0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle;
0019
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.nio.ByteBuffer;
0023 import java.util.Arrays;
0024 import java.util.Collections;
0025 import java.util.HashMap;
0026 import java.util.HashSet;
0027 import java.util.LinkedList;
0028 import java.util.List;
0029 import java.util.Random;
0030 import java.util.Set;
0031 import java.util.concurrent.Future;
0032 import java.util.concurrent.Semaphore;
0033 import java.util.concurrent.TimeUnit;
0034
0035 import com.google.common.collect.ImmutableMap;
0036 import com.google.common.collect.Sets;
0037 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
0038 import org.apache.spark.network.server.OneForOneStreamManager;
0039 import org.junit.After;
0040 import org.junit.AfterClass;
0041 import org.junit.BeforeClass;
0042 import org.junit.Test;
0043
0044 import static org.junit.Assert.*;
0045
0046 import org.apache.spark.network.TestUtils;
0047 import org.apache.spark.network.TransportContext;
0048 import org.apache.spark.network.buffer.ManagedBuffer;
0049 import org.apache.spark.network.buffer.NioManagedBuffer;
0050 import org.apache.spark.network.server.TransportServer;
0051 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
0052 import org.apache.spark.network.util.MapConfigProvider;
0053 import org.apache.spark.network.util.TransportConf;
0054
0055 public class ExternalShuffleIntegrationSuite {
0056
0057 private static final String APP_ID = "app-id";
0058 private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
0059
0060 private static final int RDD_ID = 1;
0061 private static final int SPLIT_INDEX_VALID_BLOCK = 0;
0062 private static final int SPLIT_INDEX_MISSING_FILE = 1;
0063 private static final int SPLIT_INDEX_CORRUPT_LENGTH = 2;
0064 private static final int SPLIT_INDEX_VALID_BLOCK_TO_RM = 3;
0065 private static final int SPLIT_INDEX_MISSING_BLOCK_TO_RM = 4;
0066
0067
0068 static TestShuffleDataContext dataContext0;
0069
0070 static ExternalBlockHandler handler;
0071 static TransportServer server;
0072 static TransportConf conf;
0073 static TransportContext transportContext;
0074
0075 static byte[] exec0RddBlockValid = new byte[123];
0076 static byte[] exec0RddBlockToRemove = new byte[124];
0077
0078 static byte[][] exec0Blocks = new byte[][] {
0079 new byte[123],
0080 new byte[12345],
0081 new byte[1234567],
0082 };
0083
0084 static byte[][] exec1Blocks = new byte[][] {
0085 new byte[321],
0086 new byte[54321],
0087 };
0088
0089 @BeforeClass
0090 public static void beforeAll() throws IOException {
0091 Random rand = new Random();
0092
0093 for (byte[] block : exec0Blocks) {
0094 rand.nextBytes(block);
0095 }
0096 for (byte[] block: exec1Blocks) {
0097 rand.nextBytes(block);
0098 }
0099 rand.nextBytes(exec0RddBlockValid);
0100 rand.nextBytes(exec0RddBlockToRemove);
0101
0102 dataContext0 = new TestShuffleDataContext(2, 5);
0103 dataContext0.create();
0104 dataContext0.insertSortShuffleData(0, 0, exec0Blocks);
0105 dataContext0.insertCachedRddData(RDD_ID, SPLIT_INDEX_VALID_BLOCK, exec0RddBlockValid);
0106 dataContext0.insertCachedRddData(RDD_ID, SPLIT_INDEX_VALID_BLOCK_TO_RM, exec0RddBlockToRemove);
0107
0108 HashMap<String, String> config = new HashMap<>();
0109 config.put("spark.shuffle.io.maxRetries", "0");
0110 config.put(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "true");
0111 conf = new TransportConf("shuffle", new MapConfigProvider(config));
0112 handler = new ExternalBlockHandler(
0113 new OneForOneStreamManager(),
0114 new ExternalShuffleBlockResolver(conf, null) {
0115 @Override
0116 public ManagedBuffer getRddBlockData(String appId, String execId, int rddId, int splitIdx) {
0117 ManagedBuffer res;
0118 if (rddId == RDD_ID) {
0119 switch (splitIdx) {
0120 case SPLIT_INDEX_CORRUPT_LENGTH:
0121 res = new FileSegmentManagedBuffer(conf, new File("missing.file"), 0, 12);
0122 break;
0123 default:
0124 res = super.getRddBlockData(appId, execId, rddId, splitIdx);
0125 }
0126 } else {
0127 res = super.getRddBlockData(appId, execId, rddId, splitIdx);
0128 }
0129 return res;
0130 }
0131 });
0132 transportContext = new TransportContext(conf, handler);
0133 server = transportContext.createServer();
0134 }
0135
0136 @AfterClass
0137 public static void afterAll() {
0138 dataContext0.cleanup();
0139 server.close();
0140 transportContext.close();
0141 }
0142
0143 @After
0144 public void afterEach() {
0145 handler.applicationRemoved(APP_ID, false );
0146 }
0147
0148 static class FetchResult {
0149 public Set<String> successBlocks;
0150 public Set<String> failedBlocks;
0151 public List<ManagedBuffer> buffers;
0152
0153 public void releaseBuffers() {
0154 for (ManagedBuffer buffer : buffers) {
0155 buffer.release();
0156 }
0157 }
0158 }
0159
0160
0161 private FetchResult fetchBlocks(String execId, String[] blockIds) throws Exception {
0162 return fetchBlocks(execId, blockIds, conf, server.getPort());
0163 }
0164
0165
0166
0167 private FetchResult fetchBlocks(
0168 String execId,
0169 String[] blockIds,
0170 TransportConf clientConf,
0171 int port) throws Exception {
0172 final FetchResult res = new FetchResult();
0173 res.successBlocks = Collections.synchronizedSet(new HashSet<>());
0174 res.failedBlocks = Collections.synchronizedSet(new HashSet<>());
0175 res.buffers = Collections.synchronizedList(new LinkedList<>());
0176
0177 final Semaphore requestsRemaining = new Semaphore(0);
0178
0179 try (ExternalBlockStoreClient client = new ExternalBlockStoreClient(
0180 clientConf, null, false, 5000)) {
0181 client.init(APP_ID);
0182 client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
0183 new BlockFetchingListener() {
0184 @Override
0185 public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
0186 synchronized (this) {
0187 if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
0188 data.retain();
0189 res.successBlocks.add(blockId);
0190 res.buffers.add(data);
0191 requestsRemaining.release();
0192 }
0193 }
0194 }
0195
0196 @Override
0197 public void onBlockFetchFailure(String blockId, Throwable exception) {
0198 synchronized (this) {
0199 if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
0200 res.failedBlocks.add(blockId);
0201 requestsRemaining.release();
0202 }
0203 }
0204 }
0205 }, null);
0206
0207 if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
0208 fail("Timeout getting response from the server");
0209 }
0210 }
0211 return res;
0212 }
0213
0214 @Test
0215 public void testFetchOneSort() throws Exception {
0216 registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
0217 FetchResult exec0Fetch = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" });
0218 assertEquals(Sets.newHashSet("shuffle_0_0_0"), exec0Fetch.successBlocks);
0219 assertTrue(exec0Fetch.failedBlocks.isEmpty());
0220 assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks[0]));
0221 exec0Fetch.releaseBuffers();
0222 }
0223
0224 @Test
0225 public void testFetchThreeSort() throws Exception {
0226 registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
0227 FetchResult exec0Fetch = fetchBlocks("exec-0",
0228 new String[] { "shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2" });
0229 assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"),
0230 exec0Fetch.successBlocks);
0231 assertTrue(exec0Fetch.failedBlocks.isEmpty());
0232 assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks));
0233 exec0Fetch.releaseBuffers();
0234 }
0235
0236 @Test (expected = RuntimeException.class)
0237 public void testRegisterInvalidExecutor() throws Exception {
0238 registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
0239 }
0240
0241 @Test
0242 public void testFetchWrongBlockId() throws Exception {
0243 registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
0244 FetchResult execFetch = fetchBlocks("exec-1", new String[] { "broadcast_1" });
0245 assertTrue(execFetch.successBlocks.isEmpty());
0246 assertEquals(Sets.newHashSet("broadcast_1"), execFetch.failedBlocks);
0247 }
0248
0249 @Test
0250 public void testFetchValidRddBlock() throws Exception {
0251 registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
0252 String validBlockId = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_VALID_BLOCK;
0253 FetchResult execFetch = fetchBlocks("exec-1", new String[] { validBlockId });
0254 assertTrue(execFetch.failedBlocks.isEmpty());
0255 assertEquals(Sets.newHashSet(validBlockId), execFetch.successBlocks);
0256 assertBuffersEqual(new NioManagedBuffer(ByteBuffer.wrap(exec0RddBlockValid)),
0257 execFetch.buffers.get(0));
0258 }
0259
0260 @Test
0261 public void testFetchDeletedRddBlock() throws Exception {
0262 registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
0263 String missingBlockId = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_MISSING_FILE;
0264 FetchResult execFetch = fetchBlocks("exec-1", new String[] { missingBlockId });
0265 assertTrue(execFetch.successBlocks.isEmpty());
0266 assertEquals(Sets.newHashSet(missingBlockId), execFetch.failedBlocks);
0267 }
0268
0269 @Test
0270 public void testRemoveRddBlocks() throws Exception {
0271 registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
0272 String validBlockIdToRemove = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_VALID_BLOCK_TO_RM;
0273 String missingBlockIdToRemove = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_MISSING_BLOCK_TO_RM;
0274
0275 try (ExternalBlockStoreClient client = new ExternalBlockStoreClient(conf, null, false, 5000)) {
0276 client.init(APP_ID);
0277 Future<Integer> numRemovedBlocks = client.removeBlocks(
0278 TestUtils.getLocalHost(),
0279 server.getPort(),
0280 "exec-1",
0281 new String[] { validBlockIdToRemove, missingBlockIdToRemove });
0282 assertEquals(1, numRemovedBlocks.get().intValue());
0283 }
0284 }
0285
0286 @Test
0287 public void testFetchCorruptRddBlock() throws Exception {
0288 registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
0289 String corruptBlockId = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_CORRUPT_LENGTH;
0290 FetchResult execFetch = fetchBlocks("exec-1", new String[] { corruptBlockId });
0291 assertTrue(execFetch.successBlocks.isEmpty());
0292 assertEquals(Sets.newHashSet(corruptBlockId), execFetch.failedBlocks);
0293 }
0294
0295 @Test
0296 public void testFetchNonexistent() throws Exception {
0297 registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
0298 FetchResult execFetch = fetchBlocks("exec-0",
0299 new String[] { "shuffle_2_0_0" });
0300 assertTrue(execFetch.successBlocks.isEmpty());
0301 assertEquals(Sets.newHashSet("shuffle_2_0_0"), execFetch.failedBlocks);
0302 }
0303
0304 @Test
0305 public void testFetchWrongExecutor() throws Exception {
0306 registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
0307 FetchResult execFetch0 = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" });
0308 FetchResult execFetch1 = fetchBlocks("exec-0", new String[] { "shuffle_1_0_0" });
0309 assertEquals(Sets.newHashSet("shuffle_0_0_0"), execFetch0.successBlocks);
0310 assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch1.failedBlocks);
0311 }
0312
0313 @Test
0314 public void testFetchUnregisteredExecutor() throws Exception {
0315 registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
0316 FetchResult execFetch = fetchBlocks("exec-2",
0317 new String[] { "shuffle_0_0_0", "shuffle_1_0_0" });
0318 assertTrue(execFetch.successBlocks.isEmpty());
0319 assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks);
0320 }
0321
0322 @Test
0323 public void testFetchNoServer() throws Exception {
0324 TransportConf clientConf = new TransportConf("shuffle",
0325 new MapConfigProvider(ImmutableMap.of("spark.shuffle.io.maxRetries", "0")));
0326 registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
0327 FetchResult execFetch = fetchBlocks("exec-0",
0328 new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, clientConf, 1 );
0329 assertTrue(execFetch.successBlocks.isEmpty());
0330 assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
0331 }
0332
0333 private static void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
0334 throws IOException, InterruptedException {
0335 ExternalBlockStoreClient client = new ExternalBlockStoreClient(conf, null, false, 5000);
0336 client.init(APP_ID);
0337 client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
0338 executorId, executorInfo);
0339 }
0340
0341 private static void assertBufferListsEqual(List<ManagedBuffer> list0, List<byte[]> list1)
0342 throws Exception {
0343 assertEquals(list0.size(), list1.size());
0344 for (int i = 0; i < list0.size(); i ++) {
0345 assertBuffersEqual(list0.get(i), new NioManagedBuffer(ByteBuffer.wrap(list1.get(i))));
0346 }
0347 }
0348
0349 private static void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1)
0350 throws Exception {
0351 ByteBuffer nio0 = buffer0.nioByteBuffer();
0352 ByteBuffer nio1 = buffer1.nioByteBuffer();
0353
0354 int len = nio0.remaining();
0355 assertEquals(nio0.remaining(), nio1.remaining());
0356 for (int i = 0; i < len; i ++) {
0357 assertEquals(nio0.get(), nio1.get());
0358 }
0359 }
0360 }