Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.server;
0019 
0020 import java.util.ArrayList;
0021 import java.util.Iterator;
0022 import java.util.List;
0023 
0024 import io.netty.channel.Channel;
0025 import org.junit.After;
0026 import org.junit.Assert;
0027 import org.junit.Test;
0028 import org.mockito.Mockito;
0029 
0030 import org.apache.spark.network.TestManagedBuffer;
0031 import org.apache.spark.network.buffer.ManagedBuffer;
0032 
0033 public class OneForOneStreamManagerSuite {
0034 
0035   List<ManagedBuffer> managedBuffersToRelease = new ArrayList<>();
0036 
0037   @After
0038   public void tearDown() {
0039     managedBuffersToRelease.forEach(managedBuffer -> managedBuffer.release());
0040     managedBuffersToRelease.clear();
0041   }
0042 
0043   private ManagedBuffer getChunk(OneForOneStreamManager manager, long streamId, int chunkIndex) {
0044     ManagedBuffer chunk = manager.getChunk(streamId, chunkIndex);
0045     if (chunk != null) {
0046       managedBuffersToRelease.add(chunk);
0047     }
0048     return chunk;
0049   }
0050 
0051   @Test
0052   public void testMissingChunk() {
0053     OneForOneStreamManager manager = new OneForOneStreamManager();
0054     List<ManagedBuffer> buffers = new ArrayList<>();
0055     TestManagedBuffer buffer1 = Mockito.spy(new TestManagedBuffer(10));
0056     TestManagedBuffer buffer2 = Mockito.spy(new TestManagedBuffer(20));
0057     TestManagedBuffer buffer3 = Mockito.spy(new TestManagedBuffer(20));
0058 
0059     buffers.add(buffer1);
0060     // the nulls here are to simulate a file which goes missing before being read,
0061     // just as a defensive measure
0062     buffers.add(null);
0063     buffers.add(buffer2);
0064     buffers.add(null);
0065     buffers.add(buffer3);
0066 
0067     Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS);
0068     long streamId = manager.registerStream("appId", buffers.iterator(), dummyChannel);
0069     Assert.assertEquals(1, manager.numStreamStates());
0070     Assert.assertNotNull(getChunk(manager, streamId, 0));
0071     Assert.assertNull(getChunk(manager, streamId, 1));
0072     Assert.assertNotNull(getChunk(manager, streamId, 2));
0073     manager.connectionTerminated(dummyChannel);
0074 
0075     // loaded buffers are not released yet as in production a MangedBuffer returned by getChunk()
0076     // would only be released by Netty after it is written to the network
0077     Mockito.verify(buffer1, Mockito.never()).release();
0078     Mockito.verify(buffer2, Mockito.never()).release();
0079     Mockito.verify(buffer3, Mockito.times(1)).release();
0080   }
0081 
0082   @Test
0083   public void managedBuffersAreFreedWhenConnectionIsClosed() {
0084     OneForOneStreamManager manager = new OneForOneStreamManager();
0085     List<ManagedBuffer> buffers = new ArrayList<>();
0086     TestManagedBuffer buffer1 = Mockito.spy(new TestManagedBuffer(10));
0087     TestManagedBuffer buffer2 = Mockito.spy(new TestManagedBuffer(20));
0088     buffers.add(buffer1);
0089     buffers.add(buffer2);
0090 
0091     Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS);
0092     manager.registerStream("appId", buffers.iterator(), dummyChannel);
0093     Assert.assertEquals(1, manager.numStreamStates());
0094     manager.connectionTerminated(dummyChannel);
0095 
0096     Mockito.verify(buffer1, Mockito.times(1)).release();
0097     Mockito.verify(buffer2, Mockito.times(1)).release();
0098     Assert.assertEquals(0, manager.numStreamStates());
0099   }
0100 
0101   @Test
0102   public void streamStatesAreFreedWhenConnectionIsClosedEvenIfBufferIteratorThrowsException() {
0103     OneForOneStreamManager manager = new OneForOneStreamManager();
0104 
0105     Iterator<ManagedBuffer> buffers = Mockito.mock(Iterator.class);
0106     Mockito.when(buffers.hasNext()).thenReturn(true);
0107     Mockito.when(buffers.next()).thenThrow(RuntimeException.class);
0108 
0109     ManagedBuffer mockManagedBuffer = Mockito.mock(ManagedBuffer.class);
0110 
0111     Iterator<ManagedBuffer> buffers2 = Mockito.mock(Iterator.class);
0112     Mockito.when(buffers2.hasNext()).thenReturn(true).thenReturn(true);
0113     Mockito.when(buffers2.next()).thenReturn(mockManagedBuffer).thenThrow(RuntimeException.class);
0114 
0115     Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS);
0116     manager.registerStream("appId", buffers, dummyChannel);
0117     manager.registerStream("appId", buffers2, dummyChannel);
0118 
0119     Assert.assertEquals(2, manager.numStreamStates());
0120 
0121     try {
0122       manager.connectionTerminated(dummyChannel);
0123       Assert.fail("connectionTerminated should throw exception when fails to release all buffers");
0124 
0125     } catch (RuntimeException e) {
0126 
0127       Mockito.verify(buffers, Mockito.times(1)).hasNext();
0128       Mockito.verify(buffers, Mockito.times(1)).next();
0129 
0130       Mockito.verify(buffers2, Mockito.times(2)).hasNext();
0131       Mockito.verify(buffers2, Mockito.times(2)).next();
0132 
0133       Mockito.verify(mockManagedBuffer, Mockito.times(1)).release();
0134 
0135       Assert.assertEquals(0, manager.numStreamStates());
0136     }
0137   }
0138 }