0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.util.collection.unsafe.sort;
0019
0020 import com.google.common.io.ByteStreams;
0021 import com.google.common.io.Closeables;
0022 import org.apache.spark.SparkEnv;
0023 import org.apache.spark.TaskContext;
0024 import org.apache.spark.internal.config.package$;
0025 import org.apache.spark.internal.config.ConfigEntry;
0026 import org.apache.spark.io.NioBufferedFileInputStream;
0027 import org.apache.spark.io.ReadAheadInputStream;
0028 import org.apache.spark.serializer.SerializerManager;
0029 import org.apache.spark.storage.BlockId;
0030 import org.apache.spark.unsafe.Platform;
0031
0032 import java.io.*;
0033
0034
0035
0036
0037
0038 public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable {
0039 public static final int MAX_BUFFER_SIZE_BYTES = 16777216;
0040
0041 private InputStream in;
0042 private DataInputStream din;
0043
0044
0045 private int recordLength;
0046 private long keyPrefix;
0047 private int numRecords;
0048 private int numRecordsRemaining;
0049
0050 private byte[] arr = new byte[1024 * 1024];
0051 private Object baseObject = arr;
0052 private final TaskContext taskContext = TaskContext.get();
0053
0054 public UnsafeSorterSpillReader(
0055 SerializerManager serializerManager,
0056 File file,
0057 BlockId blockId) throws IOException {
0058 assert (file.length() > 0);
0059 final ConfigEntry<Object> bufferSizeConfigEntry =
0060 package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE();
0061
0062 final int DEFAULT_BUFFER_SIZE_BYTES =
0063 ((Long) bufferSizeConfigEntry.defaultValue().get()).intValue();
0064 int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES :
0065 ((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue();
0066
0067 final boolean readAheadEnabled = SparkEnv.get() != null && (boolean)SparkEnv.get().conf().get(
0068 package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED());
0069
0070 final InputStream bs =
0071 new NioBufferedFileInputStream(file, bufferSizeBytes);
0072 try {
0073 if (readAheadEnabled) {
0074 this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
0075 bufferSizeBytes);
0076 } else {
0077 this.in = serializerManager.wrapStream(blockId, bs);
0078 }
0079 this.din = new DataInputStream(this.in);
0080 numRecords = numRecordsRemaining = din.readInt();
0081 } catch (IOException e) {
0082 Closeables.close(bs, true);
0083 throw e;
0084 }
0085 }
0086
0087 @Override
0088 public int getNumRecords() {
0089 return numRecords;
0090 }
0091
0092 @Override
0093 public boolean hasNext() {
0094 return (numRecordsRemaining > 0);
0095 }
0096
0097 @Override
0098 public void loadNext() throws IOException {
0099
0100
0101
0102
0103
0104 if (taskContext != null) {
0105 taskContext.killTaskIfInterrupted();
0106 }
0107 recordLength = din.readInt();
0108 keyPrefix = din.readLong();
0109 if (recordLength > arr.length) {
0110 arr = new byte[recordLength];
0111 baseObject = arr;
0112 }
0113 ByteStreams.readFully(in, arr, 0, recordLength);
0114 numRecordsRemaining--;
0115 if (numRecordsRemaining == 0) {
0116 close();
0117 }
0118 }
0119
0120 @Override
0121 public Object getBaseObject() {
0122 return baseObject;
0123 }
0124
0125 @Override
0126 public long getBaseOffset() {
0127 return Platform.BYTE_ARRAY_OFFSET;
0128 }
0129
0130 @Override
0131 public int getRecordLength() {
0132 return recordLength;
0133 }
0134
0135 @Override
0136 public long getKeyPrefix() {
0137 return keyPrefix;
0138 }
0139
0140 @Override
0141 public void close() throws IOException {
0142 if (in != null) {
0143 try {
0144 in.close();
0145 } finally {
0146 in = null;
0147 din = null;
0148 }
0149 }
0150 }
0151 }