0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.shuffle.sort;
0019
0020 import org.apache.spark.unsafe.Platform;
0021 import org.apache.spark.unsafe.array.LongArray;
0022 import org.apache.spark.util.collection.SortDataFormat;
0023
0024 final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, LongArray> {
0025
0026 private final LongArray buffer;
0027
0028 ShuffleSortDataFormat(LongArray buffer) {
0029 this.buffer = buffer;
0030 }
0031
0032 @Override
0033 public PackedRecordPointer getKey(LongArray data, int pos) {
0034
0035 throw new UnsupportedOperationException();
0036 }
0037
0038 @Override
0039 public PackedRecordPointer newKey() {
0040 return new PackedRecordPointer();
0041 }
0042
0043 @Override
0044 public PackedRecordPointer getKey(LongArray data, int pos, PackedRecordPointer reuse) {
0045 reuse.set(data.get(pos));
0046 return reuse;
0047 }
0048
0049 @Override
0050 public void swap(LongArray data, int pos0, int pos1) {
0051 final long temp = data.get(pos0);
0052 data.set(pos0, data.get(pos1));
0053 data.set(pos1, temp);
0054 }
0055
0056 @Override
0057 public void copyElement(LongArray src, int srcPos, LongArray dst, int dstPos) {
0058 dst.set(dstPos, src.get(srcPos));
0059 }
0060
0061 @Override
0062 public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) {
0063 Platform.copyMemory(
0064 src.getBaseObject(),
0065 src.getBaseOffset() + srcPos * 8L,
0066 dst.getBaseObject(),
0067 dst.getBaseOffset() + dstPos * 8L,
0068 length * 8L
0069 );
0070 }
0071
0072 @Override
0073 public LongArray allocate(int length) {
0074 assert (length <= buffer.size()) :
0075 "the buffer is smaller than required: " + buffer.size() + " < " + length;
0076 return buffer;
0077 }
0078 }