0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.util.collection.unsafe.sort;
0019
0020 import java.io.File;
0021 import java.io.IOException;
0022
0023 import scala.Tuple2;
0024
0025 import org.apache.spark.SparkConf;
0026 import org.apache.spark.serializer.SerializerManager;
0027 import org.apache.spark.executor.ShuffleWriteMetrics;
0028 import org.apache.spark.serializer.DummySerializerInstance;
0029 import org.apache.spark.storage.BlockId;
0030 import org.apache.spark.storage.BlockManager;
0031 import org.apache.spark.storage.DiskBlockObjectWriter;
0032 import org.apache.spark.storage.TempLocalBlockId;
0033 import org.apache.spark.unsafe.Platform;
0034 import org.apache.spark.internal.config.package$;
0035
0036
0037
0038
0039
0040
0041 public final class UnsafeSorterSpillWriter {
0042
0043 private final SparkConf conf = new SparkConf();
0044
0045
0046
0047
0048
0049 private final int diskWriteBufferSize =
0050 (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
0051
0052
0053
0054
0055 private byte[] writeBuffer = new byte[diskWriteBufferSize];
0056
0057 private final File file;
0058 private final BlockId blockId;
0059 private final int numRecordsToWrite;
0060 private DiskBlockObjectWriter writer;
0061 private int numRecordsSpilled = 0;
0062
0063 public UnsafeSorterSpillWriter(
0064 BlockManager blockManager,
0065 int fileBufferSize,
0066 ShuffleWriteMetrics writeMetrics,
0067 int numRecordsToWrite) throws IOException {
0068 final Tuple2<TempLocalBlockId, File> spilledFileInfo =
0069 blockManager.diskBlockManager().createTempLocalBlock();
0070 this.file = spilledFileInfo._2();
0071 this.blockId = spilledFileInfo._1();
0072 this.numRecordsToWrite = numRecordsToWrite;
0073
0074
0075
0076
0077 writer = blockManager.getDiskWriter(
0078 blockId, file, DummySerializerInstance.INSTANCE, fileBufferSize, writeMetrics);
0079
0080 writeIntToBuffer(numRecordsToWrite, 0);
0081 writer.write(writeBuffer, 0, 4);
0082 }
0083
0084
0085 private void writeLongToBuffer(long v, int offset) {
0086 writeBuffer[offset + 0] = (byte)(v >>> 56);
0087 writeBuffer[offset + 1] = (byte)(v >>> 48);
0088 writeBuffer[offset + 2] = (byte)(v >>> 40);
0089 writeBuffer[offset + 3] = (byte)(v >>> 32);
0090 writeBuffer[offset + 4] = (byte)(v >>> 24);
0091 writeBuffer[offset + 5] = (byte)(v >>> 16);
0092 writeBuffer[offset + 6] = (byte)(v >>> 8);
0093 writeBuffer[offset + 7] = (byte)(v >>> 0);
0094 }
0095
0096
0097 private void writeIntToBuffer(int v, int offset) {
0098 writeBuffer[offset + 0] = (byte)(v >>> 24);
0099 writeBuffer[offset + 1] = (byte)(v >>> 16);
0100 writeBuffer[offset + 2] = (byte)(v >>> 8);
0101 writeBuffer[offset + 3] = (byte)(v >>> 0);
0102 }
0103
0104
0105
0106
0107
0108
0109
0110
0111
0112 public void write(
0113 Object baseObject,
0114 long baseOffset,
0115 int recordLength,
0116 long keyPrefix) throws IOException {
0117 if (numRecordsSpilled == numRecordsToWrite) {
0118 throw new IllegalStateException(
0119 "Number of records written exceeded numRecordsToWrite = " + numRecordsToWrite);
0120 } else {
0121 numRecordsSpilled++;
0122 }
0123 writeIntToBuffer(recordLength, 0);
0124 writeLongToBuffer(keyPrefix, 4);
0125 int dataRemaining = recordLength;
0126 int freeSpaceInWriteBuffer = diskWriteBufferSize - 4 - 8;
0127 long recordReadPosition = baseOffset;
0128 while (dataRemaining > 0) {
0129 final int toTransfer = Math.min(freeSpaceInWriteBuffer, dataRemaining);
0130 Platform.copyMemory(
0131 baseObject,
0132 recordReadPosition,
0133 writeBuffer,
0134 Platform.BYTE_ARRAY_OFFSET + (diskWriteBufferSize - freeSpaceInWriteBuffer),
0135 toTransfer);
0136 writer.write(writeBuffer, 0, (diskWriteBufferSize - freeSpaceInWriteBuffer) + toTransfer);
0137 recordReadPosition += toTransfer;
0138 dataRemaining -= toTransfer;
0139 freeSpaceInWriteBuffer = diskWriteBufferSize;
0140 }
0141 if (freeSpaceInWriteBuffer < diskWriteBufferSize) {
0142 writer.write(writeBuffer, 0, (diskWriteBufferSize - freeSpaceInWriteBuffer));
0143 }
0144 writer.recordWritten();
0145 }
0146
0147 public void close() throws IOException {
0148 writer.commitAndGet();
0149 writer.close();
0150 writer = null;
0151 writeBuffer = null;
0152 }
0153
0154 public File getFile() {
0155 return file;
0156 }
0157
0158 public UnsafeSorterSpillReader getReader(SerializerManager serializerManager) throws IOException {
0159 return new UnsafeSorterSpillReader(serializerManager, file, blockId);
0160 }
0161
0162 public int recordsSpilled() {
0163 return numRecordsSpilled;
0164 }
0165 }