Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed under the Apache License, Version 2.0 (the "License");
0003  * you may not use this file except in compliance with the License.
0004  * You may obtain a copy of the License at
0005  *
0006  *     http://www.apache.org/licenses/LICENSE-2.0
0007  *
0008  * Unless required by applicable law or agreed to in writing, software
0009  * distributed under the License is distributed on an "AS IS" BASIS,
0010  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0011  * See the License for the specific language governing permissions and
0012  * limitations under the License.
0013  */
0014 package org.apache.spark.io;
0015 
0016 import org.apache.spark.storage.StorageUtils;
0017 
0018 import java.io.File;
0019 import java.io.IOException;
0020 import java.io.InputStream;
0021 import java.nio.ByteBuffer;
0022 import java.nio.channels.FileChannel;
0023 import java.nio.file.StandardOpenOption;
0024 
0025 /**
0026  * {@link InputStream} implementation which uses direct buffer
0027  * to read a file to avoid extra copy of data between Java and
0028  * native memory which happens when using {@link java.io.BufferedInputStream}.
0029  * Unfortunately, this is not something already available in JDK,
0030  * {@code sun.nio.ch.ChannelInputStream} supports reading a file using nio,
0031  * but does not support buffering.
0032  */
0033 public final class NioBufferedFileInputStream extends InputStream {
0034 
0035   private static final int DEFAULT_BUFFER_SIZE_BYTES = 8192;
0036 
0037   private final ByteBuffer byteBuffer;
0038 
0039   private final FileChannel fileChannel;
0040 
0041   public NioBufferedFileInputStream(File file, int bufferSizeInBytes) throws IOException {
0042     byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
0043     fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
0044     byteBuffer.flip();
0045   }
0046 
0047   public NioBufferedFileInputStream(File file) throws IOException {
0048     this(file, DEFAULT_BUFFER_SIZE_BYTES);
0049   }
0050 
0051   /**
0052    * Checks weather data is left to be read from the input stream.
0053    * @return true if data is left, false otherwise
0054    */
0055   private boolean refill() throws IOException {
0056     if (!byteBuffer.hasRemaining()) {
0057       byteBuffer.clear();
0058       int nRead = 0;
0059       while (nRead == 0) {
0060         nRead = fileChannel.read(byteBuffer);
0061       }
0062       byteBuffer.flip();
0063       if (nRead < 0) {
0064         return false;
0065       }
0066     }
0067     return true;
0068   }
0069 
0070   @Override
0071   public synchronized int read() throws IOException {
0072     if (!refill()) {
0073       return -1;
0074     }
0075     return byteBuffer.get() & 0xFF;
0076   }
0077 
0078   @Override
0079   public synchronized int read(byte[] b, int offset, int len) throws IOException {
0080     if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
0081       throw new IndexOutOfBoundsException();
0082     }
0083     if (!refill()) {
0084       return -1;
0085     }
0086     len = Math.min(len, byteBuffer.remaining());
0087     byteBuffer.get(b, offset, len);
0088     return len;
0089   }
0090 
0091   @Override
0092   public synchronized int available() throws IOException {
0093     return byteBuffer.remaining();
0094   }
0095 
0096   @Override
0097   public synchronized long skip(long n) throws IOException {
0098     if (n <= 0L) {
0099       return 0L;
0100     }
0101     if (byteBuffer.remaining() >= n) {
0102       // The buffered content is enough to skip
0103       byteBuffer.position(byteBuffer.position() + (int) n);
0104       return n;
0105     }
0106     long skippedFromBuffer = byteBuffer.remaining();
0107     long toSkipFromFileChannel = n - skippedFromBuffer;
0108     // Discard everything we have read in the buffer.
0109     byteBuffer.position(0);
0110     byteBuffer.flip();
0111     return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
0112   }
0113 
0114   private long skipFromFileChannel(long n) throws IOException {
0115     long currentFilePosition = fileChannel.position();
0116     long size = fileChannel.size();
0117     if (n > size - currentFilePosition) {
0118       fileChannel.position(size);
0119       return size - currentFilePosition;
0120     } else {
0121       fileChannel.position(currentFilePosition + n);
0122       return n;
0123     }
0124   }
0125 
0126   @Override
0127   public synchronized void close() throws IOException {
0128     fileChannel.close();
0129     StorageUtils.dispose(byteBuffer);
0130   }
0131 
0132   @SuppressWarnings("deprecation")
0133   @Override
0134   protected void finalize() throws IOException {
0135     close();
0136   }
0137 }