0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.util.collection.unsafe.sort;
0019
0020 import java.util.Comparator;
0021 import java.util.LinkedList;
0022
0023 import org.apache.avro.reflect.Nullable;
0024
0025 import org.apache.spark.TaskContext;
0026 import org.apache.spark.memory.MemoryConsumer;
0027 import org.apache.spark.memory.SparkOutOfMemoryError;
0028 import org.apache.spark.memory.TaskMemoryManager;
0029 import org.apache.spark.unsafe.Platform;
0030 import org.apache.spark.unsafe.UnsafeAlignedOffset;
0031 import org.apache.spark.unsafe.array.LongArray;
0032 import org.apache.spark.unsafe.memory.MemoryBlock;
0033 import org.apache.spark.util.collection.Sorter;
0034
0035
0036
0037
0038
0039
0040
0041
0042 public final class UnsafeInMemorySorter {
0043
0044 private static final class SortComparator implements Comparator<RecordPointerAndKeyPrefix> {
0045
0046 private final RecordComparator recordComparator;
0047 private final PrefixComparator prefixComparator;
0048 private final TaskMemoryManager memoryManager;
0049
0050 SortComparator(
0051 RecordComparator recordComparator,
0052 PrefixComparator prefixComparator,
0053 TaskMemoryManager memoryManager) {
0054 this.recordComparator = recordComparator;
0055 this.prefixComparator = prefixComparator;
0056 this.memoryManager = memoryManager;
0057 }
0058
0059 @Override
0060 public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
0061 final int prefixComparisonResult = prefixComparator.compare(r1.keyPrefix, r2.keyPrefix);
0062 int uaoSize = UnsafeAlignedOffset.getUaoSize();
0063 if (prefixComparisonResult == 0) {
0064 final Object baseObject1 = memoryManager.getPage(r1.recordPointer);
0065 final long baseOffset1 = memoryManager.getOffsetInPage(r1.recordPointer) + uaoSize;
0066 final int baseLength1 = UnsafeAlignedOffset.getSize(baseObject1, baseOffset1 - uaoSize);
0067 final Object baseObject2 = memoryManager.getPage(r2.recordPointer);
0068 final long baseOffset2 = memoryManager.getOffsetInPage(r2.recordPointer) + uaoSize;
0069 final int baseLength2 = UnsafeAlignedOffset.getSize(baseObject2, baseOffset2 - uaoSize);
0070 return recordComparator.compare(baseObject1, baseOffset1, baseLength1, baseObject2,
0071 baseOffset2, baseLength2);
0072 } else {
0073 return prefixComparisonResult;
0074 }
0075 }
0076 }
0077
0078 private final MemoryConsumer consumer;
0079 private final TaskMemoryManager memoryManager;
0080 @Nullable
0081 private final Comparator<RecordPointerAndKeyPrefix> sortComparator;
0082
0083
0084
0085
0086 @Nullable
0087 private final PrefixComparators.RadixSortSupport radixSortSupport;
0088
0089
0090
0091
0092
0093
0094
0095
0096 private LongArray array;
0097
0098
0099
0100
0101 private int pos = 0;
0102
0103
0104
0105
0106
0107
0108
0109 private int nullBoundaryPos = 0;
0110
0111
0112
0113
0114 private int usableCapacity = 0;
0115
0116 private long initialSize;
0117
0118 private long totalSortTimeNanos = 0L;
0119
0120 public UnsafeInMemorySorter(
0121 final MemoryConsumer consumer,
0122 final TaskMemoryManager memoryManager,
0123 final RecordComparator recordComparator,
0124 final PrefixComparator prefixComparator,
0125 int initialSize,
0126 boolean canUseRadixSort) {
0127 this(consumer, memoryManager, recordComparator, prefixComparator,
0128 consumer.allocateArray(initialSize * 2L), canUseRadixSort);
0129 }
0130
0131 public UnsafeInMemorySorter(
0132 final MemoryConsumer consumer,
0133 final TaskMemoryManager memoryManager,
0134 final RecordComparator recordComparator,
0135 final PrefixComparator prefixComparator,
0136 LongArray array,
0137 boolean canUseRadixSort) {
0138 this.consumer = consumer;
0139 this.memoryManager = memoryManager;
0140 this.initialSize = array.size();
0141 if (recordComparator != null) {
0142 this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
0143 if (canUseRadixSort && prefixComparator instanceof PrefixComparators.RadixSortSupport) {
0144 this.radixSortSupport = (PrefixComparators.RadixSortSupport)prefixComparator;
0145 } else {
0146 this.radixSortSupport = null;
0147 }
0148 } else {
0149 this.sortComparator = null;
0150 this.radixSortSupport = null;
0151 }
0152 this.array = array;
0153 this.usableCapacity = getUsableCapacity();
0154 }
0155
0156 private int getUsableCapacity() {
0157
0158
0159 return (int) (array.size() / (radixSortSupport != null ? 2 : 1.5));
0160 }
0161
0162
0163
0164
0165 public void free() {
0166 if (consumer != null) {
0167 if (array != null) {
0168 consumer.freeArray(array);
0169 }
0170 array = null;
0171 }
0172 }
0173
0174 public void reset() {
0175 if (consumer != null) {
0176 consumer.freeArray(array);
0177
0178
0179
0180
0181
0182 array = null;
0183 usableCapacity = 0;
0184 pos = 0;
0185 nullBoundaryPos = 0;
0186 array = consumer.allocateArray(initialSize);
0187 usableCapacity = getUsableCapacity();
0188 }
0189 pos = 0;
0190 nullBoundaryPos = 0;
0191 }
0192
0193
0194
0195
0196 public int numRecords() {
0197 return pos / 2;
0198 }
0199
0200
0201
0202
0203 public long getSortTimeNanos() {
0204 return totalSortTimeNanos;
0205 }
0206
0207 public long getMemoryUsage() {
0208 if (array == null) {
0209 return 0L;
0210 }
0211
0212 return array.size() * 8;
0213 }
0214
0215 public boolean hasSpaceForAnotherRecord() {
0216 return pos + 1 < usableCapacity;
0217 }
0218
0219 public void expandPointerArray(LongArray newArray) {
0220 if (newArray.size() < array.size()) {
0221
0222 throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
0223
0224 }
0225 Platform.copyMemory(
0226 array.getBaseObject(),
0227 array.getBaseOffset(),
0228 newArray.getBaseObject(),
0229 newArray.getBaseOffset(),
0230 pos * 8L);
0231 consumer.freeArray(array);
0232 array = newArray;
0233 usableCapacity = getUsableCapacity();
0234 }
0235
0236
0237
0238
0239
0240
0241
0242
0243 public void insertRecord(long recordPointer, long keyPrefix, boolean prefixIsNull) {
0244 if (!hasSpaceForAnotherRecord()) {
0245 throw new IllegalStateException("There is no space for new record");
0246 }
0247 if (prefixIsNull && radixSortSupport != null) {
0248
0249 array.set(pos, array.get(nullBoundaryPos));
0250 pos++;
0251 array.set(pos, array.get(nullBoundaryPos + 1));
0252 pos++;
0253
0254 array.set(nullBoundaryPos, recordPointer);
0255 nullBoundaryPos++;
0256 array.set(nullBoundaryPos, keyPrefix);
0257 nullBoundaryPos++;
0258 } else {
0259 array.set(pos, recordPointer);
0260 pos++;
0261 array.set(pos, keyPrefix);
0262 pos++;
0263 }
0264 }
0265
0266 public final class SortedIterator extends UnsafeSorterIterator implements Cloneable {
0267
0268 private final int numRecords;
0269 private int position;
0270 private int offset;
0271 private Object baseObject;
0272 private long baseOffset;
0273 private long keyPrefix;
0274 private int recordLength;
0275 private long currentPageNumber;
0276 private final TaskContext taskContext = TaskContext.get();
0277
0278 private SortedIterator(int numRecords, int offset) {
0279 this.numRecords = numRecords;
0280 this.position = 0;
0281 this.offset = offset;
0282 }
0283
0284 public SortedIterator clone() {
0285 SortedIterator iter = new SortedIterator(numRecords, offset);
0286 iter.position = position;
0287 iter.baseObject = baseObject;
0288 iter.baseOffset = baseOffset;
0289 iter.keyPrefix = keyPrefix;
0290 iter.recordLength = recordLength;
0291 iter.currentPageNumber = currentPageNumber;
0292 return iter;
0293 }
0294
0295 @Override
0296 public int getNumRecords() {
0297 return numRecords;
0298 }
0299
0300 @Override
0301 public boolean hasNext() {
0302 return position / 2 < numRecords;
0303 }
0304
0305 @Override
0306 public void loadNext() {
0307
0308
0309
0310
0311
0312 if (taskContext != null) {
0313 taskContext.killTaskIfInterrupted();
0314 }
0315
0316 final long recordPointer = array.get(offset + position);
0317 currentPageNumber = TaskMemoryManager.decodePageNumber(recordPointer);
0318 int uaoSize = UnsafeAlignedOffset.getUaoSize();
0319 baseObject = memoryManager.getPage(recordPointer);
0320
0321 baseOffset = memoryManager.getOffsetInPage(recordPointer) + uaoSize;
0322 recordLength = UnsafeAlignedOffset.getSize(baseObject, baseOffset - uaoSize);
0323 keyPrefix = array.get(offset + position + 1);
0324 position += 2;
0325 }
0326
0327 @Override
0328 public Object getBaseObject() { return baseObject; }
0329
0330 @Override
0331 public long getBaseOffset() { return baseOffset; }
0332
0333 public long getCurrentPageNumber() {
0334 return currentPageNumber;
0335 }
0336
0337 @Override
0338 public int getRecordLength() { return recordLength; }
0339
0340 @Override
0341 public long getKeyPrefix() { return keyPrefix; }
0342 }
0343
0344
0345
0346
0347
0348 public UnsafeSorterIterator getSortedIterator() {
0349 int offset = 0;
0350 long start = System.nanoTime();
0351 if (sortComparator != null) {
0352 if (this.radixSortSupport != null) {
0353 offset = RadixSort.sortKeyPrefixArray(
0354 array, nullBoundaryPos, (pos - nullBoundaryPos) / 2L, 0, 7,
0355 radixSortSupport.sortDescending(), radixSortSupport.sortSigned());
0356 } else {
0357 MemoryBlock unused = new MemoryBlock(
0358 array.getBaseObject(),
0359 array.getBaseOffset() + pos * 8L,
0360 (array.size() - pos) * 8L);
0361 LongArray buffer = new LongArray(unused);
0362 Sorter<RecordPointerAndKeyPrefix, LongArray> sorter =
0363 new Sorter<>(new UnsafeSortDataFormat(buffer));
0364 sorter.sort(array, 0, pos / 2, sortComparator);
0365 }
0366 }
0367 totalSortTimeNanos += System.nanoTime() - start;
0368 if (nullBoundaryPos > 0) {
0369 assert radixSortSupport != null : "Nulls are only stored separately with radix sort";
0370 LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
0371
0372
0373 if (radixSortSupport.nullsFirst()) {
0374 queue.add(new SortedIterator(nullBoundaryPos / 2, 0));
0375 queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset));
0376 } else {
0377 queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset));
0378 queue.add(new SortedIterator(nullBoundaryPos / 2, 0));
0379 }
0380 return new UnsafeExternalSorter.ChainedIterator(queue);
0381 } else {
0382 return new SortedIterator(pos / 2, offset);
0383 }
0384 }
0385 }