0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.sasl;
0019
0020 import io.netty.buffer.ByteBuf;
0021 import io.netty.buffer.Unpooled;
0022
0023 import org.apache.spark.network.buffer.NettyManagedBuffer;
0024 import org.apache.spark.network.protocol.Encoders;
0025 import org.apache.spark.network.protocol.AbstractMessage;
0026 import org.apache.spark.network.protocol.Message;
0027
0028
0029
0030
0031
0032
0033 class SaslMessage extends AbstractMessage {
0034
0035
0036 private static final byte TAG_BYTE = (byte) 0xEA;
0037
0038 public final String appId;
0039
0040 SaslMessage(String appId, byte[] message) {
0041 this(appId, Unpooled.wrappedBuffer(message));
0042 }
0043
0044 SaslMessage(String appId, ByteBuf message) {
0045 super(new NettyManagedBuffer(message), true);
0046 this.appId = appId;
0047 }
0048
0049 @Override
0050 public Message.Type type() { return Type.User; }
0051
0052 @Override
0053 public int encodedLength() {
0054
0055
0056
0057 return 1 + Encoders.Strings.encodedLength(appId) + 4;
0058 }
0059
0060 @Override
0061 public void encode(ByteBuf buf) {
0062 buf.writeByte(TAG_BYTE);
0063 Encoders.Strings.encode(buf, appId);
0064
0065 buf.writeInt((int) body().size());
0066 }
0067
0068 public static SaslMessage decode(ByteBuf buf) {
0069 if (buf.readByte() != TAG_BYTE) {
0070 throw new IllegalStateException("Expected SaslMessage, received something else"
0071 + " (maybe your client does not have SASL enabled?)");
0072 }
0073
0074 String appId = Encoders.Strings.decode(buf);
0075
0076 buf.readInt();
0077 return new SaslMessage(appId, buf.retain());
0078 }
0079 }