0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.yarn;
0019
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.nio.charset.StandardCharsets;
0023 import java.nio.ByteBuffer;
0024 import java.util.List;
0025 import java.util.Map;
0026 import java.util.Objects;
0027
0028 import com.fasterxml.jackson.annotation.JsonCreator;
0029 import com.fasterxml.jackson.annotation.JsonProperty;
0030 import com.fasterxml.jackson.databind.ObjectMapper;
0031 import com.google.common.annotations.VisibleForTesting;
0032 import com.google.common.base.Preconditions;
0033 import com.google.common.collect.Lists;
0034 import org.apache.commons.lang3.builder.ToStringBuilder;
0035 import org.apache.commons.lang3.builder.ToStringStyle;
0036 import org.apache.hadoop.conf.Configuration;
0037 import org.apache.hadoop.fs.FileSystem;
0038 import org.apache.hadoop.fs.Path;
0039 import org.apache.hadoop.fs.permission.FsPermission;
0040 import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
0041 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
0042 import org.apache.hadoop.yarn.api.records.ContainerId;
0043 import org.apache.hadoop.yarn.server.api.*;
0044 import org.apache.spark.network.util.LevelDBProvider;
0045 import org.iq80.leveldb.DB;
0046 import org.iq80.leveldb.DBIterator;
0047
0048 import org.slf4j.Logger;
0049 import org.slf4j.LoggerFactory;
0050
0051 import org.apache.spark.network.TransportContext;
0052 import org.apache.spark.network.crypto.AuthServerBootstrap;
0053 import org.apache.spark.network.sasl.ShuffleSecretManager;
0054 import org.apache.spark.network.server.TransportServer;
0055 import org.apache.spark.network.server.TransportServerBootstrap;
0056 import org.apache.spark.network.shuffle.ExternalBlockHandler;
0057 import org.apache.spark.network.util.TransportConf;
0058 import org.apache.spark.network.yarn.util.HadoopConfigProvider;
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078 public class YarnShuffleService extends AuxiliaryService {
0079 private static final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
0080
0081
0082 private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
0083 private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
0084
0085
0086 private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
0087 private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
0088
0089 private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
0090 private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb";
0091
0092
0093 @VisibleForTesting
0094 static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
0095 private static final boolean DEFAULT_STOP_ON_FAILURE = false;
0096
0097
0098 @VisibleForTesting
0099 static int boundPort = -1;
0100 private static final ObjectMapper mapper = new ObjectMapper();
0101 private static final String APP_CREDS_KEY_PREFIX = "AppCreds";
0102 private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider
0103 .StoreVersion(1, 0);
0104
0105
0106
0107 @VisibleForTesting
0108 static YarnShuffleService instance;
0109
0110
0111
0112 @VisibleForTesting
0113 ShuffleSecretManager secretManager;
0114
0115
0116 private TransportServer shuffleServer = null;
0117
0118 private TransportContext transportContext = null;
0119
0120 private Configuration _conf = null;
0121
0122
0123 @VisibleForTesting
0124 Path _recoveryPath = null;
0125
0126
0127 @VisibleForTesting
0128 ExternalBlockHandler blockHandler;
0129
0130
0131 @VisibleForTesting
0132 File registeredExecutorFile;
0133
0134
0135 @VisibleForTesting
0136 File secretsFile;
0137
0138 private DB db;
0139
0140 public YarnShuffleService() {
0141 super("spark_shuffle");
0142 logger.info("Initializing YARN shuffle service for Spark");
0143 instance = this;
0144 }
0145
0146
0147
0148
0149
0150
0151 private boolean isAuthenticationEnabled() {
0152 return secretManager != null;
0153 }
0154
0155
0156
0157
0158 @Override
0159 protected void serviceInit(Configuration conf) throws Exception {
0160 _conf = conf;
0161
0162 boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);
0163
0164 try {
0165
0166
0167
0168
0169
0170 if (_recoveryPath != null) {
0171 registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
0172 }
0173
0174 TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
0175 blockHandler = new ExternalBlockHandler(transportConf, registeredExecutorFile);
0176
0177
0178
0179 List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
0180 boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
0181 if (authEnabled) {
0182 secretManager = new ShuffleSecretManager();
0183 if (_recoveryPath != null) {
0184 loadSecretsFromDb();
0185 }
0186 bootstraps.add(new AuthServerBootstrap(transportConf, secretManager));
0187 }
0188
0189 int port = conf.getInt(
0190 SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
0191 transportContext = new TransportContext(transportConf, blockHandler, true);
0192 shuffleServer = transportContext.createServer(port, bootstraps);
0193
0194 port = shuffleServer.getPort();
0195 boundPort = port;
0196 String authEnabledString = authEnabled ? "enabled" : "not enabled";
0197
0198
0199 blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections",
0200 shuffleServer.getRegisteredConnections());
0201 YarnShuffleServiceMetrics serviceMetrics =
0202 new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
0203
0204 MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
0205 metricsSystem.register(
0206 "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics);
0207 logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
0208
0209 logger.info("Started YARN shuffle service for Spark on port {}. " +
0210 "Authentication is {}. Registered executor file is {}", port, authEnabledString,
0211 registeredExecutorFile);
0212 } catch (Exception e) {
0213 if (stopOnFailure) {
0214 throw e;
0215 } else {
0216 noteFailure(e);
0217 }
0218 }
0219 }
0220
0221 private void loadSecretsFromDb() throws IOException {
0222 secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
0223
0224
0225 FileSystem fs = FileSystem.getLocal(_conf);
0226 fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700));
0227
0228 db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
0229 logger.info("Recovery location is: " + secretsFile.getPath());
0230 if (db != null) {
0231 logger.info("Going to reload spark shuffle data");
0232 DBIterator itr = db.iterator();
0233 itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
0234 while (itr.hasNext()) {
0235 Map.Entry<byte[], byte[]> e = itr.next();
0236 String key = new String(e.getKey(), StandardCharsets.UTF_8);
0237 if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
0238 break;
0239 }
0240 String id = parseDbAppKey(key);
0241 ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);
0242 logger.info("Reloading tokens for app: " + id);
0243 secretManager.registerApp(id, secret);
0244 }
0245 }
0246 }
0247
0248 private static String parseDbAppKey(String s) throws IOException {
0249 if (!s.startsWith(APP_CREDS_KEY_PREFIX)) {
0250 throw new IllegalArgumentException("expected a string starting with " + APP_CREDS_KEY_PREFIX);
0251 }
0252 String json = s.substring(APP_CREDS_KEY_PREFIX.length() + 1);
0253 AppId parsed = mapper.readValue(json, AppId.class);
0254 return parsed.appId;
0255 }
0256
0257 private static byte[] dbAppKey(AppId appExecId) throws IOException {
0258
0259 String appExecJson = mapper.writeValueAsString(appExecId);
0260 String key = (APP_CREDS_KEY_PREFIX + ";" + appExecJson);
0261 return key.getBytes(StandardCharsets.UTF_8);
0262 }
0263
0264 @Override
0265 public void initializeApplication(ApplicationInitializationContext context) {
0266 String appId = context.getApplicationId().toString();
0267 try {
0268 ByteBuffer shuffleSecret = context.getApplicationDataForService();
0269 if (isAuthenticationEnabled()) {
0270 AppId fullId = new AppId(appId);
0271 if (db != null) {
0272 byte[] key = dbAppKey(fullId);
0273 byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8);
0274 db.put(key, value);
0275 }
0276 secretManager.registerApp(appId, shuffleSecret);
0277 }
0278 } catch (Exception e) {
0279 logger.error("Exception when initializing application {}", appId, e);
0280 }
0281 }
0282
0283 @Override
0284 public void stopApplication(ApplicationTerminationContext context) {
0285 String appId = context.getApplicationId().toString();
0286 try {
0287 if (isAuthenticationEnabled()) {
0288 AppId fullId = new AppId(appId);
0289 if (db != null) {
0290 try {
0291 db.delete(dbAppKey(fullId));
0292 } catch (IOException e) {
0293 logger.error("Error deleting {} from executor state db", appId, e);
0294 }
0295 }
0296 secretManager.unregisterApp(appId);
0297 }
0298 blockHandler.applicationRemoved(appId, false );
0299 } catch (Exception e) {
0300 logger.error("Exception when stopping application {}", appId, e);
0301 }
0302 }
0303
0304 @Override
0305 public void initializeContainer(ContainerInitializationContext context) {
0306 ContainerId containerId = context.getContainerId();
0307 logger.info("Initializing container {}", containerId);
0308 }
0309
0310 @Override
0311 public void stopContainer(ContainerTerminationContext context) {
0312 ContainerId containerId = context.getContainerId();
0313 logger.info("Stopping container {}", containerId);
0314 }
0315
0316
0317
0318
0319 @Override
0320 protected void serviceStop() {
0321 try {
0322 if (shuffleServer != null) {
0323 shuffleServer.close();
0324 }
0325 if (transportContext != null) {
0326 transportContext.close();
0327 }
0328 if (blockHandler != null) {
0329 blockHandler.close();
0330 }
0331 if (db != null) {
0332 db.close();
0333 }
0334 } catch (Exception e) {
0335 logger.error("Exception when stopping service", e);
0336 }
0337 }
0338
0339
0340 @Override
0341 public ByteBuffer getMetaData() {
0342 return ByteBuffer.allocate(0);
0343 }
0344
0345
0346
0347
0348
0349 @Override
0350 public void setRecoveryPath(Path recoveryPath) {
0351 _recoveryPath = recoveryPath;
0352 }
0353
0354
0355
0356
0357 protected Path getRecoveryPath(String fileName) {
0358 return _recoveryPath;
0359 }
0360
0361
0362
0363
0364
0365 protected File initRecoveryDb(String dbName) {
0366 Preconditions.checkNotNull(_recoveryPath,
0367 "recovery path should not be null if NM recovery is enabled");
0368
0369 File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName);
0370 if (recoveryFile.exists()) {
0371 return recoveryFile;
0372 }
0373
0374
0375 String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
0376 for (String dir : localDirs) {
0377 File f = new File(new Path(dir).toUri().getPath(), dbName);
0378 if (f.exists()) {
0379
0380
0381
0382
0383
0384 Path newLoc = new Path(_recoveryPath, dbName);
0385 Path copyFrom = new Path(f.toURI());
0386 if (!newLoc.equals(copyFrom)) {
0387 logger.info("Moving " + copyFrom + " to: " + newLoc);
0388 try {
0389
0390 FileSystem fs = FileSystem.getLocal(_conf);
0391 fs.rename(copyFrom, newLoc);
0392 } catch (Exception e) {
0393
0394 logger.error("Failed to move recovery file {} to the path {}",
0395 dbName, _recoveryPath.toString(), e);
0396 }
0397 }
0398 return new File(newLoc.toUri().getPath());
0399 }
0400 }
0401
0402 return new File(_recoveryPath.toUri().getPath(), dbName);
0403 }
0404
0405
0406
0407
0408 public static class AppId {
0409 public final String appId;
0410
0411 @JsonCreator
0412 public AppId(@JsonProperty("appId") String appId) {
0413 this.appId = appId;
0414 }
0415
0416 @Override
0417 public boolean equals(Object o) {
0418 if (this == o) return true;
0419 if (o == null || getClass() != o.getClass()) return false;
0420
0421 AppId appExecId = (AppId) o;
0422 return Objects.equals(appId, appExecId.appId);
0423 }
0424
0425 @Override
0426 public int hashCode() {
0427 return Objects.hashCode(appId);
0428 }
0429
0430 @Override
0431 public String toString() {
0432 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0433 .append("appId", appId)
0434 .toString();
0435 }
0436 }
0437
0438 }