Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.server;
0019 
0020 import java.nio.ByteBuffer;
0021 
0022 import org.apache.spark.network.client.RpcResponseCallback;
0023 import org.apache.spark.network.client.StreamCallbackWithID;
0024 import org.apache.spark.network.client.TransportClient;
0025 
0026 /**
0027  * RPC Handler which performs authentication, and when it's successful, delegates further
0028  * calls to another RPC handler. The authentication handshake itself should be implemented
0029  * by subclasses.
0030  */
0031 public abstract class AbstractAuthRpcHandler extends RpcHandler {
0032   /** RpcHandler we will delegate to for authenticated connections. */
0033   private final RpcHandler delegate;
0034 
0035   private boolean isAuthenticated;
0036 
0037   protected AbstractAuthRpcHandler(RpcHandler delegate) {
0038     this.delegate = delegate;
0039   }
0040 
0041   /**
0042    * Responds to an authentication challenge.
0043    *
0044    * @return Whether the client is authenticated.
0045    */
0046   protected abstract boolean doAuthChallenge(
0047       TransportClient client,
0048       ByteBuffer message,
0049       RpcResponseCallback callback);
0050 
0051   @Override
0052   public final void receive(
0053       TransportClient client,
0054       ByteBuffer message,
0055       RpcResponseCallback callback) {
0056     if (isAuthenticated) {
0057       delegate.receive(client, message, callback);
0058     } else {
0059       isAuthenticated = doAuthChallenge(client, message, callback);
0060     }
0061   }
0062 
0063   @Override
0064   public final void receive(TransportClient client, ByteBuffer message) {
0065     if (isAuthenticated) {
0066       delegate.receive(client, message);
0067     } else {
0068       throw new SecurityException("Unauthenticated call to receive().");
0069     }
0070   }
0071 
0072   @Override
0073   public final StreamCallbackWithID receiveStream(
0074       TransportClient client,
0075       ByteBuffer message,
0076       RpcResponseCallback callback) {
0077     if (isAuthenticated) {
0078       return delegate.receiveStream(client, message, callback);
0079     } else {
0080       throw new SecurityException("Unauthenticated call to receiveStream().");
0081     }
0082   }
0083 
0084   @Override
0085   public StreamManager getStreamManager() {
0086     return delegate.getStreamManager();
0087   }
0088 
0089   @Override
0090   public void channelActive(TransportClient client) {
0091     delegate.channelActive(client);
0092   }
0093 
0094   @Override
0095   public void channelInactive(TransportClient client) {
0096     delegate.channelInactive(client);
0097   }
0098 
0099   @Override
0100   public void exceptionCaught(Throwable cause, TransportClient client) {
0101     delegate.exceptionCaught(cause, client);
0102   }
0103 
0104   public boolean isAuthenticated() {
0105     return isAuthenticated;
0106   }
0107 }