Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.server;
0019 
0020 import java.nio.ByteBuffer;
0021 
0022 import org.slf4j.Logger;
0023 import org.slf4j.LoggerFactory;
0024 
0025 import org.apache.spark.network.client.RpcResponseCallback;
0026 import org.apache.spark.network.client.StreamCallbackWithID;
0027 import org.apache.spark.network.client.TransportClient;
0028 
0029 /**
0030  * Handler for sendRPC() messages sent by {@link org.apache.spark.network.client.TransportClient}s.
0031  */
0032 public abstract class RpcHandler {
0033 
0034   private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();
0035 
0036   /**
0037    * Receive a single RPC message. Any exception thrown while in this method will be sent back to
0038    * the client in string form as a standard RPC failure.
0039    *
0040    * Neither this method nor #receiveStream will be called in parallel for a single
0041    * TransportClient (i.e., channel).
0042    *
0043    * @param client A channel client which enables the handler to make requests back to the sender
0044    *               of this RPC. This will always be the exact same object for a particular channel.
0045    * @param message The serialized bytes of the RPC.
0046    * @param callback Callback which should be invoked exactly once upon success or failure of the
0047    *                 RPC.
0048    */
0049   public abstract void receive(
0050       TransportClient client,
0051       ByteBuffer message,
0052       RpcResponseCallback callback);
0053 
0054   /**
0055    * Receive a single RPC message which includes data that is to be received as a stream. Any
0056    * exception thrown while in this method will be sent back to the client in string form as a
0057    * standard RPC failure.
0058    *
0059    * Neither this method nor #receive will be called in parallel for a single TransportClient
0060    * (i.e., channel).
0061    *
0062    * An error while reading data from the stream
0063    * ({@link org.apache.spark.network.client.StreamCallback#onData(String, ByteBuffer)})
0064    * will fail the entire channel.  A failure in "post-processing" the stream in
0065    * {@link org.apache.spark.network.client.StreamCallback#onComplete(String)} will result in an
0066    * rpcFailure, but the channel will remain active.
0067    *
0068    * @param client A channel client which enables the handler to make requests back to the sender
0069    *               of this RPC. This will always be the exact same object for a particular channel.
0070    * @param messageHeader The serialized bytes of the header portion of the RPC.  This is in meant
0071    *                      to be relatively small, and will be buffered entirely in memory, to
0072    *                      facilitate how the streaming portion should be received.
0073    * @param callback Callback which should be invoked exactly once upon success or failure of the
0074    *                 RPC.
0075    * @return a StreamCallback for handling the accompanying streaming data
0076    */
0077   public StreamCallbackWithID receiveStream(
0078       TransportClient client,
0079       ByteBuffer messageHeader,
0080       RpcResponseCallback callback) {
0081     throw new UnsupportedOperationException();
0082   }
0083 
0084   /**
0085    * Returns the StreamManager which contains the state about which streams are currently being
0086    * fetched by a TransportClient.
0087    */
0088   public abstract StreamManager getStreamManager();
0089 
0090   /**
0091    * Receives an RPC message that does not expect a reply. The default implementation will
0092    * call "{@link #receive(TransportClient, ByteBuffer, RpcResponseCallback)}" and log a warning if
0093    * any of the callback methods are called.
0094    *
0095    * @param client A channel client which enables the handler to make requests back to the sender
0096    *               of this RPC. This will always be the exact same object for a particular channel.
0097    * @param message The serialized bytes of the RPC.
0098    */
0099   public void receive(TransportClient client, ByteBuffer message) {
0100     receive(client, message, ONE_WAY_CALLBACK);
0101   }
0102 
0103   /**
0104    * Invoked when the channel associated with the given client is active.
0105    */
0106   public void channelActive(TransportClient client) { }
0107 
0108   /**
0109    * Invoked when the channel associated with the given client is inactive.
0110    * No further requests will come from this client.
0111    */
0112   public void channelInactive(TransportClient client) { }
0113 
0114   public void exceptionCaught(Throwable cause, TransportClient client) { }
0115 
0116   private static class OneWayRpcCallback implements RpcResponseCallback {
0117 
0118     private static final Logger logger = LoggerFactory.getLogger(OneWayRpcCallback.class);
0119 
0120     @Override
0121     public void onSuccess(ByteBuffer response) {
0122       logger.warn("Response provided for one-way RPC.");
0123     }
0124 
0125     @Override
0126     public void onFailure(Throwable e) {
0127       logger.error("Error response provided for one-way RPC.", e);
0128     }
0129 
0130   }
0131 
0132 }