Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle;
0019 
0020 import java.io.IOException;
0021 import java.util.Collections;
0022 import java.util.LinkedHashSet;
0023 import java.util.concurrent.ExecutorService;
0024 import java.util.concurrent.Executors;
0025 import java.util.concurrent.TimeUnit;
0026 
0027 import com.google.common.collect.Sets;
0028 import com.google.common.util.concurrent.Uninterruptibles;
0029 import org.slf4j.Logger;
0030 import org.slf4j.LoggerFactory;
0031 
0032 import org.apache.spark.network.buffer.ManagedBuffer;
0033 import org.apache.spark.network.util.NettyUtils;
0034 import org.apache.spark.network.util.TransportConf;
0035 
0036 /**
0037  * Wraps another BlockFetcher with the ability to automatically retry fetches which fail due to
0038  * IOExceptions, which we hope are due to transient network conditions.
0039  *
0040  * This fetcher provides stronger guarantees regarding the parent BlockFetchingListener. In
0041  * particular, the listener will be invoked exactly once per blockId, with a success or failure.
0042  */
0043 public class RetryingBlockFetcher {
0044 
0045   /**
0046    * Used to initiate the first fetch for all blocks, and subsequently for retrying the fetch on any
0047    * remaining blocks.
0048    */
0049   public interface BlockFetchStarter {
0050     /**
0051      * Creates a new BlockFetcher to fetch the given block ids which may do some synchronous
0052      * bootstrapping followed by fully asynchronous block fetching.
0053      * The BlockFetcher must eventually invoke the Listener on every input blockId, or else this
0054      * method must throw an exception.
0055      *
0056      * This method should always attempt to get a new TransportClient from the
0057      * {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection
0058      * issues.
0059      */
0060     void createAndStart(String[] blockIds, BlockFetchingListener listener)
0061          throws IOException, InterruptedException;
0062   }
0063 
0064   /** Shared executor service used for waiting and retrying. */
0065   private static final ExecutorService executorService = Executors.newCachedThreadPool(
0066     NettyUtils.createThreadFactory("Block Fetch Retry"));
0067 
0068   private static final Logger logger = LoggerFactory.getLogger(RetryingBlockFetcher.class);
0069 
0070   /** Used to initiate new Block Fetches on our remaining blocks. */
0071   private final BlockFetchStarter fetchStarter;
0072 
0073   /** Parent listener which we delegate all successful or permanently failed block fetches to. */
0074   private final BlockFetchingListener listener;
0075 
0076   /** Max number of times we are allowed to retry. */
0077   private final int maxRetries;
0078 
0079   /** Milliseconds to wait before each retry. */
0080   private final int retryWaitTime;
0081 
0082   // NOTE:
0083   // All of our non-final fields are synchronized under 'this' and should only be accessed/mutated
0084   // while inside a synchronized block.
0085   /** Number of times we've attempted to retry so far. */
0086   private int retryCount = 0;
0087 
0088   /**
0089    * Set of all block ids which have not been fetched successfully or with a non-IO Exception.
0090    * A retry involves requesting every outstanding block. Note that since this is a LinkedHashSet,
0091    * input ordering is preserved, so we always request blocks in the same order the user provided.
0092    */
0093   private final LinkedHashSet<String> outstandingBlocksIds;
0094 
0095   /**
0096    * The BlockFetchingListener that is active with our current BlockFetcher.
0097    * When we start a retry, we immediately replace this with a new Listener, which causes all any
0098    * old Listeners to ignore all further responses.
0099    */
0100   private RetryingBlockFetchListener currentListener;
0101 
0102   public RetryingBlockFetcher(
0103       TransportConf conf,
0104       RetryingBlockFetcher.BlockFetchStarter fetchStarter,
0105       String[] blockIds,
0106       BlockFetchingListener listener) {
0107     this.fetchStarter = fetchStarter;
0108     this.listener = listener;
0109     this.maxRetries = conf.maxIORetries();
0110     this.retryWaitTime = conf.ioRetryWaitTimeMs();
0111     this.outstandingBlocksIds = Sets.newLinkedHashSet();
0112     Collections.addAll(outstandingBlocksIds, blockIds);
0113     this.currentListener = new RetryingBlockFetchListener();
0114   }
0115 
0116   /**
0117    * Initiates the fetch of all blocks provided in the constructor, with possible retries in the
0118    * event of transient IOExceptions.
0119    */
0120   public void start() {
0121     fetchAllOutstanding();
0122   }
0123 
0124   /**
0125    * Fires off a request to fetch all blocks that have not been fetched successfully or permanently
0126    * failed (i.e., by a non-IOException).
0127    */
0128   private void fetchAllOutstanding() {
0129     // Start by retrieving our shared state within a synchronized block.
0130     String[] blockIdsToFetch;
0131     int numRetries;
0132     RetryingBlockFetchListener myListener;
0133     synchronized (this) {
0134       blockIdsToFetch = outstandingBlocksIds.toArray(new String[outstandingBlocksIds.size()]);
0135       numRetries = retryCount;
0136       myListener = currentListener;
0137     }
0138 
0139     // Now initiate the fetch on all outstanding blocks, possibly initiating a retry if that fails.
0140     try {
0141       fetchStarter.createAndStart(blockIdsToFetch, myListener);
0142     } catch (Exception e) {
0143       logger.error(String.format("Exception while beginning fetch of %s outstanding blocks %s",
0144         blockIdsToFetch.length, numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
0145 
0146       if (shouldRetry(e)) {
0147         initiateRetry();
0148       } else {
0149         for (String bid : blockIdsToFetch) {
0150           listener.onBlockFetchFailure(bid, e);
0151         }
0152       }
0153     }
0154   }
0155 
0156   /**
0157    * Lightweight method which initiates a retry in a different thread. The retry will involve
0158    * calling fetchAllOutstanding() after a configured wait time.
0159    */
0160   private synchronized void initiateRetry() {
0161     retryCount += 1;
0162     currentListener = new RetryingBlockFetchListener();
0163 
0164     logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms",
0165       retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime);
0166 
0167     executorService.submit(() -> {
0168       Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
0169       fetchAllOutstanding();
0170     });
0171   }
0172 
0173   /**
0174    * Returns true if we should retry due a block fetch failure. We will retry if and only if
0175    * the exception was an IOException and we haven't retried 'maxRetries' times already.
0176    */
0177   private synchronized boolean shouldRetry(Throwable e) {
0178     boolean isIOException = e instanceof IOException
0179       || (e.getCause() != null && e.getCause() instanceof IOException);
0180     boolean hasRemainingRetries = retryCount < maxRetries;
0181     return isIOException && hasRemainingRetries;
0182   }
0183 
0184   /**
0185    * Our RetryListener intercepts block fetch responses and forwards them to our parent listener.
0186    * Note that in the event of a retry, we will immediately replace the 'currentListener' field,
0187    * indicating that any responses from non-current Listeners should be ignored.
0188    */
0189   private class RetryingBlockFetchListener implements BlockFetchingListener {
0190     @Override
0191     public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
0192       // We will only forward this success message to our parent listener if this block request is
0193       // outstanding and we are still the active listener.
0194       boolean shouldForwardSuccess = false;
0195       synchronized (RetryingBlockFetcher.this) {
0196         if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
0197           outstandingBlocksIds.remove(blockId);
0198           shouldForwardSuccess = true;
0199         }
0200       }
0201 
0202       // Now actually invoke the parent listener, outside of the synchronized block.
0203       if (shouldForwardSuccess) {
0204         listener.onBlockFetchSuccess(blockId, data);
0205       }
0206     }
0207 
0208     @Override
0209     public void onBlockFetchFailure(String blockId, Throwable exception) {
0210       // We will only forward this failure to our parent listener if this block request is
0211       // outstanding, we are still the active listener, AND we cannot retry the fetch.
0212       boolean shouldForwardFailure = false;
0213       synchronized (RetryingBlockFetcher.this) {
0214         if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
0215           if (shouldRetry(exception)) {
0216             initiateRetry();
0217           } else {
0218             logger.error(String.format("Failed to fetch block %s, and will not retry (%s retries)",
0219               blockId, retryCount), exception);
0220             outstandingBlocksIds.remove(blockId);
0221             shouldForwardFailure = true;
0222           }
0223         }
0224       }
0225 
0226       // Now actually invoke the parent listener, outside of the synchronized block.
0227       if (shouldForwardFailure) {
0228         listener.onBlockFetchFailure(blockId, exception);
0229       }
0230     }
0231   }
0232 }