0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.server;
0019
0020 import java.util.ArrayList;
0021 import java.util.Iterator;
0022 import java.util.List;
0023
0024 import io.netty.channel.Channel;
0025 import org.junit.After;
0026 import org.junit.Assert;
0027 import org.junit.Test;
0028 import org.mockito.Mockito;
0029
0030 import org.apache.spark.network.TestManagedBuffer;
0031 import org.apache.spark.network.buffer.ManagedBuffer;
0032
0033 public class OneForOneStreamManagerSuite {
0034
0035 List<ManagedBuffer> managedBuffersToRelease = new ArrayList<>();
0036
0037 @After
0038 public void tearDown() {
0039 managedBuffersToRelease.forEach(managedBuffer -> managedBuffer.release());
0040 managedBuffersToRelease.clear();
0041 }
0042
0043 private ManagedBuffer getChunk(OneForOneStreamManager manager, long streamId, int chunkIndex) {
0044 ManagedBuffer chunk = manager.getChunk(streamId, chunkIndex);
0045 if (chunk != null) {
0046 managedBuffersToRelease.add(chunk);
0047 }
0048 return chunk;
0049 }
0050
0051 @Test
0052 public void testMissingChunk() {
0053 OneForOneStreamManager manager = new OneForOneStreamManager();
0054 List<ManagedBuffer> buffers = new ArrayList<>();
0055 TestManagedBuffer buffer1 = Mockito.spy(new TestManagedBuffer(10));
0056 TestManagedBuffer buffer2 = Mockito.spy(new TestManagedBuffer(20));
0057 TestManagedBuffer buffer3 = Mockito.spy(new TestManagedBuffer(20));
0058
0059 buffers.add(buffer1);
0060
0061
0062 buffers.add(null);
0063 buffers.add(buffer2);
0064 buffers.add(null);
0065 buffers.add(buffer3);
0066
0067 Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS);
0068 long streamId = manager.registerStream("appId", buffers.iterator(), dummyChannel);
0069 Assert.assertEquals(1, manager.numStreamStates());
0070 Assert.assertNotNull(getChunk(manager, streamId, 0));
0071 Assert.assertNull(getChunk(manager, streamId, 1));
0072 Assert.assertNotNull(getChunk(manager, streamId, 2));
0073 manager.connectionTerminated(dummyChannel);
0074
0075
0076
0077 Mockito.verify(buffer1, Mockito.never()).release();
0078 Mockito.verify(buffer2, Mockito.never()).release();
0079 Mockito.verify(buffer3, Mockito.times(1)).release();
0080 }
0081
0082 @Test
0083 public void managedBuffersAreFreedWhenConnectionIsClosed() {
0084 OneForOneStreamManager manager = new OneForOneStreamManager();
0085 List<ManagedBuffer> buffers = new ArrayList<>();
0086 TestManagedBuffer buffer1 = Mockito.spy(new TestManagedBuffer(10));
0087 TestManagedBuffer buffer2 = Mockito.spy(new TestManagedBuffer(20));
0088 buffers.add(buffer1);
0089 buffers.add(buffer2);
0090
0091 Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS);
0092 manager.registerStream("appId", buffers.iterator(), dummyChannel);
0093 Assert.assertEquals(1, manager.numStreamStates());
0094 manager.connectionTerminated(dummyChannel);
0095
0096 Mockito.verify(buffer1, Mockito.times(1)).release();
0097 Mockito.verify(buffer2, Mockito.times(1)).release();
0098 Assert.assertEquals(0, manager.numStreamStates());
0099 }
0100
0101 @Test
0102 public void streamStatesAreFreedWhenConnectionIsClosedEvenIfBufferIteratorThrowsException() {
0103 OneForOneStreamManager manager = new OneForOneStreamManager();
0104
0105 Iterator<ManagedBuffer> buffers = Mockito.mock(Iterator.class);
0106 Mockito.when(buffers.hasNext()).thenReturn(true);
0107 Mockito.when(buffers.next()).thenThrow(RuntimeException.class);
0108
0109 ManagedBuffer mockManagedBuffer = Mockito.mock(ManagedBuffer.class);
0110
0111 Iterator<ManagedBuffer> buffers2 = Mockito.mock(Iterator.class);
0112 Mockito.when(buffers2.hasNext()).thenReturn(true).thenReturn(true);
0113 Mockito.when(buffers2.next()).thenReturn(mockManagedBuffer).thenThrow(RuntimeException.class);
0114
0115 Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS);
0116 manager.registerStream("appId", buffers, dummyChannel);
0117 manager.registerStream("appId", buffers2, dummyChannel);
0118
0119 Assert.assertEquals(2, manager.numStreamStates());
0120
0121 try {
0122 manager.connectionTerminated(dummyChannel);
0123 Assert.fail("connectionTerminated should throw exception when fails to release all buffers");
0124
0125 } catch (RuntimeException e) {
0126
0127 Mockito.verify(buffers, Mockito.times(1)).hasNext();
0128 Mockito.verify(buffers, Mockito.times(1)).next();
0129
0130 Mockito.verify(buffers2, Mockito.times(2)).hasNext();
0131 Mockito.verify(buffers2, Mockito.times(2)).next();
0132
0133 Mockito.verify(mockManagedBuffer, Mockito.times(1)).release();
0134
0135 Assert.assertEquals(0, manager.numStreamStates());
0136 }
0137 }
0138 }