0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.util;
0019
0020 import java.util.LinkedList;
0021
0022 import com.google.common.annotations.VisibleForTesting;
0023 import com.google.common.base.Preconditions;
0024 import io.netty.buffer.ByteBuf;
0025 import io.netty.buffer.CompositeByteBuf;
0026 import io.netty.buffer.Unpooled;
0027 import io.netty.channel.ChannelHandlerContext;
0028 import io.netty.channel.ChannelInboundHandlerAdapter;
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046 public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
0047
0048 public static final String HANDLER_NAME = "frameDecoder";
0049 private static final int LENGTH_SIZE = 8;
0050 private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
0051 private static final int UNKNOWN_FRAME_SIZE = -1;
0052 private static final long CONSOLIDATE_THRESHOLD = 20 * 1024 * 1024;
0053
0054 private final LinkedList<ByteBuf> buffers = new LinkedList<>();
0055 private final ByteBuf frameLenBuf = Unpooled.buffer(LENGTH_SIZE, LENGTH_SIZE);
0056 private final long consolidateThreshold;
0057
0058 private CompositeByteBuf frameBuf = null;
0059 private long consolidatedFrameBufSize = 0;
0060 private int consolidatedNumComponents = 0;
0061
0062 private long totalSize = 0;
0063 private long nextFrameSize = UNKNOWN_FRAME_SIZE;
0064 private int frameRemainingBytes = UNKNOWN_FRAME_SIZE;
0065 private volatile Interceptor interceptor;
0066
0067 public TransportFrameDecoder() {
0068 this(CONSOLIDATE_THRESHOLD);
0069 }
0070
0071 @VisibleForTesting
0072 TransportFrameDecoder(long consolidateThreshold) {
0073 this.consolidateThreshold = consolidateThreshold;
0074 }
0075
0076 @Override
0077 public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
0078 ByteBuf in = (ByteBuf) data;
0079 buffers.add(in);
0080 totalSize += in.readableBytes();
0081
0082 while (!buffers.isEmpty()) {
0083
0084 if (interceptor != null) {
0085 ByteBuf first = buffers.getFirst();
0086 int available = first.readableBytes();
0087 if (feedInterceptor(first)) {
0088 assert !first.isReadable() : "Interceptor still active but buffer has data.";
0089 }
0090
0091 int read = available - first.readableBytes();
0092 if (read == available) {
0093 buffers.removeFirst().release();
0094 }
0095 totalSize -= read;
0096 } else {
0097
0098 ByteBuf frame = decodeNext();
0099 if (frame == null) {
0100 break;
0101 }
0102 ctx.fireChannelRead(frame);
0103 }
0104 }
0105 }
0106
0107 private long decodeFrameSize() {
0108 if (nextFrameSize != UNKNOWN_FRAME_SIZE || totalSize < LENGTH_SIZE) {
0109 return nextFrameSize;
0110 }
0111
0112
0113
0114
0115
0116 ByteBuf first = buffers.getFirst();
0117 if (first.readableBytes() >= LENGTH_SIZE) {
0118 nextFrameSize = first.readLong() - LENGTH_SIZE;
0119 totalSize -= LENGTH_SIZE;
0120 if (!first.isReadable()) {
0121 buffers.removeFirst().release();
0122 }
0123 return nextFrameSize;
0124 }
0125
0126 while (frameLenBuf.readableBytes() < LENGTH_SIZE) {
0127 ByteBuf next = buffers.getFirst();
0128 int toRead = Math.min(next.readableBytes(), LENGTH_SIZE - frameLenBuf.readableBytes());
0129 frameLenBuf.writeBytes(next, toRead);
0130 if (!next.isReadable()) {
0131 buffers.removeFirst().release();
0132 }
0133 }
0134
0135 nextFrameSize = frameLenBuf.readLong() - LENGTH_SIZE;
0136 totalSize -= LENGTH_SIZE;
0137 frameLenBuf.clear();
0138 return nextFrameSize;
0139 }
0140
0141 private ByteBuf decodeNext() {
0142 long frameSize = decodeFrameSize();
0143 if (frameSize == UNKNOWN_FRAME_SIZE) {
0144 return null;
0145 }
0146
0147 if (frameBuf == null) {
0148 Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE,
0149 "Too large frame: %s", frameSize);
0150 Preconditions.checkArgument(frameSize > 0,
0151 "Frame length should be positive: %s", frameSize);
0152 frameRemainingBytes = (int) frameSize;
0153
0154
0155 if (buffers.isEmpty()) {
0156 return null;
0157 }
0158
0159
0160 if (buffers.getFirst().readableBytes() >= frameRemainingBytes) {
0161
0162 frameBuf = null;
0163 nextFrameSize = UNKNOWN_FRAME_SIZE;
0164 return nextBufferForFrame(frameRemainingBytes);
0165 }
0166
0167 frameBuf = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
0168 }
0169
0170 while (frameRemainingBytes > 0 && !buffers.isEmpty()) {
0171 ByteBuf next = nextBufferForFrame(frameRemainingBytes);
0172 frameRemainingBytes -= next.readableBytes();
0173 frameBuf.addComponent(true, next);
0174 }
0175
0176
0177 if (frameBuf.capacity() - consolidatedFrameBufSize > consolidateThreshold) {
0178 int newNumComponents = frameBuf.numComponents() - consolidatedNumComponents;
0179 frameBuf.consolidate(consolidatedNumComponents, newNumComponents);
0180 consolidatedFrameBufSize = frameBuf.capacity();
0181 consolidatedNumComponents = frameBuf.numComponents();
0182 }
0183 if (frameRemainingBytes > 0) {
0184 return null;
0185 }
0186
0187 return consumeCurrentFrameBuf();
0188 }
0189
0190 private ByteBuf consumeCurrentFrameBuf() {
0191 ByteBuf frame = frameBuf;
0192
0193 frameBuf = null;
0194 consolidatedFrameBufSize = 0;
0195 consolidatedNumComponents = 0;
0196 nextFrameSize = UNKNOWN_FRAME_SIZE;
0197 return frame;
0198 }
0199
0200
0201
0202
0203
0204 private ByteBuf nextBufferForFrame(int bytesToRead) {
0205 ByteBuf buf = buffers.getFirst();
0206 ByteBuf frame;
0207
0208 if (buf.readableBytes() > bytesToRead) {
0209 frame = buf.retain().readSlice(bytesToRead);
0210 totalSize -= bytesToRead;
0211 } else {
0212 frame = buf;
0213 buffers.removeFirst();
0214 totalSize -= frame.readableBytes();
0215 }
0216
0217 return frame;
0218 }
0219
0220 @Override
0221 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
0222 if (interceptor != null) {
0223 interceptor.channelInactive();
0224 }
0225 super.channelInactive(ctx);
0226 }
0227
0228 @Override
0229 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
0230 if (interceptor != null) {
0231 interceptor.exceptionCaught(cause);
0232 }
0233 super.exceptionCaught(ctx, cause);
0234 }
0235
0236 @Override
0237 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
0238
0239
0240
0241
0242 for (ByteBuf b : buffers) {
0243 b.release();
0244 }
0245 buffers.clear();
0246 frameLenBuf.release();
0247 ByteBuf frame = consumeCurrentFrameBuf();
0248 if (frame != null) {
0249 frame.release();
0250 }
0251 super.handlerRemoved(ctx);
0252 }
0253
0254 public void setInterceptor(Interceptor interceptor) {
0255 Preconditions.checkState(this.interceptor == null, "Already have an interceptor.");
0256 this.interceptor = interceptor;
0257 }
0258
0259
0260
0261
0262 private boolean feedInterceptor(ByteBuf buf) throws Exception {
0263 if (interceptor != null && !interceptor.handle(buf)) {
0264 interceptor = null;
0265 }
0266 return interceptor != null;
0267 }
0268
0269 public interface Interceptor {
0270
0271
0272
0273
0274
0275
0276
0277 boolean handle(ByteBuf data) throws Exception;
0278
0279
0280 void exceptionCaught(Throwable cause) throws Exception;
0281
0282
0283 void channelInactive() throws Exception;
0284
0285 }
0286
0287 }