Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle;
0019 
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.io.InputStream;
0023 import java.io.InputStreamReader;
0024 import java.nio.charset.StandardCharsets;
0025 
0026 import com.fasterxml.jackson.databind.ObjectMapper;
0027 import com.google.common.io.CharStreams;
0028 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
0029 import org.apache.spark.network.util.MapConfigProvider;
0030 import org.apache.spark.network.util.TransportConf;
0031 import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
0032 import org.junit.AfterClass;
0033 import org.junit.BeforeClass;
0034 import org.junit.Test;
0035 
0036 import static org.junit.Assert.*;
0037 
0038 public class ExternalShuffleBlockResolverSuite {
0039   private static final String sortBlock0 = "Hello!";
0040   private static final String sortBlock1 = "World!";
0041   private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
0042 
0043   private static TestShuffleDataContext dataContext;
0044 
0045   private static final TransportConf conf =
0046       new TransportConf("shuffle", MapConfigProvider.EMPTY);
0047 
0048   @BeforeClass
0049   public static void beforeAll() throws IOException {
0050     dataContext = new TestShuffleDataContext(2, 5);
0051 
0052     dataContext.create();
0053     // Write some sort data.
0054     dataContext.insertSortShuffleData(0, 0, new byte[][] {
0055         sortBlock0.getBytes(StandardCharsets.UTF_8),
0056         sortBlock1.getBytes(StandardCharsets.UTF_8)});
0057   }
0058 
0059   @AfterClass
0060   public static void afterAll() {
0061     dataContext.cleanup();
0062   }
0063 
0064   @Test
0065   public void testBadRequests() throws IOException {
0066     ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
0067     // Unregistered executor
0068     try {
0069       resolver.getBlockData("app0", "exec1", 1, 1, 0);
0070       fail("Should have failed");
0071     } catch (RuntimeException e) {
0072       assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
0073     }
0074 
0075     // Invalid shuffle manager
0076     try {
0077       resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
0078       resolver.getBlockData("app0", "exec2", 1, 1, 0);
0079       fail("Should have failed");
0080     } catch (UnsupportedOperationException e) {
0081       // pass
0082     }
0083 
0084     // Nonexistent shuffle block
0085     resolver.registerExecutor("app0", "exec3",
0086       dataContext.createExecutorInfo(SORT_MANAGER));
0087     try {
0088       resolver.getBlockData("app0", "exec3", 1, 1, 0);
0089       fail("Should have failed");
0090     } catch (Exception e) {
0091       // pass
0092     }
0093   }
0094 
0095   @Test
0096   public void testSortShuffleBlocks() throws IOException {
0097     ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
0098     resolver.registerExecutor("app0", "exec0",
0099       dataContext.createExecutorInfo(SORT_MANAGER));
0100 
0101     try (InputStream block0Stream = resolver.getBlockData(
0102         "app0", "exec0", 0, 0, 0).createInputStream()) {
0103       String block0 =
0104         CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
0105       assertEquals(sortBlock0, block0);
0106     }
0107 
0108     try (InputStream block1Stream = resolver.getBlockData(
0109         "app0", "exec0", 0, 0, 1).createInputStream()) {
0110       String block1 =
0111         CharStreams.toString(new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
0112       assertEquals(sortBlock1, block1);
0113     }
0114 
0115     try (InputStream blocksStream = resolver.getContinuousBlocksData(
0116         "app0", "exec0", 0, 0, 0, 2).createInputStream()) {
0117       String blocks =
0118         CharStreams.toString(new InputStreamReader(blocksStream, StandardCharsets.UTF_8));
0119       assertEquals(sortBlock0 + sortBlock1, blocks);
0120     }
0121   }
0122 
0123   @Test
0124   public void jsonSerializationOfExecutorRegistration() throws IOException {
0125     ObjectMapper mapper = new ObjectMapper();
0126     AppExecId appId = new AppExecId("foo", "bar");
0127     String appIdJson = mapper.writeValueAsString(appId);
0128     AppExecId parsedAppId = mapper.readValue(appIdJson, AppExecId.class);
0129     assertEquals(parsedAppId, appId);
0130 
0131     ExecutorShuffleInfo shuffleInfo =
0132       new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, SORT_MANAGER);
0133     String shuffleJson = mapper.writeValueAsString(shuffleInfo);
0134     ExecutorShuffleInfo parsedShuffleInfo =
0135       mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
0136     assertEquals(parsedShuffleInfo, shuffleInfo);
0137 
0138     // Intentionally keep these hard-coded strings in here, to check backwards-compatibility.
0139     // its not legacy yet, but keeping this here in case anybody changes it
0140     String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
0141     assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
0142     String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
0143       "\"subDirsPerLocalDir\": 7, \"shuffleManager\": " + "\"" + SORT_MANAGER + "\"}";
0144     assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
0145   }
0146 
0147   @Test
0148   public void testNormalizeAndInternPathname() {
0149     assertPathsMatch("/foo", "bar", "baz", "/foo/bar/baz");
0150     assertPathsMatch("//foo/", "bar/", "//baz", "/foo/bar/baz");
0151     assertPathsMatch("foo", "bar", "baz///", "foo/bar/baz");
0152     assertPathsMatch("/foo/", "/bar//", "/baz", "/foo/bar/baz");
0153     assertPathsMatch("/", "", "", "/");
0154     assertPathsMatch("/", "/", "/", "/");
0155   }
0156 
0157   private void assertPathsMatch(String p1, String p2, String p3, String expectedPathname) {
0158     String normPathname =
0159       ExecutorDiskUtils.createNormalizedInternedPathname(p1, p2, p3);
0160     assertEquals(expectedPathname, normPathname);
0161     File file = new File(normPathname);
0162     String returnedPath = file.getPath();
0163     assertTrue(normPathname == returnedPath);
0164   }
0165 }