0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.util;
0019
0020 import java.util.Locale;
0021 import java.util.Properties;
0022
0023 import com.google.common.primitives.Ints;
0024 import io.netty.util.NettyRuntime;
0025
0026
0027
0028
0029 public class TransportConf {
0030
0031 private final String SPARK_NETWORK_IO_MODE_KEY;
0032 private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY;
0033 private final String SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY;
0034 private final String SPARK_NETWORK_IO_BACKLOG_KEY;
0035 private final String SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY;
0036 private final String SPARK_NETWORK_IO_SERVERTHREADS_KEY;
0037 private final String SPARK_NETWORK_IO_CLIENTTHREADS_KEY;
0038 private final String SPARK_NETWORK_IO_RECEIVEBUFFER_KEY;
0039 private final String SPARK_NETWORK_IO_SENDBUFFER_KEY;
0040 private final String SPARK_NETWORK_SASL_TIMEOUT_KEY;
0041 private final String SPARK_NETWORK_IO_MAXRETRIES_KEY;
0042 private final String SPARK_NETWORK_IO_RETRYWAIT_KEY;
0043 private final String SPARK_NETWORK_IO_LAZYFD_KEY;
0044 private final String SPARK_NETWORK_VERBOSE_METRICS;
0045 private final String SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY;
0046
0047 private final ConfigProvider conf;
0048
0049 private final String module;
0050
0051 public TransportConf(String module, ConfigProvider conf) {
0052 this.module = module;
0053 this.conf = conf;
0054 SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
0055 SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
0056 SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
0057 SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog");
0058 SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY = getConfKey("io.numConnectionsPerPeer");
0059 SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads");
0060 SPARK_NETWORK_IO_CLIENTTHREADS_KEY = getConfKey("io.clientThreads");
0061 SPARK_NETWORK_IO_RECEIVEBUFFER_KEY = getConfKey("io.receiveBuffer");
0062 SPARK_NETWORK_IO_SENDBUFFER_KEY = getConfKey("io.sendBuffer");
0063 SPARK_NETWORK_SASL_TIMEOUT_KEY = getConfKey("sasl.timeout");
0064 SPARK_NETWORK_IO_MAXRETRIES_KEY = getConfKey("io.maxRetries");
0065 SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");
0066 SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
0067 SPARK_NETWORK_VERBOSE_METRICS = getConfKey("io.enableVerboseMetrics");
0068 SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY = getConfKey("io.enableTcpKeepAlive");
0069 }
0070
0071 public int getInt(String name, int defaultValue) {
0072 return conf.getInt(name, defaultValue);
0073 }
0074
0075 public String get(String name, String defaultValue) {
0076 return conf.get(name, defaultValue);
0077 }
0078
0079 private String getConfKey(String suffix) {
0080 return "spark." + module + "." + suffix;
0081 }
0082
0083 public String getModuleName() {
0084 return module;
0085 }
0086
0087
0088 public String ioMode() {
0089 return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(Locale.ROOT);
0090 }
0091
0092
0093 public boolean preferDirectBufs() {
0094 return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true);
0095 }
0096
0097
0098 public int connectionTimeoutMs() {
0099 long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
0100 conf.get("spark.network.timeout", "120s"));
0101 long defaultTimeoutMs = JavaUtils.timeStringAsSec(
0102 conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
0103 return (int) defaultTimeoutMs;
0104 }
0105
0106
0107 public int numConnectionsPerPeer() {
0108 return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);
0109 }
0110
0111
0112
0113
0114
0115
0116 public int backLog() { return conf.getInt(SPARK_NETWORK_IO_BACKLOG_KEY, -1); }
0117
0118
0119 public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }
0120
0121
0122 public int clientThreads() { return conf.getInt(SPARK_NETWORK_IO_CLIENTTHREADS_KEY, 0); }
0123
0124
0125
0126
0127
0128
0129
0130
0131 public int receiveBuf() { return conf.getInt(SPARK_NETWORK_IO_RECEIVEBUFFER_KEY, -1); }
0132
0133
0134 public int sendBuf() { return conf.getInt(SPARK_NETWORK_IO_SENDBUFFER_KEY, -1); }
0135
0136
0137 public int authRTTimeoutMs() {
0138 return (int) JavaUtils.timeStringAsSec(conf.get("spark.network.auth.rpcTimeout",
0139 conf.get(SPARK_NETWORK_SASL_TIMEOUT_KEY, "30s"))) * 1000;
0140 }
0141
0142
0143
0144
0145
0146 public int maxIORetries() { return conf.getInt(SPARK_NETWORK_IO_MAXRETRIES_KEY, 3); }
0147
0148
0149
0150
0151
0152 public int ioRetryWaitTimeMs() {
0153 return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_IO_RETRYWAIT_KEY, "5s")) * 1000;
0154 }
0155
0156
0157
0158
0159
0160
0161 public int memoryMapBytes() {
0162 return Ints.checkedCast(JavaUtils.byteStringAsBytes(
0163 conf.get("spark.storage.memoryMapThreshold", "2m")));
0164 }
0165
0166
0167
0168
0169
0170 public boolean lazyFileDescriptor() {
0171 return conf.getBoolean(SPARK_NETWORK_IO_LAZYFD_KEY, true);
0172 }
0173
0174
0175
0176
0177
0178 public boolean verboseMetrics() {
0179 return conf.getBoolean(SPARK_NETWORK_VERBOSE_METRICS, false);
0180 }
0181
0182
0183
0184
0185
0186 public boolean enableTcpKeepAlive() {
0187 return conf.getBoolean(SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY, false);
0188 }
0189
0190
0191
0192
0193 public int portMaxRetries() {
0194 return conf.getInt("spark.port.maxRetries", 16);
0195 }
0196
0197
0198
0199
0200 public boolean encryptionEnabled() {
0201 return conf.getBoolean("spark.network.crypto.enabled", false);
0202 }
0203
0204
0205
0206
0207 public String cipherTransformation() {
0208 return conf.get("spark.network.crypto.cipher", "AES/CTR/NoPadding");
0209 }
0210
0211
0212
0213
0214
0215 public String keyFactoryAlgorithm() {
0216 return conf.get("spark.network.crypto.keyFactoryAlgorithm", "PBKDF2WithHmacSHA1");
0217 }
0218
0219
0220
0221
0222
0223
0224
0225
0226 public int keyFactoryIterations() {
0227 return conf.getInt("spark.network.crypto.keyFactoryIterations", 1024);
0228 }
0229
0230
0231
0232
0233 public int encryptionKeyLength() {
0234 return conf.getInt("spark.network.crypto.keyLength", 128);
0235 }
0236
0237
0238
0239
0240 public int ivLength() {
0241 return conf.getInt("spark.network.crypto.ivLength", 16);
0242 }
0243
0244
0245
0246
0247
0248 public String keyAlgorithm() {
0249 return conf.get("spark.network.crypto.keyAlgorithm", "AES");
0250 }
0251
0252
0253
0254
0255
0256 public boolean saslFallback() {
0257 return conf.getBoolean("spark.network.crypto.saslFallback", true);
0258 }
0259
0260
0261
0262
0263 public boolean saslEncryption() {
0264 return conf.getBoolean("spark.authenticate.enableSaslEncryption", false);
0265 }
0266
0267
0268
0269
0270 public int maxSaslEncryptedBlockSize() {
0271 return Ints.checkedCast(JavaUtils.byteStringAsBytes(
0272 conf.get("spark.network.sasl.maxEncryptedBlockSize", "64k")));
0273 }
0274
0275
0276
0277
0278 public boolean saslServerAlwaysEncrypt() {
0279 return conf.getBoolean("spark.network.sasl.serverAlwaysEncrypt", false);
0280 }
0281
0282
0283
0284
0285
0286
0287
0288 public boolean sharedByteBufAllocators() {
0289 return conf.getBoolean("spark.network.sharedByteBufAllocators.enabled", true);
0290 }
0291
0292
0293
0294
0295 public boolean preferDirectBufsForSharedByteBufAllocators() {
0296 return conf.getBoolean("spark.network.io.preferDirectBufs", true);
0297 }
0298
0299
0300
0301
0302 public Properties cryptoConf() {
0303 return CryptoUtils.toCryptoConf("spark.network.crypto.config.", conf.getAll());
0304 }
0305
0306
0307
0308
0309
0310
0311
0312
0313 public long maxChunksBeingTransferred() {
0314 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE);
0315 }
0316
0317
0318
0319
0320
0321
0322
0323
0324
0325
0326
0327
0328
0329
0330
0331
0332
0333
0334
0335
0336
0337
0338 public int chunkFetchHandlerThreads() {
0339 if (!this.getModuleName().equalsIgnoreCase("shuffle")) {
0340 return 0;
0341 }
0342 int chunkFetchHandlerThreadsPercent =
0343 Integer.parseInt(conf.get("spark.shuffle.server.chunkFetchHandlerThreadsPercent"));
0344 int threads =
0345 this.serverThreads() > 0 ? this.serverThreads() : 2 * NettyRuntime.availableProcessors();
0346 return (int) Math.ceil(threads * (chunkFetchHandlerThreadsPercent / 100.0));
0347 }
0348
0349
0350
0351
0352
0353 public boolean separateChunkFetchRequest() {
0354 return conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0) > 0;
0355 }
0356
0357
0358
0359
0360
0361
0362 public boolean useOldFetchProtocol() {
0363 return conf.getBoolean("spark.shuffle.useOldFetchProtocol", false);
0364 }
0365
0366 }