Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.sasl;
0019 
0020 import java.io.IOException;
0021 import java.nio.ByteBuffer;
0022 import javax.security.sasl.Sasl;
0023 import javax.security.sasl.SaslException;
0024 
0025 import io.netty.buffer.ByteBuf;
0026 import io.netty.buffer.Unpooled;
0027 import io.netty.channel.Channel;
0028 import org.slf4j.Logger;
0029 import org.slf4j.LoggerFactory;
0030 
0031 import org.apache.spark.network.client.TransportClient;
0032 import org.apache.spark.network.client.TransportClientBootstrap;
0033 import org.apache.spark.network.util.JavaUtils;
0034 import org.apache.spark.network.util.TransportConf;
0035 
0036 /**
0037  * Bootstraps a {@link TransportClient} by performing SASL authentication on the connection. The
0038  * server should be setup with a {@link SaslRpcHandler} with matching keys for the given appId.
0039  */
0040 public class SaslClientBootstrap implements TransportClientBootstrap {
0041   private static final Logger logger = LoggerFactory.getLogger(SaslClientBootstrap.class);
0042 
0043   private final TransportConf conf;
0044   private final String appId;
0045   private final SecretKeyHolder secretKeyHolder;
0046 
0047   public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder) {
0048     this.conf = conf;
0049     this.appId = appId;
0050     this.secretKeyHolder = secretKeyHolder;
0051   }
0052 
0053   /**
0054    * Performs SASL authentication by sending a token, and then proceeding with the SASL
0055    * challenge-response tokens until we either successfully authenticate or throw an exception
0056    * due to mismatch.
0057    */
0058   @Override
0059   public void doBootstrap(TransportClient client, Channel channel) {
0060     SparkSaslClient saslClient = new SparkSaslClient(appId, secretKeyHolder, conf.saslEncryption());
0061     try {
0062       byte[] payload = saslClient.firstToken();
0063 
0064       while (!saslClient.isComplete()) {
0065         SaslMessage msg = new SaslMessage(appId, payload);
0066         ByteBuf buf = Unpooled.buffer(msg.encodedLength() + (int) msg.body().size());
0067         msg.encode(buf);
0068         buf.writeBytes(msg.body().nioByteBuffer());
0069 
0070         ByteBuffer response = client.sendRpcSync(buf.nioBuffer(), conf.authRTTimeoutMs());
0071         payload = saslClient.response(JavaUtils.bufferToArray(response));
0072       }
0073 
0074       client.setClientId(appId);
0075 
0076       if (conf.saslEncryption()) {
0077         if (!SparkSaslServer.QOP_AUTH_CONF.equals(saslClient.getNegotiatedProperty(Sasl.QOP))) {
0078           throw new RuntimeException(
0079             new SaslException("Encryption requests by negotiated non-encrypted connection."));
0080         }
0081 
0082         SaslEncryption.addToChannel(channel, saslClient, conf.maxSaslEncryptedBlockSize());
0083         saslClient = null;
0084         logger.debug("Channel {} configured for encryption.", client);
0085       }
0086     } catch (IOException ioe) {
0087       throw new RuntimeException(ioe);
0088     } finally {
0089       if (saslClient != null) {
0090         try {
0091           // Once authentication is complete, the server will trust all remaining communication.
0092           saslClient.dispose();
0093         } catch (RuntimeException e) {
0094           logger.error("Error while disposing SASL client", e);
0095         }
0096       }
0097     }
0098   }
0099 
0100 }