Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.yarn;
0019 
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.nio.charset.StandardCharsets;
0023 import java.nio.ByteBuffer;
0024 import java.util.List;
0025 import java.util.Map;
0026 import java.util.Objects;
0027 
0028 import com.fasterxml.jackson.annotation.JsonCreator;
0029 import com.fasterxml.jackson.annotation.JsonProperty;
0030 import com.fasterxml.jackson.databind.ObjectMapper;
0031 import com.google.common.annotations.VisibleForTesting;
0032 import com.google.common.base.Preconditions;
0033 import com.google.common.collect.Lists;
0034 import org.apache.commons.lang3.builder.ToStringBuilder;
0035 import org.apache.commons.lang3.builder.ToStringStyle;
0036 import org.apache.hadoop.conf.Configuration;
0037 import org.apache.hadoop.fs.FileSystem;
0038 import org.apache.hadoop.fs.Path;
0039 import org.apache.hadoop.fs.permission.FsPermission;
0040 import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
0041 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
0042 import org.apache.hadoop.yarn.api.records.ContainerId;
0043 import org.apache.hadoop.yarn.server.api.*;
0044 import org.apache.spark.network.util.LevelDBProvider;
0045 import org.iq80.leveldb.DB;
0046 import org.iq80.leveldb.DBIterator;
0047 
0048 import org.slf4j.Logger;
0049 import org.slf4j.LoggerFactory;
0050 
0051 import org.apache.spark.network.TransportContext;
0052 import org.apache.spark.network.crypto.AuthServerBootstrap;
0053 import org.apache.spark.network.sasl.ShuffleSecretManager;
0054 import org.apache.spark.network.server.TransportServer;
0055 import org.apache.spark.network.server.TransportServerBootstrap;
0056 import org.apache.spark.network.shuffle.ExternalBlockHandler;
0057 import org.apache.spark.network.util.TransportConf;
0058 import org.apache.spark.network.yarn.util.HadoopConfigProvider;
0059 
0060 /**
0061  * An external shuffle service used by Spark on Yarn.
0062  *
0063  * This is intended to be a long-running auxiliary service that runs in the NodeManager process.
0064  * A Spark application may connect to this service by setting `spark.shuffle.service.enabled`.
0065  * The application also automatically derives the service port through `spark.shuffle.service.port`
0066  * specified in the Yarn configuration. This is so that both the clients and the server agree on
0067  * the same port to communicate on.
0068  *
0069  * The service also optionally supports authentication. This ensures that executors from one
0070  * application cannot read the shuffle files written by those from another. This feature can be
0071  * enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM.
0072  * Note that the Spark application must also set `spark.authenticate` manually and, unlike in
0073  * the case of the service port, will not inherit this setting from the Yarn configuration. This
0074  * is because an application running on the same Yarn cluster may choose to not use the external
0075  * shuffle service, in which case its setting of `spark.authenticate` should be independent of
0076  * the service's.
0077  */
0078 public class YarnShuffleService extends AuxiliaryService {
0079   private static final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
0080 
0081   // Port on which the shuffle server listens for fetch requests
0082   private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
0083   private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
0084 
0085   // Whether the shuffle server should authenticate fetch requests
0086   private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
0087   private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
0088 
0089   private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
0090   private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb";
0091 
0092   // Whether failure during service initialization should stop the NM.
0093   @VisibleForTesting
0094   static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
0095   private static final boolean DEFAULT_STOP_ON_FAILURE = false;
0096 
0097   // just for testing when you want to find an open port
0098   @VisibleForTesting
0099   static int boundPort = -1;
0100   private static final ObjectMapper mapper = new ObjectMapper();
0101   private static final String APP_CREDS_KEY_PREFIX = "AppCreds";
0102   private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider
0103       .StoreVersion(1, 0);
0104 
0105   // just for integration tests that want to look at this file -- in general not sensible as
0106   // a static
0107   @VisibleForTesting
0108   static YarnShuffleService instance;
0109 
0110   // An entity that manages the shuffle secret per application
0111   // This is used only if authentication is enabled
0112   @VisibleForTesting
0113   ShuffleSecretManager secretManager;
0114 
0115   // The actual server that serves shuffle files
0116   private TransportServer shuffleServer = null;
0117 
0118   private TransportContext transportContext = null;
0119 
0120   private Configuration _conf = null;
0121 
0122   // The recovery path used to shuffle service recovery
0123   @VisibleForTesting
0124   Path _recoveryPath = null;
0125 
0126   // Handles registering executors and opening shuffle blocks
0127   @VisibleForTesting
0128   ExternalBlockHandler blockHandler;
0129 
0130   // Where to store & reload executor info for recovering state after an NM restart
0131   @VisibleForTesting
0132   File registeredExecutorFile;
0133 
0134   // Where to store & reload application secrets for recovering state after an NM restart
0135   @VisibleForTesting
0136   File secretsFile;
0137 
0138   private DB db;
0139 
0140   public YarnShuffleService() {
0141     super("spark_shuffle");
0142     logger.info("Initializing YARN shuffle service for Spark");
0143     instance = this;
0144   }
0145 
0146   /**
0147    * Return whether authentication is enabled as specified by the configuration.
0148    * If so, fetch requests will fail unless the appropriate authentication secret
0149    * for the application is provided.
0150    */
0151   private boolean isAuthenticationEnabled() {
0152     return secretManager != null;
0153   }
0154 
0155   /**
0156    * Start the shuffle server with the given configuration.
0157    */
0158   @Override
0159   protected void serviceInit(Configuration conf) throws Exception {
0160     _conf = conf;
0161 
0162     boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);
0163 
0164     try {
0165       // In case this NM was killed while there were running spark applications, we need to restore
0166       // lost state for the existing executors. We look for an existing file in the NM's local dirs.
0167       // If we don't find one, then we choose a file to use to save the state next time.  Even if
0168       // an application was stopped while the NM was down, we expect yarn to call stopApplication()
0169       // when it comes back
0170       if (_recoveryPath != null) {
0171         registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
0172       }
0173 
0174       TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
0175       blockHandler = new ExternalBlockHandler(transportConf, registeredExecutorFile);
0176 
0177       // If authentication is enabled, set up the shuffle server to use a
0178       // special RPC handler that filters out unauthenticated fetch requests
0179       List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
0180       boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
0181       if (authEnabled) {
0182         secretManager = new ShuffleSecretManager();
0183         if (_recoveryPath != null) {
0184           loadSecretsFromDb();
0185         }
0186         bootstraps.add(new AuthServerBootstrap(transportConf, secretManager));
0187       }
0188 
0189       int port = conf.getInt(
0190         SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
0191       transportContext = new TransportContext(transportConf, blockHandler, true);
0192       shuffleServer = transportContext.createServer(port, bootstraps);
0193       // the port should normally be fixed, but for tests its useful to find an open port
0194       port = shuffleServer.getPort();
0195       boundPort = port;
0196       String authEnabledString = authEnabled ? "enabled" : "not enabled";
0197 
0198       // register metrics on the block handler into the Node Manager's metrics system.
0199       blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections",
0200           shuffleServer.getRegisteredConnections());
0201       YarnShuffleServiceMetrics serviceMetrics =
0202           new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
0203 
0204       MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
0205       metricsSystem.register(
0206           "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics);
0207       logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
0208 
0209       logger.info("Started YARN shuffle service for Spark on port {}. " +
0210         "Authentication is {}.  Registered executor file is {}", port, authEnabledString,
0211         registeredExecutorFile);
0212     } catch (Exception e) {
0213       if (stopOnFailure) {
0214         throw e;
0215       } else {
0216         noteFailure(e);
0217       }
0218     }
0219   }
0220 
0221   private void loadSecretsFromDb() throws IOException {
0222     secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
0223 
0224     // Make sure this is protected in case its not in the NM recovery dir
0225     FileSystem fs = FileSystem.getLocal(_conf);
0226     fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700));
0227 
0228     db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
0229     logger.info("Recovery location is: " + secretsFile.getPath());
0230     if (db != null) {
0231       logger.info("Going to reload spark shuffle data");
0232       DBIterator itr = db.iterator();
0233       itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
0234       while (itr.hasNext()) {
0235         Map.Entry<byte[], byte[]> e = itr.next();
0236         String key = new String(e.getKey(), StandardCharsets.UTF_8);
0237         if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
0238           break;
0239         }
0240         String id = parseDbAppKey(key);
0241         ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);
0242         logger.info("Reloading tokens for app: " + id);
0243         secretManager.registerApp(id, secret);
0244       }
0245     }
0246   }
0247 
0248   private static String parseDbAppKey(String s) throws IOException {
0249     if (!s.startsWith(APP_CREDS_KEY_PREFIX)) {
0250       throw new IllegalArgumentException("expected a string starting with " + APP_CREDS_KEY_PREFIX);
0251     }
0252     String json = s.substring(APP_CREDS_KEY_PREFIX.length() + 1);
0253     AppId parsed = mapper.readValue(json, AppId.class);
0254     return parsed.appId;
0255   }
0256 
0257   private static byte[] dbAppKey(AppId appExecId) throws IOException {
0258     // we stick a common prefix on all the keys so we can find them in the DB
0259     String appExecJson = mapper.writeValueAsString(appExecId);
0260     String key = (APP_CREDS_KEY_PREFIX + ";" + appExecJson);
0261     return key.getBytes(StandardCharsets.UTF_8);
0262   }
0263 
0264   @Override
0265   public void initializeApplication(ApplicationInitializationContext context) {
0266     String appId = context.getApplicationId().toString();
0267     try {
0268       ByteBuffer shuffleSecret = context.getApplicationDataForService();
0269       if (isAuthenticationEnabled()) {
0270         AppId fullId = new AppId(appId);
0271         if (db != null) {
0272           byte[] key = dbAppKey(fullId);
0273           byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8);
0274           db.put(key, value);
0275         }
0276         secretManager.registerApp(appId, shuffleSecret);
0277       }
0278     } catch (Exception e) {
0279       logger.error("Exception when initializing application {}", appId, e);
0280     }
0281   }
0282 
0283   @Override
0284   public void stopApplication(ApplicationTerminationContext context) {
0285     String appId = context.getApplicationId().toString();
0286     try {
0287       if (isAuthenticationEnabled()) {
0288         AppId fullId = new AppId(appId);
0289         if (db != null) {
0290           try {
0291             db.delete(dbAppKey(fullId));
0292           } catch (IOException e) {
0293             logger.error("Error deleting {} from executor state db", appId, e);
0294           }
0295         }
0296         secretManager.unregisterApp(appId);
0297       }
0298       blockHandler.applicationRemoved(appId, false /* clean up local dirs */);
0299     } catch (Exception e) {
0300       logger.error("Exception when stopping application {}", appId, e);
0301     }
0302   }
0303 
0304   @Override
0305   public void initializeContainer(ContainerInitializationContext context) {
0306     ContainerId containerId = context.getContainerId();
0307     logger.info("Initializing container {}", containerId);
0308   }
0309 
0310   @Override
0311   public void stopContainer(ContainerTerminationContext context) {
0312     ContainerId containerId = context.getContainerId();
0313     logger.info("Stopping container {}", containerId);
0314   }
0315 
0316   /**
0317    * Close the shuffle server to clean up any associated state.
0318    */
0319   @Override
0320   protected void serviceStop() {
0321     try {
0322       if (shuffleServer != null) {
0323         shuffleServer.close();
0324       }
0325       if (transportContext != null) {
0326         transportContext.close();
0327       }
0328       if (blockHandler != null) {
0329         blockHandler.close();
0330       }
0331       if (db != null) {
0332         db.close();
0333       }
0334     } catch (Exception e) {
0335       logger.error("Exception when stopping service", e);
0336     }
0337   }
0338 
0339   // Not currently used
0340   @Override
0341   public ByteBuffer getMetaData() {
0342     return ByteBuffer.allocate(0);
0343   }
0344 
0345   /**
0346    * Set the recovery path for shuffle service recovery when NM is restarted. This will be call
0347    * by NM if NM recovery is enabled.
0348    */
0349   @Override
0350   public void setRecoveryPath(Path recoveryPath) {
0351     _recoveryPath = recoveryPath;
0352   }
0353 
0354   /**
0355    * Get the path specific to this auxiliary service to use for recovery.
0356    */
0357   protected Path getRecoveryPath(String fileName) {
0358     return _recoveryPath;
0359   }
0360 
0361   /**
0362    * Figure out the recovery path and handle moving the DB if YARN NM recovery gets enabled
0363    * and DB exists in the local dir of NM by old version of shuffle service.
0364    */
0365   protected File initRecoveryDb(String dbName) {
0366     Preconditions.checkNotNull(_recoveryPath,
0367       "recovery path should not be null if NM recovery is enabled");
0368 
0369     File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName);
0370     if (recoveryFile.exists()) {
0371       return recoveryFile;
0372     }
0373 
0374     // db doesn't exist in recovery path go check local dirs for it
0375     String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
0376     for (String dir : localDirs) {
0377       File f = new File(new Path(dir).toUri().getPath(), dbName);
0378       if (f.exists()) {
0379         // If the recovery path is set then either NM recovery is enabled or another recovery
0380         // DB has been initialized. If NM recovery is enabled and had set the recovery path
0381         // make sure to move all DBs to the recovery path from the old NM local dirs.
0382         // If another DB was initialized first just make sure all the DBs are in the same
0383         // location.
0384         Path newLoc = new Path(_recoveryPath, dbName);
0385         Path copyFrom = new Path(f.toURI());
0386         if (!newLoc.equals(copyFrom)) {
0387           logger.info("Moving " + copyFrom + " to: " + newLoc);
0388           try {
0389             // The move here needs to handle moving non-empty directories across NFS mounts
0390             FileSystem fs = FileSystem.getLocal(_conf);
0391             fs.rename(copyFrom, newLoc);
0392           } catch (Exception e) {
0393             // Fail to move recovery file to new path, just continue on with new DB location
0394             logger.error("Failed to move recovery file {} to the path {}",
0395               dbName, _recoveryPath.toString(), e);
0396           }
0397         }
0398         return new File(newLoc.toUri().getPath());
0399       }
0400     }
0401 
0402     return new File(_recoveryPath.toUri().getPath(), dbName);
0403   }
0404 
0405   /**
0406    * Simply encodes an application ID.
0407    */
0408   public static class AppId {
0409     public final String appId;
0410 
0411     @JsonCreator
0412     public AppId(@JsonProperty("appId") String appId) {
0413       this.appId = appId;
0414     }
0415 
0416     @Override
0417     public boolean equals(Object o) {
0418       if (this == o) return true;
0419       if (o == null || getClass() != o.getClass()) return false;
0420 
0421       AppId appExecId = (AppId) o;
0422       return Objects.equals(appId, appExecId.appId);
0423     }
0424 
0425     @Override
0426     public int hashCode() {
0427       return Objects.hashCode(appId);
0428     }
0429 
0430     @Override
0431     public String toString() {
0432       return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0433           .append("appId", appId)
0434           .toString();
0435     }
0436   }
0437 
0438 }