Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.shuffle.sort;
0019 
0020 import java.io.File;
0021 import java.io.FileInputStream;
0022 import java.io.IOException;
0023 import java.io.OutputStream;
0024 import java.nio.channels.FileChannel;
0025 import java.util.Optional;
0026 import javax.annotation.Nullable;
0027 
0028 import scala.None$;
0029 import scala.Option;
0030 import scala.Product2;
0031 import scala.Tuple2;
0032 import scala.collection.Iterator;
0033 
0034 import com.google.common.annotations.VisibleForTesting;
0035 import com.google.common.io.Closeables;
0036 import org.slf4j.Logger;
0037 import org.slf4j.LoggerFactory;
0038 
0039 import org.apache.spark.Partitioner;
0040 import org.apache.spark.ShuffleDependency;
0041 import org.apache.spark.SparkConf;
0042 import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
0043 import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
0044 import org.apache.spark.shuffle.api.ShufflePartitionWriter;
0045 import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
0046 import org.apache.spark.internal.config.package$;
0047 import org.apache.spark.scheduler.MapStatus;
0048 import org.apache.spark.scheduler.MapStatus$;
0049 import org.apache.spark.serializer.Serializer;
0050 import org.apache.spark.serializer.SerializerInstance;
0051 import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
0052 import org.apache.spark.shuffle.ShuffleWriter;
0053 import org.apache.spark.storage.*;
0054 import org.apache.spark.util.Utils;
0055 
0056 /**
0057  * This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
0058  * writes incoming records to separate files, one file per reduce partition, then concatenates these
0059  * per-partition files to form a single output file, regions of which are served to reducers.
0060  * Records are not buffered in memory. It writes output in a format
0061  * that can be served / consumed via {@link org.apache.spark.shuffle.IndexShuffleBlockResolver}.
0062  * <p>
0063  * This write path is inefficient for shuffles with large numbers of reduce partitions because it
0064  * simultaneously opens separate serializers and file streams for all partitions. As a result,
0065  * {@link SortShuffleManager} only selects this write path when
0066  * <ul>
0067  *    <li>no map-side combine is specified, and</li>
0068  *    <li>the number of partitions is less than or equal to
0069  *      <code>spark.shuffle.sort.bypassMergeThreshold</code>.</li>
0070  * </ul>
0071  *
0072  * This code used to be part of {@link org.apache.spark.util.collection.ExternalSorter} but was
0073  * refactored into its own class in order to reduce code complexity; see SPARK-7855 for details.
0074  * <p>
0075  * There have been proposals to completely remove this code path; see SPARK-6026 for details.
0076  */
0077 final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
0078 
0079   private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);
0080 
0081   private final int fileBufferSize;
0082   private final boolean transferToEnabled;
0083   private final int numPartitions;
0084   private final BlockManager blockManager;
0085   private final Partitioner partitioner;
0086   private final ShuffleWriteMetricsReporter writeMetrics;
0087   private final int shuffleId;
0088   private final long mapId;
0089   private final Serializer serializer;
0090   private final ShuffleExecutorComponents shuffleExecutorComponents;
0091 
0092   /** Array of file writers, one for each partition */
0093   private DiskBlockObjectWriter[] partitionWriters;
0094   private FileSegment[] partitionWriterSegments;
0095   @Nullable private MapStatus mapStatus;
0096   private long[] partitionLengths;
0097 
0098   /**
0099    * Are we in the process of stopping? Because map tasks can call stop() with success = true
0100    * and then call stop() with success = false if they get an exception, we want to make sure
0101    * we don't try deleting files, etc twice.
0102    */
0103   private boolean stopping = false;
0104 
0105   BypassMergeSortShuffleWriter(
0106       BlockManager blockManager,
0107       BypassMergeSortShuffleHandle<K, V> handle,
0108       long mapId,
0109       SparkConf conf,
0110       ShuffleWriteMetricsReporter writeMetrics,
0111       ShuffleExecutorComponents shuffleExecutorComponents) {
0112     // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
0113     this.fileBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
0114     this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
0115     this.blockManager = blockManager;
0116     final ShuffleDependency<K, V, V> dep = handle.dependency();
0117     this.mapId = mapId;
0118     this.shuffleId = dep.shuffleId();
0119     this.partitioner = dep.partitioner();
0120     this.numPartitions = partitioner.numPartitions();
0121     this.writeMetrics = writeMetrics;
0122     this.serializer = dep.serializer();
0123     this.shuffleExecutorComponents = shuffleExecutorComponents;
0124   }
0125 
0126   @Override
0127   public void write(Iterator<Product2<K, V>> records) throws IOException {
0128     assert (partitionWriters == null);
0129     ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
0130         .createMapOutputWriter(shuffleId, mapId, numPartitions);
0131     try {
0132       if (!records.hasNext()) {
0133         partitionLengths = mapOutputWriter.commitAllPartitions();
0134         mapStatus = MapStatus$.MODULE$.apply(
0135           blockManager.shuffleServerId(), partitionLengths, mapId);
0136         return;
0137       }
0138       final SerializerInstance serInstance = serializer.newInstance();
0139       final long openStartTime = System.nanoTime();
0140       partitionWriters = new DiskBlockObjectWriter[numPartitions];
0141       partitionWriterSegments = new FileSegment[numPartitions];
0142       for (int i = 0; i < numPartitions; i++) {
0143         final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
0144             blockManager.diskBlockManager().createTempShuffleBlock();
0145         final File file = tempShuffleBlockIdPlusFile._2();
0146         final BlockId blockId = tempShuffleBlockIdPlusFile._1();
0147         partitionWriters[i] =
0148             blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
0149       }
0150       // Creating the file to write to and creating a disk writer both involve interacting with
0151       // the disk, and can take a long time in aggregate when we open many files, so should be
0152       // included in the shuffle write time.
0153       writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
0154 
0155       while (records.hasNext()) {
0156         final Product2<K, V> record = records.next();
0157         final K key = record._1();
0158         partitionWriters[partitioner.getPartition(key)].write(key, record._2());
0159       }
0160 
0161       for (int i = 0; i < numPartitions; i++) {
0162         try (DiskBlockObjectWriter writer = partitionWriters[i]) {
0163           partitionWriterSegments[i] = writer.commitAndGet();
0164         }
0165       }
0166 
0167       partitionLengths = writePartitionedData(mapOutputWriter);
0168       mapStatus = MapStatus$.MODULE$.apply(
0169         blockManager.shuffleServerId(), partitionLengths, mapId);
0170     } catch (Exception e) {
0171       try {
0172         mapOutputWriter.abort(e);
0173       } catch (Exception e2) {
0174         logger.error("Failed to abort the writer after failing to write map output.", e2);
0175         e.addSuppressed(e2);
0176       }
0177       throw e;
0178     }
0179   }
0180 
0181   @VisibleForTesting
0182   long[] getPartitionLengths() {
0183     return partitionLengths;
0184   }
0185 
0186   /**
0187    * Concatenate all of the per-partition files into a single combined file.
0188    *
0189    * @return array of lengths, in bytes, of each partition of the file (used by map output tracker).
0190    */
0191   private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
0192     // Track location of the partition starts in the output file
0193     if (partitionWriters != null) {
0194       final long writeStartTime = System.nanoTime();
0195       try {
0196         for (int i = 0; i < numPartitions; i++) {
0197           final File file = partitionWriterSegments[i].file();
0198           ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
0199           if (file.exists()) {
0200             if (transferToEnabled) {
0201               // Using WritableByteChannelWrapper to make resource closing consistent between
0202               // this implementation and UnsafeShuffleWriter.
0203               Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
0204               if (maybeOutputChannel.isPresent()) {
0205                 writePartitionedDataWithChannel(file, maybeOutputChannel.get());
0206               } else {
0207                 writePartitionedDataWithStream(file, writer);
0208               }
0209             } else {
0210               writePartitionedDataWithStream(file, writer);
0211             }
0212             if (!file.delete()) {
0213               logger.error("Unable to delete file for partition {}", i);
0214             }
0215           }
0216         }
0217       } finally {
0218         writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
0219       }
0220       partitionWriters = null;
0221     }
0222     return mapOutputWriter.commitAllPartitions();
0223   }
0224 
0225   private void writePartitionedDataWithChannel(
0226       File file,
0227       WritableByteChannelWrapper outputChannel) throws IOException {
0228     boolean copyThrewException = true;
0229     try {
0230       FileInputStream in = new FileInputStream(file);
0231       try (FileChannel inputChannel = in.getChannel()) {
0232         Utils.copyFileStreamNIO(
0233             inputChannel, outputChannel.channel(), 0L, inputChannel.size());
0234         copyThrewException = false;
0235       } finally {
0236         Closeables.close(in, copyThrewException);
0237       }
0238     } finally {
0239       Closeables.close(outputChannel, copyThrewException);
0240     }
0241   }
0242 
0243   private void writePartitionedDataWithStream(File file, ShufflePartitionWriter writer)
0244       throws IOException {
0245     boolean copyThrewException = true;
0246     FileInputStream in = new FileInputStream(file);
0247     OutputStream outputStream;
0248     try {
0249       outputStream = writer.openStream();
0250       try {
0251         Utils.copyStream(in, outputStream, false, false);
0252         copyThrewException = false;
0253       } finally {
0254         Closeables.close(outputStream, copyThrewException);
0255       }
0256     } finally {
0257       Closeables.close(in, copyThrewException);
0258     }
0259   }
0260 
0261   @Override
0262   public Option<MapStatus> stop(boolean success) {
0263     if (stopping) {
0264       return None$.empty();
0265     } else {
0266       stopping = true;
0267       if (success) {
0268         if (mapStatus == null) {
0269           throw new IllegalStateException("Cannot call stop(true) without having called write()");
0270         }
0271         return Option.apply(mapStatus);
0272       } else {
0273         // The map task failed, so delete our output data.
0274         if (partitionWriters != null) {
0275           try {
0276             for (DiskBlockObjectWriter writer : partitionWriters) {
0277               // This method explicitly does _not_ throw exceptions:
0278               File file = writer.revertPartialWritesAndClose();
0279               if (!file.delete()) {
0280                 logger.error("Error while deleting file {}", file.getAbsolutePath());
0281               }
0282             }
0283           } finally {
0284             partitionWriters = null;
0285           }
0286         }
0287         return None$.empty();
0288       }
0289     }
0290   }
0291 }