Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.util.collection.unsafe.sort;
0019 
0020 import java.io.File;
0021 import java.io.IOException;
0022 
0023 import scala.Tuple2;
0024 
0025 import org.apache.spark.SparkConf;
0026 import org.apache.spark.serializer.SerializerManager;
0027 import org.apache.spark.executor.ShuffleWriteMetrics;
0028 import org.apache.spark.serializer.DummySerializerInstance;
0029 import org.apache.spark.storage.BlockId;
0030 import org.apache.spark.storage.BlockManager;
0031 import org.apache.spark.storage.DiskBlockObjectWriter;
0032 import org.apache.spark.storage.TempLocalBlockId;
0033 import org.apache.spark.unsafe.Platform;
0034 import org.apache.spark.internal.config.package$;
0035 
0036 /**
0037  * Spills a list of sorted records to disk. Spill files have the following format:
0038  *
0039  *   [# of records (int)] [[len (int)][prefix (long)][data (bytes)]...]
0040  */
0041 public final class UnsafeSorterSpillWriter {
0042 
0043   private final SparkConf conf = new SparkConf();
0044 
0045   /**
0046    * The buffer size to use when writing the sorted records to an on-disk file, and
0047    * this space used by prefix + len + recordLength must be greater than 4 + 8 bytes.
0048    */
0049   private final int diskWriteBufferSize =
0050     (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
0051 
0052   // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
0053   // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
0054   // data through a byte array.
0055   private byte[] writeBuffer = new byte[diskWriteBufferSize];
0056 
0057   private final File file;
0058   private final BlockId blockId;
0059   private final int numRecordsToWrite;
0060   private DiskBlockObjectWriter writer;
0061   private int numRecordsSpilled = 0;
0062 
0063   public UnsafeSorterSpillWriter(
0064       BlockManager blockManager,
0065       int fileBufferSize,
0066       ShuffleWriteMetrics writeMetrics,
0067       int numRecordsToWrite) throws IOException {
0068     final Tuple2<TempLocalBlockId, File> spilledFileInfo =
0069       blockManager.diskBlockManager().createTempLocalBlock();
0070     this.file = spilledFileInfo._2();
0071     this.blockId = spilledFileInfo._1();
0072     this.numRecordsToWrite = numRecordsToWrite;
0073     // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
0074     // Our write path doesn't actually use this serializer (since we end up calling the `write()`
0075     // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
0076     // around this, we pass a dummy no-op serializer.
0077     writer = blockManager.getDiskWriter(
0078       blockId, file, DummySerializerInstance.INSTANCE, fileBufferSize, writeMetrics);
0079     // Write the number of records
0080     writeIntToBuffer(numRecordsToWrite, 0);
0081     writer.write(writeBuffer, 0, 4);
0082   }
0083 
0084   // Based on DataOutputStream.writeLong.
0085   private void writeLongToBuffer(long v, int offset) {
0086     writeBuffer[offset + 0] = (byte)(v >>> 56);
0087     writeBuffer[offset + 1] = (byte)(v >>> 48);
0088     writeBuffer[offset + 2] = (byte)(v >>> 40);
0089     writeBuffer[offset + 3] = (byte)(v >>> 32);
0090     writeBuffer[offset + 4] = (byte)(v >>> 24);
0091     writeBuffer[offset + 5] = (byte)(v >>> 16);
0092     writeBuffer[offset + 6] = (byte)(v >>>  8);
0093     writeBuffer[offset + 7] = (byte)(v >>>  0);
0094   }
0095 
0096   // Based on DataOutputStream.writeInt.
0097   private void writeIntToBuffer(int v, int offset) {
0098     writeBuffer[offset + 0] = (byte)(v >>> 24);
0099     writeBuffer[offset + 1] = (byte)(v >>> 16);
0100     writeBuffer[offset + 2] = (byte)(v >>>  8);
0101     writeBuffer[offset + 3] = (byte)(v >>>  0);
0102   }
0103 
0104   /**
0105    * Write a record to a spill file.
0106    *
0107    * @param baseObject the base object / memory page containing the record
0108    * @param baseOffset the base offset which points directly to the record data.
0109    * @param recordLength the length of the record.
0110    * @param keyPrefix a sort key prefix
0111    */
0112   public void write(
0113       Object baseObject,
0114       long baseOffset,
0115       int recordLength,
0116       long keyPrefix) throws IOException {
0117     if (numRecordsSpilled == numRecordsToWrite) {
0118       throw new IllegalStateException(
0119         "Number of records written exceeded numRecordsToWrite = " + numRecordsToWrite);
0120     } else {
0121       numRecordsSpilled++;
0122     }
0123     writeIntToBuffer(recordLength, 0);
0124     writeLongToBuffer(keyPrefix, 4);
0125     int dataRemaining = recordLength;
0126     int freeSpaceInWriteBuffer = diskWriteBufferSize - 4 - 8; // space used by prefix + len
0127     long recordReadPosition = baseOffset;
0128     while (dataRemaining > 0) {
0129       final int toTransfer = Math.min(freeSpaceInWriteBuffer, dataRemaining);
0130       Platform.copyMemory(
0131         baseObject,
0132         recordReadPosition,
0133         writeBuffer,
0134         Platform.BYTE_ARRAY_OFFSET + (diskWriteBufferSize - freeSpaceInWriteBuffer),
0135         toTransfer);
0136       writer.write(writeBuffer, 0, (diskWriteBufferSize - freeSpaceInWriteBuffer) + toTransfer);
0137       recordReadPosition += toTransfer;
0138       dataRemaining -= toTransfer;
0139       freeSpaceInWriteBuffer = diskWriteBufferSize;
0140     }
0141     if (freeSpaceInWriteBuffer < diskWriteBufferSize) {
0142       writer.write(writeBuffer, 0, (diskWriteBufferSize - freeSpaceInWriteBuffer));
0143     }
0144     writer.recordWritten();
0145   }
0146 
0147   public void close() throws IOException {
0148     writer.commitAndGet();
0149     writer.close();
0150     writer = null;
0151     writeBuffer = null;
0152   }
0153 
0154   public File getFile() {
0155     return file;
0156   }
0157 
0158   public UnsafeSorterSpillReader getReader(SerializerManager serializerManager) throws IOException {
0159     return new UnsafeSorterSpillReader(serializerManager, file, blockId);
0160   }
0161 
0162   public int recordsSpilled() {
0163     return numRecordsSpilled;
0164   }
0165 }