0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle.protocol;
0019
0020 import java.nio.ByteBuffer;
0021
0022 import io.netty.buffer.ByteBuf;
0023 import io.netty.buffer.Unpooled;
0024
0025 import org.apache.spark.network.protocol.Encodable;
0026 import org.apache.spark.network.shuffle.ExternalBlockHandler;
0027 import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
0028 import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043 public abstract class BlockTransferMessage implements Encodable {
0044 protected abstract Type type();
0045
0046
0047 public enum Type {
0048 OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4),
0049 HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6), REMOVE_BLOCKS(7), BLOCKS_REMOVED(8),
0050 FETCH_SHUFFLE_BLOCKS(9), GET_LOCAL_DIRS_FOR_EXECUTORS(10), LOCAL_DIRS_FOR_EXECUTORS(11);
0051
0052 private final byte id;
0053
0054 Type(int id) {
0055 assert id < 128 : "Cannot have more than 128 message types";
0056 this.id = (byte) id;
0057 }
0058
0059 public byte id() { return id; }
0060 }
0061
0062
0063 public static class Decoder {
0064
0065 public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
0066 ByteBuf buf = Unpooled.wrappedBuffer(msg);
0067 byte type = buf.readByte();
0068 switch (type) {
0069 case 0: return OpenBlocks.decode(buf);
0070 case 1: return UploadBlock.decode(buf);
0071 case 2: return RegisterExecutor.decode(buf);
0072 case 3: return StreamHandle.decode(buf);
0073 case 4: return RegisterDriver.decode(buf);
0074 case 5: return ShuffleServiceHeartbeat.decode(buf);
0075 case 6: return UploadBlockStream.decode(buf);
0076 case 7: return RemoveBlocks.decode(buf);
0077 case 8: return BlocksRemoved.decode(buf);
0078 case 9: return FetchShuffleBlocks.decode(buf);
0079 case 10: return GetLocalDirsForExecutors.decode(buf);
0080 case 11: return LocalDirsForExecutors.decode(buf);
0081 default: throw new IllegalArgumentException("Unknown message type: " + type);
0082 }
0083 }
0084 }
0085
0086
0087 public ByteBuffer toByteBuffer() {
0088
0089 ByteBuf buf = Unpooled.buffer(encodedLength() + 1);
0090 buf.writeByte(type().id);
0091 encode(buf);
0092 assert buf.writableBytes() == 0 : "Writable bytes remain: " + buf.writableBytes();
0093 return buf.nioBuffer();
0094 }
0095 }