0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network;
0019
0020 import java.io.Closeable;
0021 import java.util.ArrayList;
0022 import java.util.List;
0023
0024 import com.codahale.metrics.Counter;
0025 import io.netty.channel.Channel;
0026 import io.netty.channel.ChannelPipeline;
0027 import io.netty.channel.EventLoopGroup;
0028 import io.netty.channel.socket.SocketChannel;
0029 import io.netty.handler.timeout.IdleStateHandler;
0030 import org.slf4j.Logger;
0031 import org.slf4j.LoggerFactory;
0032
0033 import org.apache.spark.network.client.TransportClient;
0034 import org.apache.spark.network.client.TransportClientBootstrap;
0035 import org.apache.spark.network.client.TransportClientFactory;
0036 import org.apache.spark.network.client.TransportResponseHandler;
0037 import org.apache.spark.network.protocol.MessageDecoder;
0038 import org.apache.spark.network.protocol.MessageEncoder;
0039 import org.apache.spark.network.server.ChunkFetchRequestHandler;
0040 import org.apache.spark.network.server.RpcHandler;
0041 import org.apache.spark.network.server.TransportChannelHandler;
0042 import org.apache.spark.network.server.TransportRequestHandler;
0043 import org.apache.spark.network.server.TransportServer;
0044 import org.apache.spark.network.server.TransportServerBootstrap;
0045 import org.apache.spark.network.util.IOMode;
0046 import org.apache.spark.network.util.NettyUtils;
0047 import org.apache.spark.network.util.TransportConf;
0048 import org.apache.spark.network.util.TransportFrameDecoder;
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064 public class TransportContext implements Closeable {
0065 private static final Logger logger = LoggerFactory.getLogger(TransportContext.class);
0066
0067 private final TransportConf conf;
0068 private final RpcHandler rpcHandler;
0069 private final boolean closeIdleConnections;
0070
0071 private Counter registeredConnections = new Counter();
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084
0085 private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
0086 private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;
0087
0088
0089
0090
0091 private final EventLoopGroup chunkFetchWorkers;
0092
0093 public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
0094 this(conf, rpcHandler, false, false);
0095 }
0096
0097 public TransportContext(
0098 TransportConf conf,
0099 RpcHandler rpcHandler,
0100 boolean closeIdleConnections) {
0101 this(conf, rpcHandler, closeIdleConnections, false);
0102 }
0103
0104
0105
0106
0107
0108
0109
0110
0111
0112
0113
0114
0115 public TransportContext(
0116 TransportConf conf,
0117 RpcHandler rpcHandler,
0118 boolean closeIdleConnections,
0119 boolean isClientOnly) {
0120 this.conf = conf;
0121 this.rpcHandler = rpcHandler;
0122 this.closeIdleConnections = closeIdleConnections;
0123
0124 if (conf.getModuleName() != null &&
0125 conf.getModuleName().equalsIgnoreCase("shuffle") &&
0126 !isClientOnly && conf.separateChunkFetchRequest()) {
0127 chunkFetchWorkers = NettyUtils.createEventLoop(
0128 IOMode.valueOf(conf.ioMode()),
0129 conf.chunkFetchHandlerThreads(),
0130 "shuffle-chunk-fetch-handler");
0131 } else {
0132 chunkFetchWorkers = null;
0133 }
0134 }
0135
0136
0137
0138
0139
0140
0141 public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
0142 return new TransportClientFactory(this, bootstraps);
0143 }
0144
0145 public TransportClientFactory createClientFactory() {
0146 return createClientFactory(new ArrayList<>());
0147 }
0148
0149
0150 public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
0151 return new TransportServer(this, null, port, rpcHandler, bootstraps);
0152 }
0153
0154
0155 public TransportServer createServer(
0156 String host, int port, List<TransportServerBootstrap> bootstraps) {
0157 return new TransportServer(this, host, port, rpcHandler, bootstraps);
0158 }
0159
0160
0161 public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
0162 return createServer(0, bootstraps);
0163 }
0164
0165 public TransportServer createServer() {
0166 return createServer(0, new ArrayList<>());
0167 }
0168
0169 public TransportChannelHandler initializePipeline(SocketChannel channel) {
0170 return initializePipeline(channel, rpcHandler);
0171 }
0172
0173
0174
0175
0176
0177
0178
0179
0180
0181
0182
0183
0184
0185 public TransportChannelHandler initializePipeline(
0186 SocketChannel channel,
0187 RpcHandler channelRpcHandler) {
0188 try {
0189 TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
0190 ChannelPipeline pipeline = channel.pipeline()
0191 .addLast("encoder", ENCODER)
0192 .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
0193 .addLast("decoder", DECODER)
0194 .addLast("idleStateHandler",
0195 new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
0196
0197
0198 .addLast("handler", channelHandler);
0199
0200 if (chunkFetchWorkers != null) {
0201 ChunkFetchRequestHandler chunkFetchHandler = new ChunkFetchRequestHandler(
0202 channelHandler.getClient(), rpcHandler.getStreamManager(),
0203 conf.maxChunksBeingTransferred(), true );
0204 pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler);
0205 }
0206 return channelHandler;
0207 } catch (RuntimeException e) {
0208 logger.error("Error while initializing Netty pipeline", e);
0209 throw e;
0210 }
0211 }
0212
0213
0214
0215
0216
0217
0218 private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
0219 TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
0220 TransportClient client = new TransportClient(channel, responseHandler);
0221 boolean separateChunkFetchRequest = conf.separateChunkFetchRequest();
0222 ChunkFetchRequestHandler chunkFetchRequestHandler = null;
0223 if (!separateChunkFetchRequest) {
0224 chunkFetchRequestHandler = new ChunkFetchRequestHandler(
0225 client, rpcHandler.getStreamManager(),
0226 conf.maxChunksBeingTransferred(), false );
0227 }
0228 TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
0229 rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler);
0230 return new TransportChannelHandler(client, responseHandler, requestHandler,
0231 conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this);
0232 }
0233
0234 public TransportConf getConf() { return conf; }
0235
0236 public Counter getRegisteredConnections() {
0237 return registeredConnections;
0238 }
0239
0240 public void close() {
0241 if (chunkFetchWorkers != null) {
0242 chunkFetchWorkers.shutdownGracefully();
0243 }
0244 }
0245 }