0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 package org.apache.spark.io;
0015
0016 import org.apache.spark.storage.StorageUtils;
0017
0018 import java.io.File;
0019 import java.io.IOException;
0020 import java.io.InputStream;
0021 import java.nio.ByteBuffer;
0022 import java.nio.channels.FileChannel;
0023 import java.nio.file.StandardOpenOption;
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033 public final class NioBufferedFileInputStream extends InputStream {
0034
0035 private static final int DEFAULT_BUFFER_SIZE_BYTES = 8192;
0036
0037 private final ByteBuffer byteBuffer;
0038
0039 private final FileChannel fileChannel;
0040
0041 public NioBufferedFileInputStream(File file, int bufferSizeInBytes) throws IOException {
0042 byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
0043 fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
0044 byteBuffer.flip();
0045 }
0046
0047 public NioBufferedFileInputStream(File file) throws IOException {
0048 this(file, DEFAULT_BUFFER_SIZE_BYTES);
0049 }
0050
0051
0052
0053
0054
0055 private boolean refill() throws IOException {
0056 if (!byteBuffer.hasRemaining()) {
0057 byteBuffer.clear();
0058 int nRead = 0;
0059 while (nRead == 0) {
0060 nRead = fileChannel.read(byteBuffer);
0061 }
0062 byteBuffer.flip();
0063 if (nRead < 0) {
0064 return false;
0065 }
0066 }
0067 return true;
0068 }
0069
0070 @Override
0071 public synchronized int read() throws IOException {
0072 if (!refill()) {
0073 return -1;
0074 }
0075 return byteBuffer.get() & 0xFF;
0076 }
0077
0078 @Override
0079 public synchronized int read(byte[] b, int offset, int len) throws IOException {
0080 if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
0081 throw new IndexOutOfBoundsException();
0082 }
0083 if (!refill()) {
0084 return -1;
0085 }
0086 len = Math.min(len, byteBuffer.remaining());
0087 byteBuffer.get(b, offset, len);
0088 return len;
0089 }
0090
0091 @Override
0092 public synchronized int available() throws IOException {
0093 return byteBuffer.remaining();
0094 }
0095
0096 @Override
0097 public synchronized long skip(long n) throws IOException {
0098 if (n <= 0L) {
0099 return 0L;
0100 }
0101 if (byteBuffer.remaining() >= n) {
0102
0103 byteBuffer.position(byteBuffer.position() + (int) n);
0104 return n;
0105 }
0106 long skippedFromBuffer = byteBuffer.remaining();
0107 long toSkipFromFileChannel = n - skippedFromBuffer;
0108
0109 byteBuffer.position(0);
0110 byteBuffer.flip();
0111 return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
0112 }
0113
0114 private long skipFromFileChannel(long n) throws IOException {
0115 long currentFilePosition = fileChannel.position();
0116 long size = fileChannel.size();
0117 if (n > size - currentFilePosition) {
0118 fileChannel.position(size);
0119 return size - currentFilePosition;
0120 } else {
0121 fileChannel.position(currentFilePosition + n);
0122 return n;
0123 }
0124 }
0125
0126 @Override
0127 public synchronized void close() throws IOException {
0128 fileChannel.close();
0129 StorageUtils.dispose(byteBuffer);
0130 }
0131
0132 @SuppressWarnings("deprecation")
0133 @Override
0134 protected void finalize() throws IOException {
0135 close();
0136 }
0137 }