Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle;
0019 
0020 import java.io.DataInputStream;
0021 import java.io.File;
0022 import java.io.IOException;
0023 import java.nio.ByteBuffer;
0024 import java.nio.LongBuffer;
0025 import java.nio.file.Files;
0026 
0027 /**
0028  * Keeps the index information for a particular map output
0029  * as an in-memory LongBuffer.
0030  */
0031 public class ShuffleIndexInformation {
0032   /** offsets as long buffer */
0033   private final LongBuffer offsets;
0034   private int size;
0035 
0036   public ShuffleIndexInformation(File indexFile) throws IOException {
0037     size = (int)indexFile.length();
0038     ByteBuffer buffer = ByteBuffer.allocate(size);
0039     offsets = buffer.asLongBuffer();
0040     try (DataInputStream dis = new DataInputStream(Files.newInputStream(indexFile.toPath()))) {
0041       dis.readFully(buffer.array());
0042     }
0043   }
0044 
0045   /**
0046    * Size of the index file
0047    * @return size
0048    */
0049   public int getSize() {
0050     return size;
0051   }
0052 
0053   /**
0054    * Get index offset for a particular reducer.
0055    */
0056   public ShuffleIndexRecord getIndex(int reduceId) {
0057     return getIndex(reduceId, reduceId + 1);
0058   }
0059 
0060   /**
0061    * Get index offset for the reducer range of [startReduceId, endReduceId).
0062    */
0063   public ShuffleIndexRecord getIndex(int startReduceId, int endReduceId) {
0064     long offset = offsets.get(startReduceId);
0065     long nextOffset = offsets.get(endReduceId);
0066     return new ShuffleIndexRecord(offset, nextOffset - offset);
0067   }
0068 }