Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.util;
0019 
0020 import java.util.LinkedList;
0021 
0022 import com.google.common.annotations.VisibleForTesting;
0023 import com.google.common.base.Preconditions;
0024 import io.netty.buffer.ByteBuf;
0025 import io.netty.buffer.CompositeByteBuf;
0026 import io.netty.buffer.Unpooled;
0027 import io.netty.channel.ChannelHandlerContext;
0028 import io.netty.channel.ChannelInboundHandlerAdapter;
0029 
0030 /**
0031  * A customized frame decoder that allows intercepting raw data.
0032  * <p>
0033  * This behaves like Netty's frame decoder (with hard coded parameters that match this library's
0034  * needs), except it allows an interceptor to be installed to read data directly before it's
0035  * framed.
0036  * <p>
0037  * Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's
0038  * decoded, instead of building as many frames as the current buffer allows and dispatching
0039  * all of them. This allows a child handler to install an interceptor if needed.
0040  * <p>
0041  * If an interceptor is installed, framing stops, and data is instead fed directly to the
0042  * interceptor. When the interceptor indicates that it doesn't need to read any more data,
0043  * framing resumes. Interceptors should not hold references to the data buffers provided
0044  * to their handle() method.
0045  */
0046 public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
0047 
0048   public static final String HANDLER_NAME = "frameDecoder";
0049   private static final int LENGTH_SIZE = 8;
0050   private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
0051   private static final int UNKNOWN_FRAME_SIZE = -1;
0052   private static final long CONSOLIDATE_THRESHOLD = 20 * 1024 * 1024;
0053 
0054   private final LinkedList<ByteBuf> buffers = new LinkedList<>();
0055   private final ByteBuf frameLenBuf = Unpooled.buffer(LENGTH_SIZE, LENGTH_SIZE);
0056   private final long consolidateThreshold;
0057 
0058   private CompositeByteBuf frameBuf = null;
0059   private long consolidatedFrameBufSize = 0;
0060   private int consolidatedNumComponents = 0;
0061 
0062   private long totalSize = 0;
0063   private long nextFrameSize = UNKNOWN_FRAME_SIZE;
0064   private int frameRemainingBytes = UNKNOWN_FRAME_SIZE;
0065   private volatile Interceptor interceptor;
0066 
0067   public TransportFrameDecoder() {
0068     this(CONSOLIDATE_THRESHOLD);
0069   }
0070 
0071   @VisibleForTesting
0072   TransportFrameDecoder(long consolidateThreshold) {
0073     this.consolidateThreshold = consolidateThreshold;
0074   }
0075 
0076   @Override
0077   public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
0078     ByteBuf in = (ByteBuf) data;
0079     buffers.add(in);
0080     totalSize += in.readableBytes();
0081 
0082     while (!buffers.isEmpty()) {
0083       // First, feed the interceptor, and if it's still, active, try again.
0084       if (interceptor != null) {
0085         ByteBuf first = buffers.getFirst();
0086         int available = first.readableBytes();
0087         if (feedInterceptor(first)) {
0088           assert !first.isReadable() : "Interceptor still active but buffer has data.";
0089         }
0090 
0091         int read = available - first.readableBytes();
0092         if (read == available) {
0093           buffers.removeFirst().release();
0094         }
0095         totalSize -= read;
0096       } else {
0097         // Interceptor is not active, so try to decode one frame.
0098         ByteBuf frame = decodeNext();
0099         if (frame == null) {
0100           break;
0101         }
0102         ctx.fireChannelRead(frame);
0103       }
0104     }
0105   }
0106 
0107   private long decodeFrameSize() {
0108     if (nextFrameSize != UNKNOWN_FRAME_SIZE || totalSize < LENGTH_SIZE) {
0109       return nextFrameSize;
0110     }
0111 
0112     // We know there's enough data. If the first buffer contains all the data, great. Otherwise,
0113     // hold the bytes for the frame length in a composite buffer until we have enough data to read
0114     // the frame size. Normally, it should be rare to need more than one buffer to read the frame
0115     // size.
0116     ByteBuf first = buffers.getFirst();
0117     if (first.readableBytes() >= LENGTH_SIZE) {
0118       nextFrameSize = first.readLong() - LENGTH_SIZE;
0119       totalSize -= LENGTH_SIZE;
0120       if (!first.isReadable()) {
0121         buffers.removeFirst().release();
0122       }
0123       return nextFrameSize;
0124     }
0125 
0126     while (frameLenBuf.readableBytes() < LENGTH_SIZE) {
0127       ByteBuf next = buffers.getFirst();
0128       int toRead = Math.min(next.readableBytes(), LENGTH_SIZE - frameLenBuf.readableBytes());
0129       frameLenBuf.writeBytes(next, toRead);
0130       if (!next.isReadable()) {
0131         buffers.removeFirst().release();
0132       }
0133     }
0134 
0135     nextFrameSize = frameLenBuf.readLong() - LENGTH_SIZE;
0136     totalSize -= LENGTH_SIZE;
0137     frameLenBuf.clear();
0138     return nextFrameSize;
0139   }
0140 
0141   private ByteBuf decodeNext() {
0142     long frameSize = decodeFrameSize();
0143     if (frameSize == UNKNOWN_FRAME_SIZE) {
0144       return null;
0145     }
0146 
0147     if (frameBuf == null) {
0148       Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE,
0149           "Too large frame: %s", frameSize);
0150       Preconditions.checkArgument(frameSize > 0,
0151           "Frame length should be positive: %s", frameSize);
0152       frameRemainingBytes = (int) frameSize;
0153 
0154       // If buffers is empty, then return immediately for more input data.
0155       if (buffers.isEmpty()) {
0156         return null;
0157       }
0158       // Otherwise, if the first buffer holds the entire frame, we attempt to
0159       // build frame with it and return.
0160       if (buffers.getFirst().readableBytes() >= frameRemainingBytes) {
0161         // Reset buf and size for next frame.
0162         frameBuf = null;
0163         nextFrameSize = UNKNOWN_FRAME_SIZE;
0164         return nextBufferForFrame(frameRemainingBytes);
0165       }
0166       // Other cases, create a composite buffer to manage all the buffers.
0167       frameBuf = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
0168     }
0169 
0170     while (frameRemainingBytes > 0 && !buffers.isEmpty()) {
0171       ByteBuf next = nextBufferForFrame(frameRemainingBytes);
0172       frameRemainingBytes -= next.readableBytes();
0173       frameBuf.addComponent(true, next);
0174     }
0175     // If the delta size of frameBuf exceeds the threshold, then we do consolidation
0176     // to reduce memory consumption.
0177     if (frameBuf.capacity() - consolidatedFrameBufSize > consolidateThreshold) {
0178       int newNumComponents = frameBuf.numComponents() - consolidatedNumComponents;
0179       frameBuf.consolidate(consolidatedNumComponents, newNumComponents);
0180       consolidatedFrameBufSize = frameBuf.capacity();
0181       consolidatedNumComponents = frameBuf.numComponents();
0182     }
0183     if (frameRemainingBytes > 0) {
0184       return null;
0185     }
0186 
0187     return consumeCurrentFrameBuf();
0188   }
0189 
0190   private ByteBuf consumeCurrentFrameBuf() {
0191     ByteBuf frame = frameBuf;
0192     // Reset buf and size for next frame.
0193     frameBuf = null;
0194     consolidatedFrameBufSize = 0;
0195     consolidatedNumComponents = 0;
0196     nextFrameSize = UNKNOWN_FRAME_SIZE;
0197     return frame;
0198   }
0199 
0200   /**
0201    * Takes the first buffer in the internal list, and either adjust it to fit in the frame
0202    * (by taking a slice out of it) or remove it from the internal list.
0203    */
0204   private ByteBuf nextBufferForFrame(int bytesToRead) {
0205     ByteBuf buf = buffers.getFirst();
0206     ByteBuf frame;
0207 
0208     if (buf.readableBytes() > bytesToRead) {
0209       frame = buf.retain().readSlice(bytesToRead);
0210       totalSize -= bytesToRead;
0211     } else {
0212       frame = buf;
0213       buffers.removeFirst();
0214       totalSize -= frame.readableBytes();
0215     }
0216 
0217     return frame;
0218   }
0219 
0220   @Override
0221   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
0222     if (interceptor != null) {
0223       interceptor.channelInactive();
0224     }
0225     super.channelInactive(ctx);
0226   }
0227 
0228   @Override
0229   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
0230     if (interceptor != null) {
0231       interceptor.exceptionCaught(cause);
0232     }
0233     super.exceptionCaught(ctx, cause);
0234   }
0235 
0236   @Override
0237   public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
0238     // Release all buffers that are still in our ownership.
0239     // Doing this in handlerRemoved(...) guarantees that this will happen in all cases:
0240     //     - When the Channel becomes inactive
0241     //     - When the decoder is removed from the ChannelPipeline
0242     for (ByteBuf b : buffers) {
0243       b.release();
0244     }
0245     buffers.clear();
0246     frameLenBuf.release();
0247     ByteBuf frame = consumeCurrentFrameBuf();
0248     if (frame != null) {
0249       frame.release();
0250     }
0251     super.handlerRemoved(ctx);
0252   }
0253 
0254   public void setInterceptor(Interceptor interceptor) {
0255     Preconditions.checkState(this.interceptor == null, "Already have an interceptor.");
0256     this.interceptor = interceptor;
0257   }
0258 
0259   /**
0260    * @return Whether the interceptor is still active after processing the data.
0261    */
0262   private boolean feedInterceptor(ByteBuf buf) throws Exception {
0263     if (interceptor != null && !interceptor.handle(buf)) {
0264       interceptor = null;
0265     }
0266     return interceptor != null;
0267   }
0268 
0269   public interface Interceptor {
0270 
0271     /**
0272      * Handles data received from the remote end.
0273      *
0274      * @param data Buffer containing data.
0275      * @return "true" if the interceptor expects more data, "false" to uninstall the interceptor.
0276      */
0277     boolean handle(ByteBuf data) throws Exception;
0278 
0279     /** Called if an exception is thrown in the channel pipeline. */
0280     void exceptionCaught(Throwable cause) throws Exception;
0281 
0282     /** Called if the channel is closed and the interceptor is still installed. */
0283     void channelInactive() throws Exception;
0284 
0285   }
0286 
0287 }