Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle;
0019 
0020 import org.junit.Test;
0021 
0022 import static org.junit.Assert.*;
0023 
0024 import java.util.HashMap;
0025 import java.util.Map;
0026 
0027 import org.apache.spark.network.shuffle.protocol.*;
0028 
0029 /** Verifies that all BlockTransferMessages can be serialized correctly. */
0030 public class BlockTransferMessagesSuite {
0031   @Test
0032   public void serializeOpenShuffleBlocks() {
0033     checkSerializeDeserialize(new OpenBlocks("app-1", "exec-2", new String[] { "b1", "b2" }));
0034     checkSerializeDeserialize(new FetchShuffleBlocks(
0035       "app-1", "exec-2", 0, new long[] {0, 1},
0036       new int[][] {{ 0, 1 }, { 0, 1, 2 }}, false));
0037     checkSerializeDeserialize(new FetchShuffleBlocks(
0038       "app-1", "exec-2", 0, new long[] {0, 1},
0039       new int[][] {{ 0, 1 }, { 0, 2 }}, true));
0040     checkSerializeDeserialize(new RegisterExecutor("app-1", "exec-2", new ExecutorShuffleInfo(
0041       new String[] { "/local1", "/local2" }, 32, "MyShuffleManager")));
0042     checkSerializeDeserialize(new UploadBlock("app-1", "exec-2", "block-3", new byte[] { 1, 2 },
0043       new byte[] { 4, 5, 6, 7} ));
0044     checkSerializeDeserialize(new StreamHandle(12345, 16));
0045   }
0046 
0047   @Test
0048   public void testLocalDirsMessages() {
0049     checkSerializeDeserialize(
0050       new GetLocalDirsForExecutors("app-1", new String[]{"exec-1", "exec-2"}));
0051 
0052     Map<String, String[]> map = new HashMap<>();
0053     map.put("exec-1", new String[]{"loc1.1"});
0054     map.put("exec-22", new String[]{"loc2.1", "loc2.2"});
0055     LocalDirsForExecutors localDirsForExecs = new LocalDirsForExecutors(map);
0056     Map<String, String[]> resultMap =
0057       ((LocalDirsForExecutors)checkSerializeDeserialize(localDirsForExecs)).getLocalDirsByExec();
0058     assertEquals(resultMap.size(), map.keySet().size());
0059     for (Map.Entry<String, String[]> e: map.entrySet()) {
0060       assertTrue(resultMap.containsKey(e.getKey()));
0061       assertArrayEquals(e.getValue(), resultMap.get(e.getKey()));
0062     }
0063   }
0064 
0065   private BlockTransferMessage checkSerializeDeserialize(BlockTransferMessage msg) {
0066     BlockTransferMessage msg2 = BlockTransferMessage.Decoder.fromByteBuffer(msg.toByteBuffer());
0067     assertEquals(msg, msg2);
0068     assertEquals(msg.hashCode(), msg2.hashCode());
0069     assertEquals(msg.toString(), msg2.toString());
0070     return msg2;
0071   }
0072 }