Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle;
0019 
0020 
0021 import java.io.IOException;
0022 import java.nio.ByteBuffer;
0023 import java.util.Arrays;
0024 import java.util.LinkedHashSet;
0025 import java.util.List;
0026 import java.util.Map;
0027 
0028 import com.google.common.collect.ImmutableMap;
0029 import com.google.common.collect.Sets;
0030 import org.junit.Test;
0031 import org.mockito.stubbing.Answer;
0032 import org.mockito.stubbing.Stubber;
0033 
0034 import static org.junit.Assert.*;
0035 import static org.mockito.Mockito.*;
0036 
0037 import org.apache.spark.network.buffer.ManagedBuffer;
0038 import org.apache.spark.network.buffer.NioManagedBuffer;
0039 import org.apache.spark.network.util.MapConfigProvider;
0040 import org.apache.spark.network.util.TransportConf;
0041 import static org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchStarter;
0042 
0043 /**
0044  * Tests retry logic by throwing IOExceptions and ensuring that subsequent attempts are made to
0045  * fetch the lost blocks.
0046  */
0047 public class RetryingBlockFetcherSuite {
0048 
0049   private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
0050   private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
0051   private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
0052 
0053   @Test
0054   public void testNoFailures() throws IOException, InterruptedException {
0055     BlockFetchingListener listener = mock(BlockFetchingListener.class);
0056 
0057     List<? extends Map<String, Object>> interactions = Arrays.asList(
0058       // Immediately return both blocks successfully.
0059       ImmutableMap.<String, Object>builder()
0060         .put("b0", block0)
0061         .put("b1", block1)
0062         .build()
0063       );
0064 
0065     performInteractions(interactions, listener);
0066 
0067     verify(listener).onBlockFetchSuccess("b0", block0);
0068     verify(listener).onBlockFetchSuccess("b1", block1);
0069     verifyNoMoreInteractions(listener);
0070   }
0071 
0072   @Test
0073   public void testUnrecoverableFailure() throws IOException, InterruptedException {
0074     BlockFetchingListener listener = mock(BlockFetchingListener.class);
0075 
0076     List<? extends Map<String, Object>> interactions = Arrays.asList(
0077       // b0 throws a non-IOException error, so it will be failed without retry.
0078       ImmutableMap.<String, Object>builder()
0079         .put("b0", new RuntimeException("Ouch!"))
0080         .put("b1", block1)
0081         .build()
0082     );
0083 
0084     performInteractions(interactions, listener);
0085 
0086     verify(listener).onBlockFetchFailure(eq("b0"), any());
0087     verify(listener).onBlockFetchSuccess("b1", block1);
0088     verifyNoMoreInteractions(listener);
0089   }
0090 
0091   @Test
0092   public void testSingleIOExceptionOnFirst() throws IOException, InterruptedException {
0093     BlockFetchingListener listener = mock(BlockFetchingListener.class);
0094 
0095     List<? extends Map<String, Object>> interactions = Arrays.asList(
0096       // IOException will cause a retry. Since b0 fails, we will retry both.
0097       ImmutableMap.<String, Object>builder()
0098         .put("b0", new IOException("Connection failed or something"))
0099         .put("b1", block1)
0100         .build(),
0101       ImmutableMap.<String, Object>builder()
0102         .put("b0", block0)
0103         .put("b1", block1)
0104         .build()
0105     );
0106 
0107     performInteractions(interactions, listener);
0108 
0109     verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
0110     verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
0111     verifyNoMoreInteractions(listener);
0112   }
0113 
0114   @Test
0115   public void testSingleIOExceptionOnSecond() throws IOException, InterruptedException {
0116     BlockFetchingListener listener = mock(BlockFetchingListener.class);
0117 
0118     List<? extends Map<String, Object>> interactions = Arrays.asList(
0119       // IOException will cause a retry. Since b1 fails, we will not retry b0.
0120       ImmutableMap.<String, Object>builder()
0121         .put("b0", block0)
0122         .put("b1", new IOException("Connection failed or something"))
0123         .build(),
0124       ImmutableMap.<String, Object>builder()
0125         .put("b1", block1)
0126         .build()
0127     );
0128 
0129     performInteractions(interactions, listener);
0130 
0131     verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
0132     verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
0133     verifyNoMoreInteractions(listener);
0134   }
0135 
0136   @Test
0137   public void testTwoIOExceptions() throws IOException, InterruptedException {
0138     BlockFetchingListener listener = mock(BlockFetchingListener.class);
0139 
0140     List<? extends Map<String, Object>> interactions = Arrays.asList(
0141       // b0's IOException will trigger retry, b1's will be ignored.
0142       ImmutableMap.<String, Object>builder()
0143         .put("b0", new IOException())
0144         .put("b1", new IOException())
0145         .build(),
0146       // Next, b0 is successful and b1 errors again, so we just request that one.
0147       ImmutableMap.<String, Object>builder()
0148         .put("b0", block0)
0149         .put("b1", new IOException())
0150         .build(),
0151       // b1 returns successfully within 2 retries.
0152       ImmutableMap.<String, Object>builder()
0153         .put("b1", block1)
0154         .build()
0155     );
0156 
0157     performInteractions(interactions, listener);
0158 
0159     verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
0160     verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
0161     verifyNoMoreInteractions(listener);
0162   }
0163 
0164   @Test
0165   public void testThreeIOExceptions() throws IOException, InterruptedException {
0166     BlockFetchingListener listener = mock(BlockFetchingListener.class);
0167 
0168     List<? extends Map<String, Object>> interactions = Arrays.asList(
0169       // b0's IOException will trigger retry, b1's will be ignored.
0170       ImmutableMap.<String, Object>builder()
0171         .put("b0", new IOException())
0172         .put("b1", new IOException())
0173         .build(),
0174       // Next, b0 is successful and b1 errors again, so we just request that one.
0175       ImmutableMap.<String, Object>builder()
0176         .put("b0", block0)
0177         .put("b1", new IOException())
0178         .build(),
0179       // b1 errors again, but this was the last retry
0180       ImmutableMap.<String, Object>builder()
0181         .put("b1", new IOException())
0182         .build(),
0183       // This is not reached -- b1 has failed.
0184       ImmutableMap.<String, Object>builder()
0185         .put("b1", block1)
0186         .build()
0187     );
0188 
0189     performInteractions(interactions, listener);
0190 
0191     verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
0192     verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any());
0193     verifyNoMoreInteractions(listener);
0194   }
0195 
0196   @Test
0197   public void testRetryAndUnrecoverable() throws IOException, InterruptedException {
0198     BlockFetchingListener listener = mock(BlockFetchingListener.class);
0199 
0200     List<? extends Map<String, Object>> interactions = Arrays.asList(
0201       // b0's IOException will trigger retry, subsequent messages will be ignored.
0202       ImmutableMap.<String, Object>builder()
0203         .put("b0", new IOException())
0204         .put("b1", new RuntimeException())
0205         .put("b2", block2)
0206         .build(),
0207       // Next, b0 is successful, b1 errors unrecoverably, and b2 triggers a retry.
0208       ImmutableMap.<String, Object>builder()
0209         .put("b0", block0)
0210         .put("b1", new RuntimeException())
0211         .put("b2", new IOException())
0212         .build(),
0213       // b2 succeeds in its last retry.
0214       ImmutableMap.<String, Object>builder()
0215         .put("b2", block2)
0216         .build()
0217     );
0218 
0219     performInteractions(interactions, listener);
0220 
0221     verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
0222     verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any());
0223     verify(listener, timeout(5000)).onBlockFetchSuccess("b2", block2);
0224     verifyNoMoreInteractions(listener);
0225   }
0226 
0227   /**
0228    * Performs a set of interactions in response to block requests from a RetryingBlockFetcher.
0229    * Each interaction is a Map from BlockId to either ManagedBuffer or Exception. This interaction
0230    * means "respond to the next block fetch request with these Successful buffers and these Failure
0231    * exceptions". We verify that the expected block ids are exactly the ones requested.
0232    *
0233    * If multiple interactions are supplied, they will be used in order. This is useful for encoding
0234    * retries -- the first interaction may include an IOException, which causes a retry of some
0235    * subset of the original blocks in a second interaction.
0236    */
0237   @SuppressWarnings("unchecked")
0238   private static void performInteractions(List<? extends Map<String, Object>> interactions,
0239                                           BlockFetchingListener listener)
0240     throws IOException, InterruptedException {
0241 
0242     MapConfigProvider provider = new MapConfigProvider(ImmutableMap.of(
0243       "spark.shuffle.io.maxRetries", "2",
0244       "spark.shuffle.io.retryWait", "0"));
0245     TransportConf conf = new TransportConf("shuffle", provider);
0246     BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);
0247 
0248     Stubber stub = null;
0249 
0250     // Contains all blockIds that are referenced across all interactions.
0251     LinkedHashSet<String> blockIds = Sets.newLinkedHashSet();
0252 
0253     for (Map<String, Object> interaction : interactions) {
0254       blockIds.addAll(interaction.keySet());
0255 
0256       Answer<Void> answer = invocationOnMock -> {
0257         try {
0258           // Verify that the RetryingBlockFetcher requested the expected blocks.
0259           String[] requestedBlockIds = (String[]) invocationOnMock.getArguments()[0];
0260           String[] desiredBlockIds = interaction.keySet().toArray(new String[interaction.size()]);
0261           assertArrayEquals(desiredBlockIds, requestedBlockIds);
0262 
0263           // Now actually invoke the success/failure callbacks on each block.
0264           BlockFetchingListener retryListener =
0265             (BlockFetchingListener) invocationOnMock.getArguments()[1];
0266           for (Map.Entry<String, Object> block : interaction.entrySet()) {
0267             String blockId = block.getKey();
0268             Object blockValue = block.getValue();
0269 
0270             if (blockValue instanceof ManagedBuffer) {
0271               retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) blockValue);
0272             } else if (blockValue instanceof Exception) {
0273               retryListener.onBlockFetchFailure(blockId, (Exception) blockValue);
0274             } else {
0275               fail("Can only handle ManagedBuffers and Exceptions, got " + blockValue);
0276             }
0277           }
0278           return null;
0279         } catch (Throwable e) {
0280           e.printStackTrace();
0281           throw e;
0282         }
0283       };
0284 
0285       // This is either the first stub, or should be chained behind the prior ones.
0286       if (stub == null) {
0287         stub = doAnswer(answer);
0288       } else {
0289         stub.doAnswer(answer);
0290       }
0291     }
0292 
0293     assertNotNull(stub);
0294     stub.when(fetchStarter).createAndStart(any(), any());
0295     String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
0296     new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
0297   }
0298 }