Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle;
0019 
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.nio.charset.StandardCharsets;
0023 import java.nio.file.Files;
0024 import java.nio.file.Path;
0025 import java.util.*;
0026 import java.util.concurrent.Executor;
0027 import java.util.concurrent.atomic.AtomicBoolean;
0028 import java.util.stream.Collectors;
0029 import java.util.stream.Stream;
0030 
0031 import com.google.common.collect.ImmutableMap;
0032 import com.google.common.collect.ImmutableSet;
0033 import org.junit.Test;
0034 
0035 import static org.junit.Assert.assertEquals;
0036 import static org.junit.Assert.assertTrue;
0037 
0038 import org.apache.spark.network.util.MapConfigProvider;
0039 import org.apache.spark.network.util.TransportConf;
0040 
0041 public class CleanupNonShuffleServiceServedFilesSuite {
0042 
0043   // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
0044   private Executor sameThreadExecutor = Runnable::run;
0045 
0046   private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
0047 
0048   private static Set<String> expectedShuffleFilesToKeep =
0049     ImmutableSet.of("shuffle_782_450_0.index", "shuffle_782_450_0.data");
0050 
0051   private static Set<String> expectedShuffleAndRddFilesToKeep =
0052     ImmutableSet.of("shuffle_782_450_0.index", "shuffle_782_450_0.data", "rdd_12_34");
0053 
0054   private TransportConf getConf(boolean isFetchRddEnabled) {
0055     return new TransportConf(
0056       "shuffle",
0057       new MapConfigProvider(ImmutableMap.of(
0058         Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED,
0059         Boolean.toString(isFetchRddEnabled))));
0060   }
0061 
0062   @Test
0063   public void cleanupOnRemovedExecutorWithFilesToKeepFetchRddEnabled() throws IOException {
0064     cleanupOnRemovedExecutor(true, getConf(true), expectedShuffleAndRddFilesToKeep);
0065   }
0066 
0067   @Test
0068   public void cleanupOnRemovedExecutorWithFilesToKeepFetchRddDisabled() throws IOException {
0069     cleanupOnRemovedExecutor(true, getConf(false), expectedShuffleFilesToKeep);
0070   }
0071 
0072   @Test
0073   public void cleanupOnRemovedExecutorWithoutFilesToKeep() throws IOException {
0074     cleanupOnRemovedExecutor(false, getConf(true), Collections.emptySet());
0075   }
0076 
0077   private void cleanupOnRemovedExecutor(
0078       boolean withFilesToKeep,
0079       TransportConf conf,
0080       Set<String> expectedFilesKept) throws IOException {
0081     TestShuffleDataContext dataContext = initDataContext(withFilesToKeep);
0082 
0083     ExternalShuffleBlockResolver resolver =
0084       new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
0085     resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
0086     resolver.executorRemoved("exec0", "app");
0087 
0088     assertContainedFilenames(dataContext, expectedFilesKept);
0089   }
0090 
0091   @Test
0092   public void cleanupUsesExecutorWithFilesToKeep() throws IOException {
0093     cleanupUsesExecutor(true);
0094   }
0095 
0096   @Test
0097   public void cleanupUsesExecutorWithoutFilesToKeep() throws IOException {
0098     cleanupUsesExecutor(false);
0099   }
0100 
0101   private void cleanupUsesExecutor(boolean withFilesToKeep) throws IOException {
0102     TestShuffleDataContext dataContext = initDataContext(withFilesToKeep);
0103 
0104     AtomicBoolean cleanupCalled = new AtomicBoolean(false);
0105 
0106     // Executor which only captures whether it's being used, without executing anything.
0107     Executor dummyExecutor = runnable -> cleanupCalled.set(true);
0108 
0109     ExternalShuffleBlockResolver manager =
0110       new ExternalShuffleBlockResolver(getConf(true), null, dummyExecutor);
0111 
0112     manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
0113     manager.executorRemoved("exec0", "app");
0114 
0115     assertTrue(cleanupCalled.get());
0116     assertStillThere(dataContext);
0117   }
0118 
0119   @Test
0120   public void cleanupOnlyRemovedExecutorWithFilesToKeepFetchRddEnabled() throws IOException {
0121     cleanupOnlyRemovedExecutor(true, getConf(true), expectedShuffleAndRddFilesToKeep);
0122   }
0123 
0124   @Test
0125   public void cleanupOnlyRemovedExecutorWithFilesToKeepFetchRddDisabled() throws IOException {
0126     cleanupOnlyRemovedExecutor(true, getConf(false), expectedShuffleFilesToKeep);
0127   }
0128 
0129   @Test
0130   public void cleanupOnlyRemovedExecutorWithoutFilesToKeep() throws IOException {
0131     cleanupOnlyRemovedExecutor(false, getConf(true) , Collections.emptySet());
0132   }
0133 
0134   private void cleanupOnlyRemovedExecutor(
0135       boolean withFilesToKeep,
0136       TransportConf conf,
0137       Set<String> expectedFilesKept) throws IOException {
0138     TestShuffleDataContext dataContext0 = initDataContext(withFilesToKeep);
0139     TestShuffleDataContext dataContext1 = initDataContext(withFilesToKeep);
0140 
0141     ExternalShuffleBlockResolver resolver =
0142       new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
0143     resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
0144     resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo(SORT_MANAGER));
0145 
0146 
0147     resolver.executorRemoved("exec-nonexistent", "app");
0148     assertStillThere(dataContext0);
0149     assertStillThere(dataContext1);
0150 
0151     resolver.executorRemoved("exec0", "app");
0152     assertContainedFilenames(dataContext0, expectedFilesKept);
0153     assertStillThere(dataContext1);
0154 
0155     resolver.executorRemoved("exec1", "app");
0156     assertContainedFilenames(dataContext0, expectedFilesKept);
0157     assertContainedFilenames(dataContext1, expectedFilesKept);
0158 
0159     // Make sure it's not an error to cleanup multiple times
0160     resolver.executorRemoved("exec1", "app");
0161     assertContainedFilenames(dataContext0, expectedFilesKept);
0162     assertContainedFilenames(dataContext1, expectedFilesKept);
0163   }
0164 
0165   @Test
0166   public void cleanupOnlyRegisteredExecutorWithFilesToKeepFetchRddEnabled() throws IOException {
0167     cleanupOnlyRegisteredExecutor(true, getConf(true), expectedShuffleAndRddFilesToKeep);
0168   }
0169 
0170   @Test
0171   public void cleanupOnlyRegisteredExecutorWithFilesToKeepFetchRddDisabled() throws IOException {
0172     cleanupOnlyRegisteredExecutor(true, getConf(false), expectedShuffleFilesToKeep);
0173   }
0174 
0175   @Test
0176   public void cleanupOnlyRegisteredExecutorWithoutFilesToKeep() throws IOException {
0177     cleanupOnlyRegisteredExecutor(false, getConf(true), Collections.emptySet());
0178   }
0179 
0180   private void cleanupOnlyRegisteredExecutor(
0181       boolean withFilesToKeep,
0182       TransportConf conf,
0183       Set<String> expectedFilesKept) throws IOException {
0184     TestShuffleDataContext dataContext = initDataContext(withFilesToKeep);
0185 
0186     ExternalShuffleBlockResolver resolver =
0187       new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
0188     resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
0189 
0190     resolver.executorRemoved("exec1", "app");
0191     assertStillThere(dataContext);
0192 
0193     resolver.executorRemoved("exec0", "app");
0194     assertContainedFilenames(dataContext, expectedFilesKept);
0195   }
0196 
0197   private static void assertStillThere(TestShuffleDataContext dataContext) {
0198     for (String localDir : dataContext.localDirs) {
0199       assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
0200     }
0201   }
0202 
0203   private static Set<String> collectFilenames(File[] files) throws IOException {
0204     Set<String> result = new HashSet<>();
0205     for (File file : files) {
0206       if (file.exists()) {
0207         try (Stream<Path> walk = Files.walk(file.toPath())) {
0208           result.addAll(walk
0209             .filter(Files::isRegularFile)
0210             .map(x -> x.toFile().getName())
0211             .collect(Collectors.toSet()));
0212         }
0213       }
0214     }
0215     return result;
0216   }
0217 
0218   private static void assertContainedFilenames(
0219       TestShuffleDataContext dataContext,
0220       Set<String> expectedFilenames) throws IOException {
0221     Set<String> collectedFilenames = new HashSet<>();
0222     for (String localDir : dataContext.localDirs) {
0223       File[] dirs = new File[] { new File(localDir) };
0224       collectedFilenames.addAll(collectFilenames(dirs));
0225     }
0226     assertEquals(expectedFilenames, collectedFilenames);
0227   }
0228 
0229   private static TestShuffleDataContext initDataContext(boolean withFilesToKeep)
0230       throws IOException {
0231     TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
0232     dataContext.create();
0233     if (withFilesToKeep) {
0234       createFilesToKeep(dataContext);
0235     } else {
0236       createRemovableTestFiles(dataContext);
0237     }
0238     return dataContext;
0239   }
0240 
0241   private static void createFilesToKeep(TestShuffleDataContext dataContext) throws IOException {
0242     Random rand = new Random(123);
0243     dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] {
0244         "ABC".getBytes(StandardCharsets.UTF_8),
0245         "DEF".getBytes(StandardCharsets.UTF_8)});
0246     dataContext.insertCachedRddData(12, 34, new byte[] { 42 });
0247   }
0248 
0249   private static void createRemovableTestFiles(TestShuffleDataContext dataContext)
0250       throws IOException {
0251     dataContext.insertSpillData();
0252     dataContext.insertBroadcastData();
0253     dataContext.insertTempShuffleData();
0254   }
0255 }