0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle;
0019
0020 import org.junit.Test;
0021
0022 import static org.junit.Assert.*;
0023
0024 import java.util.HashMap;
0025 import java.util.Map;
0026
0027 import org.apache.spark.network.shuffle.protocol.*;
0028
0029
0030 public class BlockTransferMessagesSuite {
0031 @Test
0032 public void serializeOpenShuffleBlocks() {
0033 checkSerializeDeserialize(new OpenBlocks("app-1", "exec-2", new String[] { "b1", "b2" }));
0034 checkSerializeDeserialize(new FetchShuffleBlocks(
0035 "app-1", "exec-2", 0, new long[] {0, 1},
0036 new int[][] {{ 0, 1 }, { 0, 1, 2 }}, false));
0037 checkSerializeDeserialize(new FetchShuffleBlocks(
0038 "app-1", "exec-2", 0, new long[] {0, 1},
0039 new int[][] {{ 0, 1 }, { 0, 2 }}, true));
0040 checkSerializeDeserialize(new RegisterExecutor("app-1", "exec-2", new ExecutorShuffleInfo(
0041 new String[] { "/local1", "/local2" }, 32, "MyShuffleManager")));
0042 checkSerializeDeserialize(new UploadBlock("app-1", "exec-2", "block-3", new byte[] { 1, 2 },
0043 new byte[] { 4, 5, 6, 7} ));
0044 checkSerializeDeserialize(new StreamHandle(12345, 16));
0045 }
0046
0047 @Test
0048 public void testLocalDirsMessages() {
0049 checkSerializeDeserialize(
0050 new GetLocalDirsForExecutors("app-1", new String[]{"exec-1", "exec-2"}));
0051
0052 Map<String, String[]> map = new HashMap<>();
0053 map.put("exec-1", new String[]{"loc1.1"});
0054 map.put("exec-22", new String[]{"loc2.1", "loc2.2"});
0055 LocalDirsForExecutors localDirsForExecs = new LocalDirsForExecutors(map);
0056 Map<String, String[]> resultMap =
0057 ((LocalDirsForExecutors)checkSerializeDeserialize(localDirsForExecs)).getLocalDirsByExec();
0058 assertEquals(resultMap.size(), map.keySet().size());
0059 for (Map.Entry<String, String[]> e: map.entrySet()) {
0060 assertTrue(resultMap.containsKey(e.getKey()));
0061 assertArrayEquals(e.getValue(), resultMap.get(e.getKey()));
0062 }
0063 }
0064
0065 private BlockTransferMessage checkSerializeDeserialize(BlockTransferMessage msg) {
0066 BlockTransferMessage msg2 = BlockTransferMessage.Decoder.fromByteBuffer(msg.toByteBuffer());
0067 assertEquals(msg, msg2);
0068 assertEquals(msg.hashCode(), msg2.hashCode());
0069 assertEquals(msg.toString(), msg2.toString());
0070 return msg2;
0071 }
0072 }