0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.server;
0019
0020 import java.nio.ByteBuffer;
0021
0022 import org.apache.spark.network.client.RpcResponseCallback;
0023 import org.apache.spark.network.client.StreamCallbackWithID;
0024 import org.apache.spark.network.client.TransportClient;
0025
0026
0027
0028
0029
0030
0031 public abstract class AbstractAuthRpcHandler extends RpcHandler {
0032
0033 private final RpcHandler delegate;
0034
0035 private boolean isAuthenticated;
0036
0037 protected AbstractAuthRpcHandler(RpcHandler delegate) {
0038 this.delegate = delegate;
0039 }
0040
0041
0042
0043
0044
0045
0046 protected abstract boolean doAuthChallenge(
0047 TransportClient client,
0048 ByteBuffer message,
0049 RpcResponseCallback callback);
0050
0051 @Override
0052 public final void receive(
0053 TransportClient client,
0054 ByteBuffer message,
0055 RpcResponseCallback callback) {
0056 if (isAuthenticated) {
0057 delegate.receive(client, message, callback);
0058 } else {
0059 isAuthenticated = doAuthChallenge(client, message, callback);
0060 }
0061 }
0062
0063 @Override
0064 public final void receive(TransportClient client, ByteBuffer message) {
0065 if (isAuthenticated) {
0066 delegate.receive(client, message);
0067 } else {
0068 throw new SecurityException("Unauthenticated call to receive().");
0069 }
0070 }
0071
0072 @Override
0073 public final StreamCallbackWithID receiveStream(
0074 TransportClient client,
0075 ByteBuffer message,
0076 RpcResponseCallback callback) {
0077 if (isAuthenticated) {
0078 return delegate.receiveStream(client, message, callback);
0079 } else {
0080 throw new SecurityException("Unauthenticated call to receiveStream().");
0081 }
0082 }
0083
0084 @Override
0085 public StreamManager getStreamManager() {
0086 return delegate.getStreamManager();
0087 }
0088
0089 @Override
0090 public void channelActive(TransportClient client) {
0091 delegate.channelActive(client);
0092 }
0093
0094 @Override
0095 public void channelInactive(TransportClient client) {
0096 delegate.channelInactive(client);
0097 }
0098
0099 @Override
0100 public void exceptionCaught(Throwable cause, TransportClient client) {
0101 delegate.exceptionCaught(cause, client);
0102 }
0103
0104 public boolean isAuthenticated() {
0105 return isAuthenticated;
0106 }
0107 }