Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.util.collection.unsafe.sort;
0019 
0020 import java.util.Comparator;
0021 import java.util.LinkedList;
0022 
0023 import org.apache.avro.reflect.Nullable;
0024 
0025 import org.apache.spark.TaskContext;
0026 import org.apache.spark.memory.MemoryConsumer;
0027 import org.apache.spark.memory.SparkOutOfMemoryError;
0028 import org.apache.spark.memory.TaskMemoryManager;
0029 import org.apache.spark.unsafe.Platform;
0030 import org.apache.spark.unsafe.UnsafeAlignedOffset;
0031 import org.apache.spark.unsafe.array.LongArray;
0032 import org.apache.spark.unsafe.memory.MemoryBlock;
0033 import org.apache.spark.util.collection.Sorter;
0034 
0035 /**
0036  * Sorts records using an AlphaSort-style key-prefix sort. This sort stores pointers to records
0037  * alongside a user-defined prefix of the record's sorting key. When the underlying sort algorithm
0038  * compares records, it will first compare the stored key prefixes; if the prefixes are not equal,
0039  * then we do not need to traverse the record pointers to compare the actual records. Avoiding these
0040  * random memory accesses improves cache hit rates.
0041  */
0042 public final class UnsafeInMemorySorter {
0043 
0044   private static final class SortComparator implements Comparator<RecordPointerAndKeyPrefix> {
0045 
0046     private final RecordComparator recordComparator;
0047     private final PrefixComparator prefixComparator;
0048     private final TaskMemoryManager memoryManager;
0049 
0050     SortComparator(
0051         RecordComparator recordComparator,
0052         PrefixComparator prefixComparator,
0053         TaskMemoryManager memoryManager) {
0054       this.recordComparator = recordComparator;
0055       this.prefixComparator = prefixComparator;
0056       this.memoryManager = memoryManager;
0057     }
0058 
0059     @Override
0060     public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
0061       final int prefixComparisonResult = prefixComparator.compare(r1.keyPrefix, r2.keyPrefix);
0062       int uaoSize = UnsafeAlignedOffset.getUaoSize();
0063       if (prefixComparisonResult == 0) {
0064         final Object baseObject1 = memoryManager.getPage(r1.recordPointer);
0065         final long baseOffset1 = memoryManager.getOffsetInPage(r1.recordPointer) + uaoSize;
0066         final int baseLength1 = UnsafeAlignedOffset.getSize(baseObject1, baseOffset1 - uaoSize);
0067         final Object baseObject2 = memoryManager.getPage(r2.recordPointer);
0068         final long baseOffset2 = memoryManager.getOffsetInPage(r2.recordPointer) + uaoSize;
0069         final int baseLength2 = UnsafeAlignedOffset.getSize(baseObject2, baseOffset2 - uaoSize);
0070         return recordComparator.compare(baseObject1, baseOffset1, baseLength1, baseObject2,
0071           baseOffset2, baseLength2);
0072       } else {
0073         return prefixComparisonResult;
0074       }
0075     }
0076   }
0077 
0078   private final MemoryConsumer consumer;
0079   private final TaskMemoryManager memoryManager;
0080   @Nullable
0081   private final Comparator<RecordPointerAndKeyPrefix> sortComparator;
0082 
0083   /**
0084    * If non-null, specifies the radix sort parameters and that radix sort will be used.
0085    */
0086   @Nullable
0087   private final PrefixComparators.RadixSortSupport radixSortSupport;
0088 
0089   /**
0090    * Within this buffer, position {@code 2 * i} holds a pointer to the record at
0091    * index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix.
0092    *
0093    * Only part of the array will be used to store the pointers, the rest part is preserved as
0094    * temporary buffer for sorting.
0095    */
0096   private LongArray array;
0097 
0098   /**
0099    * The position in the sort buffer where new records can be inserted.
0100    */
0101   private int pos = 0;
0102 
0103   /**
0104    * If sorting with radix sort, specifies the starting position in the sort buffer where records
0105    * with non-null prefixes are kept. Positions [0..nullBoundaryPos) will contain null-prefixed
0106    * records, and positions [nullBoundaryPos..pos) non-null prefixed records. This lets us avoid
0107    * radix sorting over null values.
0108    */
0109   private int nullBoundaryPos = 0;
0110 
0111   /*
0112    * How many records could be inserted, because part of the array should be left for sorting.
0113    */
0114   private int usableCapacity = 0;
0115 
0116   private long initialSize;
0117 
0118   private long totalSortTimeNanos = 0L;
0119 
0120   public UnsafeInMemorySorter(
0121     final MemoryConsumer consumer,
0122     final TaskMemoryManager memoryManager,
0123     final RecordComparator recordComparator,
0124     final PrefixComparator prefixComparator,
0125     int initialSize,
0126     boolean canUseRadixSort) {
0127     this(consumer, memoryManager, recordComparator, prefixComparator,
0128       consumer.allocateArray(initialSize * 2L), canUseRadixSort);
0129   }
0130 
0131   public UnsafeInMemorySorter(
0132       final MemoryConsumer consumer,
0133       final TaskMemoryManager memoryManager,
0134       final RecordComparator recordComparator,
0135       final PrefixComparator prefixComparator,
0136       LongArray array,
0137       boolean canUseRadixSort) {
0138     this.consumer = consumer;
0139     this.memoryManager = memoryManager;
0140     this.initialSize = array.size();
0141     if (recordComparator != null) {
0142       this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
0143       if (canUseRadixSort && prefixComparator instanceof PrefixComparators.RadixSortSupport) {
0144         this.radixSortSupport = (PrefixComparators.RadixSortSupport)prefixComparator;
0145       } else {
0146         this.radixSortSupport = null;
0147       }
0148     } else {
0149       this.sortComparator = null;
0150       this.radixSortSupport = null;
0151     }
0152     this.array = array;
0153     this.usableCapacity = getUsableCapacity();
0154   }
0155 
0156   private int getUsableCapacity() {
0157     // Radix sort requires same amount of used memory as buffer, Tim sort requires
0158     // half of the used memory as buffer.
0159     return (int) (array.size() / (radixSortSupport != null ? 2 : 1.5));
0160   }
0161 
0162   /**
0163    * Free the memory used by pointer array.
0164    */
0165   public void free() {
0166     if (consumer != null) {
0167       if (array != null) {
0168         consumer.freeArray(array);
0169       }
0170       array = null;
0171     }
0172   }
0173 
0174   public void reset() {
0175     if (consumer != null) {
0176       consumer.freeArray(array);
0177       // the call to consumer.allocateArray may trigger a spill which in turn access this instance
0178       // and eventually re-enter this method and try to free the array again.  by setting the array
0179       // to null and its length to 0 we effectively make the spill code-path a no-op.  setting the
0180       // array to null also indicates that it has already been de-allocated which prevents a double
0181       // de-allocation in free().
0182       array = null;
0183       usableCapacity = 0;
0184       pos = 0;
0185       nullBoundaryPos = 0;
0186       array = consumer.allocateArray(initialSize);
0187       usableCapacity = getUsableCapacity();
0188     }
0189     pos = 0;
0190     nullBoundaryPos = 0;
0191   }
0192 
0193   /**
0194    * @return the number of records that have been inserted into this sorter.
0195    */
0196   public int numRecords() {
0197     return pos / 2;
0198   }
0199 
0200   /**
0201    * @return the total amount of time spent sorting data (in-memory only).
0202    */
0203   public long getSortTimeNanos() {
0204     return totalSortTimeNanos;
0205   }
0206 
0207   public long getMemoryUsage() {
0208     if (array == null) {
0209       return 0L;
0210     }
0211 
0212     return array.size() * 8;
0213   }
0214 
0215   public boolean hasSpaceForAnotherRecord() {
0216     return pos + 1 < usableCapacity;
0217   }
0218 
0219   public void expandPointerArray(LongArray newArray) {
0220     if (newArray.size() < array.size()) {
0221       // checkstyle.off: RegexpSinglelineJava
0222       throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
0223       // checkstyle.on: RegexpSinglelineJava
0224     }
0225     Platform.copyMemory(
0226       array.getBaseObject(),
0227       array.getBaseOffset(),
0228       newArray.getBaseObject(),
0229       newArray.getBaseOffset(),
0230       pos * 8L);
0231     consumer.freeArray(array);
0232     array = newArray;
0233     usableCapacity = getUsableCapacity();
0234   }
0235 
0236   /**
0237    * Inserts a record to be sorted. Assumes that the record pointer points to a record length
0238    * stored as a uaoSize(4 or 8) bytes integer, followed by the record's bytes.
0239    *
0240    * @param recordPointer pointer to a record in a data page, encoded by {@link TaskMemoryManager}.
0241    * @param keyPrefix a user-defined key prefix
0242    */
0243   public void insertRecord(long recordPointer, long keyPrefix, boolean prefixIsNull) {
0244     if (!hasSpaceForAnotherRecord()) {
0245       throw new IllegalStateException("There is no space for new record");
0246     }
0247     if (prefixIsNull && radixSortSupport != null) {
0248       // Swap forward a non-null record to make room for this one at the beginning of the array.
0249       array.set(pos, array.get(nullBoundaryPos));
0250       pos++;
0251       array.set(pos, array.get(nullBoundaryPos + 1));
0252       pos++;
0253       // Place this record in the vacated position.
0254       array.set(nullBoundaryPos, recordPointer);
0255       nullBoundaryPos++;
0256       array.set(nullBoundaryPos, keyPrefix);
0257       nullBoundaryPos++;
0258     } else {
0259       array.set(pos, recordPointer);
0260       pos++;
0261       array.set(pos, keyPrefix);
0262       pos++;
0263     }
0264   }
0265 
0266   public final class SortedIterator extends UnsafeSorterIterator implements Cloneable {
0267 
0268     private final int numRecords;
0269     private int position;
0270     private int offset;
0271     private Object baseObject;
0272     private long baseOffset;
0273     private long keyPrefix;
0274     private int recordLength;
0275     private long currentPageNumber;
0276     private final TaskContext taskContext = TaskContext.get();
0277 
0278     private SortedIterator(int numRecords, int offset) {
0279       this.numRecords = numRecords;
0280       this.position = 0;
0281       this.offset = offset;
0282     }
0283 
0284     public SortedIterator clone() {
0285       SortedIterator iter = new SortedIterator(numRecords, offset);
0286       iter.position = position;
0287       iter.baseObject = baseObject;
0288       iter.baseOffset = baseOffset;
0289       iter.keyPrefix = keyPrefix;
0290       iter.recordLength = recordLength;
0291       iter.currentPageNumber = currentPageNumber;
0292       return iter;
0293     }
0294 
0295     @Override
0296     public int getNumRecords() {
0297       return numRecords;
0298     }
0299 
0300     @Override
0301     public boolean hasNext() {
0302       return position / 2 < numRecords;
0303     }
0304 
0305     @Override
0306     public void loadNext() {
0307       // Kill the task in case it has been marked as killed. This logic is from
0308       // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order
0309       // to avoid performance overhead. This check is added here in `loadNext()` instead of in
0310       // `hasNext()` because it's technically possible for the caller to be relying on
0311       // `getNumRecords()` instead of `hasNext()` to know when to stop.
0312       if (taskContext != null) {
0313         taskContext.killTaskIfInterrupted();
0314       }
0315       // This pointer points to a 4-byte record length, followed by the record's bytes
0316       final long recordPointer = array.get(offset + position);
0317       currentPageNumber = TaskMemoryManager.decodePageNumber(recordPointer);
0318       int uaoSize = UnsafeAlignedOffset.getUaoSize();
0319       baseObject = memoryManager.getPage(recordPointer);
0320       // Skip over record length
0321       baseOffset = memoryManager.getOffsetInPage(recordPointer) + uaoSize;
0322       recordLength = UnsafeAlignedOffset.getSize(baseObject, baseOffset - uaoSize);
0323       keyPrefix = array.get(offset + position + 1);
0324       position += 2;
0325     }
0326 
0327     @Override
0328     public Object getBaseObject() { return baseObject; }
0329 
0330     @Override
0331     public long getBaseOffset() { return baseOffset; }
0332 
0333     public long getCurrentPageNumber() {
0334       return currentPageNumber;
0335     }
0336 
0337     @Override
0338     public int getRecordLength() { return recordLength; }
0339 
0340     @Override
0341     public long getKeyPrefix() { return keyPrefix; }
0342   }
0343 
0344   /**
0345    * Return an iterator over record pointers in sorted order. For efficiency, all calls to
0346    * {@code next()} will return the same mutable object.
0347    */
0348   public UnsafeSorterIterator getSortedIterator() {
0349     int offset = 0;
0350     long start = System.nanoTime();
0351     if (sortComparator != null) {
0352       if (this.radixSortSupport != null) {
0353         offset = RadixSort.sortKeyPrefixArray(
0354           array, nullBoundaryPos, (pos - nullBoundaryPos) / 2L, 0, 7,
0355           radixSortSupport.sortDescending(), radixSortSupport.sortSigned());
0356       } else {
0357         MemoryBlock unused = new MemoryBlock(
0358           array.getBaseObject(),
0359           array.getBaseOffset() + pos * 8L,
0360           (array.size() - pos) * 8L);
0361         LongArray buffer = new LongArray(unused);
0362         Sorter<RecordPointerAndKeyPrefix, LongArray> sorter =
0363           new Sorter<>(new UnsafeSortDataFormat(buffer));
0364         sorter.sort(array, 0, pos / 2, sortComparator);
0365       }
0366     }
0367     totalSortTimeNanos += System.nanoTime() - start;
0368     if (nullBoundaryPos > 0) {
0369       assert radixSortSupport != null : "Nulls are only stored separately with radix sort";
0370       LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
0371 
0372       // The null order is either LAST or FIRST, regardless of sorting direction (ASC|DESC)
0373       if (radixSortSupport.nullsFirst()) {
0374         queue.add(new SortedIterator(nullBoundaryPos / 2, 0));
0375         queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset));
0376       } else {
0377         queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset));
0378         queue.add(new SortedIterator(nullBoundaryPos / 2, 0));
0379       }
0380       return new UnsafeExternalSorter.ChainedIterator(queue);
0381     } else {
0382       return new SortedIterator(pos / 2, offset);
0383     }
0384   }
0385 }