0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.server;
0019
0020 import io.netty.channel.ChannelHandlerContext;
0021 import io.netty.channel.SimpleChannelInboundHandler;
0022 import io.netty.handler.timeout.IdleState;
0023 import io.netty.handler.timeout.IdleStateEvent;
0024 import org.apache.spark.network.TransportContext;
0025 import org.slf4j.Logger;
0026 import org.slf4j.LoggerFactory;
0027
0028 import org.apache.spark.network.client.TransportClient;
0029 import org.apache.spark.network.client.TransportResponseHandler;
0030 import org.apache.spark.network.protocol.ChunkFetchRequest;
0031 import org.apache.spark.network.protocol.Message;
0032 import org.apache.spark.network.protocol.RequestMessage;
0033 import org.apache.spark.network.protocol.ResponseMessage;
0034 import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053 public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
0054 private static final Logger logger = LoggerFactory.getLogger(TransportChannelHandler.class);
0055
0056 private final TransportClient client;
0057 private final TransportResponseHandler responseHandler;
0058 private final TransportRequestHandler requestHandler;
0059 private final long requestTimeoutNs;
0060 private final boolean closeIdleConnections;
0061 private final boolean skipChunkFetchRequest;
0062 private final TransportContext transportContext;
0063
0064 public TransportChannelHandler(
0065 TransportClient client,
0066 TransportResponseHandler responseHandler,
0067 TransportRequestHandler requestHandler,
0068 long requestTimeoutMs,
0069 boolean skipChunkFetchRequest,
0070 boolean closeIdleConnections,
0071 TransportContext transportContext) {
0072 this.client = client;
0073 this.responseHandler = responseHandler;
0074 this.requestHandler = requestHandler;
0075 this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
0076 this.skipChunkFetchRequest = skipChunkFetchRequest;
0077 this.closeIdleConnections = closeIdleConnections;
0078 this.transportContext = transportContext;
0079 }
0080
0081 public TransportClient getClient() {
0082 return client;
0083 }
0084
0085 @Override
0086 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
0087 logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()),
0088 cause);
0089 requestHandler.exceptionCaught(cause);
0090 responseHandler.exceptionCaught(cause);
0091 ctx.close();
0092 }
0093
0094 @Override
0095 public void channelActive(ChannelHandlerContext ctx) throws Exception {
0096 try {
0097 requestHandler.channelActive();
0098 } catch (RuntimeException e) {
0099 logger.error("Exception from request handler while channel is active", e);
0100 }
0101 try {
0102 responseHandler.channelActive();
0103 } catch (RuntimeException e) {
0104 logger.error("Exception from response handler while channel is active", e);
0105 }
0106 super.channelActive(ctx);
0107 }
0108
0109 @Override
0110 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
0111 try {
0112 requestHandler.channelInactive();
0113 } catch (RuntimeException e) {
0114 logger.error("Exception from request handler while channel is inactive", e);
0115 }
0116 try {
0117 responseHandler.channelInactive();
0118 } catch (RuntimeException e) {
0119 logger.error("Exception from response handler while channel is inactive", e);
0120 }
0121 super.channelInactive(ctx);
0122 }
0123
0124
0125
0126
0127
0128 @Override
0129 public boolean acceptInboundMessage(Object msg) throws Exception {
0130 if (skipChunkFetchRequest && msg instanceof ChunkFetchRequest) {
0131 return false;
0132 } else {
0133 return super.acceptInboundMessage(msg);
0134 }
0135 }
0136
0137 @Override
0138 public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
0139 if (request instanceof RequestMessage) {
0140 requestHandler.handle((RequestMessage) request);
0141 } else if (request instanceof ResponseMessage) {
0142 responseHandler.handle((ResponseMessage) request);
0143 } else {
0144 ctx.fireChannelRead(request);
0145 }
0146 }
0147
0148
0149 @Override
0150 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
0151 if (evt instanceof IdleStateEvent) {
0152 IdleStateEvent e = (IdleStateEvent) evt;
0153
0154
0155
0156
0157
0158
0159
0160 synchronized (this) {
0161 boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 0;
0162 boolean isActuallyOverdue =
0163 System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
0164 if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
0165 if (hasInFlightRequests) {
0166 String address = getRemoteAddress(ctx.channel());
0167 logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
0168 "requests. Assuming connection is dead; please adjust spark.network.timeout if " +
0169 "this is wrong.", address, requestTimeoutNs / 1000 / 1000);
0170 client.timeOut();
0171 ctx.close();
0172 } else if (closeIdleConnections) {
0173
0174 client.timeOut();
0175 ctx.close();
0176 }
0177 }
0178 }
0179 }
0180 ctx.fireUserEventTriggered(evt);
0181 }
0182
0183 public TransportResponseHandler getResponseHandler() {
0184 return responseHandler;
0185 }
0186
0187 @Override
0188 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
0189 transportContext.getRegisteredConnections().inc();
0190 super.channelRegistered(ctx);
0191 }
0192
0193 @Override
0194 public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
0195 transportContext.getRegisteredConnections().dec();
0196 super.channelUnregistered(ctx);
0197 }
0198
0199 }