Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.execution;
0019 
0020 import javax.annotation.Nullable;
0021 import java.io.IOException;
0022 import java.util.function.Supplier;
0023 
0024 import com.google.common.annotations.VisibleForTesting;
0025 
0026 import org.apache.spark.SparkEnv;
0027 import org.apache.spark.TaskContext;
0028 import org.apache.spark.internal.config.package$;
0029 import org.apache.spark.memory.TaskMemoryManager;
0030 import org.apache.spark.serializer.SerializerManager;
0031 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
0032 import org.apache.spark.sql.catalyst.expressions.BaseOrdering;
0033 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering;
0034 import org.apache.spark.sql.types.StructType;
0035 import org.apache.spark.storage.BlockManager;
0036 import org.apache.spark.unsafe.KVIterator;
0037 import org.apache.spark.unsafe.Platform;
0038 import org.apache.spark.unsafe.UnsafeAlignedOffset;
0039 import org.apache.spark.unsafe.array.LongArray;
0040 import org.apache.spark.unsafe.map.BytesToBytesMap;
0041 import org.apache.spark.unsafe.memory.MemoryBlock;
0042 import org.apache.spark.util.collection.unsafe.sort.*;
0043 
0044 /**
0045  * A class for performing external sorting on key-value records. Both key and value are UnsafeRows.
0046  *
0047  * Note that this class allows optionally passing in a {@link BytesToBytesMap} directly in order
0048  * to perform in-place sorting of records in the map.
0049  */
0050 public final class UnsafeKVExternalSorter {
0051 
0052   private final StructType keySchema;
0053   private final StructType valueSchema;
0054   private final UnsafeExternalRowSorter.PrefixComputer prefixComputer;
0055   private final UnsafeExternalSorter sorter;
0056 
0057   public UnsafeKVExternalSorter(
0058       StructType keySchema,
0059       StructType valueSchema,
0060       BlockManager blockManager,
0061       SerializerManager serializerManager,
0062       long pageSizeBytes,
0063       int numElementsForSpillThreshold) throws IOException {
0064     this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes,
0065       numElementsForSpillThreshold, null);
0066   }
0067 
0068   public UnsafeKVExternalSorter(
0069       StructType keySchema,
0070       StructType valueSchema,
0071       BlockManager blockManager,
0072       SerializerManager serializerManager,
0073       long pageSizeBytes,
0074       int numElementsForSpillThreshold,
0075       @Nullable BytesToBytesMap map) throws IOException {
0076     this.keySchema = keySchema;
0077     this.valueSchema = valueSchema;
0078     final TaskContext taskContext = TaskContext.get();
0079 
0080     prefixComputer = SortPrefixUtils.createPrefixGenerator(keySchema);
0081     PrefixComparator prefixComparator = SortPrefixUtils.getPrefixComparator(keySchema);
0082     BaseOrdering ordering = GenerateOrdering.create(keySchema);
0083     Supplier<RecordComparator> comparatorSupplier =
0084       () -> new KVComparator(ordering, keySchema.length());
0085     boolean canUseRadixSort = keySchema.length() == 1 &&
0086       SortPrefixUtils.canSortFullyWithPrefix(keySchema.apply(0));
0087 
0088     TaskMemoryManager taskMemoryManager = taskContext.taskMemoryManager();
0089 
0090     if (map == null) {
0091       sorter = UnsafeExternalSorter.create(
0092         taskMemoryManager,
0093         blockManager,
0094         serializerManager,
0095         taskContext,
0096         comparatorSupplier,
0097         prefixComparator,
0098         (int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
0099         pageSizeBytes,
0100         numElementsForSpillThreshold,
0101         canUseRadixSort);
0102     } else {
0103       // During spilling, the pointer array in `BytesToBytesMap` will not be used, so we can borrow
0104       // that and use it as the pointer array for `UnsafeInMemorySorter`.
0105       LongArray pointerArray = map.getArray();
0106       // `BytesToBytesMap`'s pointer array is only guaranteed to hold all the distinct keys, but
0107       // `UnsafeInMemorySorter`'s pointer array need to hold all the entries. Since
0108       // `BytesToBytesMap` can have duplicated keys, here we need a check to make sure the pointer
0109       // array can hold all the entries in `BytesToBytesMap`.
0110       // The pointer array will be used to do in-place sort, which requires half of the space to be
0111       // empty. Note: each record in the map takes two entries in the pointer array, one is record
0112       // pointer, another is key prefix. So the required size of pointer array is `numRecords * 4`.
0113       // TODO: It's possible to change UnsafeInMemorySorter to have multiple entries with same key,
0114       // so that we can always reuse the pointer array.
0115       if (map.numValues() > pointerArray.size() / 4) {
0116         // Here we ask the map to allocate memory, so that the memory manager won't ask the map
0117         // to spill, if the memory is not enough.
0118         pointerArray = map.allocateArray(map.numValues() * 4L);
0119       }
0120 
0121       // Since the pointer array(either reuse the one in the map, or create a new one) is guaranteed
0122       // to be large enough, it's fine to pass `null` as consumer because we won't allocate more
0123       // memory.
0124       final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
0125         null,
0126         taskMemoryManager,
0127         comparatorSupplier.get(),
0128         prefixComparator,
0129         pointerArray,
0130         canUseRadixSort);
0131 
0132       // We cannot use the destructive iterator here because we are reusing the existing memory
0133       // pages in BytesToBytesMap to hold records during sorting.
0134       // The only new memory we are allocating is the pointer/prefix array.
0135       BytesToBytesMap.MapIterator iter = map.iterator();
0136       final int numKeyFields = keySchema.size();
0137       UnsafeRow row = new UnsafeRow(numKeyFields);
0138       while (iter.hasNext()) {
0139         final BytesToBytesMap.Location loc = iter.next();
0140         final Object baseObject = loc.getKeyBase();
0141         final long baseOffset = loc.getKeyOffset();
0142 
0143         // Get encoded memory address
0144         // baseObject + baseOffset point to the beginning of the key data in the map, but that
0145         // the KV-pair's length data is stored at 2 * uaoSize bytes immediately before that address
0146         MemoryBlock page = loc.getMemoryPage();
0147         long address = taskMemoryManager.encodePageNumberAndOffset(page,
0148             baseOffset - 2 * UnsafeAlignedOffset.getUaoSize());
0149 
0150         // Compute prefix
0151         row.pointTo(baseObject, baseOffset, loc.getKeyLength());
0152         final UnsafeExternalRowSorter.PrefixComputer.Prefix prefix =
0153           prefixComputer.computePrefix(row);
0154 
0155         inMemSorter.insertRecord(address, prefix.value, prefix.isNull);
0156       }
0157 
0158       sorter = UnsafeExternalSorter.createWithExistingInMemorySorter(
0159         taskMemoryManager,
0160         blockManager,
0161         serializerManager,
0162         taskContext,
0163         comparatorSupplier,
0164         prefixComparator,
0165         (int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
0166         pageSizeBytes,
0167         numElementsForSpillThreshold,
0168         inMemSorter);
0169 
0170       // reset the map, so we can re-use it to insert new records. the inMemSorter will not used
0171       // anymore, so the underline array could be used by map again.
0172       map.reset();
0173     }
0174   }
0175 
0176   /**
0177    * Inserts a key-value record into the sorter. If the sorter no longer has enough memory to hold
0178    * the record, the sorter sorts the existing records in-memory, writes them out as partially
0179    * sorted runs, and then reallocates memory to hold the new record.
0180    */
0181   public void insertKV(UnsafeRow key, UnsafeRow value) throws IOException {
0182     final UnsafeExternalRowSorter.PrefixComputer.Prefix prefix =
0183       prefixComputer.computePrefix(key);
0184     sorter.insertKVRecord(
0185       key.getBaseObject(), key.getBaseOffset(), key.getSizeInBytes(),
0186       value.getBaseObject(), value.getBaseOffset(), value.getSizeInBytes(),
0187       prefix.value, prefix.isNull);
0188   }
0189 
0190   /**
0191    * Merges another UnsafeKVExternalSorter into `this`, the other one will be emptied.
0192    *
0193    * @throws IOException
0194    */
0195   public void merge(UnsafeKVExternalSorter other) throws IOException {
0196     sorter.merge(other.sorter);
0197   }
0198 
0199   /**
0200    * Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()`
0201    * after consuming this iterator.
0202    */
0203   public KVSorterIterator sortedIterator() throws IOException {
0204     try {
0205       final UnsafeSorterIterator underlying = sorter.getSortedIterator();
0206       if (!underlying.hasNext()) {
0207         // Since we won't ever call next() on an empty iterator, we need to clean up resources
0208         // here in order to prevent memory leaks.
0209         cleanupResources();
0210       }
0211       return new KVSorterIterator(underlying);
0212     } catch (IOException e) {
0213       cleanupResources();
0214       throw e;
0215     }
0216   }
0217 
0218   /**
0219    * Return the total number of bytes that has been spilled into disk so far.
0220    */
0221   public long getSpillSize() {
0222     return sorter.getSpillSize();
0223   }
0224 
0225   /**
0226    * Return the peak memory used so far, in bytes.
0227    */
0228   public long getPeakMemoryUsedBytes() {
0229     return sorter.getPeakMemoryUsedBytes();
0230   }
0231 
0232   /**
0233    * Marks the current page as no-more-space-available, and as a result, either allocate a
0234    * new page or spill when we see the next record.
0235    */
0236   @VisibleForTesting
0237   void closeCurrentPage() {
0238     sorter.closeCurrentPage();
0239   }
0240 
0241   /**
0242    * Frees this sorter's in-memory data structures and cleans up its spill files.
0243    */
0244   public void cleanupResources() {
0245     sorter.cleanupResources();
0246   }
0247 
0248   private static final class KVComparator extends RecordComparator {
0249     private final BaseOrdering ordering;
0250     private final UnsafeRow row1;
0251     private final UnsafeRow row2;
0252 
0253     KVComparator(BaseOrdering ordering, int numKeyFields) {
0254       this.row1 = new UnsafeRow(numKeyFields);
0255       this.row2 = new UnsafeRow(numKeyFields);
0256       this.ordering = ordering;
0257     }
0258 
0259     @Override
0260     public int compare(
0261         Object baseObj1,
0262         long baseOff1,
0263         int baseLen1,
0264         Object baseObj2,
0265         long baseOff2,
0266         int baseLen2) {
0267       int uaoSize = UnsafeAlignedOffset.getUaoSize();
0268       // Note that since ordering doesn't need the total length of the record, we just pass 0
0269       // into the row.
0270       row1.pointTo(baseObj1, baseOff1 + uaoSize, 0);
0271       row2.pointTo(baseObj2, baseOff2 + uaoSize, 0);
0272       return ordering.compare(row1, row2);
0273     }
0274   }
0275 
0276   public class KVSorterIterator extends KVIterator<UnsafeRow, UnsafeRow> {
0277     private UnsafeRow key = new UnsafeRow(keySchema.size());
0278     private UnsafeRow value = new UnsafeRow(valueSchema.size());
0279     private final UnsafeSorterIterator underlying;
0280 
0281     private KVSorterIterator(UnsafeSorterIterator underlying) {
0282       this.underlying = underlying;
0283     }
0284 
0285     @Override
0286     public boolean next() throws IOException {
0287       try {
0288         if (underlying.hasNext()) {
0289           underlying.loadNext();
0290 
0291           Object baseObj = underlying.getBaseObject();
0292           long recordOffset = underlying.getBaseOffset();
0293           int recordLen = underlying.getRecordLength();
0294 
0295           // Note that recordLen = keyLen + valueLen + uaoSize (for the keyLen itself)
0296           int uaoSize = UnsafeAlignedOffset.getUaoSize();
0297           int keyLen = Platform.getInt(baseObj, recordOffset);
0298           int valueLen = recordLen - keyLen - uaoSize;
0299           key.pointTo(baseObj, recordOffset + uaoSize, keyLen);
0300           value.pointTo(baseObj, recordOffset + uaoSize + keyLen, valueLen);
0301 
0302           return true;
0303         } else {
0304           key = null;
0305           value = null;
0306           cleanupResources();
0307           return false;
0308         }
0309       } catch (IOException e) {
0310         cleanupResources();
0311         throw e;
0312       }
0313     }
0314 
0315     @Override
0316     public UnsafeRow getKey() {
0317       return key;
0318     }
0319 
0320     @Override
0321     public UnsafeRow getValue() {
0322       return value;
0323     }
0324 
0325     @Override
0326     public void close() {
0327       cleanupResources();
0328     }
0329   }
0330 }