0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.server;
0019
0020 import java.net.SocketAddress;
0021
0022 import com.google.common.base.Throwables;
0023 import io.netty.channel.Channel;
0024 import io.netty.channel.ChannelFuture;
0025 import io.netty.channel.ChannelFutureListener;
0026 import io.netty.channel.ChannelHandlerContext;
0027 import io.netty.channel.SimpleChannelInboundHandler;
0028 import org.slf4j.Logger;
0029 import org.slf4j.LoggerFactory;
0030
0031 import org.apache.spark.network.buffer.ManagedBuffer;
0032 import org.apache.spark.network.client.TransportClient;
0033 import org.apache.spark.network.protocol.ChunkFetchFailure;
0034 import org.apache.spark.network.protocol.ChunkFetchRequest;
0035 import org.apache.spark.network.protocol.ChunkFetchSuccess;
0036 import org.apache.spark.network.protocol.Encodable;
0037
0038 import static org.apache.spark.network.util.NettyUtils.*;
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051 public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkFetchRequest> {
0052 private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
0053
0054 private final TransportClient client;
0055 private final StreamManager streamManager;
0056
0057 private final long maxChunksBeingTransferred;
0058 private final boolean syncModeEnabled;
0059
0060 public ChunkFetchRequestHandler(
0061 TransportClient client,
0062 StreamManager streamManager,
0063 Long maxChunksBeingTransferred,
0064 boolean syncModeEnabled) {
0065 this.client = client;
0066 this.streamManager = streamManager;
0067 this.maxChunksBeingTransferred = maxChunksBeingTransferred;
0068 this.syncModeEnabled = syncModeEnabled;
0069 }
0070
0071 @Override
0072 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
0073 logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), cause);
0074 ctx.close();
0075 }
0076
0077 @Override
0078 protected void channelRead0(
0079 ChannelHandlerContext ctx,
0080 final ChunkFetchRequest msg) throws Exception {
0081 Channel channel = ctx.channel();
0082 processFetchRequest(channel, msg);
0083 }
0084
0085 public void processFetchRequest(
0086 final Channel channel, final ChunkFetchRequest msg) throws Exception {
0087 if (logger.isTraceEnabled()) {
0088 logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
0089 msg.streamChunkId);
0090 }
0091 long chunksBeingTransferred = streamManager.chunksBeingTransferred();
0092 if (chunksBeingTransferred >= maxChunksBeingTransferred) {
0093 logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
0094 chunksBeingTransferred, maxChunksBeingTransferred);
0095 channel.close();
0096 return;
0097 }
0098 ManagedBuffer buf;
0099 try {
0100 streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
0101 buf = streamManager.getChunk(msg.streamChunkId.streamId, msg.streamChunkId.chunkIndex);
0102 if (buf == null) {
0103 throw new IllegalStateException("Chunk was not found");
0104 }
0105 } catch (Exception e) {
0106 logger.error(String.format("Error opening block %s for request from %s",
0107 msg.streamChunkId, getRemoteAddress(channel)), e);
0108 respond(channel, new ChunkFetchFailure(msg.streamChunkId,
0109 Throwables.getStackTraceAsString(e)));
0110 return;
0111 }
0112
0113 streamManager.chunkBeingSent(msg.streamChunkId.streamId);
0114 respond(channel, new ChunkFetchSuccess(msg.streamChunkId, buf)).addListener(
0115 (ChannelFutureListener) future -> streamManager.chunkSent(msg.streamChunkId.streamId));
0116 }
0117
0118
0119
0120
0121
0122
0123
0124
0125
0126
0127
0128
0129
0130
0131
0132 private ChannelFuture respond(
0133 final Channel channel,
0134 final Encodable result) throws InterruptedException {
0135 final SocketAddress remoteAddress = channel.remoteAddress();
0136 ChannelFuture channelFuture;
0137 if (syncModeEnabled) {
0138 channelFuture = channel.writeAndFlush(result).await();
0139 } else {
0140 channelFuture = channel.writeAndFlush(result);
0141 }
0142 return channelFuture.addListener((ChannelFutureListener) future -> {
0143 if (future.isSuccess()) {
0144 logger.trace("Sent result {} to client {}", result, remoteAddress);
0145 } else {
0146 logger.error(String.format("Error sending result %s to %s; closing connection",
0147 result, remoteAddress), future.cause());
0148 channel.close();
0149 }
0150 });
0151 }
0152 }