0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.shuffle.sort;
0019
0020 import java.util.Comparator;
0021
0022 import org.apache.spark.memory.MemoryConsumer;
0023 import org.apache.spark.unsafe.Platform;
0024 import org.apache.spark.unsafe.array.LongArray;
0025 import org.apache.spark.unsafe.memory.MemoryBlock;
0026 import org.apache.spark.util.collection.Sorter;
0027 import org.apache.spark.util.collection.unsafe.sort.RadixSort;
0028
0029 final class ShuffleInMemorySorter {
0030
0031 private static final class SortComparator implements Comparator<PackedRecordPointer> {
0032 @Override
0033 public int compare(PackedRecordPointer left, PackedRecordPointer right) {
0034 int leftId = left.getPartitionId();
0035 int rightId = right.getPartitionId();
0036 return leftId < rightId ? -1 : (leftId > rightId ? 1 : 0);
0037 }
0038 }
0039 private static final SortComparator SORT_COMPARATOR = new SortComparator();
0040
0041 private final MemoryConsumer consumer;
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051 private LongArray array;
0052
0053
0054
0055
0056
0057 private final boolean useRadixSort;
0058
0059
0060
0061
0062 private int pos = 0;
0063
0064
0065
0066
0067 private int usableCapacity = 0;
0068
0069 private final int initialSize;
0070
0071 ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) {
0072 this.consumer = consumer;
0073 assert (initialSize > 0);
0074 this.initialSize = initialSize;
0075 this.useRadixSort = useRadixSort;
0076 this.array = consumer.allocateArray(initialSize);
0077 this.usableCapacity = getUsableCapacity();
0078 }
0079
0080 private int getUsableCapacity() {
0081
0082
0083 return (int) (array.size() / (useRadixSort ? 2 : 1.5));
0084 }
0085
0086 public void free() {
0087 if (array != null) {
0088 consumer.freeArray(array);
0089 array = null;
0090 }
0091 }
0092
0093 public int numRecords() {
0094 return pos;
0095 }
0096
0097 public void reset() {
0098
0099 pos = 0;
0100 if (consumer != null) {
0101 consumer.freeArray(array);
0102
0103
0104
0105
0106
0107 array = null;
0108 usableCapacity = 0;
0109 array = consumer.allocateArray(initialSize);
0110 usableCapacity = getUsableCapacity();
0111 }
0112 }
0113
0114 public void expandPointerArray(LongArray newArray) {
0115 assert(newArray.size() > array.size());
0116 Platform.copyMemory(
0117 array.getBaseObject(),
0118 array.getBaseOffset(),
0119 newArray.getBaseObject(),
0120 newArray.getBaseOffset(),
0121 pos * 8L
0122 );
0123 consumer.freeArray(array);
0124 array = newArray;
0125 usableCapacity = getUsableCapacity();
0126 }
0127
0128 public boolean hasSpaceForAnotherRecord() {
0129 return pos < usableCapacity;
0130 }
0131
0132 public long getMemoryUsage() {
0133 return array.size() * 8;
0134 }
0135
0136
0137
0138
0139
0140
0141
0142
0143
0144
0145
0146 public void insertRecord(long recordPointer, int partitionId) {
0147 if (!hasSpaceForAnotherRecord()) {
0148 throw new IllegalStateException("There is no space for new record");
0149 }
0150 array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
0151 pos++;
0152 }
0153
0154
0155
0156
0157 public static final class ShuffleSorterIterator {
0158
0159 private final LongArray pointerArray;
0160 private final int limit;
0161 final PackedRecordPointer packedRecordPointer = new PackedRecordPointer();
0162 private int position = 0;
0163
0164 ShuffleSorterIterator(int numRecords, LongArray pointerArray, int startingPosition) {
0165 this.limit = numRecords + startingPosition;
0166 this.pointerArray = pointerArray;
0167 this.position = startingPosition;
0168 }
0169
0170 public boolean hasNext() {
0171 return position < limit;
0172 }
0173
0174 public void loadNext() {
0175 packedRecordPointer.set(pointerArray.get(position));
0176 position++;
0177 }
0178 }
0179
0180
0181
0182
0183 public ShuffleSorterIterator getSortedIterator() {
0184 int offset = 0;
0185 if (useRadixSort) {
0186 offset = RadixSort.sort(
0187 array, pos,
0188 PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
0189 PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
0190 } else {
0191 MemoryBlock unused = new MemoryBlock(
0192 array.getBaseObject(),
0193 array.getBaseOffset() + pos * 8L,
0194 (array.size() - pos) * 8L);
0195 LongArray buffer = new LongArray(unused);
0196 Sorter<PackedRecordPointer, LongArray> sorter =
0197 new Sorter<>(new ShuffleSortDataFormat(buffer));
0198
0199 sorter.sort(array, 0, pos, SORT_COMPARATOR);
0200 }
0201 return new ShuffleSorterIterator(pos, array, offset);
0202 }
0203 }