0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.protocol;
0019
0020 import java.util.List;
0021
0022 import io.netty.buffer.ByteBuf;
0023 import io.netty.channel.ChannelHandler;
0024 import io.netty.channel.ChannelHandlerContext;
0025 import io.netty.handler.codec.MessageToMessageDecoder;
0026 import org.slf4j.Logger;
0027 import org.slf4j.LoggerFactory;
0028
0029
0030
0031
0032
0033 @ChannelHandler.Sharable
0034 public final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
0035
0036 private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);
0037
0038 public static final MessageDecoder INSTANCE = new MessageDecoder();
0039
0040 private MessageDecoder() {}
0041
0042 @Override
0043 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
0044 Message.Type msgType = Message.Type.decode(in);
0045 Message decoded = decode(msgType, in);
0046 assert decoded.type() == msgType;
0047 logger.trace("Received message {}: {}", msgType, decoded);
0048 out.add(decoded);
0049 }
0050
0051 private Message decode(Message.Type msgType, ByteBuf in) {
0052 switch (msgType) {
0053 case ChunkFetchRequest:
0054 return ChunkFetchRequest.decode(in);
0055
0056 case ChunkFetchSuccess:
0057 return ChunkFetchSuccess.decode(in);
0058
0059 case ChunkFetchFailure:
0060 return ChunkFetchFailure.decode(in);
0061
0062 case RpcRequest:
0063 return RpcRequest.decode(in);
0064
0065 case RpcResponse:
0066 return RpcResponse.decode(in);
0067
0068 case RpcFailure:
0069 return RpcFailure.decode(in);
0070
0071 case OneWayMessage:
0072 return OneWayMessage.decode(in);
0073
0074 case StreamRequest:
0075 return StreamRequest.decode(in);
0076
0077 case StreamResponse:
0078 return StreamResponse.decode(in);
0079
0080 case StreamFailure:
0081 return StreamFailure.decode(in);
0082
0083 case UploadStream:
0084 return UploadStream.decode(in);
0085
0086 default:
0087 throw new IllegalArgumentException("Unexpected message type: " + msgType);
0088 }
0089 }
0090 }