0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.protocol;
0019
0020 import java.util.List;
0021
0022 import io.netty.buffer.ByteBuf;
0023 import io.netty.channel.ChannelHandler;
0024 import io.netty.channel.ChannelHandlerContext;
0025 import io.netty.handler.codec.MessageToMessageEncoder;
0026 import org.slf4j.Logger;
0027 import org.slf4j.LoggerFactory;
0028
0029
0030
0031
0032
0033 @ChannelHandler.Sharable
0034 public final class MessageEncoder extends MessageToMessageEncoder<Message> {
0035
0036 private static final Logger logger = LoggerFactory.getLogger(MessageEncoder.class);
0037
0038 public static final MessageEncoder INSTANCE = new MessageEncoder();
0039
0040 private MessageEncoder() {}
0041
0042
0043
0044
0045
0046
0047
0048 @Override
0049 public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) throws Exception {
0050 Object body = null;
0051 long bodyLength = 0;
0052 boolean isBodyInFrame = false;
0053
0054
0055 if (in.body() != null) {
0056 try {
0057 bodyLength = in.body().size();
0058 body = in.body().convertToNetty();
0059 isBodyInFrame = in.isBodyInFrame();
0060 } catch (Exception e) {
0061 in.body().release();
0062 if (in instanceof AbstractResponseMessage) {
0063 AbstractResponseMessage resp = (AbstractResponseMessage) in;
0064
0065 String error = e.getMessage() != null ? e.getMessage() : "null";
0066 logger.error(String.format("Error processing %s for client %s",
0067 in, ctx.channel().remoteAddress()), e);
0068 encode(ctx, resp.createFailureResponse(error), out);
0069 } else {
0070 throw e;
0071 }
0072 return;
0073 }
0074 }
0075
0076 Message.Type msgType = in.type();
0077
0078
0079
0080 int headerLength = 8 + msgType.encodedLength() + in.encodedLength();
0081 long frameLength = headerLength + (isBodyInFrame ? bodyLength : 0);
0082 ByteBuf header = ctx.alloc().buffer(headerLength);
0083 header.writeLong(frameLength);
0084 msgType.encode(header);
0085 in.encode(header);
0086 assert header.writableBytes() == 0;
0087
0088 if (body != null) {
0089
0090
0091 out.add(new MessageWithHeader(in.body(), header, body, bodyLength));
0092 } else {
0093 out.add(header);
0094 }
0095 }
0096
0097 }