Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.server;
0019 
0020 import java.net.SocketAddress;
0021 
0022 import com.google.common.base.Throwables;
0023 import io.netty.channel.Channel;
0024 import io.netty.channel.ChannelFuture;
0025 import io.netty.channel.ChannelFutureListener;
0026 import io.netty.channel.ChannelHandlerContext;
0027 import io.netty.channel.SimpleChannelInboundHandler;
0028 import org.slf4j.Logger;
0029 import org.slf4j.LoggerFactory;
0030 
0031 import org.apache.spark.network.buffer.ManagedBuffer;
0032 import org.apache.spark.network.client.TransportClient;
0033 import org.apache.spark.network.protocol.ChunkFetchFailure;
0034 import org.apache.spark.network.protocol.ChunkFetchRequest;
0035 import org.apache.spark.network.protocol.ChunkFetchSuccess;
0036 import org.apache.spark.network.protocol.Encodable;
0037 
0038 import static org.apache.spark.network.util.NettyUtils.*;
0039 
0040 /**
0041  * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response
0042  * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying
0043  * channel could potentially be blocked due to disk contentions. If several hundreds of clients
0044  * send ChunkFetchRequest to the server at the same time, it could potentially occupying all
0045  * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it
0046  * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a
0047  * result, it would leave no threads left to process other RPC messages, which takes much less
0048  * time to process, and could lead to client timing out on either performing SASL authentication,
0049  * registering executors, or waiting for response for an OpenBlocks messages.
0050  */
0051 public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkFetchRequest> {
0052   private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
0053 
0054   private final TransportClient client;
0055   private final StreamManager streamManager;
0056   /** The max number of chunks being transferred and not finished yet. */
0057   private final long maxChunksBeingTransferred;
0058   private final boolean syncModeEnabled;
0059 
0060   public ChunkFetchRequestHandler(
0061       TransportClient client,
0062       StreamManager streamManager,
0063       Long maxChunksBeingTransferred,
0064       boolean syncModeEnabled) {
0065     this.client = client;
0066     this.streamManager = streamManager;
0067     this.maxChunksBeingTransferred = maxChunksBeingTransferred;
0068     this.syncModeEnabled = syncModeEnabled;
0069   }
0070 
0071   @Override
0072   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
0073     logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), cause);
0074     ctx.close();
0075   }
0076 
0077   @Override
0078   protected void channelRead0(
0079       ChannelHandlerContext ctx,
0080       final ChunkFetchRequest msg) throws Exception {
0081     Channel channel = ctx.channel();
0082     processFetchRequest(channel, msg);
0083   }
0084 
0085   public void processFetchRequest(
0086       final Channel channel, final ChunkFetchRequest msg) throws Exception {
0087     if (logger.isTraceEnabled()) {
0088       logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
0089         msg.streamChunkId);
0090     }
0091     long chunksBeingTransferred = streamManager.chunksBeingTransferred();
0092     if (chunksBeingTransferred >= maxChunksBeingTransferred) {
0093       logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
0094         chunksBeingTransferred, maxChunksBeingTransferred);
0095       channel.close();
0096       return;
0097     }
0098     ManagedBuffer buf;
0099     try {
0100       streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
0101       buf = streamManager.getChunk(msg.streamChunkId.streamId, msg.streamChunkId.chunkIndex);
0102       if (buf == null) {
0103         throw new IllegalStateException("Chunk was not found");
0104       }
0105     } catch (Exception e) {
0106       logger.error(String.format("Error opening block %s for request from %s",
0107         msg.streamChunkId, getRemoteAddress(channel)), e);
0108       respond(channel, new ChunkFetchFailure(msg.streamChunkId,
0109         Throwables.getStackTraceAsString(e)));
0110       return;
0111     }
0112 
0113     streamManager.chunkBeingSent(msg.streamChunkId.streamId);
0114     respond(channel, new ChunkFetchSuccess(msg.streamChunkId, buf)).addListener(
0115       (ChannelFutureListener) future -> streamManager.chunkSent(msg.streamChunkId.streamId));
0116   }
0117 
0118   /**
0119    * The invocation to channel.writeAndFlush is async, and the actual I/O on the
0120    * channel will be handled by the EventLoop the channel is registered to. So even
0121    * though we are processing the ChunkFetchRequest in a separate thread pool, the actual I/O,
0122    * which is the potentially blocking call that could deplete server handler threads, is still
0123    * being processed by TransportServer's default EventLoopGroup.
0124    *
0125    * When syncModeEnabled is true, Spark will throttle the max number of threads that channel I/O
0126    * for sending response to ChunkFetchRequest, the thread calling channel.writeAndFlush will wait
0127    * for the completion of sending response back to client by invoking await(). This will throttle
0128    * the rate at which threads from ChunkFetchRequest dedicated EventLoopGroup submit channel I/O
0129    * requests to TransportServer's default EventLoopGroup, thus making sure that we can reserve
0130    * some threads in TransportServer's default EventLoopGroup for handling other RPC messages.
0131    */
0132   private ChannelFuture respond(
0133       final Channel channel,
0134       final Encodable result) throws InterruptedException {
0135     final SocketAddress remoteAddress = channel.remoteAddress();
0136     ChannelFuture channelFuture;
0137     if (syncModeEnabled) {
0138       channelFuture = channel.writeAndFlush(result).await();
0139     } else {
0140       channelFuture = channel.writeAndFlush(result);
0141     }
0142     return channelFuture.addListener((ChannelFutureListener) future -> {
0143       if (future.isSuccess()) {
0144         logger.trace("Sent result {} to client {}", result, remoteAddress);
0145       } else {
0146         logger.error(String.format("Error sending result %s to %s; closing connection",
0147           result, remoteAddress), future.cause());
0148         channel.close();
0149       }
0150     });
0151   }
0152 }