0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle;
0019
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.io.InputStream;
0023 import java.io.InputStreamReader;
0024 import java.nio.charset.StandardCharsets;
0025
0026 import com.fasterxml.jackson.databind.ObjectMapper;
0027 import com.google.common.io.CharStreams;
0028 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
0029 import org.apache.spark.network.util.MapConfigProvider;
0030 import org.apache.spark.network.util.TransportConf;
0031 import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
0032 import org.junit.AfterClass;
0033 import org.junit.BeforeClass;
0034 import org.junit.Test;
0035
0036 import static org.junit.Assert.*;
0037
0038 public class ExternalShuffleBlockResolverSuite {
0039 private static final String sortBlock0 = "Hello!";
0040 private static final String sortBlock1 = "World!";
0041 private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
0042
0043 private static TestShuffleDataContext dataContext;
0044
0045 private static final TransportConf conf =
0046 new TransportConf("shuffle", MapConfigProvider.EMPTY);
0047
0048 @BeforeClass
0049 public static void beforeAll() throws IOException {
0050 dataContext = new TestShuffleDataContext(2, 5);
0051
0052 dataContext.create();
0053
0054 dataContext.insertSortShuffleData(0, 0, new byte[][] {
0055 sortBlock0.getBytes(StandardCharsets.UTF_8),
0056 sortBlock1.getBytes(StandardCharsets.UTF_8)});
0057 }
0058
0059 @AfterClass
0060 public static void afterAll() {
0061 dataContext.cleanup();
0062 }
0063
0064 @Test
0065 public void testBadRequests() throws IOException {
0066 ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
0067
0068 try {
0069 resolver.getBlockData("app0", "exec1", 1, 1, 0);
0070 fail("Should have failed");
0071 } catch (RuntimeException e) {
0072 assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
0073 }
0074
0075
0076 try {
0077 resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
0078 resolver.getBlockData("app0", "exec2", 1, 1, 0);
0079 fail("Should have failed");
0080 } catch (UnsupportedOperationException e) {
0081
0082 }
0083
0084
0085 resolver.registerExecutor("app0", "exec3",
0086 dataContext.createExecutorInfo(SORT_MANAGER));
0087 try {
0088 resolver.getBlockData("app0", "exec3", 1, 1, 0);
0089 fail("Should have failed");
0090 } catch (Exception e) {
0091
0092 }
0093 }
0094
0095 @Test
0096 public void testSortShuffleBlocks() throws IOException {
0097 ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
0098 resolver.registerExecutor("app0", "exec0",
0099 dataContext.createExecutorInfo(SORT_MANAGER));
0100
0101 try (InputStream block0Stream = resolver.getBlockData(
0102 "app0", "exec0", 0, 0, 0).createInputStream()) {
0103 String block0 =
0104 CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
0105 assertEquals(sortBlock0, block0);
0106 }
0107
0108 try (InputStream block1Stream = resolver.getBlockData(
0109 "app0", "exec0", 0, 0, 1).createInputStream()) {
0110 String block1 =
0111 CharStreams.toString(new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
0112 assertEquals(sortBlock1, block1);
0113 }
0114
0115 try (InputStream blocksStream = resolver.getContinuousBlocksData(
0116 "app0", "exec0", 0, 0, 0, 2).createInputStream()) {
0117 String blocks =
0118 CharStreams.toString(new InputStreamReader(blocksStream, StandardCharsets.UTF_8));
0119 assertEquals(sortBlock0 + sortBlock1, blocks);
0120 }
0121 }
0122
0123 @Test
0124 public void jsonSerializationOfExecutorRegistration() throws IOException {
0125 ObjectMapper mapper = new ObjectMapper();
0126 AppExecId appId = new AppExecId("foo", "bar");
0127 String appIdJson = mapper.writeValueAsString(appId);
0128 AppExecId parsedAppId = mapper.readValue(appIdJson, AppExecId.class);
0129 assertEquals(parsedAppId, appId);
0130
0131 ExecutorShuffleInfo shuffleInfo =
0132 new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, SORT_MANAGER);
0133 String shuffleJson = mapper.writeValueAsString(shuffleInfo);
0134 ExecutorShuffleInfo parsedShuffleInfo =
0135 mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
0136 assertEquals(parsedShuffleInfo, shuffleInfo);
0137
0138
0139
0140 String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
0141 assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
0142 String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
0143 "\"subDirsPerLocalDir\": 7, \"shuffleManager\": " + "\"" + SORT_MANAGER + "\"}";
0144 assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
0145 }
0146
0147 @Test
0148 public void testNormalizeAndInternPathname() {
0149 assertPathsMatch("/foo", "bar", "baz", "/foo/bar/baz");
0150 assertPathsMatch("//foo/", "bar/", "//baz", "/foo/bar/baz");
0151 assertPathsMatch("foo", "bar", "baz///", "foo/bar/baz");
0152 assertPathsMatch("/foo/", "/bar//", "/baz", "/foo/bar/baz");
0153 assertPathsMatch("/", "", "", "/");
0154 assertPathsMatch("/", "/", "/", "/");
0155 }
0156
0157 private void assertPathsMatch(String p1, String p2, String p3, String expectedPathname) {
0158 String normPathname =
0159 ExecutorDiskUtils.createNormalizedInternedPathname(p1, p2, p3);
0160 assertEquals(expectedPathname, normPathname);
0161 File file = new File(normPathname);
0162 String returnedPath = file.getPath();
0163 assertTrue(normPathname == returnedPath);
0164 }
0165 }