Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.protocol;
0019 
0020 import java.util.List;
0021 
0022 import io.netty.buffer.ByteBuf;
0023 import io.netty.channel.ChannelHandler;
0024 import io.netty.channel.ChannelHandlerContext;
0025 import io.netty.handler.codec.MessageToMessageDecoder;
0026 import org.slf4j.Logger;
0027 import org.slf4j.LoggerFactory;
0028 
0029 /**
0030  * Decoder used by the client side to encode server-to-client responses.
0031  * This encoder is stateless so it is safe to be shared by multiple threads.
0032  */
0033 @ChannelHandler.Sharable
0034 public final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
0035 
0036   private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);
0037 
0038   public static final MessageDecoder INSTANCE = new MessageDecoder();
0039 
0040   private MessageDecoder() {}
0041 
0042   @Override
0043   public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
0044     Message.Type msgType = Message.Type.decode(in);
0045     Message decoded = decode(msgType, in);
0046     assert decoded.type() == msgType;
0047     logger.trace("Received message {}: {}", msgType, decoded);
0048     out.add(decoded);
0049   }
0050 
0051   private Message decode(Message.Type msgType, ByteBuf in) {
0052     switch (msgType) {
0053       case ChunkFetchRequest:
0054         return ChunkFetchRequest.decode(in);
0055 
0056       case ChunkFetchSuccess:
0057         return ChunkFetchSuccess.decode(in);
0058 
0059       case ChunkFetchFailure:
0060         return ChunkFetchFailure.decode(in);
0061 
0062       case RpcRequest:
0063         return RpcRequest.decode(in);
0064 
0065       case RpcResponse:
0066         return RpcResponse.decode(in);
0067 
0068       case RpcFailure:
0069         return RpcFailure.decode(in);
0070 
0071       case OneWayMessage:
0072         return OneWayMessage.decode(in);
0073 
0074       case StreamRequest:
0075         return StreamRequest.decode(in);
0076 
0077       case StreamResponse:
0078         return StreamResponse.decode(in);
0079 
0080       case StreamFailure:
0081         return StreamFailure.decode(in);
0082 
0083       case UploadStream:
0084         return UploadStream.decode(in);
0085 
0086       default:
0087         throw new IllegalArgumentException("Unexpected message type: " + msgType);
0088     }
0089   }
0090 }