0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle;
0019
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.nio.charset.StandardCharsets;
0023 import java.util.Random;
0024 import java.util.concurrent.Executor;
0025 import java.util.concurrent.atomic.AtomicBoolean;
0026
0027 import org.junit.Test;
0028 import static org.junit.Assert.assertFalse;
0029 import static org.junit.Assert.assertTrue;
0030
0031 import org.apache.spark.network.util.MapConfigProvider;
0032 import org.apache.spark.network.util.TransportConf;
0033
0034 public class ExternalShuffleCleanupSuite {
0035
0036
0037 private Executor sameThreadExecutor = Runnable::run;
0038 private TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
0039 private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
0040
0041 @Test
0042 public void noCleanupAndCleanup() throws IOException {
0043 TestShuffleDataContext dataContext = createSomeData();
0044
0045 ExternalShuffleBlockResolver resolver =
0046 new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
0047 resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
0048 resolver.applicationRemoved("app", false );
0049
0050 assertStillThere(dataContext);
0051
0052 resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo(SORT_MANAGER));
0053 resolver.applicationRemoved("app", true );
0054
0055 assertCleanedUp(dataContext);
0056 }
0057
0058 @Test
0059 public void cleanupUsesExecutor() throws IOException {
0060 TestShuffleDataContext dataContext = createSomeData();
0061
0062 AtomicBoolean cleanupCalled = new AtomicBoolean(false);
0063
0064
0065 Executor noThreadExecutor = runnable -> cleanupCalled.set(true);
0066
0067 ExternalShuffleBlockResolver manager =
0068 new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);
0069
0070 manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
0071 manager.applicationRemoved("app", true);
0072
0073 assertTrue(cleanupCalled.get());
0074 assertStillThere(dataContext);
0075
0076 dataContext.cleanup();
0077 assertCleanedUp(dataContext);
0078 }
0079
0080 @Test
0081 public void cleanupMultipleExecutors() throws IOException {
0082 TestShuffleDataContext dataContext0 = createSomeData();
0083 TestShuffleDataContext dataContext1 = createSomeData();
0084
0085 ExternalShuffleBlockResolver resolver =
0086 new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
0087
0088 resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
0089 resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo(SORT_MANAGER));
0090 resolver.applicationRemoved("app", true);
0091
0092 assertCleanedUp(dataContext0);
0093 assertCleanedUp(dataContext1);
0094 }
0095
0096 @Test
0097 public void cleanupOnlyRemovedApp() throws IOException {
0098 TestShuffleDataContext dataContext0 = createSomeData();
0099 TestShuffleDataContext dataContext1 = createSomeData();
0100
0101 ExternalShuffleBlockResolver resolver =
0102 new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
0103
0104 resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
0105 resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo(SORT_MANAGER));
0106
0107 resolver.applicationRemoved("app-nonexistent", true);
0108 assertStillThere(dataContext0);
0109 assertStillThere(dataContext1);
0110
0111 resolver.applicationRemoved("app-0", true);
0112 assertCleanedUp(dataContext0);
0113 assertStillThere(dataContext1);
0114
0115 resolver.applicationRemoved("app-1", true);
0116 assertCleanedUp(dataContext0);
0117 assertCleanedUp(dataContext1);
0118
0119
0120 resolver.applicationRemoved("app-1", true);
0121 assertCleanedUp(dataContext0);
0122 assertCleanedUp(dataContext1);
0123 }
0124
0125 private static void assertStillThere(TestShuffleDataContext dataContext) {
0126 for (String localDir : dataContext.localDirs) {
0127 assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
0128 }
0129 }
0130
0131 private static void assertCleanedUp(TestShuffleDataContext dataContext) {
0132 for (String localDir : dataContext.localDirs) {
0133 assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists());
0134 }
0135 }
0136
0137 private static TestShuffleDataContext createSomeData() throws IOException {
0138 Random rand = new Random(123);
0139 TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
0140
0141 dataContext.create();
0142 dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] {
0143 "ABC".getBytes(StandardCharsets.UTF_8),
0144 "DEF".getBytes(StandardCharsets.UTF_8)});
0145 return dataContext;
0146 }
0147 }