Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network;
0019 
0020 import io.netty.channel.ChannelHandlerContext;
0021 import java.util.ArrayList;
0022 import java.util.List;
0023 
0024 import io.netty.channel.Channel;
0025 import org.junit.Assert;
0026 import org.junit.Test;
0027 
0028 import static org.mockito.Mockito.*;
0029 
0030 import org.apache.commons.lang3.tuple.ImmutablePair;
0031 import org.apache.commons.lang3.tuple.Pair;
0032 import org.apache.spark.network.buffer.ManagedBuffer;
0033 import org.apache.spark.network.client.TransportClient;
0034 import org.apache.spark.network.protocol.*;
0035 import org.apache.spark.network.server.ChunkFetchRequestHandler;
0036 import org.apache.spark.network.server.NoOpRpcHandler;
0037 import org.apache.spark.network.server.OneForOneStreamManager;
0038 import org.apache.spark.network.server.RpcHandler;
0039 
0040 public class ChunkFetchRequestHandlerSuite {
0041 
0042   @Test
0043   public void handleChunkFetchRequest() throws Exception {
0044     RpcHandler rpcHandler = new NoOpRpcHandler();
0045     OneForOneStreamManager streamManager = (OneForOneStreamManager) (rpcHandler.getStreamManager());
0046     Channel channel = mock(Channel.class);
0047     ChannelHandlerContext context = mock(ChannelHandlerContext.class);
0048     when(context.channel())
0049       .thenAnswer(invocationOnMock0 -> channel);
0050 
0051     List<Pair<Object, ExtendedChannelPromise>> responseAndPromisePairs =
0052       new ArrayList<>();
0053     when(channel.writeAndFlush(any()))
0054       .thenAnswer(invocationOnMock0 -> {
0055         Object response = invocationOnMock0.getArguments()[0];
0056         ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel);
0057         responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture));
0058         return channelFuture;
0059       });
0060 
0061     // Prepare the stream.
0062     List<ManagedBuffer> managedBuffers = new ArrayList<>();
0063     managedBuffers.add(new TestManagedBuffer(10));
0064     managedBuffers.add(new TestManagedBuffer(20));
0065     managedBuffers.add(null);
0066     managedBuffers.add(new TestManagedBuffer(30));
0067     managedBuffers.add(new TestManagedBuffer(40));
0068     long streamId = streamManager.registerStream("test-app", managedBuffers.iterator(), channel);
0069     TransportClient reverseClient = mock(TransportClient.class);
0070     ChunkFetchRequestHandler requestHandler = new ChunkFetchRequestHandler(reverseClient,
0071       rpcHandler.getStreamManager(), 2L, false);
0072 
0073     RequestMessage request0 = new ChunkFetchRequest(new StreamChunkId(streamId, 0));
0074     requestHandler.channelRead(context, request0);
0075     Assert.assertEquals(1, responseAndPromisePairs.size());
0076     Assert.assertTrue(responseAndPromisePairs.get(0).getLeft() instanceof ChunkFetchSuccess);
0077     Assert.assertEquals(managedBuffers.get(0),
0078       ((ChunkFetchSuccess) (responseAndPromisePairs.get(0).getLeft())).body());
0079 
0080     RequestMessage request1 = new ChunkFetchRequest(new StreamChunkId(streamId, 1));
0081     requestHandler.channelRead(context, request1);
0082     Assert.assertEquals(2, responseAndPromisePairs.size());
0083     Assert.assertTrue(responseAndPromisePairs.get(1).getLeft() instanceof ChunkFetchSuccess);
0084     Assert.assertEquals(managedBuffers.get(1),
0085       ((ChunkFetchSuccess) (responseAndPromisePairs.get(1).getLeft())).body());
0086 
0087     // Finish flushing the response for request0.
0088     responseAndPromisePairs.get(0).getRight().finish(true);
0089 
0090     RequestMessage request2 = new ChunkFetchRequest(new StreamChunkId(streamId, 2));
0091     requestHandler.channelRead(context, request2);
0092     Assert.assertEquals(3, responseAndPromisePairs.size());
0093     Assert.assertTrue(responseAndPromisePairs.get(2).getLeft() instanceof ChunkFetchFailure);
0094     ChunkFetchFailure chunkFetchFailure =
0095         ((ChunkFetchFailure) (responseAndPromisePairs.get(2).getLeft()));
0096     Assert.assertEquals("java.lang.IllegalStateException: Chunk was not found",
0097         chunkFetchFailure.errorString.split("\\r?\\n")[0]);
0098 
0099     RequestMessage request3 = new ChunkFetchRequest(new StreamChunkId(streamId, 3));
0100     requestHandler.channelRead(context, request3);
0101     Assert.assertEquals(4, responseAndPromisePairs.size());
0102     Assert.assertTrue(responseAndPromisePairs.get(3).getLeft() instanceof ChunkFetchSuccess);
0103     Assert.assertEquals(managedBuffers.get(3),
0104       ((ChunkFetchSuccess) (responseAndPromisePairs.get(3).getLeft())).body());
0105 
0106     RequestMessage request4 = new ChunkFetchRequest(new StreamChunkId(streamId, 4));
0107     requestHandler.channelRead(context, request4);
0108     verify(channel, times(1)).close();
0109     Assert.assertEquals(4, responseAndPromisePairs.size());
0110   }
0111 }