0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.sasl;
0019
0020 import java.io.IOException;
0021 import java.nio.ByteBuffer;
0022 import javax.security.sasl.Sasl;
0023 import javax.security.sasl.SaslException;
0024
0025 import io.netty.buffer.ByteBuf;
0026 import io.netty.buffer.Unpooled;
0027 import io.netty.channel.Channel;
0028 import org.slf4j.Logger;
0029 import org.slf4j.LoggerFactory;
0030
0031 import org.apache.spark.network.client.TransportClient;
0032 import org.apache.spark.network.client.TransportClientBootstrap;
0033 import org.apache.spark.network.util.JavaUtils;
0034 import org.apache.spark.network.util.TransportConf;
0035
0036
0037
0038
0039
0040 public class SaslClientBootstrap implements TransportClientBootstrap {
0041 private static final Logger logger = LoggerFactory.getLogger(SaslClientBootstrap.class);
0042
0043 private final TransportConf conf;
0044 private final String appId;
0045 private final SecretKeyHolder secretKeyHolder;
0046
0047 public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder) {
0048 this.conf = conf;
0049 this.appId = appId;
0050 this.secretKeyHolder = secretKeyHolder;
0051 }
0052
0053
0054
0055
0056
0057
0058 @Override
0059 public void doBootstrap(TransportClient client, Channel channel) {
0060 SparkSaslClient saslClient = new SparkSaslClient(appId, secretKeyHolder, conf.saslEncryption());
0061 try {
0062 byte[] payload = saslClient.firstToken();
0063
0064 while (!saslClient.isComplete()) {
0065 SaslMessage msg = new SaslMessage(appId, payload);
0066 ByteBuf buf = Unpooled.buffer(msg.encodedLength() + (int) msg.body().size());
0067 msg.encode(buf);
0068 buf.writeBytes(msg.body().nioByteBuffer());
0069
0070 ByteBuffer response = client.sendRpcSync(buf.nioBuffer(), conf.authRTTimeoutMs());
0071 payload = saslClient.response(JavaUtils.bufferToArray(response));
0072 }
0073
0074 client.setClientId(appId);
0075
0076 if (conf.saslEncryption()) {
0077 if (!SparkSaslServer.QOP_AUTH_CONF.equals(saslClient.getNegotiatedProperty(Sasl.QOP))) {
0078 throw new RuntimeException(
0079 new SaslException("Encryption requests by negotiated non-encrypted connection."));
0080 }
0081
0082 SaslEncryption.addToChannel(channel, saslClient, conf.maxSaslEncryptedBlockSize());
0083 saslClient = null;
0084 logger.debug("Channel {} configured for encryption.", client);
0085 }
0086 } catch (IOException ioe) {
0087 throw new RuntimeException(ioe);
0088 } finally {
0089 if (saslClient != null) {
0090 try {
0091
0092 saslClient.dispose();
0093 } catch (RuntimeException e) {
0094 logger.error("Error while disposing SASL client", e);
0095 }
0096 }
0097 }
0098 }
0099
0100 }