0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.shuffle.sort;
0019
0020 import java.io.File;
0021 import java.io.FileInputStream;
0022 import java.io.IOException;
0023 import java.io.OutputStream;
0024 import java.nio.channels.FileChannel;
0025 import java.util.Optional;
0026 import javax.annotation.Nullable;
0027
0028 import scala.None$;
0029 import scala.Option;
0030 import scala.Product2;
0031 import scala.Tuple2;
0032 import scala.collection.Iterator;
0033
0034 import com.google.common.annotations.VisibleForTesting;
0035 import com.google.common.io.Closeables;
0036 import org.slf4j.Logger;
0037 import org.slf4j.LoggerFactory;
0038
0039 import org.apache.spark.Partitioner;
0040 import org.apache.spark.ShuffleDependency;
0041 import org.apache.spark.SparkConf;
0042 import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
0043 import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
0044 import org.apache.spark.shuffle.api.ShufflePartitionWriter;
0045 import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
0046 import org.apache.spark.internal.config.package$;
0047 import org.apache.spark.scheduler.MapStatus;
0048 import org.apache.spark.scheduler.MapStatus$;
0049 import org.apache.spark.serializer.Serializer;
0050 import org.apache.spark.serializer.SerializerInstance;
0051 import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
0052 import org.apache.spark.shuffle.ShuffleWriter;
0053 import org.apache.spark.storage.*;
0054 import org.apache.spark.util.Utils;
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077 final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
0078
0079 private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);
0080
0081 private final int fileBufferSize;
0082 private final boolean transferToEnabled;
0083 private final int numPartitions;
0084 private final BlockManager blockManager;
0085 private final Partitioner partitioner;
0086 private final ShuffleWriteMetricsReporter writeMetrics;
0087 private final int shuffleId;
0088 private final long mapId;
0089 private final Serializer serializer;
0090 private final ShuffleExecutorComponents shuffleExecutorComponents;
0091
0092
0093 private DiskBlockObjectWriter[] partitionWriters;
0094 private FileSegment[] partitionWriterSegments;
0095 @Nullable private MapStatus mapStatus;
0096 private long[] partitionLengths;
0097
0098
0099
0100
0101
0102
0103 private boolean stopping = false;
0104
0105 BypassMergeSortShuffleWriter(
0106 BlockManager blockManager,
0107 BypassMergeSortShuffleHandle<K, V> handle,
0108 long mapId,
0109 SparkConf conf,
0110 ShuffleWriteMetricsReporter writeMetrics,
0111 ShuffleExecutorComponents shuffleExecutorComponents) {
0112
0113 this.fileBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
0114 this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
0115 this.blockManager = blockManager;
0116 final ShuffleDependency<K, V, V> dep = handle.dependency();
0117 this.mapId = mapId;
0118 this.shuffleId = dep.shuffleId();
0119 this.partitioner = dep.partitioner();
0120 this.numPartitions = partitioner.numPartitions();
0121 this.writeMetrics = writeMetrics;
0122 this.serializer = dep.serializer();
0123 this.shuffleExecutorComponents = shuffleExecutorComponents;
0124 }
0125
0126 @Override
0127 public void write(Iterator<Product2<K, V>> records) throws IOException {
0128 assert (partitionWriters == null);
0129 ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
0130 .createMapOutputWriter(shuffleId, mapId, numPartitions);
0131 try {
0132 if (!records.hasNext()) {
0133 partitionLengths = mapOutputWriter.commitAllPartitions();
0134 mapStatus = MapStatus$.MODULE$.apply(
0135 blockManager.shuffleServerId(), partitionLengths, mapId);
0136 return;
0137 }
0138 final SerializerInstance serInstance = serializer.newInstance();
0139 final long openStartTime = System.nanoTime();
0140 partitionWriters = new DiskBlockObjectWriter[numPartitions];
0141 partitionWriterSegments = new FileSegment[numPartitions];
0142 for (int i = 0; i < numPartitions; i++) {
0143 final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
0144 blockManager.diskBlockManager().createTempShuffleBlock();
0145 final File file = tempShuffleBlockIdPlusFile._2();
0146 final BlockId blockId = tempShuffleBlockIdPlusFile._1();
0147 partitionWriters[i] =
0148 blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
0149 }
0150
0151
0152
0153 writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
0154
0155 while (records.hasNext()) {
0156 final Product2<K, V> record = records.next();
0157 final K key = record._1();
0158 partitionWriters[partitioner.getPartition(key)].write(key, record._2());
0159 }
0160
0161 for (int i = 0; i < numPartitions; i++) {
0162 try (DiskBlockObjectWriter writer = partitionWriters[i]) {
0163 partitionWriterSegments[i] = writer.commitAndGet();
0164 }
0165 }
0166
0167 partitionLengths = writePartitionedData(mapOutputWriter);
0168 mapStatus = MapStatus$.MODULE$.apply(
0169 blockManager.shuffleServerId(), partitionLengths, mapId);
0170 } catch (Exception e) {
0171 try {
0172 mapOutputWriter.abort(e);
0173 } catch (Exception e2) {
0174 logger.error("Failed to abort the writer after failing to write map output.", e2);
0175 e.addSuppressed(e2);
0176 }
0177 throw e;
0178 }
0179 }
0180
0181 @VisibleForTesting
0182 long[] getPartitionLengths() {
0183 return partitionLengths;
0184 }
0185
0186
0187
0188
0189
0190
0191 private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
0192
0193 if (partitionWriters != null) {
0194 final long writeStartTime = System.nanoTime();
0195 try {
0196 for (int i = 0; i < numPartitions; i++) {
0197 final File file = partitionWriterSegments[i].file();
0198 ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
0199 if (file.exists()) {
0200 if (transferToEnabled) {
0201
0202
0203 Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
0204 if (maybeOutputChannel.isPresent()) {
0205 writePartitionedDataWithChannel(file, maybeOutputChannel.get());
0206 } else {
0207 writePartitionedDataWithStream(file, writer);
0208 }
0209 } else {
0210 writePartitionedDataWithStream(file, writer);
0211 }
0212 if (!file.delete()) {
0213 logger.error("Unable to delete file for partition {}", i);
0214 }
0215 }
0216 }
0217 } finally {
0218 writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
0219 }
0220 partitionWriters = null;
0221 }
0222 return mapOutputWriter.commitAllPartitions();
0223 }
0224
0225 private void writePartitionedDataWithChannel(
0226 File file,
0227 WritableByteChannelWrapper outputChannel) throws IOException {
0228 boolean copyThrewException = true;
0229 try {
0230 FileInputStream in = new FileInputStream(file);
0231 try (FileChannel inputChannel = in.getChannel()) {
0232 Utils.copyFileStreamNIO(
0233 inputChannel, outputChannel.channel(), 0L, inputChannel.size());
0234 copyThrewException = false;
0235 } finally {
0236 Closeables.close(in, copyThrewException);
0237 }
0238 } finally {
0239 Closeables.close(outputChannel, copyThrewException);
0240 }
0241 }
0242
0243 private void writePartitionedDataWithStream(File file, ShufflePartitionWriter writer)
0244 throws IOException {
0245 boolean copyThrewException = true;
0246 FileInputStream in = new FileInputStream(file);
0247 OutputStream outputStream;
0248 try {
0249 outputStream = writer.openStream();
0250 try {
0251 Utils.copyStream(in, outputStream, false, false);
0252 copyThrewException = false;
0253 } finally {
0254 Closeables.close(outputStream, copyThrewException);
0255 }
0256 } finally {
0257 Closeables.close(in, copyThrewException);
0258 }
0259 }
0260
0261 @Override
0262 public Option<MapStatus> stop(boolean success) {
0263 if (stopping) {
0264 return None$.empty();
0265 } else {
0266 stopping = true;
0267 if (success) {
0268 if (mapStatus == null) {
0269 throw new IllegalStateException("Cannot call stop(true) without having called write()");
0270 }
0271 return Option.apply(mapStatus);
0272 } else {
0273
0274 if (partitionWriters != null) {
0275 try {
0276 for (DiskBlockObjectWriter writer : partitionWriters) {
0277
0278 File file = writer.revertPartialWritesAndClose();
0279 if (!file.delete()) {
0280 logger.error("Error while deleting file {}", file.getAbsolutePath());
0281 }
0282 }
0283 } finally {
0284 partitionWriters = null;
0285 }
0286 }
0287 return None$.empty();
0288 }
0289 }
0290 }
0291 }