Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle;
0019 
0020 import java.io.*;
0021 import java.nio.charset.StandardCharsets;
0022 import java.util.*;
0023 import java.util.concurrent.ConcurrentMap;
0024 import java.util.concurrent.ExecutionException;
0025 import java.util.concurrent.Executor;
0026 import java.util.concurrent.Executors;
0027 import java.util.regex.Pattern;
0028 import java.util.stream.Collectors;
0029 
0030 import org.apache.commons.lang3.builder.ToStringBuilder;
0031 import org.apache.commons.lang3.builder.ToStringStyle;
0032 import org.apache.commons.lang3.tuple.Pair;
0033 import com.fasterxml.jackson.annotation.JsonCreator;
0034 import com.fasterxml.jackson.annotation.JsonProperty;
0035 import com.fasterxml.jackson.databind.ObjectMapper;
0036 import com.google.common.annotations.VisibleForTesting;
0037 import com.google.common.cache.CacheBuilder;
0038 import com.google.common.cache.CacheLoader;
0039 import com.google.common.cache.LoadingCache;
0040 import com.google.common.cache.Weigher;
0041 import com.google.common.collect.Maps;
0042 import org.iq80.leveldb.DB;
0043 import org.iq80.leveldb.DBIterator;
0044 import org.slf4j.Logger;
0045 import org.slf4j.LoggerFactory;
0046 
0047 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
0048 import org.apache.spark.network.buffer.ManagedBuffer;
0049 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
0050 import org.apache.spark.network.util.LevelDBProvider;
0051 import org.apache.spark.network.util.LevelDBProvider.StoreVersion;
0052 import org.apache.spark.network.util.JavaUtils;
0053 import org.apache.spark.network.util.NettyUtils;
0054 import org.apache.spark.network.util.TransportConf;
0055 
0056 /**
0057  * Manages converting shuffle BlockIds into physical segments of local files, from a process outside
0058  * of Executors. Each Executor must register its own configuration about where it stores its files
0059  * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
0060  * from Spark's IndexShuffleBlockResolver.
0061  */
0062 public class ExternalShuffleBlockResolver {
0063   private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
0064 
0065   private static final ObjectMapper mapper = new ObjectMapper();
0066 
0067   /**
0068    * This a common prefix to the key for each app registration we stick in leveldb, so they
0069    * are easy to find, since leveldb lets you search based on prefix.
0070    */
0071   private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
0072   private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);
0073 
0074   private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}");
0075 
0076   // Map containing all registered executors' metadata.
0077   @VisibleForTesting
0078   final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
0079 
0080   /**
0081    *  Caches index file information so that we can avoid open/close the index files
0082    *  for each block fetch.
0083    */
0084   private final LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache;
0085 
0086   // Single-threaded Java executor used to perform expensive recursive directory deletion.
0087   private final Executor directoryCleaner;
0088 
0089   private final TransportConf conf;
0090 
0091   private final boolean rddFetchEnabled;
0092 
0093   @VisibleForTesting
0094   final File registeredExecutorFile;
0095   @VisibleForTesting
0096   final DB db;
0097 
0098   private final List<String> knownManagers = Arrays.asList(
0099     "org.apache.spark.shuffle.sort.SortShuffleManager",
0100     "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");
0101 
0102   public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
0103       throws IOException {
0104     this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
0105         // Add `spark` prefix because it will run in NM in Yarn mode.
0106         NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
0107   }
0108 
0109   // Allows tests to have more control over when directories are cleaned up.
0110   @VisibleForTesting
0111   ExternalShuffleBlockResolver(
0112       TransportConf conf,
0113       File registeredExecutorFile,
0114       Executor directoryCleaner) throws IOException {
0115     this.conf = conf;
0116     this.rddFetchEnabled =
0117       Boolean.valueOf(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false"));
0118     this.registeredExecutorFile = registeredExecutorFile;
0119     String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
0120     CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
0121         new CacheLoader<File, ShuffleIndexInformation>() {
0122           public ShuffleIndexInformation load(File file) throws IOException {
0123             return new ShuffleIndexInformation(file);
0124           }
0125         };
0126     shuffleIndexCache = CacheBuilder.newBuilder()
0127       .maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
0128       .weigher(new Weigher<File, ShuffleIndexInformation>() {
0129         public int weigh(File file, ShuffleIndexInformation indexInfo) {
0130           return indexInfo.getSize();
0131         }
0132       })
0133       .build(indexCacheLoader);
0134     db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
0135     if (db != null) {
0136       executors = reloadRegisteredExecutors(db);
0137     } else {
0138       executors = Maps.newConcurrentMap();
0139     }
0140     this.directoryCleaner = directoryCleaner;
0141   }
0142 
0143   public int getRegisteredExecutorsSize() {
0144     return executors.size();
0145   }
0146 
0147   /** Registers a new Executor with all the configuration we need to find its shuffle files. */
0148   public void registerExecutor(
0149       String appId,
0150       String execId,
0151       ExecutorShuffleInfo executorInfo) {
0152     AppExecId fullId = new AppExecId(appId, execId);
0153     logger.info("Registered executor {} with {}", fullId, executorInfo);
0154     if (!knownManagers.contains(executorInfo.shuffleManager)) {
0155       throw new UnsupportedOperationException(
0156         "Unsupported shuffle manager of executor: " + executorInfo);
0157     }
0158     try {
0159       if (db != null) {
0160         byte[] key = dbAppExecKey(fullId);
0161         byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8);
0162         db.put(key, value);
0163       }
0164     } catch (Exception e) {
0165       logger.error("Error saving registered executors", e);
0166     }
0167     executors.put(fullId, executorInfo);
0168   }
0169 
0170   /**
0171    * Obtains a FileSegmentManagedBuffer from a single block (shuffleId, mapId, reduceId).
0172    */
0173   public ManagedBuffer getBlockData(
0174       String appId,
0175       String execId,
0176       int shuffleId,
0177       long mapId,
0178       int reduceId) {
0179     return getContinuousBlocksData(appId, execId, shuffleId, mapId, reduceId, reduceId + 1);
0180   }
0181 
0182   /**
0183    * Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, [startReduceId, endReduceId)).
0184    * We make assumptions about how the hash and sort based shuffles store their data.
0185    */
0186   public ManagedBuffer getContinuousBlocksData(
0187       String appId,
0188       String execId,
0189       int shuffleId,
0190       long mapId,
0191       int startReduceId,
0192       int endReduceId) {
0193     ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
0194     if (executor == null) {
0195       throw new RuntimeException(
0196         String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
0197     }
0198     return getSortBasedShuffleBlockData(executor, shuffleId, mapId, startReduceId, endReduceId);
0199   }
0200 
0201   public ManagedBuffer getRddBlockData(
0202       String appId,
0203       String execId,
0204       int rddId,
0205       int splitIndex) {
0206     ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
0207     if (executor == null) {
0208       throw new RuntimeException(
0209         String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
0210     }
0211     return getDiskPersistedRddBlockData(executor, rddId, splitIndex);
0212   }
0213   /**
0214    * Removes our metadata of all executors registered for the given application, and optionally
0215    * also deletes the local directories associated with the executors of that application in a
0216    * separate thread.
0217    *
0218    * It is not valid to call registerExecutor() for an executor with this appId after invoking
0219    * this method.
0220    */
0221   public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
0222     logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
0223     Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
0224     while (it.hasNext()) {
0225       Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
0226       AppExecId fullId = entry.getKey();
0227       final ExecutorShuffleInfo executor = entry.getValue();
0228 
0229       // Only touch executors associated with the appId that was removed.
0230       if (appId.equals(fullId.appId)) {
0231         it.remove();
0232         if (db != null) {
0233           try {
0234             db.delete(dbAppExecKey(fullId));
0235           } catch (IOException e) {
0236             logger.error("Error deleting {} from executor state db", appId, e);
0237           }
0238         }
0239 
0240         if (cleanupLocalDirs) {
0241           logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
0242 
0243           // Execute the actual deletion in a different thread, as it may take some time.
0244           directoryCleaner.execute(() -> deleteExecutorDirs(executor.localDirs));
0245         }
0246       }
0247     }
0248   }
0249 
0250   /**
0251    * Removes all the files which cannot be served by the external shuffle service (non-shuffle and
0252    * non-RDD files) in any local directories associated with the finished executor.
0253    */
0254   public void executorRemoved(String executorId, String appId) {
0255     logger.info("Clean up non-shuffle and non-RDD files associated with the finished executor {}",
0256       executorId);
0257     AppExecId fullId = new AppExecId(appId, executorId);
0258     final ExecutorShuffleInfo executor = executors.get(fullId);
0259     if (executor == null) {
0260       // Executor not registered, skip clean up of the local directories.
0261       logger.info("Executor is not registered (appId={}, execId={})", appId, executorId);
0262     } else {
0263       logger.info("Cleaning up non-shuffle and non-RDD files in executor {}'s {} local dirs",
0264         fullId, executor.localDirs.length);
0265 
0266       // Execute the actual deletion in a different thread, as it may take some time.
0267       directoryCleaner.execute(() -> deleteNonShuffleServiceServedFiles(executor.localDirs));
0268     }
0269   }
0270 
0271   /**
0272    * Synchronously deletes each directory one at a time.
0273    * Should be executed in its own thread, as this may take a long time.
0274    */
0275   private void deleteExecutorDirs(String[] dirs) {
0276     for (String localDir : dirs) {
0277       try {
0278         JavaUtils.deleteRecursively(new File(localDir));
0279         logger.debug("Successfully cleaned up directory: {}", localDir);
0280       } catch (Exception e) {
0281         logger.error("Failed to delete directory: " + localDir, e);
0282       }
0283     }
0284   }
0285 
0286   /**
0287    * Synchronously deletes files not served by shuffle service in each directory recursively.
0288    * Should be executed in its own thread, as this may take a long time.
0289    */
0290   private void deleteNonShuffleServiceServedFiles(String[] dirs) {
0291     FilenameFilter filter = (dir, name) -> {
0292       // Don't delete shuffle data, shuffle index files or cached RDD files.
0293       return !name.endsWith(".index") && !name.endsWith(".data")
0294         && (!rddFetchEnabled || !name.startsWith("rdd_"));
0295     };
0296 
0297     for (String localDir : dirs) {
0298       try {
0299         JavaUtils.deleteRecursively(new File(localDir), filter);
0300         logger.debug("Successfully cleaned up files not served by shuffle service in directory: {}",
0301           localDir);
0302       } catch (Exception e) {
0303         logger.error("Failed to delete files not served by shuffle service in directory: "
0304           + localDir, e);
0305       }
0306     }
0307   }
0308 
0309   /**
0310    * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
0311    * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
0312    * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
0313    */
0314   private ManagedBuffer getSortBasedShuffleBlockData(
0315     ExecutorShuffleInfo executor, int shuffleId, long mapId, int startReduceId, int endReduceId) {
0316     File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
0317       "shuffle_" + shuffleId + "_" + mapId + "_0.index");
0318 
0319     try {
0320       ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
0321       ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(
0322         startReduceId, endReduceId);
0323       return new FileSegmentManagedBuffer(
0324         conf,
0325         ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
0326           "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
0327         shuffleIndexRecord.getOffset(),
0328         shuffleIndexRecord.getLength());
0329     } catch (ExecutionException e) {
0330       throw new RuntimeException("Failed to open file: " + indexFile, e);
0331     }
0332   }
0333 
0334   public ManagedBuffer getDiskPersistedRddBlockData(
0335       ExecutorShuffleInfo executor, int rddId, int splitIndex) {
0336     File file = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
0337       "rdd_" + rddId + "_" + splitIndex);
0338     long fileLength = file.length();
0339     ManagedBuffer res = null;
0340     if (file.exists()) {
0341       res = new FileSegmentManagedBuffer(conf, file, 0, fileLength);
0342     }
0343     return res;
0344   }
0345 
0346   void close() {
0347     if (db != null) {
0348       try {
0349         db.close();
0350       } catch (IOException e) {
0351         logger.error("Exception closing leveldb with registered executors", e);
0352       }
0353     }
0354   }
0355 
0356   public int removeBlocks(String appId, String execId, String[] blockIds) {
0357     ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
0358     if (executor == null) {
0359       throw new RuntimeException(
0360         String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
0361     }
0362     int numRemovedBlocks = 0;
0363     for (String blockId : blockIds) {
0364       File file =
0365         ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
0366       if (file.delete()) {
0367         numRemovedBlocks++;
0368       } else {
0369         logger.warn("Failed to delete block: " + file.getAbsolutePath());
0370       }
0371     }
0372     return numRemovedBlocks;
0373   }
0374 
0375   public Map<String, String[]> getLocalDirs(String appId, String[] execIds) {
0376     return Arrays.stream(execIds)
0377       .map(exec -> {
0378         ExecutorShuffleInfo info = executors.get(new AppExecId(appId, exec));
0379         if (info == null) {
0380           throw new RuntimeException(
0381             String.format("Executor is not registered (appId=%s, execId=%s)", appId, exec));
0382         }
0383         return Pair.of(exec, info.localDirs);
0384       })
0385       .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
0386   }
0387 
0388   /** Simply encodes an executor's full ID, which is appId + execId. */
0389   public static class AppExecId {
0390     public final String appId;
0391     public final String execId;
0392 
0393     @JsonCreator
0394     public AppExecId(@JsonProperty("appId") String appId, @JsonProperty("execId") String execId) {
0395       this.appId = appId;
0396       this.execId = execId;
0397     }
0398 
0399     @Override
0400     public boolean equals(Object o) {
0401       if (this == o) return true;
0402       if (o == null || getClass() != o.getClass()) return false;
0403 
0404       AppExecId appExecId = (AppExecId) o;
0405       return Objects.equals(appId, appExecId.appId) && Objects.equals(execId, appExecId.execId);
0406     }
0407 
0408     @Override
0409     public int hashCode() {
0410       return Objects.hash(appId, execId);
0411     }
0412 
0413     @Override
0414     public String toString() {
0415       return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0416         .append("appId", appId)
0417         .append("execId", execId)
0418         .toString();
0419     }
0420   }
0421 
0422   private static byte[] dbAppExecKey(AppExecId appExecId) throws IOException {
0423     // we stick a common prefix on all the keys so we can find them in the DB
0424     String appExecJson = mapper.writeValueAsString(appExecId);
0425     String key = (APP_KEY_PREFIX + ";" + appExecJson);
0426     return key.getBytes(StandardCharsets.UTF_8);
0427   }
0428 
0429   private static AppExecId parseDbAppExecKey(String s) throws IOException {
0430     if (!s.startsWith(APP_KEY_PREFIX)) {
0431       throw new IllegalArgumentException("expected a string starting with " + APP_KEY_PREFIX);
0432     }
0433     String json = s.substring(APP_KEY_PREFIX.length() + 1);
0434     AppExecId parsed = mapper.readValue(json, AppExecId.class);
0435     return parsed;
0436   }
0437 
0438   @VisibleForTesting
0439   static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(DB db)
0440       throws IOException {
0441     ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
0442     if (db != null) {
0443       DBIterator itr = db.iterator();
0444       itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
0445       while (itr.hasNext()) {
0446         Map.Entry<byte[], byte[]> e = itr.next();
0447         String key = new String(e.getKey(), StandardCharsets.UTF_8);
0448         if (!key.startsWith(APP_KEY_PREFIX)) {
0449           break;
0450         }
0451         AppExecId id = parseDbAppExecKey(key);
0452         logger.info("Reloading registered executors: " +  id.toString());
0453         ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class);
0454         registeredExecutors.put(id, shuffleInfo);
0455       }
0456     }
0457     return registeredExecutors;
0458   }
0459 }