0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.protocol;
0019
0020 import io.netty.buffer.ByteBuf;
0021
0022 import org.apache.spark.network.buffer.ManagedBuffer;
0023
0024
0025 public interface Message extends Encodable {
0026
0027 Type type();
0028
0029
0030 ManagedBuffer body();
0031
0032
0033 boolean isBodyInFrame();
0034
0035
0036 enum Type implements Encodable {
0037 ChunkFetchRequest(0), ChunkFetchSuccess(1), ChunkFetchFailure(2),
0038 RpcRequest(3), RpcResponse(4), RpcFailure(5),
0039 StreamRequest(6), StreamResponse(7), StreamFailure(8),
0040 OneWayMessage(9), UploadStream(10), User(-1);
0041
0042 private final byte id;
0043
0044 Type(int id) {
0045 assert id < 128 : "Cannot have more than 128 message types";
0046 this.id = (byte) id;
0047 }
0048
0049 public byte id() { return id; }
0050
0051 @Override public int encodedLength() { return 1; }
0052
0053 @Override public void encode(ByteBuf buf) { buf.writeByte(id); }
0054
0055 public static Type decode(ByteBuf buf) {
0056 byte id = buf.readByte();
0057 switch (id) {
0058 case 0: return ChunkFetchRequest;
0059 case 1: return ChunkFetchSuccess;
0060 case 2: return ChunkFetchFailure;
0061 case 3: return RpcRequest;
0062 case 4: return RpcResponse;
0063 case 5: return RpcFailure;
0064 case 6: return StreamRequest;
0065 case 7: return StreamResponse;
0066 case 8: return StreamFailure;
0067 case 9: return OneWayMessage;
0068 case 10: return UploadStream;
0069 case -1: throw new IllegalArgumentException("User type messages cannot be decoded.");
0070 default: throw new IllegalArgumentException("Unknown message type: " + id);
0071 }
0072 }
0073 }
0074 }