0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle;
0019
0020 import java.io.*;
0021 import java.nio.charset.StandardCharsets;
0022 import java.util.*;
0023 import java.util.concurrent.ConcurrentMap;
0024 import java.util.concurrent.ExecutionException;
0025 import java.util.concurrent.Executor;
0026 import java.util.concurrent.Executors;
0027 import java.util.regex.Pattern;
0028 import java.util.stream.Collectors;
0029
0030 import org.apache.commons.lang3.builder.ToStringBuilder;
0031 import org.apache.commons.lang3.builder.ToStringStyle;
0032 import org.apache.commons.lang3.tuple.Pair;
0033 import com.fasterxml.jackson.annotation.JsonCreator;
0034 import com.fasterxml.jackson.annotation.JsonProperty;
0035 import com.fasterxml.jackson.databind.ObjectMapper;
0036 import com.google.common.annotations.VisibleForTesting;
0037 import com.google.common.cache.CacheBuilder;
0038 import com.google.common.cache.CacheLoader;
0039 import com.google.common.cache.LoadingCache;
0040 import com.google.common.cache.Weigher;
0041 import com.google.common.collect.Maps;
0042 import org.iq80.leveldb.DB;
0043 import org.iq80.leveldb.DBIterator;
0044 import org.slf4j.Logger;
0045 import org.slf4j.LoggerFactory;
0046
0047 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
0048 import org.apache.spark.network.buffer.ManagedBuffer;
0049 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
0050 import org.apache.spark.network.util.LevelDBProvider;
0051 import org.apache.spark.network.util.LevelDBProvider.StoreVersion;
0052 import org.apache.spark.network.util.JavaUtils;
0053 import org.apache.spark.network.util.NettyUtils;
0054 import org.apache.spark.network.util.TransportConf;
0055
0056
0057
0058
0059
0060
0061
0062 public class ExternalShuffleBlockResolver {
0063 private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
0064
0065 private static final ObjectMapper mapper = new ObjectMapper();
0066
0067
0068
0069
0070
0071 private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
0072 private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);
0073
0074 private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}");
0075
0076
0077 @VisibleForTesting
0078 final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
0079
0080
0081
0082
0083
0084 private final LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache;
0085
0086
0087 private final Executor directoryCleaner;
0088
0089 private final TransportConf conf;
0090
0091 private final boolean rddFetchEnabled;
0092
0093 @VisibleForTesting
0094 final File registeredExecutorFile;
0095 @VisibleForTesting
0096 final DB db;
0097
0098 private final List<String> knownManagers = Arrays.asList(
0099 "org.apache.spark.shuffle.sort.SortShuffleManager",
0100 "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");
0101
0102 public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
0103 throws IOException {
0104 this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
0105
0106 NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
0107 }
0108
0109
0110 @VisibleForTesting
0111 ExternalShuffleBlockResolver(
0112 TransportConf conf,
0113 File registeredExecutorFile,
0114 Executor directoryCleaner) throws IOException {
0115 this.conf = conf;
0116 this.rddFetchEnabled =
0117 Boolean.valueOf(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false"));
0118 this.registeredExecutorFile = registeredExecutorFile;
0119 String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
0120 CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
0121 new CacheLoader<File, ShuffleIndexInformation>() {
0122 public ShuffleIndexInformation load(File file) throws IOException {
0123 return new ShuffleIndexInformation(file);
0124 }
0125 };
0126 shuffleIndexCache = CacheBuilder.newBuilder()
0127 .maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
0128 .weigher(new Weigher<File, ShuffleIndexInformation>() {
0129 public int weigh(File file, ShuffleIndexInformation indexInfo) {
0130 return indexInfo.getSize();
0131 }
0132 })
0133 .build(indexCacheLoader);
0134 db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
0135 if (db != null) {
0136 executors = reloadRegisteredExecutors(db);
0137 } else {
0138 executors = Maps.newConcurrentMap();
0139 }
0140 this.directoryCleaner = directoryCleaner;
0141 }
0142
0143 public int getRegisteredExecutorsSize() {
0144 return executors.size();
0145 }
0146
0147
0148 public void registerExecutor(
0149 String appId,
0150 String execId,
0151 ExecutorShuffleInfo executorInfo) {
0152 AppExecId fullId = new AppExecId(appId, execId);
0153 logger.info("Registered executor {} with {}", fullId, executorInfo);
0154 if (!knownManagers.contains(executorInfo.shuffleManager)) {
0155 throw new UnsupportedOperationException(
0156 "Unsupported shuffle manager of executor: " + executorInfo);
0157 }
0158 try {
0159 if (db != null) {
0160 byte[] key = dbAppExecKey(fullId);
0161 byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8);
0162 db.put(key, value);
0163 }
0164 } catch (Exception e) {
0165 logger.error("Error saving registered executors", e);
0166 }
0167 executors.put(fullId, executorInfo);
0168 }
0169
0170
0171
0172
0173 public ManagedBuffer getBlockData(
0174 String appId,
0175 String execId,
0176 int shuffleId,
0177 long mapId,
0178 int reduceId) {
0179 return getContinuousBlocksData(appId, execId, shuffleId, mapId, reduceId, reduceId + 1);
0180 }
0181
0182
0183
0184
0185
0186 public ManagedBuffer getContinuousBlocksData(
0187 String appId,
0188 String execId,
0189 int shuffleId,
0190 long mapId,
0191 int startReduceId,
0192 int endReduceId) {
0193 ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
0194 if (executor == null) {
0195 throw new RuntimeException(
0196 String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
0197 }
0198 return getSortBasedShuffleBlockData(executor, shuffleId, mapId, startReduceId, endReduceId);
0199 }
0200
0201 public ManagedBuffer getRddBlockData(
0202 String appId,
0203 String execId,
0204 int rddId,
0205 int splitIndex) {
0206 ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
0207 if (executor == null) {
0208 throw new RuntimeException(
0209 String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
0210 }
0211 return getDiskPersistedRddBlockData(executor, rddId, splitIndex);
0212 }
0213
0214
0215
0216
0217
0218
0219
0220
0221 public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
0222 logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
0223 Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
0224 while (it.hasNext()) {
0225 Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
0226 AppExecId fullId = entry.getKey();
0227 final ExecutorShuffleInfo executor = entry.getValue();
0228
0229
0230 if (appId.equals(fullId.appId)) {
0231 it.remove();
0232 if (db != null) {
0233 try {
0234 db.delete(dbAppExecKey(fullId));
0235 } catch (IOException e) {
0236 logger.error("Error deleting {} from executor state db", appId, e);
0237 }
0238 }
0239
0240 if (cleanupLocalDirs) {
0241 logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
0242
0243
0244 directoryCleaner.execute(() -> deleteExecutorDirs(executor.localDirs));
0245 }
0246 }
0247 }
0248 }
0249
0250
0251
0252
0253
0254 public void executorRemoved(String executorId, String appId) {
0255 logger.info("Clean up non-shuffle and non-RDD files associated with the finished executor {}",
0256 executorId);
0257 AppExecId fullId = new AppExecId(appId, executorId);
0258 final ExecutorShuffleInfo executor = executors.get(fullId);
0259 if (executor == null) {
0260
0261 logger.info("Executor is not registered (appId={}, execId={})", appId, executorId);
0262 } else {
0263 logger.info("Cleaning up non-shuffle and non-RDD files in executor {}'s {} local dirs",
0264 fullId, executor.localDirs.length);
0265
0266
0267 directoryCleaner.execute(() -> deleteNonShuffleServiceServedFiles(executor.localDirs));
0268 }
0269 }
0270
0271
0272
0273
0274
0275 private void deleteExecutorDirs(String[] dirs) {
0276 for (String localDir : dirs) {
0277 try {
0278 JavaUtils.deleteRecursively(new File(localDir));
0279 logger.debug("Successfully cleaned up directory: {}", localDir);
0280 } catch (Exception e) {
0281 logger.error("Failed to delete directory: " + localDir, e);
0282 }
0283 }
0284 }
0285
0286
0287
0288
0289
0290 private void deleteNonShuffleServiceServedFiles(String[] dirs) {
0291 FilenameFilter filter = (dir, name) -> {
0292
0293 return !name.endsWith(".index") && !name.endsWith(".data")
0294 && (!rddFetchEnabled || !name.startsWith("rdd_"));
0295 };
0296
0297 for (String localDir : dirs) {
0298 try {
0299 JavaUtils.deleteRecursively(new File(localDir), filter);
0300 logger.debug("Successfully cleaned up files not served by shuffle service in directory: {}",
0301 localDir);
0302 } catch (Exception e) {
0303 logger.error("Failed to delete files not served by shuffle service in directory: "
0304 + localDir, e);
0305 }
0306 }
0307 }
0308
0309
0310
0311
0312
0313
0314 private ManagedBuffer getSortBasedShuffleBlockData(
0315 ExecutorShuffleInfo executor, int shuffleId, long mapId, int startReduceId, int endReduceId) {
0316 File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
0317 "shuffle_" + shuffleId + "_" + mapId + "_0.index");
0318
0319 try {
0320 ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
0321 ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(
0322 startReduceId, endReduceId);
0323 return new FileSegmentManagedBuffer(
0324 conf,
0325 ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
0326 "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
0327 shuffleIndexRecord.getOffset(),
0328 shuffleIndexRecord.getLength());
0329 } catch (ExecutionException e) {
0330 throw new RuntimeException("Failed to open file: " + indexFile, e);
0331 }
0332 }
0333
0334 public ManagedBuffer getDiskPersistedRddBlockData(
0335 ExecutorShuffleInfo executor, int rddId, int splitIndex) {
0336 File file = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
0337 "rdd_" + rddId + "_" + splitIndex);
0338 long fileLength = file.length();
0339 ManagedBuffer res = null;
0340 if (file.exists()) {
0341 res = new FileSegmentManagedBuffer(conf, file, 0, fileLength);
0342 }
0343 return res;
0344 }
0345
0346 void close() {
0347 if (db != null) {
0348 try {
0349 db.close();
0350 } catch (IOException e) {
0351 logger.error("Exception closing leveldb with registered executors", e);
0352 }
0353 }
0354 }
0355
0356 public int removeBlocks(String appId, String execId, String[] blockIds) {
0357 ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
0358 if (executor == null) {
0359 throw new RuntimeException(
0360 String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
0361 }
0362 int numRemovedBlocks = 0;
0363 for (String blockId : blockIds) {
0364 File file =
0365 ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
0366 if (file.delete()) {
0367 numRemovedBlocks++;
0368 } else {
0369 logger.warn("Failed to delete block: " + file.getAbsolutePath());
0370 }
0371 }
0372 return numRemovedBlocks;
0373 }
0374
0375 public Map<String, String[]> getLocalDirs(String appId, String[] execIds) {
0376 return Arrays.stream(execIds)
0377 .map(exec -> {
0378 ExecutorShuffleInfo info = executors.get(new AppExecId(appId, exec));
0379 if (info == null) {
0380 throw new RuntimeException(
0381 String.format("Executor is not registered (appId=%s, execId=%s)", appId, exec));
0382 }
0383 return Pair.of(exec, info.localDirs);
0384 })
0385 .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
0386 }
0387
0388
0389 public static class AppExecId {
0390 public final String appId;
0391 public final String execId;
0392
0393 @JsonCreator
0394 public AppExecId(@JsonProperty("appId") String appId, @JsonProperty("execId") String execId) {
0395 this.appId = appId;
0396 this.execId = execId;
0397 }
0398
0399 @Override
0400 public boolean equals(Object o) {
0401 if (this == o) return true;
0402 if (o == null || getClass() != o.getClass()) return false;
0403
0404 AppExecId appExecId = (AppExecId) o;
0405 return Objects.equals(appId, appExecId.appId) && Objects.equals(execId, appExecId.execId);
0406 }
0407
0408 @Override
0409 public int hashCode() {
0410 return Objects.hash(appId, execId);
0411 }
0412
0413 @Override
0414 public String toString() {
0415 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0416 .append("appId", appId)
0417 .append("execId", execId)
0418 .toString();
0419 }
0420 }
0421
0422 private static byte[] dbAppExecKey(AppExecId appExecId) throws IOException {
0423
0424 String appExecJson = mapper.writeValueAsString(appExecId);
0425 String key = (APP_KEY_PREFIX + ";" + appExecJson);
0426 return key.getBytes(StandardCharsets.UTF_8);
0427 }
0428
0429 private static AppExecId parseDbAppExecKey(String s) throws IOException {
0430 if (!s.startsWith(APP_KEY_PREFIX)) {
0431 throw new IllegalArgumentException("expected a string starting with " + APP_KEY_PREFIX);
0432 }
0433 String json = s.substring(APP_KEY_PREFIX.length() + 1);
0434 AppExecId parsed = mapper.readValue(json, AppExecId.class);
0435 return parsed;
0436 }
0437
0438 @VisibleForTesting
0439 static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(DB db)
0440 throws IOException {
0441 ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
0442 if (db != null) {
0443 DBIterator itr = db.iterator();
0444 itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
0445 while (itr.hasNext()) {
0446 Map.Entry<byte[], byte[]> e = itr.next();
0447 String key = new String(e.getKey(), StandardCharsets.UTF_8);
0448 if (!key.startsWith(APP_KEY_PREFIX)) {
0449 break;
0450 }
0451 AppExecId id = parseDbAppExecKey(key);
0452 logger.info("Reloading registered executors: " + id.toString());
0453 ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class);
0454 registeredExecutors.put(id, shuffleInfo);
0455 }
0456 }
0457 return registeredExecutors;
0458 }
0459 }