Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.server;
0019 
0020 import io.netty.channel.Channel;
0021 
0022 import org.apache.spark.network.buffer.ManagedBuffer;
0023 import org.apache.spark.network.client.TransportClient;
0024 
0025 /**
0026  * The StreamManager is used to fetch individual chunks from a stream. This is used in
0027  * {@link TransportRequestHandler} in order to respond to fetchChunk() requests. Creation of the
0028  * stream is outside the scope of the transport layer, but a given stream is guaranteed to be read
0029  * by only one client connection, meaning that getChunk() for a particular stream will be called
0030  * serially and that once the connection associated with the stream is closed, that stream will
0031  * never be used again.
0032  */
0033 public abstract class StreamManager {
0034   /**
0035    * Called in response to a fetchChunk() request. The returned buffer will be passed as-is to the
0036    * client. A single stream will be associated with a single TCP connection, so this method
0037    * will not be called in parallel for a particular stream.
0038    *
0039    * Chunks may be requested in any order, and requests may be repeated, but it is not required
0040    * that implementations support this behavior.
0041    *
0042    * The returned ManagedBuffer will be release()'d after being written to the network.
0043    *
0044    * @param streamId id of a stream that has been previously registered with the StreamManager.
0045    * @param chunkIndex 0-indexed chunk of the stream that's requested
0046    */
0047   public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
0048 
0049   /**
0050    * Called in response to a stream() request. The returned data is streamed to the client
0051    * through a single TCP connection.
0052    *
0053    * Note the <code>streamId</code> argument is not related to the similarly named argument in the
0054    * {@link #getChunk(long, int)} method.
0055    *
0056    * @param streamId id of a stream that has been previously registered with the StreamManager.
0057    * @return A managed buffer for the stream, or null if the stream was not found.
0058    */
0059   public ManagedBuffer openStream(String streamId) {
0060     throw new UnsupportedOperationException();
0061   }
0062 
0063   /**
0064    * Indicates that the given channel has been terminated. After this occurs, we are guaranteed not
0065    * to read from the associated streams again, so any state can be cleaned up.
0066    */
0067   public void connectionTerminated(Channel channel) { }
0068 
0069   /**
0070    * Verify that the client is authorized to read from the given stream.
0071    *
0072    * @throws SecurityException If client is not authorized.
0073    */
0074   public void checkAuthorization(TransportClient client, long streamId) { }
0075 
0076   /**
0077    * Return the number of chunks being transferred and not finished yet in this StreamManager.
0078    */
0079   public long chunksBeingTransferred() {
0080     return 0;
0081   }
0082 
0083   /**
0084    * Called when start sending a chunk.
0085    */
0086   public void chunkBeingSent(long streamId) { }
0087 
0088   /**
0089    * Called when start sending a stream.
0090    */
0091   public void streamBeingSent(String streamId) { }
0092 
0093   /**
0094    * Called when a chunk is successfully sent.
0095    */
0096   public void chunkSent(long streamId) { }
0097 
0098   /**
0099    * Called when a stream is successfully sent.
0100    */
0101   public void streamSent(String streamId) { }
0102 
0103 }