Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network;
0019 
0020 import java.io.Closeable;
0021 import java.util.ArrayList;
0022 import java.util.List;
0023 
0024 import com.codahale.metrics.Counter;
0025 import io.netty.channel.Channel;
0026 import io.netty.channel.ChannelPipeline;
0027 import io.netty.channel.EventLoopGroup;
0028 import io.netty.channel.socket.SocketChannel;
0029 import io.netty.handler.timeout.IdleStateHandler;
0030 import org.slf4j.Logger;
0031 import org.slf4j.LoggerFactory;
0032 
0033 import org.apache.spark.network.client.TransportClient;
0034 import org.apache.spark.network.client.TransportClientBootstrap;
0035 import org.apache.spark.network.client.TransportClientFactory;
0036 import org.apache.spark.network.client.TransportResponseHandler;
0037 import org.apache.spark.network.protocol.MessageDecoder;
0038 import org.apache.spark.network.protocol.MessageEncoder;
0039 import org.apache.spark.network.server.ChunkFetchRequestHandler;
0040 import org.apache.spark.network.server.RpcHandler;
0041 import org.apache.spark.network.server.TransportChannelHandler;
0042 import org.apache.spark.network.server.TransportRequestHandler;
0043 import org.apache.spark.network.server.TransportServer;
0044 import org.apache.spark.network.server.TransportServerBootstrap;
0045 import org.apache.spark.network.util.IOMode;
0046 import org.apache.spark.network.util.NettyUtils;
0047 import org.apache.spark.network.util.TransportConf;
0048 import org.apache.spark.network.util.TransportFrameDecoder;
0049 
0050 /**
0051  * Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to
0052  * setup Netty Channel pipelines with a
0053  * {@link org.apache.spark.network.server.TransportChannelHandler}.
0054  *
0055  * There are two communication protocols that the TransportClient provides, control-plane RPCs and
0056  * data-plane "chunk fetching". The handling of the RPCs is performed outside of the scope of the
0057  * TransportContext (i.e., by a user-provided handler), and it is responsible for setting up streams
0058  * which can be streamed through the data plane in chunks using zero-copy IO.
0059  *
0060  * The TransportServer and TransportClientFactory both create a TransportChannelHandler for each
0061  * channel. As each TransportChannelHandler contains a TransportClient, this enables server
0062  * processes to send messages back to the client on an existing channel.
0063  */
0064 public class TransportContext implements Closeable {
0065   private static final Logger logger = LoggerFactory.getLogger(TransportContext.class);
0066 
0067   private final TransportConf conf;
0068   private final RpcHandler rpcHandler;
0069   private final boolean closeIdleConnections;
0070   // Number of registered connections to the shuffle service
0071   private Counter registeredConnections = new Counter();
0072 
0073   /**
0074    * Force to create MessageEncoder and MessageDecoder so that we can make sure they will be created
0075    * before switching the current context class loader to ExecutorClassLoader.
0076    *
0077    * Netty's MessageToMessageEncoder uses Javassist to generate a matcher class and the
0078    * implementation calls "Class.forName" to check if this calls is already generated. If the
0079    * following two objects are created in "ExecutorClassLoader.findClass", it will cause
0080    * "ClassCircularityError". This is because loading this Netty generated class will call
0081    * "ExecutorClassLoader.findClass" to search this class, and "ExecutorClassLoader" will try to use
0082    * RPC to load it and cause to load the non-exist matcher class again. JVM will report
0083    * `ClassCircularityError` to prevent such infinite recursion. (See SPARK-17714)
0084    */
0085   private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
0086   private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;
0087 
0088   // Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling
0089   // max number of TransportServer worker threads that are blocked on writing response
0090   // of ChunkFetchRequest message back to the client via the underlying channel.
0091   private final EventLoopGroup chunkFetchWorkers;
0092 
0093   public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
0094     this(conf, rpcHandler, false, false);
0095   }
0096 
0097   public TransportContext(
0098       TransportConf conf,
0099       RpcHandler rpcHandler,
0100       boolean closeIdleConnections) {
0101     this(conf, rpcHandler, closeIdleConnections, false);
0102   }
0103 
0104   /**
0105    * Enables TransportContext initialization for underlying client and server.
0106    *
0107    * @param conf TransportConf
0108    * @param rpcHandler RpcHandler responsible for handling requests and responses.
0109    * @param closeIdleConnections Close idle connections if it is set to true.
0110    * @param isClientOnly This config indicates the TransportContext is only used by a client.
0111    *                     This config is more important when external shuffle is enabled.
0112    *                     It stops creating extra event loop and subsequent thread pool
0113    *                     for shuffle clients to handle chunked fetch requests.
0114    */
0115   public TransportContext(
0116       TransportConf conf,
0117       RpcHandler rpcHandler,
0118       boolean closeIdleConnections,
0119       boolean isClientOnly) {
0120     this.conf = conf;
0121     this.rpcHandler = rpcHandler;
0122     this.closeIdleConnections = closeIdleConnections;
0123 
0124     if (conf.getModuleName() != null &&
0125         conf.getModuleName().equalsIgnoreCase("shuffle") &&
0126         !isClientOnly && conf.separateChunkFetchRequest()) {
0127       chunkFetchWorkers = NettyUtils.createEventLoop(
0128           IOMode.valueOf(conf.ioMode()),
0129           conf.chunkFetchHandlerThreads(),
0130           "shuffle-chunk-fetch-handler");
0131     } else {
0132       chunkFetchWorkers = null;
0133     }
0134   }
0135 
0136   /**
0137    * Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning
0138    * a new Client. Bootstraps will be executed synchronously, and must run successfully in order
0139    * to create a Client.
0140    */
0141   public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
0142     return new TransportClientFactory(this, bootstraps);
0143   }
0144 
0145   public TransportClientFactory createClientFactory() {
0146     return createClientFactory(new ArrayList<>());
0147   }
0148 
0149   /** Create a server which will attempt to bind to a specific port. */
0150   public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
0151     return new TransportServer(this, null, port, rpcHandler, bootstraps);
0152   }
0153 
0154   /** Create a server which will attempt to bind to a specific host and port. */
0155   public TransportServer createServer(
0156       String host, int port, List<TransportServerBootstrap> bootstraps) {
0157     return new TransportServer(this, host, port, rpcHandler, bootstraps);
0158   }
0159 
0160   /** Creates a new server, binding to any available ephemeral port. */
0161   public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
0162     return createServer(0, bootstraps);
0163   }
0164 
0165   public TransportServer createServer() {
0166     return createServer(0, new ArrayList<>());
0167   }
0168 
0169   public TransportChannelHandler initializePipeline(SocketChannel channel) {
0170     return initializePipeline(channel, rpcHandler);
0171   }
0172 
0173   /**
0174    * Initializes a client or server Netty Channel Pipeline which encodes/decodes messages and
0175    * has a {@link org.apache.spark.network.server.TransportChannelHandler} to handle request or
0176    * response messages.
0177    *
0178    * @param channel The channel to initialize.
0179    * @param channelRpcHandler The RPC handler to use for the channel.
0180    *
0181    * @return Returns the created TransportChannelHandler, which includes a TransportClient that can
0182    * be used to communicate on this channel. The TransportClient is directly associated with a
0183    * ChannelHandler to ensure all users of the same channel get the same TransportClient object.
0184    */
0185   public TransportChannelHandler initializePipeline(
0186       SocketChannel channel,
0187       RpcHandler channelRpcHandler) {
0188     try {
0189       TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
0190       ChannelPipeline pipeline = channel.pipeline()
0191         .addLast("encoder", ENCODER)
0192         .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
0193         .addLast("decoder", DECODER)
0194         .addLast("idleStateHandler",
0195           new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
0196         // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
0197         // would require more logic to guarantee if this were not part of the same event loop.
0198         .addLast("handler", channelHandler);
0199       // Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs.
0200       if (chunkFetchWorkers != null) {
0201         ChunkFetchRequestHandler chunkFetchHandler = new ChunkFetchRequestHandler(
0202           channelHandler.getClient(), rpcHandler.getStreamManager(),
0203           conf.maxChunksBeingTransferred(), true /* syncModeEnabled */);
0204         pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler);
0205       }
0206       return channelHandler;
0207     } catch (RuntimeException e) {
0208       logger.error("Error while initializing Netty pipeline", e);
0209       throw e;
0210     }
0211   }
0212 
0213   /**
0214    * Creates the server- and client-side handler which is used to handle both RequestMessages and
0215    * ResponseMessages. The channel is expected to have been successfully created, though certain
0216    * properties (such as the remoteAddress()) may not be available yet.
0217    */
0218   private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
0219     TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
0220     TransportClient client = new TransportClient(channel, responseHandler);
0221     boolean separateChunkFetchRequest = conf.separateChunkFetchRequest();
0222     ChunkFetchRequestHandler chunkFetchRequestHandler = null;
0223     if (!separateChunkFetchRequest) {
0224       chunkFetchRequestHandler = new ChunkFetchRequestHandler(
0225         client, rpcHandler.getStreamManager(),
0226         conf.maxChunksBeingTransferred(), false /* syncModeEnabled */);
0227     }
0228     TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
0229       rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler);
0230     return new TransportChannelHandler(client, responseHandler, requestHandler,
0231       conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this);
0232   }
0233 
0234   public TransportConf getConf() { return conf; }
0235 
0236   public Counter getRegisteredConnections() {
0237     return registeredConnections;
0238   }
0239 
0240   public void close() {
0241     if (chunkFetchWorkers != null) {
0242       chunkFetchWorkers.shutdownGracefully();
0243     }
0244   }
0245 }