Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.shuffle.sort;
0019 
0020 import javax.annotation.Nullable;
0021 import java.io.File;
0022 import java.io.IOException;
0023 import java.util.LinkedList;
0024 
0025 import scala.Tuple2;
0026 
0027 import com.google.common.annotations.VisibleForTesting;
0028 import org.slf4j.Logger;
0029 import org.slf4j.LoggerFactory;
0030 
0031 import org.apache.spark.SparkConf;
0032 import org.apache.spark.TaskContext;
0033 import org.apache.spark.executor.ShuffleWriteMetrics;
0034 import org.apache.spark.internal.config.package$;
0035 import org.apache.spark.memory.MemoryConsumer;
0036 import org.apache.spark.memory.SparkOutOfMemoryError;
0037 import org.apache.spark.memory.TaskMemoryManager;
0038 import org.apache.spark.memory.TooLargePageException;
0039 import org.apache.spark.serializer.DummySerializerInstance;
0040 import org.apache.spark.serializer.SerializerInstance;
0041 import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
0042 import org.apache.spark.storage.BlockManager;
0043 import org.apache.spark.storage.DiskBlockObjectWriter;
0044 import org.apache.spark.storage.FileSegment;
0045 import org.apache.spark.storage.TempShuffleBlockId;
0046 import org.apache.spark.unsafe.Platform;
0047 import org.apache.spark.unsafe.UnsafeAlignedOffset;
0048 import org.apache.spark.unsafe.array.LongArray;
0049 import org.apache.spark.unsafe.memory.MemoryBlock;
0050 import org.apache.spark.util.Utils;
0051 
0052 /**
0053  * An external sorter that is specialized for sort-based shuffle.
0054  * <p>
0055  * Incoming records are appended to data pages. When all records have been inserted (or when the
0056  * current thread's shuffle memory limit is reached), the in-memory records are sorted according to
0057  * their partition ids (using a {@link ShuffleInMemorySorter}). The sorted records are then
0058  * written to a single output file (or multiple files, if we've spilled). The format of the output
0059  * files is the same as the format of the final output file written by
0060  * {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output partition's records are
0061  * written as a single serialized, compressed stream that can be read with a new decompression and
0062  * deserialization stream.
0063  * <p>
0064  * Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its
0065  * spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a
0066  * specialized merge procedure that avoids extra serialization/deserialization.
0067  */
0068 final class ShuffleExternalSorter extends MemoryConsumer {
0069 
0070   private static final Logger logger = LoggerFactory.getLogger(ShuffleExternalSorter.class);
0071 
0072   @VisibleForTesting
0073   static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
0074 
0075   private final int numPartitions;
0076   private final TaskMemoryManager taskMemoryManager;
0077   private final BlockManager blockManager;
0078   private final TaskContext taskContext;
0079   private final ShuffleWriteMetricsReporter writeMetrics;
0080 
0081   /**
0082    * Force this sorter to spill when there are this many elements in memory.
0083    */
0084   private final int numElementsForSpillThreshold;
0085 
0086   /** The buffer size to use when writing spills using DiskBlockObjectWriter */
0087   private final int fileBufferSizeBytes;
0088 
0089   /** The buffer size to use when writing the sorted records to an on-disk file */
0090   private final int diskWriteBufferSize;
0091 
0092   /**
0093    * Memory pages that hold the records being sorted. The pages in this list are freed when
0094    * spilling, although in principle we could recycle these pages across spills (on the other hand,
0095    * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
0096    * itself).
0097    */
0098   private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<>();
0099 
0100   private final LinkedList<SpillInfo> spills = new LinkedList<>();
0101 
0102   /** Peak memory used by this sorter so far, in bytes. **/
0103   private long peakMemoryUsedBytes;
0104 
0105   // These variables are reset after spilling:
0106   @Nullable private ShuffleInMemorySorter inMemSorter;
0107   @Nullable private MemoryBlock currentPage = null;
0108   private long pageCursor = -1;
0109 
0110   ShuffleExternalSorter(
0111       TaskMemoryManager memoryManager,
0112       BlockManager blockManager,
0113       TaskContext taskContext,
0114       int initialSize,
0115       int numPartitions,
0116       SparkConf conf,
0117       ShuffleWriteMetricsReporter writeMetrics) {
0118     super(memoryManager,
0119       (int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes()),
0120       memoryManager.getTungstenMemoryMode());
0121     this.taskMemoryManager = memoryManager;
0122     this.blockManager = blockManager;
0123     this.taskContext = taskContext;
0124     this.numPartitions = numPartitions;
0125     // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
0126     this.fileBufferSizeBytes =
0127         (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
0128     this.numElementsForSpillThreshold =
0129         (int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
0130     this.writeMetrics = writeMetrics;
0131     this.inMemSorter = new ShuffleInMemorySorter(
0132       this, initialSize, (boolean) conf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()));
0133     this.peakMemoryUsedBytes = getMemoryUsage();
0134     this.diskWriteBufferSize =
0135         (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
0136   }
0137 
0138   /**
0139    * Sorts the in-memory records and writes the sorted records to an on-disk file.
0140    * This method does not free the sort data structures.
0141    *
0142    * @param isLastFile if true, this indicates that we're writing the final output file and that the
0143    *                   bytes written should be counted towards shuffle spill metrics rather than
0144    *                   shuffle write metrics.
0145    */
0146   private void writeSortedFile(boolean isLastFile) {
0147 
0148     // This call performs the actual sort.
0149     final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
0150       inMemSorter.getSortedIterator();
0151 
0152     // If there are no sorted records, so we don't need to create an empty spill file.
0153     if (!sortedRecords.hasNext()) {
0154       return;
0155     }
0156 
0157     final ShuffleWriteMetricsReporter writeMetricsToUse;
0158 
0159     if (isLastFile) {
0160       // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
0161       writeMetricsToUse = writeMetrics;
0162     } else {
0163       // We're spilling, so bytes written should be counted towards spill rather than write.
0164       // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
0165       // them towards shuffle bytes written.
0166       writeMetricsToUse = new ShuffleWriteMetrics();
0167     }
0168 
0169     // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
0170     // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
0171     // data through a byte array. This array does not need to be large enough to hold a single
0172     // record;
0173     final byte[] writeBuffer = new byte[diskWriteBufferSize];
0174 
0175     // Because this output will be read during shuffle, its compression codec must be controlled by
0176     // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
0177     // createTempShuffleBlock here; see SPARK-3426 for more details.
0178     final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
0179       blockManager.diskBlockManager().createTempShuffleBlock();
0180     final File file = spilledFileInfo._2();
0181     final TempShuffleBlockId blockId = spilledFileInfo._1();
0182     final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);
0183 
0184     // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
0185     // Our write path doesn't actually use this serializer (since we end up calling the `write()`
0186     // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
0187     // around this, we pass a dummy no-op serializer.
0188     final SerializerInstance ser = DummySerializerInstance.INSTANCE;
0189 
0190     int currentPartition = -1;
0191     final FileSegment committedSegment;
0192     try (DiskBlockObjectWriter writer =
0193         blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) {
0194 
0195       final int uaoSize = UnsafeAlignedOffset.getUaoSize();
0196       while (sortedRecords.hasNext()) {
0197         sortedRecords.loadNext();
0198         final int partition = sortedRecords.packedRecordPointer.getPartitionId();
0199         assert (partition >= currentPartition);
0200         if (partition != currentPartition) {
0201           // Switch to the new partition
0202           if (currentPartition != -1) {
0203             final FileSegment fileSegment = writer.commitAndGet();
0204             spillInfo.partitionLengths[currentPartition] = fileSegment.length();
0205           }
0206           currentPartition = partition;
0207         }
0208 
0209         final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
0210         final Object recordPage = taskMemoryManager.getPage(recordPointer);
0211         final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
0212         int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
0213         long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length
0214         while (dataRemaining > 0) {
0215           final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
0216           Platform.copyMemory(
0217             recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
0218           writer.write(writeBuffer, 0, toTransfer);
0219           recordReadPosition += toTransfer;
0220           dataRemaining -= toTransfer;
0221         }
0222         writer.recordWritten();
0223       }
0224 
0225       committedSegment = writer.commitAndGet();
0226     }
0227     // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
0228     // then the file might be empty. Note that it might be better to avoid calling
0229     // writeSortedFile() in that case.
0230     if (currentPartition != -1) {
0231       spillInfo.partitionLengths[currentPartition] = committedSegment.length();
0232       spills.add(spillInfo);
0233     }
0234 
0235     if (!isLastFile) {  // i.e. this is a spill file
0236       // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
0237       // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
0238       // relies on its `recordWritten()` method being called in order to trigger periodic updates to
0239       // `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that
0240       // counter at a higher-level, then the in-progress metrics for records written and bytes
0241       // written would get out of sync.
0242       //
0243       // When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter;
0244       // in all other cases, we pass in a dummy write metrics to capture metrics, then copy those
0245       // metrics to the true write metrics here. The reason for performing this copying is so that
0246       // we can avoid reporting spilled bytes as shuffle write bytes.
0247       //
0248       // Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
0249       // Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
0250       // SPARK-3577 tracks the spill time separately.
0251 
0252       // This is guaranteed to be a ShuffleWriteMetrics based on the if check in the beginning
0253       // of this method.
0254       writeMetrics.incRecordsWritten(
0255         ((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten());
0256       taskContext.taskMetrics().incDiskBytesSpilled(
0257         ((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten());
0258     }
0259   }
0260 
0261   /**
0262    * Sort and spill the current records in response to memory pressure.
0263    */
0264   @Override
0265   public long spill(long size, MemoryConsumer trigger) throws IOException {
0266     if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) {
0267       return 0L;
0268     }
0269 
0270     logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
0271       Thread.currentThread().getId(),
0272       Utils.bytesToString(getMemoryUsage()),
0273       spills.size(),
0274       spills.size() > 1 ? " times" : " time");
0275 
0276     writeSortedFile(false);
0277     final long spillSize = freeMemory();
0278     inMemSorter.reset();
0279     // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
0280     // records. Otherwise, if the task is over allocated memory, then without freeing the memory
0281     // pages, we might not be able to get memory for the pointer array.
0282     taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
0283     return spillSize;
0284   }
0285 
0286   private long getMemoryUsage() {
0287     long totalPageSize = 0;
0288     for (MemoryBlock page : allocatedPages) {
0289       totalPageSize += page.size();
0290     }
0291     return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageSize;
0292   }
0293 
0294   private void updatePeakMemoryUsed() {
0295     long mem = getMemoryUsage();
0296     if (mem > peakMemoryUsedBytes) {
0297       peakMemoryUsedBytes = mem;
0298     }
0299   }
0300 
0301   /**
0302    * Return the peak memory used so far, in bytes.
0303    */
0304   long getPeakMemoryUsedBytes() {
0305     updatePeakMemoryUsed();
0306     return peakMemoryUsedBytes;
0307   }
0308 
0309   private long freeMemory() {
0310     updatePeakMemoryUsed();
0311     long memoryFreed = 0;
0312     for (MemoryBlock block : allocatedPages) {
0313       memoryFreed += block.size();
0314       freePage(block);
0315     }
0316     allocatedPages.clear();
0317     currentPage = null;
0318     pageCursor = 0;
0319     return memoryFreed;
0320   }
0321 
0322   /**
0323    * Force all memory and spill files to be deleted; called by shuffle error-handling code.
0324    */
0325   public void cleanupResources() {
0326     freeMemory();
0327     if (inMemSorter != null) {
0328       inMemSorter.free();
0329       inMemSorter = null;
0330     }
0331     for (SpillInfo spill : spills) {
0332       if (spill.file.exists() && !spill.file.delete()) {
0333         logger.error("Unable to delete spill file {}", spill.file.getPath());
0334       }
0335     }
0336   }
0337 
0338   /**
0339    * Checks whether there is enough space to insert an additional record in to the sort pointer
0340    * array and grows the array if additional space is required. If the required space cannot be
0341    * obtained, then the in-memory data will be spilled to disk.
0342    */
0343   private void growPointerArrayIfNecessary() throws IOException {
0344     assert(inMemSorter != null);
0345     if (!inMemSorter.hasSpaceForAnotherRecord()) {
0346       long used = inMemSorter.getMemoryUsage();
0347       LongArray array;
0348       try {
0349         // could trigger spilling
0350         array = allocateArray(used / 8 * 2);
0351       } catch (TooLargePageException e) {
0352         // The pointer array is too big to fix in a single page, spill.
0353         spill();
0354         return;
0355       } catch (SparkOutOfMemoryError e) {
0356         // should have trigger spilling
0357         if (!inMemSorter.hasSpaceForAnotherRecord()) {
0358           logger.error("Unable to grow the pointer array");
0359           throw e;
0360         }
0361         return;
0362       }
0363       // check if spilling is triggered or not
0364       if (inMemSorter.hasSpaceForAnotherRecord()) {
0365         freeArray(array);
0366       } else {
0367         inMemSorter.expandPointerArray(array);
0368       }
0369     }
0370   }
0371 
0372   /**
0373    * Allocates more memory in order to insert an additional record. This will request additional
0374    * memory from the memory manager and spill if the requested memory can not be obtained.
0375    *
0376    * @param required the required space in the data page, in bytes, including space for storing
0377    *                      the record size. This must be less than or equal to the page size (records
0378    *                      that exceed the page size are handled via a different code path which uses
0379    *                      special overflow pages).
0380    */
0381   private void acquireNewPageIfNecessary(int required) {
0382     if (currentPage == null ||
0383       pageCursor + required > currentPage.getBaseOffset() + currentPage.size() ) {
0384       // TODO: try to find space in previous pages
0385       currentPage = allocatePage(required);
0386       pageCursor = currentPage.getBaseOffset();
0387       allocatedPages.add(currentPage);
0388     }
0389   }
0390 
0391   /**
0392    * Write a record to the shuffle sorter.
0393    */
0394   public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
0395     throws IOException {
0396 
0397     // for tests
0398     assert(inMemSorter != null);
0399     if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
0400       logger.info("Spilling data because number of spilledRecords crossed the threshold " +
0401         numElementsForSpillThreshold);
0402       spill();
0403     }
0404 
0405     growPointerArrayIfNecessary();
0406     final int uaoSize = UnsafeAlignedOffset.getUaoSize();
0407     // Need 4 or 8 bytes to store the record length.
0408     final int required = length + uaoSize;
0409     acquireNewPageIfNecessary(required);
0410 
0411     assert(currentPage != null);
0412     final Object base = currentPage.getBaseObject();
0413     final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
0414     UnsafeAlignedOffset.putSize(base, pageCursor, length);
0415     pageCursor += uaoSize;
0416     Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
0417     pageCursor += length;
0418     inMemSorter.insertRecord(recordAddress, partitionId);
0419   }
0420 
0421   /**
0422    * Close the sorter, causing any buffered data to be sorted and written out to disk.
0423    *
0424    * @return metadata for the spill files written by this sorter. If no records were ever inserted
0425    *         into this sorter, then this will return an empty array.
0426    */
0427   public SpillInfo[] closeAndGetSpills() throws IOException {
0428     if (inMemSorter != null) {
0429       // Do not count the final file towards the spill count.
0430       writeSortedFile(true);
0431       freeMemory();
0432       inMemSorter.free();
0433       inMemSorter = null;
0434     }
0435     return spills.toArray(new SpillInfo[spills.size()]);
0436   }
0437 
0438 }