Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.crypto;
0019 
0020 import java.io.IOException;
0021 import java.nio.ByteBuffer;
0022 import java.security.GeneralSecurityException;
0023 import java.util.concurrent.TimeoutException;
0024 
0025 import com.google.common.base.Throwables;
0026 import io.netty.buffer.ByteBuf;
0027 import io.netty.buffer.Unpooled;
0028 import io.netty.channel.Channel;
0029 import org.slf4j.Logger;
0030 import org.slf4j.LoggerFactory;
0031 
0032 import org.apache.spark.network.client.TransportClient;
0033 import org.apache.spark.network.client.TransportClientBootstrap;
0034 import org.apache.spark.network.sasl.SaslClientBootstrap;
0035 import org.apache.spark.network.sasl.SecretKeyHolder;
0036 import org.apache.spark.network.util.TransportConf;
0037 
0038 /**
0039  * Bootstraps a {@link TransportClient} by performing authentication using Spark's auth protocol.
0040  *
0041  * This bootstrap falls back to using the SASL bootstrap if the server throws an error during
0042  * authentication, and the configuration allows it. This is used for backwards compatibility
0043  * with external shuffle services that do not support the new protocol.
0044  *
0045  * It also automatically falls back to SASL if the new encryption backend is disabled, so that
0046  * callers only need to install this bootstrap when authentication is enabled.
0047  */
0048 public class AuthClientBootstrap implements TransportClientBootstrap {
0049 
0050   private static final Logger LOG = LoggerFactory.getLogger(AuthClientBootstrap.class);
0051 
0052   private final TransportConf conf;
0053   private final String appId;
0054   private final SecretKeyHolder secretKeyHolder;
0055 
0056   public AuthClientBootstrap(
0057       TransportConf conf,
0058       String appId,
0059       SecretKeyHolder secretKeyHolder) {
0060     this.conf = conf;
0061     // TODO: right now this behaves like the SASL backend, because when executors start up
0062     // they don't necessarily know the app ID. So they send a hardcoded "user" that is defined
0063     // in the SecurityManager, which will also always return the same secret (regardless of the
0064     // user name). All that's needed here is for this "user" to match on both sides, since that's
0065     // required by the protocol. At some point, though, it would be better for the actual app ID
0066     // to be provided here.
0067     this.appId = appId;
0068     this.secretKeyHolder = secretKeyHolder;
0069   }
0070 
0071   @Override
0072   public void doBootstrap(TransportClient client, Channel channel) {
0073     if (!conf.encryptionEnabled()) {
0074       LOG.debug("AES encryption disabled, using old auth protocol.");
0075       doSaslAuth(client, channel);
0076       return;
0077     }
0078 
0079     try {
0080       doSparkAuth(client, channel);
0081       client.setClientId(appId);
0082     } catch (GeneralSecurityException | IOException e) {
0083       throw Throwables.propagate(e);
0084     } catch (RuntimeException e) {
0085       // There isn't a good exception that can be caught here to know whether it's really
0086       // OK to switch back to SASL (because the server doesn't speak the new protocol). So
0087       // try it anyway, unless it's a timeout, which is locally fatal. In the worst case
0088       // things will fail again.
0089       if (!conf.saslFallback() || e.getCause() instanceof TimeoutException) {
0090         throw e;
0091       }
0092 
0093       if (LOG.isDebugEnabled()) {
0094         Throwable cause = e.getCause() != null ? e.getCause() : e;
0095         LOG.debug("New auth protocol failed, trying SASL.", cause);
0096       } else {
0097         LOG.info("New auth protocol failed, trying SASL.");
0098       }
0099       doSaslAuth(client, channel);
0100     }
0101   }
0102 
0103   private void doSparkAuth(TransportClient client, Channel channel)
0104     throws GeneralSecurityException, IOException {
0105 
0106     String secretKey = secretKeyHolder.getSecretKey(appId);
0107     try (AuthEngine engine = new AuthEngine(appId, secretKey, conf)) {
0108       ClientChallenge challenge = engine.challenge();
0109       ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength());
0110       challenge.encode(challengeData);
0111 
0112       ByteBuffer responseData =
0113           client.sendRpcSync(challengeData.nioBuffer(), conf.authRTTimeoutMs());
0114       ServerResponse response = ServerResponse.decodeMessage(responseData);
0115 
0116       engine.validate(response);
0117       engine.sessionCipher().addToChannel(channel);
0118     }
0119   }
0120 
0121   private void doSaslAuth(TransportClient client, Channel channel) {
0122     SaslClientBootstrap sasl = new SaslClientBootstrap(conf, appId, secretKeyHolder);
0123     sasl.doBootstrap(client, channel);
0124   }
0125 
0126 }