Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.util;
0019 
0020 import java.util.concurrent.ThreadFactory;
0021 
0022 import io.netty.buffer.PooledByteBufAllocator;
0023 import io.netty.channel.Channel;
0024 import io.netty.channel.EventLoopGroup;
0025 import io.netty.channel.ServerChannel;
0026 import io.netty.channel.epoll.EpollEventLoopGroup;
0027 import io.netty.channel.epoll.EpollServerSocketChannel;
0028 import io.netty.channel.epoll.EpollSocketChannel;
0029 import io.netty.channel.nio.NioEventLoopGroup;
0030 import io.netty.channel.socket.nio.NioServerSocketChannel;
0031 import io.netty.channel.socket.nio.NioSocketChannel;
0032 import io.netty.util.concurrent.DefaultThreadFactory;
0033 import io.netty.util.internal.PlatformDependent;
0034 
0035 /**
0036  * Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO.
0037  */
0038 public class NettyUtils {
0039 
0040   /**
0041    * Specifies an upper bound on the number of Netty threads that Spark requires by default.
0042    * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
0043    * that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
0044    * at a premium.
0045    *
0046    * Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
0047    * allocation. It can be overridden by setting the number of serverThreads and clientThreads
0048    * manually in Spark's configuration.
0049    */
0050   private static int MAX_DEFAULT_NETTY_THREADS = 8;
0051 
0052   private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator =
0053       new PooledByteBufAllocator[2];
0054 
0055   /** Creates a new ThreadFactory which prefixes each thread with the given name. */
0056   public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
0057     return new DefaultThreadFactory(threadPoolPrefix, true);
0058   }
0059 
0060   /** Creates a Netty EventLoopGroup based on the IOMode. */
0061   public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {
0062     ThreadFactory threadFactory = createThreadFactory(threadPrefix);
0063 
0064     switch (mode) {
0065       case NIO:
0066         return new NioEventLoopGroup(numThreads, threadFactory);
0067       case EPOLL:
0068         return new EpollEventLoopGroup(numThreads, threadFactory);
0069       default:
0070         throw new IllegalArgumentException("Unknown io mode: " + mode);
0071     }
0072   }
0073 
0074   /** Returns the correct (client) SocketChannel class based on IOMode. */
0075   public static Class<? extends Channel> getClientChannelClass(IOMode mode) {
0076     switch (mode) {
0077       case NIO:
0078         return NioSocketChannel.class;
0079       case EPOLL:
0080         return EpollSocketChannel.class;
0081       default:
0082         throw new IllegalArgumentException("Unknown io mode: " + mode);
0083     }
0084   }
0085 
0086   /** Returns the correct ServerSocketChannel class based on IOMode. */
0087   public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
0088     switch (mode) {
0089       case NIO:
0090         return NioServerSocketChannel.class;
0091       case EPOLL:
0092         return EpollServerSocketChannel.class;
0093       default:
0094         throw new IllegalArgumentException("Unknown io mode: " + mode);
0095     }
0096   }
0097 
0098   /**
0099    * Creates a LengthFieldBasedFrameDecoder where the first 8 bytes are the length of the frame.
0100    * This is used before all decoders.
0101    */
0102   public static TransportFrameDecoder createFrameDecoder() {
0103     return new TransportFrameDecoder();
0104   }
0105 
0106   /** Returns the remote address on the channel or "&lt;unknown remote&gt;" if none exists. */
0107   public static String getRemoteAddress(Channel channel) {
0108     if (channel != null && channel.remoteAddress() != null) {
0109       return channel.remoteAddress().toString();
0110     }
0111     return "<unknown remote>";
0112   }
0113 
0114   /**
0115    * Returns the default number of threads for both the Netty client and server thread pools.
0116    * If numUsableCores is 0, we will use Runtime get an approximate number of available cores.
0117    */
0118   public static int defaultNumThreads(int numUsableCores) {
0119     final int availableCores;
0120     if (numUsableCores > 0) {
0121       availableCores = numUsableCores;
0122     } else {
0123       availableCores = Runtime.getRuntime().availableProcessors();
0124     }
0125     return Math.min(availableCores, MAX_DEFAULT_NETTY_THREADS);
0126   }
0127 
0128   /**
0129    * Returns the lazily created shared pooled ByteBuf allocator for the specified allowCache
0130    * parameter value.
0131    */
0132   public static synchronized PooledByteBufAllocator getSharedPooledByteBufAllocator(
0133       boolean allowDirectBufs,
0134       boolean allowCache) {
0135     final int index = allowCache ? 0 : 1;
0136     if (_sharedPooledByteBufAllocator[index] == null) {
0137       _sharedPooledByteBufAllocator[index] =
0138         createPooledByteBufAllocator(
0139           allowDirectBufs,
0140           allowCache,
0141           defaultNumThreads(0));
0142     }
0143     return _sharedPooledByteBufAllocator[index];
0144   }
0145 
0146   /**
0147    * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
0148    * are disabled for TransportClients because the ByteBufs are allocated by the event loop thread,
0149    * but released by the executor thread rather than the event loop thread. Those thread-local
0150    * caches actually delay the recycling of buffers, leading to larger memory usage.
0151    */
0152   public static PooledByteBufAllocator createPooledByteBufAllocator(
0153       boolean allowDirectBufs,
0154       boolean allowCache,
0155       int numCores) {
0156     if (numCores == 0) {
0157       numCores = Runtime.getRuntime().availableProcessors();
0158     }
0159     return new PooledByteBufAllocator(
0160       allowDirectBufs && PlatformDependent.directBufferPreferred(),
0161       Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
0162       Math.min(PooledByteBufAllocator.defaultNumDirectArena(), allowDirectBufs ? numCores : 0),
0163       PooledByteBufAllocator.defaultPageSize(),
0164       PooledByteBufAllocator.defaultMaxOrder(),
0165       allowCache ? PooledByteBufAllocator.defaultTinyCacheSize() : 0,
0166       allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0,
0167       allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0,
0168       allowCache ? PooledByteBufAllocator.defaultUseCacheForAllThreads() : false
0169     );
0170   }
0171 }