0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle;
0019
0020 import java.io.DataInputStream;
0021 import java.io.File;
0022 import java.io.IOException;
0023 import java.nio.ByteBuffer;
0024 import java.nio.LongBuffer;
0025 import java.nio.file.Files;
0026
0027
0028
0029
0030
0031 public class ShuffleIndexInformation {
0032
0033 private final LongBuffer offsets;
0034 private int size;
0035
0036 public ShuffleIndexInformation(File indexFile) throws IOException {
0037 size = (int)indexFile.length();
0038 ByteBuffer buffer = ByteBuffer.allocate(size);
0039 offsets = buffer.asLongBuffer();
0040 try (DataInputStream dis = new DataInputStream(Files.newInputStream(indexFile.toPath()))) {
0041 dis.readFully(buffer.array());
0042 }
0043 }
0044
0045
0046
0047
0048
0049 public int getSize() {
0050 return size;
0051 }
0052
0053
0054
0055
0056 public ShuffleIndexRecord getIndex(int reduceId) {
0057 return getIndex(reduceId, reduceId + 1);
0058 }
0059
0060
0061
0062
0063 public ShuffleIndexRecord getIndex(int startReduceId, int endReduceId) {
0064 long offset = offsets.get(startReduceId);
0065 long nextOffset = offsets.get(endReduceId);
0066 return new ShuffleIndexRecord(offset, nextOffset - offset);
0067 }
0068 }