Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle;
0019 
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.nio.charset.StandardCharsets;
0023 import java.util.Random;
0024 import java.util.concurrent.Executor;
0025 import java.util.concurrent.atomic.AtomicBoolean;
0026 
0027 import org.junit.Test;
0028 import static org.junit.Assert.assertFalse;
0029 import static org.junit.Assert.assertTrue;
0030 
0031 import org.apache.spark.network.util.MapConfigProvider;
0032 import org.apache.spark.network.util.TransportConf;
0033 
0034 public class ExternalShuffleCleanupSuite {
0035 
0036   // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
0037   private Executor sameThreadExecutor = Runnable::run;
0038   private TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
0039   private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
0040 
0041   @Test
0042   public void noCleanupAndCleanup() throws IOException {
0043     TestShuffleDataContext dataContext = createSomeData();
0044 
0045     ExternalShuffleBlockResolver resolver =
0046       new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
0047     resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
0048     resolver.applicationRemoved("app", false /* cleanup */);
0049 
0050     assertStillThere(dataContext);
0051 
0052     resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo(SORT_MANAGER));
0053     resolver.applicationRemoved("app", true /* cleanup */);
0054 
0055     assertCleanedUp(dataContext);
0056   }
0057 
0058   @Test
0059   public void cleanupUsesExecutor() throws IOException {
0060     TestShuffleDataContext dataContext = createSomeData();
0061 
0062     AtomicBoolean cleanupCalled = new AtomicBoolean(false);
0063 
0064     // Executor which does nothing to ensure we're actually using it.
0065     Executor noThreadExecutor = runnable -> cleanupCalled.set(true);
0066 
0067     ExternalShuffleBlockResolver manager =
0068       new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);
0069 
0070     manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
0071     manager.applicationRemoved("app", true);
0072 
0073     assertTrue(cleanupCalled.get());
0074     assertStillThere(dataContext);
0075 
0076     dataContext.cleanup();
0077     assertCleanedUp(dataContext);
0078   }
0079 
0080   @Test
0081   public void cleanupMultipleExecutors() throws IOException {
0082     TestShuffleDataContext dataContext0 = createSomeData();
0083     TestShuffleDataContext dataContext1 = createSomeData();
0084 
0085     ExternalShuffleBlockResolver resolver =
0086       new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
0087 
0088     resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
0089     resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo(SORT_MANAGER));
0090     resolver.applicationRemoved("app", true);
0091 
0092     assertCleanedUp(dataContext0);
0093     assertCleanedUp(dataContext1);
0094   }
0095 
0096   @Test
0097   public void cleanupOnlyRemovedApp() throws IOException {
0098     TestShuffleDataContext dataContext0 = createSomeData();
0099     TestShuffleDataContext dataContext1 = createSomeData();
0100 
0101     ExternalShuffleBlockResolver resolver =
0102       new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
0103 
0104     resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
0105     resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo(SORT_MANAGER));
0106 
0107     resolver.applicationRemoved("app-nonexistent", true);
0108     assertStillThere(dataContext0);
0109     assertStillThere(dataContext1);
0110 
0111     resolver.applicationRemoved("app-0", true);
0112     assertCleanedUp(dataContext0);
0113     assertStillThere(dataContext1);
0114 
0115     resolver.applicationRemoved("app-1", true);
0116     assertCleanedUp(dataContext0);
0117     assertCleanedUp(dataContext1);
0118 
0119     // Make sure it's not an error to cleanup multiple times
0120     resolver.applicationRemoved("app-1", true);
0121     assertCleanedUp(dataContext0);
0122     assertCleanedUp(dataContext1);
0123   }
0124 
0125   private static void assertStillThere(TestShuffleDataContext dataContext) {
0126     for (String localDir : dataContext.localDirs) {
0127       assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
0128     }
0129   }
0130 
0131   private static void assertCleanedUp(TestShuffleDataContext dataContext) {
0132     for (String localDir : dataContext.localDirs) {
0133       assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists());
0134     }
0135   }
0136 
0137   private static TestShuffleDataContext createSomeData() throws IOException {
0138     Random rand = new Random(123);
0139     TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
0140 
0141     dataContext.create();
0142     dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] {
0143         "ABC".getBytes(StandardCharsets.UTF_8),
0144         "DEF".getBytes(StandardCharsets.UTF_8)});
0145     return dataContext;
0146   }
0147 }