Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle.protocol;
0019 
0020 import java.nio.ByteBuffer;
0021 
0022 import io.netty.buffer.ByteBuf;
0023 import io.netty.buffer.Unpooled;
0024 
0025 import org.apache.spark.network.protocol.Encodable;
0026 import org.apache.spark.network.shuffle.ExternalBlockHandler;
0027 import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
0028 import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;
0029 
0030 /**
0031  * Messages handled by the {@link ExternalBlockHandler}, or
0032  * by Spark's NettyBlockTransferService.
0033  *
0034  * At a high level:
0035  *   - OpenBlock is logically only handled by the NettyBlockTransferService, but for the capability
0036  *     for old version Spark, we still keep it in external shuffle service.
0037  *     It returns a StreamHandle.
0038  *   - UploadBlock is only handled by the NettyBlockTransferService.
0039  *   - RegisterExecutor is only handled by the external shuffle service.
0040  *   - RemoveBlocks is only handled by the external shuffle service.
0041  *   - FetchShuffleBlocks is handled by both services for shuffle files. It returns a StreamHandle.
0042  */
0043 public abstract class BlockTransferMessage implements Encodable {
0044   protected abstract Type type();
0045 
0046   /** Preceding every serialized message is its type, which allows us to deserialize it. */
0047   public enum Type {
0048     OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4),
0049     HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6), REMOVE_BLOCKS(7), BLOCKS_REMOVED(8),
0050     FETCH_SHUFFLE_BLOCKS(9), GET_LOCAL_DIRS_FOR_EXECUTORS(10), LOCAL_DIRS_FOR_EXECUTORS(11);
0051 
0052     private final byte id;
0053 
0054     Type(int id) {
0055       assert id < 128 : "Cannot have more than 128 message types";
0056       this.id = (byte) id;
0057     }
0058 
0059     public byte id() { return id; }
0060   }
0061 
0062   // NB: Java does not support static methods in interfaces, so we must put this in a static class.
0063   public static class Decoder {
0064     /** Deserializes the 'type' byte followed by the message itself. */
0065     public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
0066       ByteBuf buf = Unpooled.wrappedBuffer(msg);
0067       byte type = buf.readByte();
0068       switch (type) {
0069         case 0: return OpenBlocks.decode(buf);
0070         case 1: return UploadBlock.decode(buf);
0071         case 2: return RegisterExecutor.decode(buf);
0072         case 3: return StreamHandle.decode(buf);
0073         case 4: return RegisterDriver.decode(buf);
0074         case 5: return ShuffleServiceHeartbeat.decode(buf);
0075         case 6: return UploadBlockStream.decode(buf);
0076         case 7: return RemoveBlocks.decode(buf);
0077         case 8: return BlocksRemoved.decode(buf);
0078         case 9: return FetchShuffleBlocks.decode(buf);
0079         case 10: return GetLocalDirsForExecutors.decode(buf);
0080         case 11: return LocalDirsForExecutors.decode(buf);
0081         default: throw new IllegalArgumentException("Unknown message type: " + type);
0082       }
0083     }
0084   }
0085 
0086   /** Serializes the 'type' byte followed by the message itself. */
0087   public ByteBuffer toByteBuffer() {
0088     // Allow room for encoded message, plus the type byte
0089     ByteBuf buf = Unpooled.buffer(encodedLength() + 1);
0090     buf.writeByte(type().id);
0091     encode(buf);
0092     assert buf.writableBytes() == 0 : "Writable bytes remain: " + buf.writableBytes();
0093     return buf.nioBuffer();
0094   }
0095 }