0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle;
0019
0020
0021 import java.io.IOException;
0022 import java.nio.ByteBuffer;
0023 import java.util.Arrays;
0024 import java.util.LinkedHashSet;
0025 import java.util.List;
0026 import java.util.Map;
0027
0028 import com.google.common.collect.ImmutableMap;
0029 import com.google.common.collect.Sets;
0030 import org.junit.Test;
0031 import org.mockito.stubbing.Answer;
0032 import org.mockito.stubbing.Stubber;
0033
0034 import static org.junit.Assert.*;
0035 import static org.mockito.Mockito.*;
0036
0037 import org.apache.spark.network.buffer.ManagedBuffer;
0038 import org.apache.spark.network.buffer.NioManagedBuffer;
0039 import org.apache.spark.network.util.MapConfigProvider;
0040 import org.apache.spark.network.util.TransportConf;
0041 import static org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchStarter;
0042
0043
0044
0045
0046
0047 public class RetryingBlockFetcherSuite {
0048
0049 private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
0050 private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
0051 private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
0052
0053 @Test
0054 public void testNoFailures() throws IOException, InterruptedException {
0055 BlockFetchingListener listener = mock(BlockFetchingListener.class);
0056
0057 List<? extends Map<String, Object>> interactions = Arrays.asList(
0058
0059 ImmutableMap.<String, Object>builder()
0060 .put("b0", block0)
0061 .put("b1", block1)
0062 .build()
0063 );
0064
0065 performInteractions(interactions, listener);
0066
0067 verify(listener).onBlockFetchSuccess("b0", block0);
0068 verify(listener).onBlockFetchSuccess("b1", block1);
0069 verifyNoMoreInteractions(listener);
0070 }
0071
0072 @Test
0073 public void testUnrecoverableFailure() throws IOException, InterruptedException {
0074 BlockFetchingListener listener = mock(BlockFetchingListener.class);
0075
0076 List<? extends Map<String, Object>> interactions = Arrays.asList(
0077
0078 ImmutableMap.<String, Object>builder()
0079 .put("b0", new RuntimeException("Ouch!"))
0080 .put("b1", block1)
0081 .build()
0082 );
0083
0084 performInteractions(interactions, listener);
0085
0086 verify(listener).onBlockFetchFailure(eq("b0"), any());
0087 verify(listener).onBlockFetchSuccess("b1", block1);
0088 verifyNoMoreInteractions(listener);
0089 }
0090
0091 @Test
0092 public void testSingleIOExceptionOnFirst() throws IOException, InterruptedException {
0093 BlockFetchingListener listener = mock(BlockFetchingListener.class);
0094
0095 List<? extends Map<String, Object>> interactions = Arrays.asList(
0096
0097 ImmutableMap.<String, Object>builder()
0098 .put("b0", new IOException("Connection failed or something"))
0099 .put("b1", block1)
0100 .build(),
0101 ImmutableMap.<String, Object>builder()
0102 .put("b0", block0)
0103 .put("b1", block1)
0104 .build()
0105 );
0106
0107 performInteractions(interactions, listener);
0108
0109 verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
0110 verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
0111 verifyNoMoreInteractions(listener);
0112 }
0113
0114 @Test
0115 public void testSingleIOExceptionOnSecond() throws IOException, InterruptedException {
0116 BlockFetchingListener listener = mock(BlockFetchingListener.class);
0117
0118 List<? extends Map<String, Object>> interactions = Arrays.asList(
0119
0120 ImmutableMap.<String, Object>builder()
0121 .put("b0", block0)
0122 .put("b1", new IOException("Connection failed or something"))
0123 .build(),
0124 ImmutableMap.<String, Object>builder()
0125 .put("b1", block1)
0126 .build()
0127 );
0128
0129 performInteractions(interactions, listener);
0130
0131 verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
0132 verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
0133 verifyNoMoreInteractions(listener);
0134 }
0135
0136 @Test
0137 public void testTwoIOExceptions() throws IOException, InterruptedException {
0138 BlockFetchingListener listener = mock(BlockFetchingListener.class);
0139
0140 List<? extends Map<String, Object>> interactions = Arrays.asList(
0141
0142 ImmutableMap.<String, Object>builder()
0143 .put("b0", new IOException())
0144 .put("b1", new IOException())
0145 .build(),
0146
0147 ImmutableMap.<String, Object>builder()
0148 .put("b0", block0)
0149 .put("b1", new IOException())
0150 .build(),
0151
0152 ImmutableMap.<String, Object>builder()
0153 .put("b1", block1)
0154 .build()
0155 );
0156
0157 performInteractions(interactions, listener);
0158
0159 verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
0160 verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
0161 verifyNoMoreInteractions(listener);
0162 }
0163
0164 @Test
0165 public void testThreeIOExceptions() throws IOException, InterruptedException {
0166 BlockFetchingListener listener = mock(BlockFetchingListener.class);
0167
0168 List<? extends Map<String, Object>> interactions = Arrays.asList(
0169
0170 ImmutableMap.<String, Object>builder()
0171 .put("b0", new IOException())
0172 .put("b1", new IOException())
0173 .build(),
0174
0175 ImmutableMap.<String, Object>builder()
0176 .put("b0", block0)
0177 .put("b1", new IOException())
0178 .build(),
0179
0180 ImmutableMap.<String, Object>builder()
0181 .put("b1", new IOException())
0182 .build(),
0183
0184 ImmutableMap.<String, Object>builder()
0185 .put("b1", block1)
0186 .build()
0187 );
0188
0189 performInteractions(interactions, listener);
0190
0191 verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
0192 verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any());
0193 verifyNoMoreInteractions(listener);
0194 }
0195
0196 @Test
0197 public void testRetryAndUnrecoverable() throws IOException, InterruptedException {
0198 BlockFetchingListener listener = mock(BlockFetchingListener.class);
0199
0200 List<? extends Map<String, Object>> interactions = Arrays.asList(
0201
0202 ImmutableMap.<String, Object>builder()
0203 .put("b0", new IOException())
0204 .put("b1", new RuntimeException())
0205 .put("b2", block2)
0206 .build(),
0207
0208 ImmutableMap.<String, Object>builder()
0209 .put("b0", block0)
0210 .put("b1", new RuntimeException())
0211 .put("b2", new IOException())
0212 .build(),
0213
0214 ImmutableMap.<String, Object>builder()
0215 .put("b2", block2)
0216 .build()
0217 );
0218
0219 performInteractions(interactions, listener);
0220
0221 verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
0222 verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any());
0223 verify(listener, timeout(5000)).onBlockFetchSuccess("b2", block2);
0224 verifyNoMoreInteractions(listener);
0225 }
0226
0227
0228
0229
0230
0231
0232
0233
0234
0235
0236
0237 @SuppressWarnings("unchecked")
0238 private static void performInteractions(List<? extends Map<String, Object>> interactions,
0239 BlockFetchingListener listener)
0240 throws IOException, InterruptedException {
0241
0242 MapConfigProvider provider = new MapConfigProvider(ImmutableMap.of(
0243 "spark.shuffle.io.maxRetries", "2",
0244 "spark.shuffle.io.retryWait", "0"));
0245 TransportConf conf = new TransportConf("shuffle", provider);
0246 BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);
0247
0248 Stubber stub = null;
0249
0250
0251 LinkedHashSet<String> blockIds = Sets.newLinkedHashSet();
0252
0253 for (Map<String, Object> interaction : interactions) {
0254 blockIds.addAll(interaction.keySet());
0255
0256 Answer<Void> answer = invocationOnMock -> {
0257 try {
0258
0259 String[] requestedBlockIds = (String[]) invocationOnMock.getArguments()[0];
0260 String[] desiredBlockIds = interaction.keySet().toArray(new String[interaction.size()]);
0261 assertArrayEquals(desiredBlockIds, requestedBlockIds);
0262
0263
0264 BlockFetchingListener retryListener =
0265 (BlockFetchingListener) invocationOnMock.getArguments()[1];
0266 for (Map.Entry<String, Object> block : interaction.entrySet()) {
0267 String blockId = block.getKey();
0268 Object blockValue = block.getValue();
0269
0270 if (blockValue instanceof ManagedBuffer) {
0271 retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) blockValue);
0272 } else if (blockValue instanceof Exception) {
0273 retryListener.onBlockFetchFailure(blockId, (Exception) blockValue);
0274 } else {
0275 fail("Can only handle ManagedBuffers and Exceptions, got " + blockValue);
0276 }
0277 }
0278 return null;
0279 } catch (Throwable e) {
0280 e.printStackTrace();
0281 throw e;
0282 }
0283 };
0284
0285
0286 if (stub == null) {
0287 stub = doAnswer(answer);
0288 } else {
0289 stub.doAnswer(answer);
0290 }
0291 }
0292
0293 assertNotNull(stub);
0294 stub.when(fetchStarter).createAndStart(any(), any());
0295 String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
0296 new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
0297 }
0298 }