Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.protocol;
0019 
0020 import java.util.List;
0021 
0022 import io.netty.buffer.ByteBuf;
0023 import io.netty.channel.ChannelHandler;
0024 import io.netty.channel.ChannelHandlerContext;
0025 import io.netty.handler.codec.MessageToMessageEncoder;
0026 import org.slf4j.Logger;
0027 import org.slf4j.LoggerFactory;
0028 
0029 /**
0030  * Encoder used by the server side to encode server-to-client responses.
0031  * This encoder is stateless so it is safe to be shared by multiple threads.
0032  */
0033 @ChannelHandler.Sharable
0034 public final class MessageEncoder extends MessageToMessageEncoder<Message> {
0035 
0036   private static final Logger logger = LoggerFactory.getLogger(MessageEncoder.class);
0037 
0038   public static final MessageEncoder INSTANCE = new MessageEncoder();
0039 
0040   private MessageEncoder() {}
0041 
0042   /***
0043    * Encodes a Message by invoking its encode() method. For non-data messages, we will add one
0044    * ByteBuf to 'out' containing the total frame length, the message type, and the message itself.
0045    * In the case of a ChunkFetchSuccess, we will also add the ManagedBuffer corresponding to the
0046    * data to 'out', in order to enable zero-copy transfer.
0047    */
0048   @Override
0049   public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) throws Exception {
0050     Object body = null;
0051     long bodyLength = 0;
0052     boolean isBodyInFrame = false;
0053 
0054     // If the message has a body, take it out to enable zero-copy transfer for the payload.
0055     if (in.body() != null) {
0056       try {
0057         bodyLength = in.body().size();
0058         body = in.body().convertToNetty();
0059         isBodyInFrame = in.isBodyInFrame();
0060       } catch (Exception e) {
0061         in.body().release();
0062         if (in instanceof AbstractResponseMessage) {
0063           AbstractResponseMessage resp = (AbstractResponseMessage) in;
0064           // Re-encode this message as a failure response.
0065           String error = e.getMessage() != null ? e.getMessage() : "null";
0066           logger.error(String.format("Error processing %s for client %s",
0067             in, ctx.channel().remoteAddress()), e);
0068           encode(ctx, resp.createFailureResponse(error), out);
0069         } else {
0070           throw e;
0071         }
0072         return;
0073       }
0074     }
0075 
0076     Message.Type msgType = in.type();
0077     // All messages have the frame length, message type, and message itself. The frame length
0078     // may optionally include the length of the body data, depending on what message is being
0079     // sent.
0080     int headerLength = 8 + msgType.encodedLength() + in.encodedLength();
0081     long frameLength = headerLength + (isBodyInFrame ? bodyLength : 0);
0082     ByteBuf header = ctx.alloc().buffer(headerLength);
0083     header.writeLong(frameLength);
0084     msgType.encode(header);
0085     in.encode(header);
0086     assert header.writableBytes() == 0;
0087 
0088     if (body != null) {
0089       // We transfer ownership of the reference on in.body() to MessageWithHeader.
0090       // This reference will be freed when MessageWithHeader.deallocate() is called.
0091       out.add(new MessageWithHeader(in.body(), header, body, bodyLength));
0092     } else {
0093       out.add(header);
0094     }
0095   }
0096 
0097 }