0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network;
0019
0020 import io.netty.channel.ChannelHandlerContext;
0021 import java.util.ArrayList;
0022 import java.util.List;
0023
0024 import io.netty.channel.Channel;
0025 import org.junit.Assert;
0026 import org.junit.Test;
0027
0028 import static org.mockito.Mockito.*;
0029
0030 import org.apache.commons.lang3.tuple.ImmutablePair;
0031 import org.apache.commons.lang3.tuple.Pair;
0032 import org.apache.spark.network.buffer.ManagedBuffer;
0033 import org.apache.spark.network.client.TransportClient;
0034 import org.apache.spark.network.protocol.*;
0035 import org.apache.spark.network.server.ChunkFetchRequestHandler;
0036 import org.apache.spark.network.server.NoOpRpcHandler;
0037 import org.apache.spark.network.server.OneForOneStreamManager;
0038 import org.apache.spark.network.server.RpcHandler;
0039
0040 public class ChunkFetchRequestHandlerSuite {
0041
0042 @Test
0043 public void handleChunkFetchRequest() throws Exception {
0044 RpcHandler rpcHandler = new NoOpRpcHandler();
0045 OneForOneStreamManager streamManager = (OneForOneStreamManager) (rpcHandler.getStreamManager());
0046 Channel channel = mock(Channel.class);
0047 ChannelHandlerContext context = mock(ChannelHandlerContext.class);
0048 when(context.channel())
0049 .thenAnswer(invocationOnMock0 -> channel);
0050
0051 List<Pair<Object, ExtendedChannelPromise>> responseAndPromisePairs =
0052 new ArrayList<>();
0053 when(channel.writeAndFlush(any()))
0054 .thenAnswer(invocationOnMock0 -> {
0055 Object response = invocationOnMock0.getArguments()[0];
0056 ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel);
0057 responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture));
0058 return channelFuture;
0059 });
0060
0061
0062 List<ManagedBuffer> managedBuffers = new ArrayList<>();
0063 managedBuffers.add(new TestManagedBuffer(10));
0064 managedBuffers.add(new TestManagedBuffer(20));
0065 managedBuffers.add(null);
0066 managedBuffers.add(new TestManagedBuffer(30));
0067 managedBuffers.add(new TestManagedBuffer(40));
0068 long streamId = streamManager.registerStream("test-app", managedBuffers.iterator(), channel);
0069 TransportClient reverseClient = mock(TransportClient.class);
0070 ChunkFetchRequestHandler requestHandler = new ChunkFetchRequestHandler(reverseClient,
0071 rpcHandler.getStreamManager(), 2L, false);
0072
0073 RequestMessage request0 = new ChunkFetchRequest(new StreamChunkId(streamId, 0));
0074 requestHandler.channelRead(context, request0);
0075 Assert.assertEquals(1, responseAndPromisePairs.size());
0076 Assert.assertTrue(responseAndPromisePairs.get(0).getLeft() instanceof ChunkFetchSuccess);
0077 Assert.assertEquals(managedBuffers.get(0),
0078 ((ChunkFetchSuccess) (responseAndPromisePairs.get(0).getLeft())).body());
0079
0080 RequestMessage request1 = new ChunkFetchRequest(new StreamChunkId(streamId, 1));
0081 requestHandler.channelRead(context, request1);
0082 Assert.assertEquals(2, responseAndPromisePairs.size());
0083 Assert.assertTrue(responseAndPromisePairs.get(1).getLeft() instanceof ChunkFetchSuccess);
0084 Assert.assertEquals(managedBuffers.get(1),
0085 ((ChunkFetchSuccess) (responseAndPromisePairs.get(1).getLeft())).body());
0086
0087
0088 responseAndPromisePairs.get(0).getRight().finish(true);
0089
0090 RequestMessage request2 = new ChunkFetchRequest(new StreamChunkId(streamId, 2));
0091 requestHandler.channelRead(context, request2);
0092 Assert.assertEquals(3, responseAndPromisePairs.size());
0093 Assert.assertTrue(responseAndPromisePairs.get(2).getLeft() instanceof ChunkFetchFailure);
0094 ChunkFetchFailure chunkFetchFailure =
0095 ((ChunkFetchFailure) (responseAndPromisePairs.get(2).getLeft()));
0096 Assert.assertEquals("java.lang.IllegalStateException: Chunk was not found",
0097 chunkFetchFailure.errorString.split("\\r?\\n")[0]);
0098
0099 RequestMessage request3 = new ChunkFetchRequest(new StreamChunkId(streamId, 3));
0100 requestHandler.channelRead(context, request3);
0101 Assert.assertEquals(4, responseAndPromisePairs.size());
0102 Assert.assertTrue(responseAndPromisePairs.get(3).getLeft() instanceof ChunkFetchSuccess);
0103 Assert.assertEquals(managedBuffers.get(3),
0104 ((ChunkFetchSuccess) (responseAndPromisePairs.get(3).getLeft())).body());
0105
0106 RequestMessage request4 = new ChunkFetchRequest(new StreamChunkId(streamId, 4));
0107 requestHandler.channelRead(context, request4);
0108 verify(channel, times(1)).close();
0109 Assert.assertEquals(4, responseAndPromisePairs.size());
0110 }
0111 }