Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.server;
0019 
0020 import java.io.Closeable;
0021 import java.net.InetSocketAddress;
0022 import java.util.List;
0023 import java.util.concurrent.TimeUnit;
0024 
0025 import com.codahale.metrics.Counter;
0026 import com.codahale.metrics.MetricSet;
0027 import com.google.common.base.Preconditions;
0028 import com.google.common.collect.Lists;
0029 import io.netty.bootstrap.ServerBootstrap;
0030 import io.netty.buffer.PooledByteBufAllocator;
0031 import io.netty.channel.ChannelFuture;
0032 import io.netty.channel.ChannelInitializer;
0033 import io.netty.channel.ChannelOption;
0034 import io.netty.channel.EventLoopGroup;
0035 import io.netty.channel.socket.SocketChannel;
0036 import org.apache.commons.lang3.SystemUtils;
0037 import org.slf4j.Logger;
0038 import org.slf4j.LoggerFactory;
0039 
0040 import org.apache.spark.network.TransportContext;
0041 import org.apache.spark.network.util.*;
0042 
0043 /**
0044  * Server for the efficient, low-level streaming service.
0045  */
0046 public class TransportServer implements Closeable {
0047   private static final Logger logger = LoggerFactory.getLogger(TransportServer.class);
0048 
0049   private final TransportContext context;
0050   private final TransportConf conf;
0051   private final RpcHandler appRpcHandler;
0052   private final List<TransportServerBootstrap> bootstraps;
0053 
0054   private ServerBootstrap bootstrap;
0055   private ChannelFuture channelFuture;
0056   private int port = -1;
0057   private final PooledByteBufAllocator pooledAllocator;
0058   private NettyMemoryMetrics metrics;
0059 
0060   /**
0061    * Creates a TransportServer that binds to the given host and the given port, or to any available
0062    * if 0. If you don't want to bind to any special host, set "hostToBind" to null.
0063    * */
0064   public TransportServer(
0065       TransportContext context,
0066       String hostToBind,
0067       int portToBind,
0068       RpcHandler appRpcHandler,
0069       List<TransportServerBootstrap> bootstraps) {
0070     this.context = context;
0071     this.conf = context.getConf();
0072     this.appRpcHandler = appRpcHandler;
0073     if (conf.sharedByteBufAllocators()) {
0074       this.pooledAllocator = NettyUtils.getSharedPooledByteBufAllocator(
0075           conf.preferDirectBufsForSharedByteBufAllocators(), true /* allowCache */);
0076     } else {
0077       this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
0078           conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
0079     }
0080     this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
0081 
0082     boolean shouldClose = true;
0083     try {
0084       init(hostToBind, portToBind);
0085       shouldClose = false;
0086     } finally {
0087       if (shouldClose) {
0088         JavaUtils.closeQuietly(this);
0089       }
0090     }
0091   }
0092 
0093   public int getPort() {
0094     if (port == -1) {
0095       throw new IllegalStateException("Server not initialized");
0096     }
0097     return port;
0098   }
0099 
0100   private void init(String hostToBind, int portToBind) {
0101 
0102     IOMode ioMode = IOMode.valueOf(conf.ioMode());
0103     EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
0104       conf.getModuleName() + "-boss");
0105     EventLoopGroup workerGroup =  NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
0106       conf.getModuleName() + "-server");
0107 
0108     bootstrap = new ServerBootstrap()
0109       .group(bossGroup, workerGroup)
0110       .channel(NettyUtils.getServerChannelClass(ioMode))
0111       .option(ChannelOption.ALLOCATOR, pooledAllocator)
0112       .option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
0113       .childOption(ChannelOption.ALLOCATOR, pooledAllocator);
0114 
0115     this.metrics = new NettyMemoryMetrics(
0116       pooledAllocator, conf.getModuleName() + "-server", conf);
0117 
0118     if (conf.backLog() > 0) {
0119       bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
0120     }
0121 
0122     if (conf.receiveBuf() > 0) {
0123       bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
0124     }
0125 
0126     if (conf.sendBuf() > 0) {
0127       bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
0128     }
0129 
0130     if (conf.enableTcpKeepAlive()) {
0131       bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
0132     }
0133 
0134     bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
0135       @Override
0136       protected void initChannel(SocketChannel ch) {
0137         logger.debug("New connection accepted for remote address {}.", ch.remoteAddress());
0138 
0139         RpcHandler rpcHandler = appRpcHandler;
0140         for (TransportServerBootstrap bootstrap : bootstraps) {
0141           rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
0142         }
0143         context.initializePipeline(ch, rpcHandler);
0144       }
0145     });
0146 
0147     InetSocketAddress address = hostToBind == null ?
0148         new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
0149     channelFuture = bootstrap.bind(address);
0150     channelFuture.syncUninterruptibly();
0151 
0152     port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
0153     logger.debug("Shuffle server started on port: {}", port);
0154   }
0155 
0156   public MetricSet getAllMetrics() {
0157     return metrics;
0158   }
0159 
0160   @Override
0161   public void close() {
0162     if (channelFuture != null) {
0163       // close is a local operation and should finish within milliseconds; timeout just to be safe
0164       channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
0165       channelFuture = null;
0166     }
0167     if (bootstrap != null && bootstrap.config().group() != null) {
0168       bootstrap.config().group().shutdownGracefully();
0169     }
0170     if (bootstrap != null && bootstrap.config().childGroup() != null) {
0171       bootstrap.config().childGroup().shutdownGracefully();
0172     }
0173     bootstrap = null;
0174   }
0175 
0176   public Counter getRegisteredConnections() {
0177     return context.getRegisteredConnections();
0178   }
0179 }