0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.util;
0019
0020 import java.util.concurrent.ThreadFactory;
0021
0022 import io.netty.buffer.PooledByteBufAllocator;
0023 import io.netty.channel.Channel;
0024 import io.netty.channel.EventLoopGroup;
0025 import io.netty.channel.ServerChannel;
0026 import io.netty.channel.epoll.EpollEventLoopGroup;
0027 import io.netty.channel.epoll.EpollServerSocketChannel;
0028 import io.netty.channel.epoll.EpollSocketChannel;
0029 import io.netty.channel.nio.NioEventLoopGroup;
0030 import io.netty.channel.socket.nio.NioServerSocketChannel;
0031 import io.netty.channel.socket.nio.NioSocketChannel;
0032 import io.netty.util.concurrent.DefaultThreadFactory;
0033 import io.netty.util.internal.PlatformDependent;
0034
0035
0036
0037
0038 public class NettyUtils {
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050 private static int MAX_DEFAULT_NETTY_THREADS = 8;
0051
0052 private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator =
0053 new PooledByteBufAllocator[2];
0054
0055
0056 public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
0057 return new DefaultThreadFactory(threadPoolPrefix, true);
0058 }
0059
0060
0061 public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {
0062 ThreadFactory threadFactory = createThreadFactory(threadPrefix);
0063
0064 switch (mode) {
0065 case NIO:
0066 return new NioEventLoopGroup(numThreads, threadFactory);
0067 case EPOLL:
0068 return new EpollEventLoopGroup(numThreads, threadFactory);
0069 default:
0070 throw new IllegalArgumentException("Unknown io mode: " + mode);
0071 }
0072 }
0073
0074
0075 public static Class<? extends Channel> getClientChannelClass(IOMode mode) {
0076 switch (mode) {
0077 case NIO:
0078 return NioSocketChannel.class;
0079 case EPOLL:
0080 return EpollSocketChannel.class;
0081 default:
0082 throw new IllegalArgumentException("Unknown io mode: " + mode);
0083 }
0084 }
0085
0086
0087 public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
0088 switch (mode) {
0089 case NIO:
0090 return NioServerSocketChannel.class;
0091 case EPOLL:
0092 return EpollServerSocketChannel.class;
0093 default:
0094 throw new IllegalArgumentException("Unknown io mode: " + mode);
0095 }
0096 }
0097
0098
0099
0100
0101
0102 public static TransportFrameDecoder createFrameDecoder() {
0103 return new TransportFrameDecoder();
0104 }
0105
0106
0107 public static String getRemoteAddress(Channel channel) {
0108 if (channel != null && channel.remoteAddress() != null) {
0109 return channel.remoteAddress().toString();
0110 }
0111 return "<unknown remote>";
0112 }
0113
0114
0115
0116
0117
0118 public static int defaultNumThreads(int numUsableCores) {
0119 final int availableCores;
0120 if (numUsableCores > 0) {
0121 availableCores = numUsableCores;
0122 } else {
0123 availableCores = Runtime.getRuntime().availableProcessors();
0124 }
0125 return Math.min(availableCores, MAX_DEFAULT_NETTY_THREADS);
0126 }
0127
0128
0129
0130
0131
0132 public static synchronized PooledByteBufAllocator getSharedPooledByteBufAllocator(
0133 boolean allowDirectBufs,
0134 boolean allowCache) {
0135 final int index = allowCache ? 0 : 1;
0136 if (_sharedPooledByteBufAllocator[index] == null) {
0137 _sharedPooledByteBufAllocator[index] =
0138 createPooledByteBufAllocator(
0139 allowDirectBufs,
0140 allowCache,
0141 defaultNumThreads(0));
0142 }
0143 return _sharedPooledByteBufAllocator[index];
0144 }
0145
0146
0147
0148
0149
0150
0151
0152 public static PooledByteBufAllocator createPooledByteBufAllocator(
0153 boolean allowDirectBufs,
0154 boolean allowCache,
0155 int numCores) {
0156 if (numCores == 0) {
0157 numCores = Runtime.getRuntime().availableProcessors();
0158 }
0159 return new PooledByteBufAllocator(
0160 allowDirectBufs && PlatformDependent.directBufferPreferred(),
0161 Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
0162 Math.min(PooledByteBufAllocator.defaultNumDirectArena(), allowDirectBufs ? numCores : 0),
0163 PooledByteBufAllocator.defaultPageSize(),
0164 PooledByteBufAllocator.defaultMaxOrder(),
0165 allowCache ? PooledByteBufAllocator.defaultTinyCacheSize() : 0,
0166 allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0,
0167 allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0,
0168 allowCache ? PooledByteBufAllocator.defaultUseCacheForAllThreads() : false
0169 );
0170 }
0171 }