Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.server;
0019 
0020 import io.netty.channel.ChannelHandlerContext;
0021 import io.netty.channel.SimpleChannelInboundHandler;
0022 import io.netty.handler.timeout.IdleState;
0023 import io.netty.handler.timeout.IdleStateEvent;
0024 import org.apache.spark.network.TransportContext;
0025 import org.slf4j.Logger;
0026 import org.slf4j.LoggerFactory;
0027 
0028 import org.apache.spark.network.client.TransportClient;
0029 import org.apache.spark.network.client.TransportResponseHandler;
0030 import org.apache.spark.network.protocol.ChunkFetchRequest;
0031 import org.apache.spark.network.protocol.Message;
0032 import org.apache.spark.network.protocol.RequestMessage;
0033 import org.apache.spark.network.protocol.ResponseMessage;
0034 import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
0035 
0036 /**
0037  * The single Transport-level Channel handler which is used for delegating requests to the
0038  * {@link TransportRequestHandler} and responses to the {@link TransportResponseHandler}.
0039  *
0040  * All channels created in the transport layer are bidirectional. When the Client initiates a Netty
0041  * Channel with a RequestMessage (which gets handled by the Server's RequestHandler), the Server
0042  * will produce a ResponseMessage (handled by the Client's ResponseHandler). However, the Server
0043  * also gets a handle on the same Channel, so it may then begin to send RequestMessages to the
0044  * Client.
0045  * This means that the Client also needs a RequestHandler and the Server needs a ResponseHandler,
0046  * for the Client's responses to the Server's requests.
0047  *
0048  * This class also handles timeouts from a {@link io.netty.handler.timeout.IdleStateHandler}.
0049  * We consider a connection timed out if there are outstanding fetch or RPC requests but no traffic
0050  * on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not
0051  * timeout if the client is continuously sending but getting no responses, for simplicity.
0052  */
0053 public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
0054   private static final Logger logger = LoggerFactory.getLogger(TransportChannelHandler.class);
0055 
0056   private final TransportClient client;
0057   private final TransportResponseHandler responseHandler;
0058   private final TransportRequestHandler requestHandler;
0059   private final long requestTimeoutNs;
0060   private final boolean closeIdleConnections;
0061   private final boolean skipChunkFetchRequest;
0062   private final TransportContext transportContext;
0063 
0064   public TransportChannelHandler(
0065       TransportClient client,
0066       TransportResponseHandler responseHandler,
0067       TransportRequestHandler requestHandler,
0068       long requestTimeoutMs,
0069       boolean skipChunkFetchRequest,
0070       boolean closeIdleConnections,
0071       TransportContext transportContext) {
0072     this.client = client;
0073     this.responseHandler = responseHandler;
0074     this.requestHandler = requestHandler;
0075     this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
0076     this.skipChunkFetchRequest = skipChunkFetchRequest;
0077     this.closeIdleConnections = closeIdleConnections;
0078     this.transportContext = transportContext;
0079   }
0080 
0081   public TransportClient getClient() {
0082     return client;
0083   }
0084 
0085   @Override
0086   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
0087     logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()),
0088       cause);
0089     requestHandler.exceptionCaught(cause);
0090     responseHandler.exceptionCaught(cause);
0091     ctx.close();
0092   }
0093 
0094   @Override
0095   public void channelActive(ChannelHandlerContext ctx) throws Exception {
0096     try {
0097       requestHandler.channelActive();
0098     } catch (RuntimeException e) {
0099       logger.error("Exception from request handler while channel is active", e);
0100     }
0101     try {
0102       responseHandler.channelActive();
0103     } catch (RuntimeException e) {
0104       logger.error("Exception from response handler while channel is active", e);
0105     }
0106     super.channelActive(ctx);
0107   }
0108 
0109   @Override
0110   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
0111     try {
0112       requestHandler.channelInactive();
0113     } catch (RuntimeException e) {
0114       logger.error("Exception from request handler while channel is inactive", e);
0115     }
0116     try {
0117       responseHandler.channelInactive();
0118     } catch (RuntimeException e) {
0119       logger.error("Exception from response handler while channel is inactive", e);
0120     }
0121     super.channelInactive(ctx);
0122   }
0123 
0124   /**
0125    * Overwrite acceptInboundMessage to properly delegate ChunkFetchRequest messages
0126    * to ChunkFetchRequestHandler.
0127    */
0128   @Override
0129   public boolean acceptInboundMessage(Object msg) throws Exception {
0130     if (skipChunkFetchRequest && msg instanceof ChunkFetchRequest) {
0131       return false;
0132     } else {
0133       return super.acceptInboundMessage(msg);
0134     }
0135   }
0136 
0137   @Override
0138   public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
0139     if (request instanceof RequestMessage) {
0140       requestHandler.handle((RequestMessage) request);
0141     } else if (request instanceof ResponseMessage) {
0142       responseHandler.handle((ResponseMessage) request);
0143     } else {
0144       ctx.fireChannelRead(request);
0145     }
0146   }
0147 
0148   /** Triggered based on events from an {@link io.netty.handler.timeout.IdleStateHandler}. */
0149   @Override
0150   public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
0151     if (evt instanceof IdleStateEvent) {
0152       IdleStateEvent e = (IdleStateEvent) evt;
0153       // See class comment for timeout semantics. In addition to ensuring we only timeout while
0154       // there are outstanding requests, we also do a secondary consistency check to ensure
0155       // there's no race between the idle timeout and incrementing the numOutstandingRequests
0156       // (see SPARK-7003).
0157       //
0158       // To avoid a race between TransportClientFactory.createClient() and this code which could
0159       // result in an inactive client being returned, this needs to run in a synchronized block.
0160       synchronized (this) {
0161         boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 0;
0162         boolean isActuallyOverdue =
0163           System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
0164         if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
0165           if (hasInFlightRequests) {
0166             String address = getRemoteAddress(ctx.channel());
0167             logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
0168               "requests. Assuming connection is dead; please adjust spark.network.timeout if " +
0169               "this is wrong.", address, requestTimeoutNs / 1000 / 1000);
0170             client.timeOut();
0171             ctx.close();
0172           } else if (closeIdleConnections) {
0173             // While CloseIdleConnections is enable, we also close idle connection
0174             client.timeOut();
0175             ctx.close();
0176           }
0177         }
0178       }
0179     }
0180     ctx.fireUserEventTriggered(evt);
0181   }
0182 
0183   public TransportResponseHandler getResponseHandler() {
0184     return responseHandler;
0185   }
0186 
0187   @Override
0188   public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
0189     transportContext.getRegisteredConnections().inc();
0190     super.channelRegistered(ctx);
0191   }
0192 
0193   @Override
0194   public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
0195     transportContext.getRegisteredConnections().dec();
0196     super.channelUnregistered(ctx);
0197   }
0198 
0199 }