0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle;
0019
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.nio.charset.StandardCharsets;
0023 import java.nio.file.Files;
0024 import java.nio.file.Path;
0025 import java.util.*;
0026 import java.util.concurrent.Executor;
0027 import java.util.concurrent.atomic.AtomicBoolean;
0028 import java.util.stream.Collectors;
0029 import java.util.stream.Stream;
0030
0031 import com.google.common.collect.ImmutableMap;
0032 import com.google.common.collect.ImmutableSet;
0033 import org.junit.Test;
0034
0035 import static org.junit.Assert.assertEquals;
0036 import static org.junit.Assert.assertTrue;
0037
0038 import org.apache.spark.network.util.MapConfigProvider;
0039 import org.apache.spark.network.util.TransportConf;
0040
0041 public class CleanupNonShuffleServiceServedFilesSuite {
0042
0043
0044 private Executor sameThreadExecutor = Runnable::run;
0045
0046 private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
0047
0048 private static Set<String> expectedShuffleFilesToKeep =
0049 ImmutableSet.of("shuffle_782_450_0.index", "shuffle_782_450_0.data");
0050
0051 private static Set<String> expectedShuffleAndRddFilesToKeep =
0052 ImmutableSet.of("shuffle_782_450_0.index", "shuffle_782_450_0.data", "rdd_12_34");
0053
0054 private TransportConf getConf(boolean isFetchRddEnabled) {
0055 return new TransportConf(
0056 "shuffle",
0057 new MapConfigProvider(ImmutableMap.of(
0058 Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED,
0059 Boolean.toString(isFetchRddEnabled))));
0060 }
0061
0062 @Test
0063 public void cleanupOnRemovedExecutorWithFilesToKeepFetchRddEnabled() throws IOException {
0064 cleanupOnRemovedExecutor(true, getConf(true), expectedShuffleAndRddFilesToKeep);
0065 }
0066
0067 @Test
0068 public void cleanupOnRemovedExecutorWithFilesToKeepFetchRddDisabled() throws IOException {
0069 cleanupOnRemovedExecutor(true, getConf(false), expectedShuffleFilesToKeep);
0070 }
0071
0072 @Test
0073 public void cleanupOnRemovedExecutorWithoutFilesToKeep() throws IOException {
0074 cleanupOnRemovedExecutor(false, getConf(true), Collections.emptySet());
0075 }
0076
0077 private void cleanupOnRemovedExecutor(
0078 boolean withFilesToKeep,
0079 TransportConf conf,
0080 Set<String> expectedFilesKept) throws IOException {
0081 TestShuffleDataContext dataContext = initDataContext(withFilesToKeep);
0082
0083 ExternalShuffleBlockResolver resolver =
0084 new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
0085 resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
0086 resolver.executorRemoved("exec0", "app");
0087
0088 assertContainedFilenames(dataContext, expectedFilesKept);
0089 }
0090
0091 @Test
0092 public void cleanupUsesExecutorWithFilesToKeep() throws IOException {
0093 cleanupUsesExecutor(true);
0094 }
0095
0096 @Test
0097 public void cleanupUsesExecutorWithoutFilesToKeep() throws IOException {
0098 cleanupUsesExecutor(false);
0099 }
0100
0101 private void cleanupUsesExecutor(boolean withFilesToKeep) throws IOException {
0102 TestShuffleDataContext dataContext = initDataContext(withFilesToKeep);
0103
0104 AtomicBoolean cleanupCalled = new AtomicBoolean(false);
0105
0106
0107 Executor dummyExecutor = runnable -> cleanupCalled.set(true);
0108
0109 ExternalShuffleBlockResolver manager =
0110 new ExternalShuffleBlockResolver(getConf(true), null, dummyExecutor);
0111
0112 manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
0113 manager.executorRemoved("exec0", "app");
0114
0115 assertTrue(cleanupCalled.get());
0116 assertStillThere(dataContext);
0117 }
0118
0119 @Test
0120 public void cleanupOnlyRemovedExecutorWithFilesToKeepFetchRddEnabled() throws IOException {
0121 cleanupOnlyRemovedExecutor(true, getConf(true), expectedShuffleAndRddFilesToKeep);
0122 }
0123
0124 @Test
0125 public void cleanupOnlyRemovedExecutorWithFilesToKeepFetchRddDisabled() throws IOException {
0126 cleanupOnlyRemovedExecutor(true, getConf(false), expectedShuffleFilesToKeep);
0127 }
0128
0129 @Test
0130 public void cleanupOnlyRemovedExecutorWithoutFilesToKeep() throws IOException {
0131 cleanupOnlyRemovedExecutor(false, getConf(true) , Collections.emptySet());
0132 }
0133
0134 private void cleanupOnlyRemovedExecutor(
0135 boolean withFilesToKeep,
0136 TransportConf conf,
0137 Set<String> expectedFilesKept) throws IOException {
0138 TestShuffleDataContext dataContext0 = initDataContext(withFilesToKeep);
0139 TestShuffleDataContext dataContext1 = initDataContext(withFilesToKeep);
0140
0141 ExternalShuffleBlockResolver resolver =
0142 new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
0143 resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
0144 resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo(SORT_MANAGER));
0145
0146
0147 resolver.executorRemoved("exec-nonexistent", "app");
0148 assertStillThere(dataContext0);
0149 assertStillThere(dataContext1);
0150
0151 resolver.executorRemoved("exec0", "app");
0152 assertContainedFilenames(dataContext0, expectedFilesKept);
0153 assertStillThere(dataContext1);
0154
0155 resolver.executorRemoved("exec1", "app");
0156 assertContainedFilenames(dataContext0, expectedFilesKept);
0157 assertContainedFilenames(dataContext1, expectedFilesKept);
0158
0159
0160 resolver.executorRemoved("exec1", "app");
0161 assertContainedFilenames(dataContext0, expectedFilesKept);
0162 assertContainedFilenames(dataContext1, expectedFilesKept);
0163 }
0164
0165 @Test
0166 public void cleanupOnlyRegisteredExecutorWithFilesToKeepFetchRddEnabled() throws IOException {
0167 cleanupOnlyRegisteredExecutor(true, getConf(true), expectedShuffleAndRddFilesToKeep);
0168 }
0169
0170 @Test
0171 public void cleanupOnlyRegisteredExecutorWithFilesToKeepFetchRddDisabled() throws IOException {
0172 cleanupOnlyRegisteredExecutor(true, getConf(false), expectedShuffleFilesToKeep);
0173 }
0174
0175 @Test
0176 public void cleanupOnlyRegisteredExecutorWithoutFilesToKeep() throws IOException {
0177 cleanupOnlyRegisteredExecutor(false, getConf(true), Collections.emptySet());
0178 }
0179
0180 private void cleanupOnlyRegisteredExecutor(
0181 boolean withFilesToKeep,
0182 TransportConf conf,
0183 Set<String> expectedFilesKept) throws IOException {
0184 TestShuffleDataContext dataContext = initDataContext(withFilesToKeep);
0185
0186 ExternalShuffleBlockResolver resolver =
0187 new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
0188 resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
0189
0190 resolver.executorRemoved("exec1", "app");
0191 assertStillThere(dataContext);
0192
0193 resolver.executorRemoved("exec0", "app");
0194 assertContainedFilenames(dataContext, expectedFilesKept);
0195 }
0196
0197 private static void assertStillThere(TestShuffleDataContext dataContext) {
0198 for (String localDir : dataContext.localDirs) {
0199 assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
0200 }
0201 }
0202
0203 private static Set<String> collectFilenames(File[] files) throws IOException {
0204 Set<String> result = new HashSet<>();
0205 for (File file : files) {
0206 if (file.exists()) {
0207 try (Stream<Path> walk = Files.walk(file.toPath())) {
0208 result.addAll(walk
0209 .filter(Files::isRegularFile)
0210 .map(x -> x.toFile().getName())
0211 .collect(Collectors.toSet()));
0212 }
0213 }
0214 }
0215 return result;
0216 }
0217
0218 private static void assertContainedFilenames(
0219 TestShuffleDataContext dataContext,
0220 Set<String> expectedFilenames) throws IOException {
0221 Set<String> collectedFilenames = new HashSet<>();
0222 for (String localDir : dataContext.localDirs) {
0223 File[] dirs = new File[] { new File(localDir) };
0224 collectedFilenames.addAll(collectFilenames(dirs));
0225 }
0226 assertEquals(expectedFilenames, collectedFilenames);
0227 }
0228
0229 private static TestShuffleDataContext initDataContext(boolean withFilesToKeep)
0230 throws IOException {
0231 TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
0232 dataContext.create();
0233 if (withFilesToKeep) {
0234 createFilesToKeep(dataContext);
0235 } else {
0236 createRemovableTestFiles(dataContext);
0237 }
0238 return dataContext;
0239 }
0240
0241 private static void createFilesToKeep(TestShuffleDataContext dataContext) throws IOException {
0242 Random rand = new Random(123);
0243 dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] {
0244 "ABC".getBytes(StandardCharsets.UTF_8),
0245 "DEF".getBytes(StandardCharsets.UTF_8)});
0246 dataContext.insertCachedRddData(12, 34, new byte[] { 42 });
0247 }
0248
0249 private static void createRemovableTestFiles(TestShuffleDataContext dataContext)
0250 throws IOException {
0251 dataContext.insertSpillData();
0252 dataContext.insertBroadcastData();
0253 dataContext.insertTempShuffleData();
0254 }
0255 }