|
||||
0001 /* 0002 * Licensed to the Apache Software Foundation (ASF) under one or more 0003 * contributor license agreements. See the NOTICE file distributed with 0004 * this work for additional information regarding copyright ownership. 0005 * The ASF licenses this file to You under the Apache License, Version 2.0 0006 * (the "License"); you may not use this file except in compliance with 0007 * the License. You may obtain a copy of the License at 0008 * 0009 * http://www.apache.org/licenses/LICENSE-2.0 0010 * 0011 * Unless required by applicable law or agreed to in writing, software 0012 * distributed under the License is distributed on an "AS IS" BASIS, 0013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 0014 * See the License for the specific language governing permissions and 0015 * limitations under the License. 0016 */ 0017 0018 package org.apache.spark.network.sasl; 0019 0020 import java.nio.ByteBuffer; 0021 import java.util.concurrent.ConcurrentHashMap; 0022 0023 import org.slf4j.Logger; 0024 import org.slf4j.LoggerFactory; 0025 0026 import org.apache.spark.network.util.JavaUtils; 0027 0028 /** 0029 * A class that manages shuffle secret used by the external shuffle service. 0030 */ 0031 public class ShuffleSecretManager implements SecretKeyHolder { 0032 private static final Logger logger = LoggerFactory.getLogger(ShuffleSecretManager.class); 0033 0034 private final ConcurrentHashMap<String, String> shuffleSecretMap; 0035 0036 // Spark user used for authenticating SASL connections 0037 // Note that this must match the value in org.apache.spark.SecurityManager 0038 private static final String SPARK_SASL_USER = "sparkSaslUser"; 0039 0040 public ShuffleSecretManager() { 0041 shuffleSecretMap = new ConcurrentHashMap<>(); 0042 } 0043 0044 /** 0045 * Register an application with its secret. 0046 * Executors need to first authenticate themselves with the same secret before 0047 * fetching shuffle files written by other executors in this application. 0048 */ 0049 public void registerApp(String appId, String shuffleSecret) { 0050 // Always put the new secret information to make sure it's the most up to date. 0051 // Otherwise we have to specifically look at the application attempt in addition 0052 // to the applicationId since the secrets change between application attempts on yarn. 0053 shuffleSecretMap.put(appId, shuffleSecret); 0054 logger.info("Registered shuffle secret for application {}", appId); 0055 } 0056 0057 /** 0058 * Register an application with its secret specified as a byte buffer. 0059 */ 0060 public void registerApp(String appId, ByteBuffer shuffleSecret) { 0061 registerApp(appId, JavaUtils.bytesToString(shuffleSecret)); 0062 } 0063 0064 /** 0065 * Unregister an application along with its secret. 0066 * This is called when the application terminates. 0067 */ 0068 public void unregisterApp(String appId) { 0069 shuffleSecretMap.remove(appId); 0070 logger.info("Unregistered shuffle secret for application {}", appId); 0071 } 0072 0073 /** 0074 * Return the Spark user for authenticating SASL connections. 0075 */ 0076 @Override 0077 public String getSaslUser(String appId) { 0078 return SPARK_SASL_USER; 0079 } 0080 0081 /** 0082 * Return the secret key registered with the given application. 0083 * This key is used to authenticate the executors before they can fetch shuffle files 0084 * written by this application from the external shuffle service. If the specified 0085 * application is not registered, return null. 0086 */ 0087 @Override 0088 public String getSecretKey(String appId) { 0089 return shuffleSecretMap.get(appId); 0090 } 0091 }
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.1.0 LXR engine. The LXR team |