Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.util.collection.unsafe.sort;
0019 
0020 import com.google.common.io.ByteStreams;
0021 import com.google.common.io.Closeables;
0022 import org.apache.spark.SparkEnv;
0023 import org.apache.spark.TaskContext;
0024 import org.apache.spark.internal.config.package$;
0025 import org.apache.spark.internal.config.ConfigEntry;
0026 import org.apache.spark.io.NioBufferedFileInputStream;
0027 import org.apache.spark.io.ReadAheadInputStream;
0028 import org.apache.spark.serializer.SerializerManager;
0029 import org.apache.spark.storage.BlockId;
0030 import org.apache.spark.unsafe.Platform;
0031 
0032 import java.io.*;
0033 
0034 /**
0035  * Reads spill files written by {@link UnsafeSorterSpillWriter} (see that class for a description
0036  * of the file format).
0037  */
0038 public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable {
0039   public static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb
0040 
0041   private InputStream in;
0042   private DataInputStream din;
0043 
0044   // Variables that change with every record read:
0045   private int recordLength;
0046   private long keyPrefix;
0047   private int numRecords;
0048   private int numRecordsRemaining;
0049 
0050   private byte[] arr = new byte[1024 * 1024];
0051   private Object baseObject = arr;
0052   private final TaskContext taskContext = TaskContext.get();
0053 
0054   public UnsafeSorterSpillReader(
0055       SerializerManager serializerManager,
0056       File file,
0057       BlockId blockId) throws IOException {
0058     assert (file.length() > 0);
0059     final ConfigEntry<Object> bufferSizeConfigEntry =
0060         package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE();
0061     // This value must be less than or equal to MAX_BUFFER_SIZE_BYTES. Cast to int is always safe.
0062     final int DEFAULT_BUFFER_SIZE_BYTES =
0063         ((Long) bufferSizeConfigEntry.defaultValue().get()).intValue();
0064     int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES :
0065         ((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue();
0066 
0067     final boolean readAheadEnabled = SparkEnv.get() != null && (boolean)SparkEnv.get().conf().get(
0068         package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED());
0069 
0070     final InputStream bs =
0071         new NioBufferedFileInputStream(file, bufferSizeBytes);
0072     try {
0073       if (readAheadEnabled) {
0074         this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
0075                 bufferSizeBytes);
0076       } else {
0077         this.in = serializerManager.wrapStream(blockId, bs);
0078       }
0079       this.din = new DataInputStream(this.in);
0080       numRecords = numRecordsRemaining = din.readInt();
0081     } catch (IOException e) {
0082       Closeables.close(bs, /* swallowIOException = */ true);
0083       throw e;
0084     }
0085   }
0086 
0087   @Override
0088   public int getNumRecords() {
0089     return numRecords;
0090   }
0091 
0092   @Override
0093   public boolean hasNext() {
0094     return (numRecordsRemaining > 0);
0095   }
0096 
0097   @Override
0098   public void loadNext() throws IOException {
0099     // Kill the task in case it has been marked as killed. This logic is from
0100     // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order
0101     // to avoid performance overhead. This check is added here in `loadNext()` instead of in
0102     // `hasNext()` because it's technically possible for the caller to be relying on
0103     // `getNumRecords()` instead of `hasNext()` to know when to stop.
0104     if (taskContext != null) {
0105       taskContext.killTaskIfInterrupted();
0106     }
0107     recordLength = din.readInt();
0108     keyPrefix = din.readLong();
0109     if (recordLength > arr.length) {
0110       arr = new byte[recordLength];
0111       baseObject = arr;
0112     }
0113     ByteStreams.readFully(in, arr, 0, recordLength);
0114     numRecordsRemaining--;
0115     if (numRecordsRemaining == 0) {
0116       close();
0117     }
0118   }
0119 
0120   @Override
0121   public Object getBaseObject() {
0122     return baseObject;
0123   }
0124 
0125   @Override
0126   public long getBaseOffset() {
0127     return Platform.BYTE_ARRAY_OFFSET;
0128   }
0129 
0130   @Override
0131   public int getRecordLength() {
0132     return recordLength;
0133   }
0134 
0135   @Override
0136   public long getKeyPrefix() {
0137     return keyPrefix;
0138   }
0139 
0140   @Override
0141   public void close() throws IOException {
0142    if (in != null) {
0143      try {
0144        in.close();
0145      } finally {
0146        in = null;
0147        din = null;
0148      }
0149    }
0150   }
0151 }