Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.util;
0019 
0020 import java.util.Locale;
0021 import java.util.Properties;
0022 
0023 import com.google.common.primitives.Ints;
0024 import io.netty.util.NettyRuntime;
0025 
0026 /**
0027  * A central location that tracks all the settings we expose to users.
0028  */
0029 public class TransportConf {
0030 
0031   private final String SPARK_NETWORK_IO_MODE_KEY;
0032   private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY;
0033   private final String SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY;
0034   private final String SPARK_NETWORK_IO_BACKLOG_KEY;
0035   private final String SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY;
0036   private final String SPARK_NETWORK_IO_SERVERTHREADS_KEY;
0037   private final String SPARK_NETWORK_IO_CLIENTTHREADS_KEY;
0038   private final String SPARK_NETWORK_IO_RECEIVEBUFFER_KEY;
0039   private final String SPARK_NETWORK_IO_SENDBUFFER_KEY;
0040   private final String SPARK_NETWORK_SASL_TIMEOUT_KEY;
0041   private final String SPARK_NETWORK_IO_MAXRETRIES_KEY;
0042   private final String SPARK_NETWORK_IO_RETRYWAIT_KEY;
0043   private final String SPARK_NETWORK_IO_LAZYFD_KEY;
0044   private final String SPARK_NETWORK_VERBOSE_METRICS;
0045   private final String SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY;
0046 
0047   private final ConfigProvider conf;
0048 
0049   private final String module;
0050 
0051   public TransportConf(String module, ConfigProvider conf) {
0052     this.module = module;
0053     this.conf = conf;
0054     SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
0055     SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
0056     SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
0057     SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog");
0058     SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY =  getConfKey("io.numConnectionsPerPeer");
0059     SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads");
0060     SPARK_NETWORK_IO_CLIENTTHREADS_KEY = getConfKey("io.clientThreads");
0061     SPARK_NETWORK_IO_RECEIVEBUFFER_KEY = getConfKey("io.receiveBuffer");
0062     SPARK_NETWORK_IO_SENDBUFFER_KEY = getConfKey("io.sendBuffer");
0063     SPARK_NETWORK_SASL_TIMEOUT_KEY = getConfKey("sasl.timeout");
0064     SPARK_NETWORK_IO_MAXRETRIES_KEY = getConfKey("io.maxRetries");
0065     SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");
0066     SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
0067     SPARK_NETWORK_VERBOSE_METRICS = getConfKey("io.enableVerboseMetrics");
0068     SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY = getConfKey("io.enableTcpKeepAlive");
0069   }
0070 
0071   public int getInt(String name, int defaultValue) {
0072     return conf.getInt(name, defaultValue);
0073   }
0074 
0075   public String get(String name, String defaultValue) {
0076     return conf.get(name, defaultValue);
0077   }
0078 
0079   private String getConfKey(String suffix) {
0080     return "spark." + module + "." + suffix;
0081   }
0082 
0083   public String getModuleName() {
0084     return module;
0085   }
0086 
0087   /** IO mode: nio or epoll */
0088   public String ioMode() {
0089     return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(Locale.ROOT);
0090   }
0091 
0092   /** If true, we will prefer allocating off-heap byte buffers within Netty. */
0093   public boolean preferDirectBufs() {
0094     return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true);
0095   }
0096 
0097   /** Connect timeout in milliseconds. Default 120 secs. */
0098   public int connectionTimeoutMs() {
0099     long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
0100       conf.get("spark.network.timeout", "120s"));
0101     long defaultTimeoutMs = JavaUtils.timeStringAsSec(
0102       conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
0103     return (int) defaultTimeoutMs;
0104   }
0105 
0106   /** Number of concurrent connections between two nodes for fetching data. */
0107   public int numConnectionsPerPeer() {
0108     return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);
0109   }
0110 
0111   /**
0112    * Requested maximum length of the queue of incoming connections. If  < 1,
0113    * the default Netty value of {@link io.netty.util.NetUtil#SOMAXCONN} will be used.
0114    * Default to -1.
0115    */
0116   public int backLog() { return conf.getInt(SPARK_NETWORK_IO_BACKLOG_KEY, -1); }
0117 
0118   /** Number of threads used in the server thread pool. Default to 0, which is 2x#cores. */
0119   public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }
0120 
0121   /** Number of threads used in the client thread pool. Default to 0, which is 2x#cores. */
0122   public int clientThreads() { return conf.getInt(SPARK_NETWORK_IO_CLIENTTHREADS_KEY, 0); }
0123 
0124   /**
0125    * Receive buffer size (SO_RCVBUF).
0126    * Note: the optimal size for receive buffer and send buffer should be
0127    *  latency * network_bandwidth.
0128    * Assuming latency = 1ms, network_bandwidth = 10Gbps
0129    *  buffer size should be ~ 1.25MB
0130    */
0131   public int receiveBuf() { return conf.getInt(SPARK_NETWORK_IO_RECEIVEBUFFER_KEY, -1); }
0132 
0133   /** Send buffer size (SO_SNDBUF). */
0134   public int sendBuf() { return conf.getInt(SPARK_NETWORK_IO_SENDBUFFER_KEY, -1); }
0135 
0136   /** Timeout for a single round trip of auth message exchange, in milliseconds. */
0137   public int authRTTimeoutMs() {
0138     return (int) JavaUtils.timeStringAsSec(conf.get("spark.network.auth.rpcTimeout",
0139       conf.get(SPARK_NETWORK_SASL_TIMEOUT_KEY, "30s"))) * 1000;
0140   }
0141 
0142   /**
0143    * Max number of times we will try IO exceptions (such as connection timeouts) per request.
0144    * If set to 0, we will not do any retries.
0145    */
0146   public int maxIORetries() { return conf.getInt(SPARK_NETWORK_IO_MAXRETRIES_KEY, 3); }
0147 
0148   /**
0149    * Time (in milliseconds) that we will wait in order to perform a retry after an IOException.
0150    * Only relevant if maxIORetries > 0.
0151    */
0152   public int ioRetryWaitTimeMs() {
0153     return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_IO_RETRYWAIT_KEY, "5s")) * 1000;
0154   }
0155 
0156   /**
0157    * Minimum size of a block that we should start using memory map rather than reading in through
0158    * normal IO operations. This prevents Spark from memory mapping very small blocks. In general,
0159    * memory mapping has high overhead for blocks close to or below the page size of the OS.
0160    */
0161   public int memoryMapBytes() {
0162     return Ints.checkedCast(JavaUtils.byteStringAsBytes(
0163       conf.get("spark.storage.memoryMapThreshold", "2m")));
0164   }
0165 
0166   /**
0167    * Whether to initialize FileDescriptor lazily or not. If true, file descriptors are
0168    * created only when data is going to be transferred. This can reduce the number of open files.
0169    */
0170   public boolean lazyFileDescriptor() {
0171     return conf.getBoolean(SPARK_NETWORK_IO_LAZYFD_KEY, true);
0172   }
0173 
0174   /**
0175    * Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty
0176    * PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked.
0177    */
0178   public boolean verboseMetrics() {
0179     return conf.getBoolean(SPARK_NETWORK_VERBOSE_METRICS, false);
0180   }
0181 
0182   /**
0183    * Whether to enable TCP keep-alive. If true, the TCP keep-alives are enabled, which removes
0184    * connections that are idle for too long.
0185    */
0186   public boolean enableTcpKeepAlive() {
0187     return conf.getBoolean(SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY, false);
0188   }
0189 
0190   /**
0191    * Maximum number of retries when binding to a port before giving up.
0192    */
0193   public int portMaxRetries() {
0194     return conf.getInt("spark.port.maxRetries", 16);
0195   }
0196 
0197   /**
0198    * Enables strong encryption. Also enables the new auth protocol, used to negotiate keys.
0199    */
0200   public boolean encryptionEnabled() {
0201     return conf.getBoolean("spark.network.crypto.enabled", false);
0202   }
0203 
0204   /**
0205    * The cipher transformation to use for encrypting session data.
0206    */
0207   public String cipherTransformation() {
0208     return conf.get("spark.network.crypto.cipher", "AES/CTR/NoPadding");
0209   }
0210 
0211   /**
0212    * The key generation algorithm. This should be an algorithm that accepts a "PBEKeySpec"
0213    * as input. The default value (PBKDF2WithHmacSHA1) is available in Java 7.
0214    */
0215   public String keyFactoryAlgorithm() {
0216     return conf.get("spark.network.crypto.keyFactoryAlgorithm", "PBKDF2WithHmacSHA1");
0217   }
0218 
0219   /**
0220    * How many iterations to run when generating keys.
0221    *
0222    * See some discussion about this at: http://security.stackexchange.com/q/3959
0223    * The default value was picked for speed, since it assumes that the secret has good entropy
0224    * (128 bits by default), which is not generally the case with user passwords.
0225    */
0226   public int keyFactoryIterations() {
0227     return conf.getInt("spark.network.crypto.keyFactoryIterations", 1024);
0228   }
0229 
0230   /**
0231    * Encryption key length, in bits.
0232    */
0233   public int encryptionKeyLength() {
0234     return conf.getInt("spark.network.crypto.keyLength", 128);
0235   }
0236 
0237   /**
0238    * Initial vector length, in bytes.
0239    */
0240   public int ivLength() {
0241     return conf.getInt("spark.network.crypto.ivLength", 16);
0242   }
0243 
0244   /**
0245    * The algorithm for generated secret keys. Nobody should really need to change this,
0246    * but configurable just in case.
0247    */
0248   public String keyAlgorithm() {
0249     return conf.get("spark.network.crypto.keyAlgorithm", "AES");
0250   }
0251 
0252   /**
0253    * Whether to fall back to SASL if the new auth protocol fails. Enabled by default for
0254    * backwards compatibility.
0255    */
0256   public boolean saslFallback() {
0257     return conf.getBoolean("spark.network.crypto.saslFallback", true);
0258   }
0259 
0260   /**
0261    * Whether to enable SASL-based encryption when authenticating using SASL.
0262    */
0263   public boolean saslEncryption() {
0264     return conf.getBoolean("spark.authenticate.enableSaslEncryption", false);
0265   }
0266 
0267   /**
0268    * Maximum number of bytes to be encrypted at a time when SASL encryption is used.
0269    */
0270   public int maxSaslEncryptedBlockSize() {
0271     return Ints.checkedCast(JavaUtils.byteStringAsBytes(
0272       conf.get("spark.network.sasl.maxEncryptedBlockSize", "64k")));
0273   }
0274 
0275   /**
0276    * Whether the server should enforce encryption on SASL-authenticated connections.
0277    */
0278   public boolean saslServerAlwaysEncrypt() {
0279     return conf.getBoolean("spark.network.sasl.serverAlwaysEncrypt", false);
0280   }
0281 
0282   /**
0283    * Flag indicating whether to share the pooled ByteBuf allocators between the different Netty
0284    * channels. If enabled then only two pooled ByteBuf allocators are created: one where caching
0285    * is allowed (for transport servers) and one where not (for transport clients).
0286    * When disabled a new allocator is created for each transport servers and clients.
0287    */
0288   public boolean sharedByteBufAllocators() {
0289     return conf.getBoolean("spark.network.sharedByteBufAllocators.enabled", true);
0290   }
0291 
0292   /**
0293   * If enabled then off-heap byte buffers will be prefered for the shared ByteBuf allocators.
0294   */
0295   public boolean preferDirectBufsForSharedByteBufAllocators() {
0296     return conf.getBoolean("spark.network.io.preferDirectBufs", true);
0297   }
0298 
0299   /**
0300    * The commons-crypto configuration for the module.
0301    */
0302   public Properties cryptoConf() {
0303     return CryptoUtils.toCryptoConf("spark.network.crypto.config.", conf.getAll());
0304   }
0305 
0306   /**
0307    * The max number of chunks allowed to be transferred at the same time on shuffle service.
0308    * Note that new incoming connections will be closed when the max number is hit. The client will
0309    * retry according to the shuffle retry configs (see `spark.shuffle.io.maxRetries` and
0310    * `spark.shuffle.io.retryWait`), if those limits are reached the task will fail with fetch
0311    * failure.
0312    */
0313   public long maxChunksBeingTransferred() {
0314     return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE);
0315   }
0316 
0317   /**
0318    * Percentage of io.serverThreads used by netty to process ChunkFetchRequest.
0319    * When the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent` is set,
0320    * shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages.
0321    * Although when calling the async writeAndFlush on the underlying channel to send
0322    * response back to client, the I/O on the channel is still being handled by
0323    * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup
0324    * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler
0325    * threads for the completion of sending back responses, we are able to put a limit on
0326    * the max number of threads from TransportServer's default EventLoopGroup that are
0327    * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive
0328    * and could take long time to process due to disk contentions. By configuring a slightly
0329    * higher number of shuffler server threads, we are able to reserve some threads for
0330    * handling other RPC messages, thus making the Client less likely to experience timeout
0331    * when sending RPC messages to the shuffle server. The number of threads used for handling
0332    * chunked fetch requests are percentage of io.serverThreads (if defined) else it is a percentage
0333    * of 2 * #cores. However, a percentage of 0 means netty default number of threads which
0334    * is 2 * #cores ignoring io.serverThreads. The percentage here is configured via
0335    * spark.shuffle.server.chunkFetchHandlerThreadsPercent. The returned value is rounded off to
0336    * ceiling of the nearest integer.
0337    */
0338   public int chunkFetchHandlerThreads() {
0339     if (!this.getModuleName().equalsIgnoreCase("shuffle")) {
0340       return 0;
0341     }
0342     int chunkFetchHandlerThreadsPercent =
0343       Integer.parseInt(conf.get("spark.shuffle.server.chunkFetchHandlerThreadsPercent"));
0344     int threads =
0345       this.serverThreads() > 0 ? this.serverThreads() : 2 * NettyRuntime.availableProcessors();
0346     return (int) Math.ceil(threads * (chunkFetchHandlerThreadsPercent / 100.0));
0347   }
0348 
0349   /**
0350    * Whether to use a separate EventLoopGroup to process ChunkFetchRequest messages, it is decided
0351    * by the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent` is set or not.
0352    */
0353   public boolean separateChunkFetchRequest() {
0354     return conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0) > 0;
0355   }
0356 
0357   /**
0358    * Whether to use the old protocol while doing the shuffle block fetching.
0359    * It is only enabled while we need the compatibility in the scenario of new spark version
0360    * job fetching blocks from old version external shuffle service.
0361    */
0362   public boolean useOldFetchProtocol() {
0363     return conf.getBoolean("spark.shuffle.useOldFetchProtocol", false);
0364   }
0365 
0366 }