Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.sasl;
0019 
0020 import java.nio.ByteBuffer;
0021 import java.util.concurrent.ConcurrentHashMap;
0022 
0023 import org.slf4j.Logger;
0024 import org.slf4j.LoggerFactory;
0025 
0026 import org.apache.spark.network.util.JavaUtils;
0027 
0028 /**
0029  * A class that manages shuffle secret used by the external shuffle service.
0030  */
0031 public class ShuffleSecretManager implements SecretKeyHolder {
0032   private static final Logger logger = LoggerFactory.getLogger(ShuffleSecretManager.class);
0033 
0034   private final ConcurrentHashMap<String, String> shuffleSecretMap;
0035 
0036   // Spark user used for authenticating SASL connections
0037   // Note that this must match the value in org.apache.spark.SecurityManager
0038   private static final String SPARK_SASL_USER = "sparkSaslUser";
0039 
0040   public ShuffleSecretManager() {
0041     shuffleSecretMap = new ConcurrentHashMap<>();
0042   }
0043 
0044   /**
0045    * Register an application with its secret.
0046    * Executors need to first authenticate themselves with the same secret before
0047    * fetching shuffle files written by other executors in this application.
0048    */
0049   public void registerApp(String appId, String shuffleSecret) {
0050     // Always put the new secret information to make sure it's the most up to date.
0051     // Otherwise we have to specifically look at the application attempt in addition
0052     // to the applicationId since the secrets change between application attempts on yarn.
0053     shuffleSecretMap.put(appId, shuffleSecret);
0054     logger.info("Registered shuffle secret for application {}", appId);
0055   }
0056 
0057   /**
0058    * Register an application with its secret specified as a byte buffer.
0059    */
0060   public void registerApp(String appId, ByteBuffer shuffleSecret) {
0061     registerApp(appId, JavaUtils.bytesToString(shuffleSecret));
0062   }
0063 
0064   /**
0065    * Unregister an application along with its secret.
0066    * This is called when the application terminates.
0067    */
0068   public void unregisterApp(String appId) {
0069     shuffleSecretMap.remove(appId);
0070     logger.info("Unregistered shuffle secret for application {}", appId);
0071   }
0072 
0073   /**
0074    * Return the Spark user for authenticating SASL connections.
0075    */
0076   @Override
0077   public String getSaslUser(String appId) {
0078     return SPARK_SASL_USER;
0079   }
0080 
0081   /**
0082    * Return the secret key registered with the given application.
0083    * This key is used to authenticate the executors before they can fetch shuffle files
0084    * written by this application from the external shuffle service. If the specified
0085    * application is not registered, return null.
0086    */
0087   @Override
0088   public String getSecretKey(String appId) {
0089     return shuffleSecretMap.get(appId);
0090   }
0091 }