0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.server;
0019
0020 import java.io.Closeable;
0021 import java.net.InetSocketAddress;
0022 import java.util.List;
0023 import java.util.concurrent.TimeUnit;
0024
0025 import com.codahale.metrics.Counter;
0026 import com.codahale.metrics.MetricSet;
0027 import com.google.common.base.Preconditions;
0028 import com.google.common.collect.Lists;
0029 import io.netty.bootstrap.ServerBootstrap;
0030 import io.netty.buffer.PooledByteBufAllocator;
0031 import io.netty.channel.ChannelFuture;
0032 import io.netty.channel.ChannelInitializer;
0033 import io.netty.channel.ChannelOption;
0034 import io.netty.channel.EventLoopGroup;
0035 import io.netty.channel.socket.SocketChannel;
0036 import org.apache.commons.lang3.SystemUtils;
0037 import org.slf4j.Logger;
0038 import org.slf4j.LoggerFactory;
0039
0040 import org.apache.spark.network.TransportContext;
0041 import org.apache.spark.network.util.*;
0042
0043
0044
0045
0046 public class TransportServer implements Closeable {
0047 private static final Logger logger = LoggerFactory.getLogger(TransportServer.class);
0048
0049 private final TransportContext context;
0050 private final TransportConf conf;
0051 private final RpcHandler appRpcHandler;
0052 private final List<TransportServerBootstrap> bootstraps;
0053
0054 private ServerBootstrap bootstrap;
0055 private ChannelFuture channelFuture;
0056 private int port = -1;
0057 private final PooledByteBufAllocator pooledAllocator;
0058 private NettyMemoryMetrics metrics;
0059
0060
0061
0062
0063
0064 public TransportServer(
0065 TransportContext context,
0066 String hostToBind,
0067 int portToBind,
0068 RpcHandler appRpcHandler,
0069 List<TransportServerBootstrap> bootstraps) {
0070 this.context = context;
0071 this.conf = context.getConf();
0072 this.appRpcHandler = appRpcHandler;
0073 if (conf.sharedByteBufAllocators()) {
0074 this.pooledAllocator = NettyUtils.getSharedPooledByteBufAllocator(
0075 conf.preferDirectBufsForSharedByteBufAllocators(), true );
0076 } else {
0077 this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
0078 conf.preferDirectBufs(), true , conf.serverThreads());
0079 }
0080 this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
0081
0082 boolean shouldClose = true;
0083 try {
0084 init(hostToBind, portToBind);
0085 shouldClose = false;
0086 } finally {
0087 if (shouldClose) {
0088 JavaUtils.closeQuietly(this);
0089 }
0090 }
0091 }
0092
0093 public int getPort() {
0094 if (port == -1) {
0095 throw new IllegalStateException("Server not initialized");
0096 }
0097 return port;
0098 }
0099
0100 private void init(String hostToBind, int portToBind) {
0101
0102 IOMode ioMode = IOMode.valueOf(conf.ioMode());
0103 EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
0104 conf.getModuleName() + "-boss");
0105 EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
0106 conf.getModuleName() + "-server");
0107
0108 bootstrap = new ServerBootstrap()
0109 .group(bossGroup, workerGroup)
0110 .channel(NettyUtils.getServerChannelClass(ioMode))
0111 .option(ChannelOption.ALLOCATOR, pooledAllocator)
0112 .option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
0113 .childOption(ChannelOption.ALLOCATOR, pooledAllocator);
0114
0115 this.metrics = new NettyMemoryMetrics(
0116 pooledAllocator, conf.getModuleName() + "-server", conf);
0117
0118 if (conf.backLog() > 0) {
0119 bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
0120 }
0121
0122 if (conf.receiveBuf() > 0) {
0123 bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
0124 }
0125
0126 if (conf.sendBuf() > 0) {
0127 bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
0128 }
0129
0130 if (conf.enableTcpKeepAlive()) {
0131 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
0132 }
0133
0134 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
0135 @Override
0136 protected void initChannel(SocketChannel ch) {
0137 logger.debug("New connection accepted for remote address {}.", ch.remoteAddress());
0138
0139 RpcHandler rpcHandler = appRpcHandler;
0140 for (TransportServerBootstrap bootstrap : bootstraps) {
0141 rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
0142 }
0143 context.initializePipeline(ch, rpcHandler);
0144 }
0145 });
0146
0147 InetSocketAddress address = hostToBind == null ?
0148 new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
0149 channelFuture = bootstrap.bind(address);
0150 channelFuture.syncUninterruptibly();
0151
0152 port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
0153 logger.debug("Shuffle server started on port: {}", port);
0154 }
0155
0156 public MetricSet getAllMetrics() {
0157 return metrics;
0158 }
0159
0160 @Override
0161 public void close() {
0162 if (channelFuture != null) {
0163
0164 channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
0165 channelFuture = null;
0166 }
0167 if (bootstrap != null && bootstrap.config().group() != null) {
0168 bootstrap.config().group().shutdownGracefully();
0169 }
0170 if (bootstrap != null && bootstrap.config().childGroup() != null) {
0171 bootstrap.config().childGroup().shutdownGracefully();
0172 }
0173 bootstrap = null;
0174 }
0175
0176 public Counter getRegisteredConnections() {
0177 return context.getRegisteredConnections();
0178 }
0179 }