Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle;
0019 
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.nio.ByteBuffer;
0023 import java.util.Arrays;
0024 import java.util.Collections;
0025 import java.util.HashMap;
0026 import java.util.HashSet;
0027 import java.util.LinkedList;
0028 import java.util.List;
0029 import java.util.Random;
0030 import java.util.Set;
0031 import java.util.concurrent.Future;
0032 import java.util.concurrent.Semaphore;
0033 import java.util.concurrent.TimeUnit;
0034 
0035 import com.google.common.collect.ImmutableMap;
0036 import com.google.common.collect.Sets;
0037 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
0038 import org.apache.spark.network.server.OneForOneStreamManager;
0039 import org.junit.After;
0040 import org.junit.AfterClass;
0041 import org.junit.BeforeClass;
0042 import org.junit.Test;
0043 
0044 import static org.junit.Assert.*;
0045 
0046 import org.apache.spark.network.TestUtils;
0047 import org.apache.spark.network.TransportContext;
0048 import org.apache.spark.network.buffer.ManagedBuffer;
0049 import org.apache.spark.network.buffer.NioManagedBuffer;
0050 import org.apache.spark.network.server.TransportServer;
0051 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
0052 import org.apache.spark.network.util.MapConfigProvider;
0053 import org.apache.spark.network.util.TransportConf;
0054 
0055 public class ExternalShuffleIntegrationSuite {
0056 
0057   private static final String APP_ID = "app-id";
0058   private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
0059 
0060   private static final int RDD_ID = 1;
0061   private static final int SPLIT_INDEX_VALID_BLOCK = 0;
0062   private static final int SPLIT_INDEX_MISSING_FILE = 1;
0063   private static final int SPLIT_INDEX_CORRUPT_LENGTH = 2;
0064   private static final int SPLIT_INDEX_VALID_BLOCK_TO_RM = 3;
0065   private static final int SPLIT_INDEX_MISSING_BLOCK_TO_RM = 4;
0066 
0067   // Executor 0 is sort-based
0068   static TestShuffleDataContext dataContext0;
0069 
0070   static ExternalBlockHandler handler;
0071   static TransportServer server;
0072   static TransportConf conf;
0073   static TransportContext transportContext;
0074 
0075   static byte[] exec0RddBlockValid = new byte[123];
0076   static byte[] exec0RddBlockToRemove = new byte[124];
0077 
0078   static byte[][] exec0Blocks = new byte[][] {
0079     new byte[123],
0080     new byte[12345],
0081     new byte[1234567],
0082   };
0083 
0084   static byte[][] exec1Blocks = new byte[][] {
0085     new byte[321],
0086     new byte[54321],
0087   };
0088 
0089   @BeforeClass
0090   public static void beforeAll() throws IOException {
0091     Random rand = new Random();
0092 
0093     for (byte[] block : exec0Blocks) {
0094       rand.nextBytes(block);
0095     }
0096     for (byte[] block: exec1Blocks) {
0097       rand.nextBytes(block);
0098     }
0099     rand.nextBytes(exec0RddBlockValid);
0100     rand.nextBytes(exec0RddBlockToRemove);
0101 
0102     dataContext0 = new TestShuffleDataContext(2, 5);
0103     dataContext0.create();
0104     dataContext0.insertSortShuffleData(0, 0, exec0Blocks);
0105     dataContext0.insertCachedRddData(RDD_ID, SPLIT_INDEX_VALID_BLOCK, exec0RddBlockValid);
0106     dataContext0.insertCachedRddData(RDD_ID, SPLIT_INDEX_VALID_BLOCK_TO_RM, exec0RddBlockToRemove);
0107 
0108     HashMap<String, String> config = new HashMap<>();
0109     config.put("spark.shuffle.io.maxRetries", "0");
0110     config.put(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "true");
0111     conf = new TransportConf("shuffle", new MapConfigProvider(config));
0112     handler = new ExternalBlockHandler(
0113       new OneForOneStreamManager(),
0114       new ExternalShuffleBlockResolver(conf, null) {
0115         @Override
0116         public ManagedBuffer getRddBlockData(String appId, String execId, int rddId, int splitIdx) {
0117           ManagedBuffer res;
0118           if (rddId == RDD_ID) {
0119             switch (splitIdx) {
0120               case SPLIT_INDEX_CORRUPT_LENGTH:
0121                 res = new FileSegmentManagedBuffer(conf, new File("missing.file"), 0, 12);
0122                 break;
0123               default:
0124                 res = super.getRddBlockData(appId, execId, rddId, splitIdx);
0125             }
0126           } else {
0127             res = super.getRddBlockData(appId, execId, rddId, splitIdx);
0128           }
0129           return res;
0130         }
0131       });
0132     transportContext = new TransportContext(conf, handler);
0133     server = transportContext.createServer();
0134   }
0135 
0136   @AfterClass
0137   public static void afterAll() {
0138     dataContext0.cleanup();
0139     server.close();
0140     transportContext.close();
0141   }
0142 
0143   @After
0144   public void afterEach() {
0145     handler.applicationRemoved(APP_ID, false /* cleanupLocalDirs */);
0146   }
0147 
0148   static class FetchResult {
0149     public Set<String> successBlocks;
0150     public Set<String> failedBlocks;
0151     public List<ManagedBuffer> buffers;
0152 
0153     public void releaseBuffers() {
0154       for (ManagedBuffer buffer : buffers) {
0155         buffer.release();
0156       }
0157     }
0158   }
0159 
0160   // Fetch a set of blocks from a pre-registered executor.
0161   private FetchResult fetchBlocks(String execId, String[] blockIds) throws Exception {
0162     return fetchBlocks(execId, blockIds, conf, server.getPort());
0163   }
0164 
0165   // Fetch a set of blocks from a pre-registered executor. Connects to the server on the given port,
0166   // to allow connecting to invalid servers.
0167   private FetchResult fetchBlocks(
0168       String execId,
0169       String[] blockIds,
0170       TransportConf clientConf,
0171       int port) throws Exception {
0172     final FetchResult res = new FetchResult();
0173     res.successBlocks = Collections.synchronizedSet(new HashSet<>());
0174     res.failedBlocks = Collections.synchronizedSet(new HashSet<>());
0175     res.buffers = Collections.synchronizedList(new LinkedList<>());
0176 
0177     final Semaphore requestsRemaining = new Semaphore(0);
0178 
0179     try (ExternalBlockStoreClient client = new ExternalBlockStoreClient(
0180         clientConf, null, false, 5000)) {
0181       client.init(APP_ID);
0182       client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
0183         new BlockFetchingListener() {
0184           @Override
0185           public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
0186             synchronized (this) {
0187               if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
0188                 data.retain();
0189                 res.successBlocks.add(blockId);
0190                 res.buffers.add(data);
0191                 requestsRemaining.release();
0192               }
0193             }
0194           }
0195 
0196           @Override
0197           public void onBlockFetchFailure(String blockId, Throwable exception) {
0198             synchronized (this) {
0199               if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
0200                 res.failedBlocks.add(blockId);
0201                 requestsRemaining.release();
0202               }
0203             }
0204           }
0205         }, null);
0206 
0207       if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
0208         fail("Timeout getting response from the server");
0209       }
0210     }
0211     return res;
0212   }
0213 
0214   @Test
0215   public void testFetchOneSort() throws Exception {
0216     registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
0217     FetchResult exec0Fetch = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" });
0218     assertEquals(Sets.newHashSet("shuffle_0_0_0"), exec0Fetch.successBlocks);
0219     assertTrue(exec0Fetch.failedBlocks.isEmpty());
0220     assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks[0]));
0221     exec0Fetch.releaseBuffers();
0222   }
0223 
0224   @Test
0225   public void testFetchThreeSort() throws Exception {
0226     registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
0227     FetchResult exec0Fetch = fetchBlocks("exec-0",
0228       new String[] { "shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2" });
0229     assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"),
0230       exec0Fetch.successBlocks);
0231     assertTrue(exec0Fetch.failedBlocks.isEmpty());
0232     assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks));
0233     exec0Fetch.releaseBuffers();
0234   }
0235 
0236   @Test (expected = RuntimeException.class)
0237   public void testRegisterInvalidExecutor() throws Exception {
0238     registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
0239   }
0240 
0241   @Test
0242   public void testFetchWrongBlockId() throws Exception {
0243     registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
0244     FetchResult execFetch = fetchBlocks("exec-1", new String[] { "broadcast_1" });
0245     assertTrue(execFetch.successBlocks.isEmpty());
0246     assertEquals(Sets.newHashSet("broadcast_1"), execFetch.failedBlocks);
0247   }
0248 
0249   @Test
0250   public void testFetchValidRddBlock() throws Exception {
0251     registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
0252     String validBlockId = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_VALID_BLOCK;
0253     FetchResult execFetch = fetchBlocks("exec-1", new String[] { validBlockId });
0254     assertTrue(execFetch.failedBlocks.isEmpty());
0255     assertEquals(Sets.newHashSet(validBlockId), execFetch.successBlocks);
0256     assertBuffersEqual(new NioManagedBuffer(ByteBuffer.wrap(exec0RddBlockValid)),
0257       execFetch.buffers.get(0));
0258   }
0259 
0260   @Test
0261   public void testFetchDeletedRddBlock() throws Exception {
0262     registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
0263     String missingBlockId = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_MISSING_FILE;
0264     FetchResult execFetch = fetchBlocks("exec-1", new String[] { missingBlockId });
0265     assertTrue(execFetch.successBlocks.isEmpty());
0266     assertEquals(Sets.newHashSet(missingBlockId), execFetch.failedBlocks);
0267   }
0268 
0269   @Test
0270   public void testRemoveRddBlocks() throws Exception {
0271     registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
0272     String validBlockIdToRemove = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_VALID_BLOCK_TO_RM;
0273     String missingBlockIdToRemove = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_MISSING_BLOCK_TO_RM;
0274 
0275     try (ExternalBlockStoreClient client = new ExternalBlockStoreClient(conf, null, false, 5000)) {
0276       client.init(APP_ID);
0277       Future<Integer> numRemovedBlocks = client.removeBlocks(
0278         TestUtils.getLocalHost(),
0279         server.getPort(),
0280         "exec-1",
0281           new String[] { validBlockIdToRemove, missingBlockIdToRemove });
0282       assertEquals(1, numRemovedBlocks.get().intValue());
0283     }
0284   }
0285 
0286   @Test
0287   public void testFetchCorruptRddBlock() throws Exception {
0288     registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
0289     String corruptBlockId = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_CORRUPT_LENGTH;
0290     FetchResult execFetch = fetchBlocks("exec-1", new String[] { corruptBlockId });
0291     assertTrue(execFetch.successBlocks.isEmpty());
0292     assertEquals(Sets.newHashSet(corruptBlockId), execFetch.failedBlocks);
0293   }
0294 
0295   @Test
0296   public void testFetchNonexistent() throws Exception {
0297     registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
0298     FetchResult execFetch = fetchBlocks("exec-0",
0299       new String[] { "shuffle_2_0_0" });
0300     assertTrue(execFetch.successBlocks.isEmpty());
0301     assertEquals(Sets.newHashSet("shuffle_2_0_0"), execFetch.failedBlocks);
0302   }
0303 
0304   @Test
0305   public void testFetchWrongExecutor() throws Exception {
0306     registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
0307     FetchResult execFetch0 = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" /* right */});
0308     FetchResult execFetch1 = fetchBlocks("exec-0", new String[] { "shuffle_1_0_0" /* wrong */ });
0309     assertEquals(Sets.newHashSet("shuffle_0_0_0"), execFetch0.successBlocks);
0310     assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch1.failedBlocks);
0311   }
0312 
0313   @Test
0314   public void testFetchUnregisteredExecutor() throws Exception {
0315     registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
0316     FetchResult execFetch = fetchBlocks("exec-2",
0317       new String[] { "shuffle_0_0_0", "shuffle_1_0_0" });
0318     assertTrue(execFetch.successBlocks.isEmpty());
0319     assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks);
0320   }
0321 
0322   @Test
0323   public void testFetchNoServer() throws Exception {
0324     TransportConf clientConf = new TransportConf("shuffle",
0325       new MapConfigProvider(ImmutableMap.of("spark.shuffle.io.maxRetries", "0")));
0326     registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
0327     FetchResult execFetch = fetchBlocks("exec-0",
0328       new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, clientConf, 1 /* port */);
0329     assertTrue(execFetch.successBlocks.isEmpty());
0330     assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
0331   }
0332 
0333   private static void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
0334       throws IOException, InterruptedException {
0335     ExternalBlockStoreClient client = new ExternalBlockStoreClient(conf, null, false, 5000);
0336     client.init(APP_ID);
0337     client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
0338       executorId, executorInfo);
0339   }
0340 
0341   private static void assertBufferListsEqual(List<ManagedBuffer> list0, List<byte[]> list1)
0342     throws Exception {
0343     assertEquals(list0.size(), list1.size());
0344     for (int i = 0; i < list0.size(); i ++) {
0345       assertBuffersEqual(list0.get(i), new NioManagedBuffer(ByteBuffer.wrap(list1.get(i))));
0346     }
0347   }
0348 
0349   private static void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1)
0350       throws Exception {
0351     ByteBuffer nio0 = buffer0.nioByteBuffer();
0352     ByteBuffer nio1 = buffer1.nioByteBuffer();
0353 
0354     int len = nio0.remaining();
0355     assertEquals(nio0.remaining(), nio1.remaining());
0356     for (int i = 0; i < len; i ++) {
0357       assertEquals(nio0.get(), nio1.get());
0358     }
0359   }
0360 }