0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.sql.execution;
0019
0020 import javax.annotation.Nullable;
0021 import java.io.IOException;
0022 import java.util.function.Supplier;
0023
0024 import com.google.common.annotations.VisibleForTesting;
0025
0026 import org.apache.spark.SparkEnv;
0027 import org.apache.spark.TaskContext;
0028 import org.apache.spark.internal.config.package$;
0029 import org.apache.spark.memory.TaskMemoryManager;
0030 import org.apache.spark.serializer.SerializerManager;
0031 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
0032 import org.apache.spark.sql.catalyst.expressions.BaseOrdering;
0033 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering;
0034 import org.apache.spark.sql.types.StructType;
0035 import org.apache.spark.storage.BlockManager;
0036 import org.apache.spark.unsafe.KVIterator;
0037 import org.apache.spark.unsafe.Platform;
0038 import org.apache.spark.unsafe.UnsafeAlignedOffset;
0039 import org.apache.spark.unsafe.array.LongArray;
0040 import org.apache.spark.unsafe.map.BytesToBytesMap;
0041 import org.apache.spark.unsafe.memory.MemoryBlock;
0042 import org.apache.spark.util.collection.unsafe.sort.*;
0043
0044
0045
0046
0047
0048
0049
0050 public final class UnsafeKVExternalSorter {
0051
0052 private final StructType keySchema;
0053 private final StructType valueSchema;
0054 private final UnsafeExternalRowSorter.PrefixComputer prefixComputer;
0055 private final UnsafeExternalSorter sorter;
0056
0057 public UnsafeKVExternalSorter(
0058 StructType keySchema,
0059 StructType valueSchema,
0060 BlockManager blockManager,
0061 SerializerManager serializerManager,
0062 long pageSizeBytes,
0063 int numElementsForSpillThreshold) throws IOException {
0064 this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes,
0065 numElementsForSpillThreshold, null);
0066 }
0067
0068 public UnsafeKVExternalSorter(
0069 StructType keySchema,
0070 StructType valueSchema,
0071 BlockManager blockManager,
0072 SerializerManager serializerManager,
0073 long pageSizeBytes,
0074 int numElementsForSpillThreshold,
0075 @Nullable BytesToBytesMap map) throws IOException {
0076 this.keySchema = keySchema;
0077 this.valueSchema = valueSchema;
0078 final TaskContext taskContext = TaskContext.get();
0079
0080 prefixComputer = SortPrefixUtils.createPrefixGenerator(keySchema);
0081 PrefixComparator prefixComparator = SortPrefixUtils.getPrefixComparator(keySchema);
0082 BaseOrdering ordering = GenerateOrdering.create(keySchema);
0083 Supplier<RecordComparator> comparatorSupplier =
0084 () -> new KVComparator(ordering, keySchema.length());
0085 boolean canUseRadixSort = keySchema.length() == 1 &&
0086 SortPrefixUtils.canSortFullyWithPrefix(keySchema.apply(0));
0087
0088 TaskMemoryManager taskMemoryManager = taskContext.taskMemoryManager();
0089
0090 if (map == null) {
0091 sorter = UnsafeExternalSorter.create(
0092 taskMemoryManager,
0093 blockManager,
0094 serializerManager,
0095 taskContext,
0096 comparatorSupplier,
0097 prefixComparator,
0098 (int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
0099 pageSizeBytes,
0100 numElementsForSpillThreshold,
0101 canUseRadixSort);
0102 } else {
0103
0104
0105 LongArray pointerArray = map.getArray();
0106
0107
0108
0109
0110
0111
0112
0113
0114
0115 if (map.numValues() > pointerArray.size() / 4) {
0116
0117
0118 pointerArray = map.allocateArray(map.numValues() * 4L);
0119 }
0120
0121
0122
0123
0124 final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
0125 null,
0126 taskMemoryManager,
0127 comparatorSupplier.get(),
0128 prefixComparator,
0129 pointerArray,
0130 canUseRadixSort);
0131
0132
0133
0134
0135 BytesToBytesMap.MapIterator iter = map.iterator();
0136 final int numKeyFields = keySchema.size();
0137 UnsafeRow row = new UnsafeRow(numKeyFields);
0138 while (iter.hasNext()) {
0139 final BytesToBytesMap.Location loc = iter.next();
0140 final Object baseObject = loc.getKeyBase();
0141 final long baseOffset = loc.getKeyOffset();
0142
0143
0144
0145
0146 MemoryBlock page = loc.getMemoryPage();
0147 long address = taskMemoryManager.encodePageNumberAndOffset(page,
0148 baseOffset - 2 * UnsafeAlignedOffset.getUaoSize());
0149
0150
0151 row.pointTo(baseObject, baseOffset, loc.getKeyLength());
0152 final UnsafeExternalRowSorter.PrefixComputer.Prefix prefix =
0153 prefixComputer.computePrefix(row);
0154
0155 inMemSorter.insertRecord(address, prefix.value, prefix.isNull);
0156 }
0157
0158 sorter = UnsafeExternalSorter.createWithExistingInMemorySorter(
0159 taskMemoryManager,
0160 blockManager,
0161 serializerManager,
0162 taskContext,
0163 comparatorSupplier,
0164 prefixComparator,
0165 (int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
0166 pageSizeBytes,
0167 numElementsForSpillThreshold,
0168 inMemSorter);
0169
0170
0171
0172 map.reset();
0173 }
0174 }
0175
0176
0177
0178
0179
0180
0181 public void insertKV(UnsafeRow key, UnsafeRow value) throws IOException {
0182 final UnsafeExternalRowSorter.PrefixComputer.Prefix prefix =
0183 prefixComputer.computePrefix(key);
0184 sorter.insertKVRecord(
0185 key.getBaseObject(), key.getBaseOffset(), key.getSizeInBytes(),
0186 value.getBaseObject(), value.getBaseOffset(), value.getSizeInBytes(),
0187 prefix.value, prefix.isNull);
0188 }
0189
0190
0191
0192
0193
0194
0195 public void merge(UnsafeKVExternalSorter other) throws IOException {
0196 sorter.merge(other.sorter);
0197 }
0198
0199
0200
0201
0202
0203 public KVSorterIterator sortedIterator() throws IOException {
0204 try {
0205 final UnsafeSorterIterator underlying = sorter.getSortedIterator();
0206 if (!underlying.hasNext()) {
0207
0208
0209 cleanupResources();
0210 }
0211 return new KVSorterIterator(underlying);
0212 } catch (IOException e) {
0213 cleanupResources();
0214 throw e;
0215 }
0216 }
0217
0218
0219
0220
0221 public long getSpillSize() {
0222 return sorter.getSpillSize();
0223 }
0224
0225
0226
0227
0228 public long getPeakMemoryUsedBytes() {
0229 return sorter.getPeakMemoryUsedBytes();
0230 }
0231
0232
0233
0234
0235
0236 @VisibleForTesting
0237 void closeCurrentPage() {
0238 sorter.closeCurrentPage();
0239 }
0240
0241
0242
0243
0244 public void cleanupResources() {
0245 sorter.cleanupResources();
0246 }
0247
0248 private static final class KVComparator extends RecordComparator {
0249 private final BaseOrdering ordering;
0250 private final UnsafeRow row1;
0251 private final UnsafeRow row2;
0252
0253 KVComparator(BaseOrdering ordering, int numKeyFields) {
0254 this.row1 = new UnsafeRow(numKeyFields);
0255 this.row2 = new UnsafeRow(numKeyFields);
0256 this.ordering = ordering;
0257 }
0258
0259 @Override
0260 public int compare(
0261 Object baseObj1,
0262 long baseOff1,
0263 int baseLen1,
0264 Object baseObj2,
0265 long baseOff2,
0266 int baseLen2) {
0267 int uaoSize = UnsafeAlignedOffset.getUaoSize();
0268
0269
0270 row1.pointTo(baseObj1, baseOff1 + uaoSize, 0);
0271 row2.pointTo(baseObj2, baseOff2 + uaoSize, 0);
0272 return ordering.compare(row1, row2);
0273 }
0274 }
0275
0276 public class KVSorterIterator extends KVIterator<UnsafeRow, UnsafeRow> {
0277 private UnsafeRow key = new UnsafeRow(keySchema.size());
0278 private UnsafeRow value = new UnsafeRow(valueSchema.size());
0279 private final UnsafeSorterIterator underlying;
0280
0281 private KVSorterIterator(UnsafeSorterIterator underlying) {
0282 this.underlying = underlying;
0283 }
0284
0285 @Override
0286 public boolean next() throws IOException {
0287 try {
0288 if (underlying.hasNext()) {
0289 underlying.loadNext();
0290
0291 Object baseObj = underlying.getBaseObject();
0292 long recordOffset = underlying.getBaseOffset();
0293 int recordLen = underlying.getRecordLength();
0294
0295
0296 int uaoSize = UnsafeAlignedOffset.getUaoSize();
0297 int keyLen = Platform.getInt(baseObj, recordOffset);
0298 int valueLen = recordLen - keyLen - uaoSize;
0299 key.pointTo(baseObj, recordOffset + uaoSize, keyLen);
0300 value.pointTo(baseObj, recordOffset + uaoSize + keyLen, valueLen);
0301
0302 return true;
0303 } else {
0304 key = null;
0305 value = null;
0306 cleanupResources();
0307 return false;
0308 }
0309 } catch (IOException e) {
0310 cleanupResources();
0311 throw e;
0312 }
0313 }
0314
0315 @Override
0316 public UnsafeRow getKey() {
0317 return key;
0318 }
0319
0320 @Override
0321 public UnsafeRow getValue() {
0322 return value;
0323 }
0324
0325 @Override
0326 public void close() {
0327 cleanupResources();
0328 }
0329 }
0330 }