Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.shuffle.sort;
0019 
0020 import java.util.Comparator;
0021 
0022 import org.apache.spark.memory.MemoryConsumer;
0023 import org.apache.spark.unsafe.Platform;
0024 import org.apache.spark.unsafe.array.LongArray;
0025 import org.apache.spark.unsafe.memory.MemoryBlock;
0026 import org.apache.spark.util.collection.Sorter;
0027 import org.apache.spark.util.collection.unsafe.sort.RadixSort;
0028 
0029 final class ShuffleInMemorySorter {
0030 
0031   private static final class SortComparator implements Comparator<PackedRecordPointer> {
0032     @Override
0033     public int compare(PackedRecordPointer left, PackedRecordPointer right) {
0034       int leftId = left.getPartitionId();
0035       int rightId = right.getPartitionId();
0036       return leftId < rightId ? -1 : (leftId > rightId ? 1 : 0);
0037     }
0038   }
0039   private static final SortComparator SORT_COMPARATOR = new SortComparator();
0040 
0041   private final MemoryConsumer consumer;
0042 
0043   /**
0044    * An array of record pointers and partition ids that have been encoded by
0045    * {@link PackedRecordPointer}. The sort operates on this array instead of directly manipulating
0046    * records.
0047    *
0048    * Only part of the array will be used to store the pointers, the rest part is preserved as
0049    * temporary buffer for sorting.
0050    */
0051   private LongArray array;
0052 
0053   /**
0054    * Whether to use radix sort for sorting in-memory partition ids. Radix sort is much faster
0055    * but requires additional memory to be reserved memory as pointers are added.
0056    */
0057   private final boolean useRadixSort;
0058 
0059   /**
0060    * The position in the pointer array where new records can be inserted.
0061    */
0062   private int pos = 0;
0063 
0064   /**
0065    * How many records could be inserted, because part of the array should be left for sorting.
0066    */
0067   private int usableCapacity = 0;
0068 
0069   private final int initialSize;
0070 
0071   ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) {
0072     this.consumer = consumer;
0073     assert (initialSize > 0);
0074     this.initialSize = initialSize;
0075     this.useRadixSort = useRadixSort;
0076     this.array = consumer.allocateArray(initialSize);
0077     this.usableCapacity = getUsableCapacity();
0078   }
0079 
0080   private int getUsableCapacity() {
0081     // Radix sort requires same amount of used memory as buffer, Tim sort requires
0082     // half of the used memory as buffer.
0083     return (int) (array.size() / (useRadixSort ? 2 : 1.5));
0084   }
0085 
0086   public void free() {
0087     if (array != null) {
0088       consumer.freeArray(array);
0089       array = null;
0090     }
0091   }
0092 
0093   public int numRecords() {
0094     return pos;
0095   }
0096 
0097   public void reset() {
0098     // Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op.
0099     pos = 0;
0100     if (consumer != null) {
0101       consumer.freeArray(array);
0102       // As `array` has been released, we should set it to  `null` to avoid accessing it before
0103       // `allocateArray` returns. `usableCapacity` is also set to `0` to avoid any codes writing
0104       // data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in
0105       // ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to access
0106       // `ShuffleInMemorySorter` when `allocateArray` throws SparkOutOfMemoryError).
0107       array = null;
0108       usableCapacity = 0;
0109       array = consumer.allocateArray(initialSize);
0110       usableCapacity = getUsableCapacity();
0111     }
0112   }
0113 
0114   public void expandPointerArray(LongArray newArray) {
0115     assert(newArray.size() > array.size());
0116     Platform.copyMemory(
0117       array.getBaseObject(),
0118       array.getBaseOffset(),
0119       newArray.getBaseObject(),
0120       newArray.getBaseOffset(),
0121       pos * 8L
0122     );
0123     consumer.freeArray(array);
0124     array = newArray;
0125     usableCapacity = getUsableCapacity();
0126   }
0127 
0128   public boolean hasSpaceForAnotherRecord() {
0129     return pos < usableCapacity;
0130   }
0131 
0132   public long getMemoryUsage() {
0133     return array.size() * 8;
0134   }
0135 
0136   /**
0137    * Inserts a record to be sorted.
0138    *
0139    * @param recordPointer a pointer to the record, encoded by the task memory manager. Due to
0140    *                      certain pointer compression techniques used by the sorter, the sort can
0141    *                      only operate on pointers that point to locations in the first
0142    *                      {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page.
0143    * @param partitionId the partition id, which must be less than or equal to
0144    *                    {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}.
0145    */
0146   public void insertRecord(long recordPointer, int partitionId) {
0147     if (!hasSpaceForAnotherRecord()) {
0148       throw new IllegalStateException("There is no space for new record");
0149     }
0150     array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
0151     pos++;
0152   }
0153 
0154   /**
0155    * An iterator-like class that's used instead of Java's Iterator in order to facilitate inlining.
0156    */
0157   public static final class ShuffleSorterIterator {
0158 
0159     private final LongArray pointerArray;
0160     private final int limit;
0161     final PackedRecordPointer packedRecordPointer = new PackedRecordPointer();
0162     private int position = 0;
0163 
0164     ShuffleSorterIterator(int numRecords, LongArray pointerArray, int startingPosition) {
0165       this.limit = numRecords + startingPosition;
0166       this.pointerArray = pointerArray;
0167       this.position = startingPosition;
0168     }
0169 
0170     public boolean hasNext() {
0171       return position < limit;
0172     }
0173 
0174     public void loadNext() {
0175       packedRecordPointer.set(pointerArray.get(position));
0176       position++;
0177     }
0178   }
0179 
0180   /**
0181    * Return an iterator over record pointers in sorted order.
0182    */
0183   public ShuffleSorterIterator getSortedIterator() {
0184     int offset = 0;
0185     if (useRadixSort) {
0186       offset = RadixSort.sort(
0187         array, pos,
0188         PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
0189         PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
0190     } else {
0191       MemoryBlock unused = new MemoryBlock(
0192         array.getBaseObject(),
0193         array.getBaseOffset() + pos * 8L,
0194         (array.size() - pos) * 8L);
0195       LongArray buffer = new LongArray(unused);
0196       Sorter<PackedRecordPointer, LongArray> sorter =
0197         new Sorter<>(new ShuffleSortDataFormat(buffer));
0198 
0199       sorter.sort(array, 0, pos, SORT_COMPARATOR);
0200     }
0201     return new ShuffleSorterIterator(pos, array, offset);
0202   }
0203 }