0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle;
0019
0020 import java.io.IOException;
0021 import java.util.Collections;
0022 import java.util.LinkedHashSet;
0023 import java.util.concurrent.ExecutorService;
0024 import java.util.concurrent.Executors;
0025 import java.util.concurrent.TimeUnit;
0026
0027 import com.google.common.collect.Sets;
0028 import com.google.common.util.concurrent.Uninterruptibles;
0029 import org.slf4j.Logger;
0030 import org.slf4j.LoggerFactory;
0031
0032 import org.apache.spark.network.buffer.ManagedBuffer;
0033 import org.apache.spark.network.util.NettyUtils;
0034 import org.apache.spark.network.util.TransportConf;
0035
0036
0037
0038
0039
0040
0041
0042
0043 public class RetryingBlockFetcher {
0044
0045
0046
0047
0048
0049 public interface BlockFetchStarter {
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060 void createAndStart(String[] blockIds, BlockFetchingListener listener)
0061 throws IOException, InterruptedException;
0062 }
0063
0064
0065 private static final ExecutorService executorService = Executors.newCachedThreadPool(
0066 NettyUtils.createThreadFactory("Block Fetch Retry"));
0067
0068 private static final Logger logger = LoggerFactory.getLogger(RetryingBlockFetcher.class);
0069
0070
0071 private final BlockFetchStarter fetchStarter;
0072
0073
0074 private final BlockFetchingListener listener;
0075
0076
0077 private final int maxRetries;
0078
0079
0080 private final int retryWaitTime;
0081
0082
0083
0084
0085
0086 private int retryCount = 0;
0087
0088
0089
0090
0091
0092
0093 private final LinkedHashSet<String> outstandingBlocksIds;
0094
0095
0096
0097
0098
0099
0100 private RetryingBlockFetchListener currentListener;
0101
0102 public RetryingBlockFetcher(
0103 TransportConf conf,
0104 RetryingBlockFetcher.BlockFetchStarter fetchStarter,
0105 String[] blockIds,
0106 BlockFetchingListener listener) {
0107 this.fetchStarter = fetchStarter;
0108 this.listener = listener;
0109 this.maxRetries = conf.maxIORetries();
0110 this.retryWaitTime = conf.ioRetryWaitTimeMs();
0111 this.outstandingBlocksIds = Sets.newLinkedHashSet();
0112 Collections.addAll(outstandingBlocksIds, blockIds);
0113 this.currentListener = new RetryingBlockFetchListener();
0114 }
0115
0116
0117
0118
0119
0120 public void start() {
0121 fetchAllOutstanding();
0122 }
0123
0124
0125
0126
0127
0128 private void fetchAllOutstanding() {
0129
0130 String[] blockIdsToFetch;
0131 int numRetries;
0132 RetryingBlockFetchListener myListener;
0133 synchronized (this) {
0134 blockIdsToFetch = outstandingBlocksIds.toArray(new String[outstandingBlocksIds.size()]);
0135 numRetries = retryCount;
0136 myListener = currentListener;
0137 }
0138
0139
0140 try {
0141 fetchStarter.createAndStart(blockIdsToFetch, myListener);
0142 } catch (Exception e) {
0143 logger.error(String.format("Exception while beginning fetch of %s outstanding blocks %s",
0144 blockIdsToFetch.length, numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
0145
0146 if (shouldRetry(e)) {
0147 initiateRetry();
0148 } else {
0149 for (String bid : blockIdsToFetch) {
0150 listener.onBlockFetchFailure(bid, e);
0151 }
0152 }
0153 }
0154 }
0155
0156
0157
0158
0159
0160 private synchronized void initiateRetry() {
0161 retryCount += 1;
0162 currentListener = new RetryingBlockFetchListener();
0163
0164 logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms",
0165 retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime);
0166
0167 executorService.submit(() -> {
0168 Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
0169 fetchAllOutstanding();
0170 });
0171 }
0172
0173
0174
0175
0176
0177 private synchronized boolean shouldRetry(Throwable e) {
0178 boolean isIOException = e instanceof IOException
0179 || (e.getCause() != null && e.getCause() instanceof IOException);
0180 boolean hasRemainingRetries = retryCount < maxRetries;
0181 return isIOException && hasRemainingRetries;
0182 }
0183
0184
0185
0186
0187
0188
0189 private class RetryingBlockFetchListener implements BlockFetchingListener {
0190 @Override
0191 public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
0192
0193
0194 boolean shouldForwardSuccess = false;
0195 synchronized (RetryingBlockFetcher.this) {
0196 if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
0197 outstandingBlocksIds.remove(blockId);
0198 shouldForwardSuccess = true;
0199 }
0200 }
0201
0202
0203 if (shouldForwardSuccess) {
0204 listener.onBlockFetchSuccess(blockId, data);
0205 }
0206 }
0207
0208 @Override
0209 public void onBlockFetchFailure(String blockId, Throwable exception) {
0210
0211
0212 boolean shouldForwardFailure = false;
0213 synchronized (RetryingBlockFetcher.this) {
0214 if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
0215 if (shouldRetry(exception)) {
0216 initiateRetry();
0217 } else {
0218 logger.error(String.format("Failed to fetch block %s, and will not retry (%s retries)",
0219 blockId, retryCount), exception);
0220 outstandingBlocksIds.remove(blockId);
0221 shouldForwardFailure = true;
0222 }
0223 }
0224 }
0225
0226
0227 if (shouldForwardFailure) {
0228 listener.onBlockFetchFailure(blockId, exception);
0229 }
0230 }
0231 }
0232 }