0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 package org.apache.spark.io;
0015
0016 import com.google.common.base.Preconditions;
0017 import com.google.common.base.Throwables;
0018 import org.apache.spark.util.ThreadUtils;
0019 import org.slf4j.Logger;
0020 import org.slf4j.LoggerFactory;
0021
0022 import javax.annotation.concurrent.GuardedBy;
0023 import java.io.EOFException;
0024 import java.io.IOException;
0025 import java.io.InputStream;
0026 import java.io.InterruptedIOException;
0027 import java.nio.ByteBuffer;
0028 import java.util.concurrent.ExecutorService;
0029 import java.util.concurrent.TimeUnit;
0030 import java.util.concurrent.atomic.AtomicBoolean;
0031 import java.util.concurrent.locks.Condition;
0032 import java.util.concurrent.locks.ReentrantLock;
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043 public class ReadAheadInputStream extends InputStream {
0044
0045 private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class);
0046
0047 private ReentrantLock stateChangeLock = new ReentrantLock();
0048
0049 @GuardedBy("stateChangeLock")
0050 private ByteBuffer activeBuffer;
0051
0052 @GuardedBy("stateChangeLock")
0053 private ByteBuffer readAheadBuffer;
0054
0055 @GuardedBy("stateChangeLock")
0056 private boolean endOfStream;
0057
0058 @GuardedBy("stateChangeLock")
0059
0060 private boolean readInProgress;
0061
0062 @GuardedBy("stateChangeLock")
0063
0064 private boolean readAborted;
0065
0066 @GuardedBy("stateChangeLock")
0067 private Throwable readException;
0068
0069 @GuardedBy("stateChangeLock")
0070
0071 private boolean isClosed;
0072
0073 @GuardedBy("stateChangeLock")
0074
0075
0076 private boolean isUnderlyingInputStreamBeingClosed;
0077
0078 @GuardedBy("stateChangeLock")
0079
0080 private boolean isReading;
0081
0082
0083 private AtomicBoolean isWaiting = new AtomicBoolean(false);
0084
0085 private final InputStream underlyingInputStream;
0086
0087 private final ExecutorService executorService =
0088 ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
0089
0090 private final Condition asyncReadComplete = stateChangeLock.newCondition();
0091
0092 private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]);
0093
0094
0095
0096
0097
0098
0099
0100
0101 public ReadAheadInputStream(
0102 InputStream inputStream, int bufferSizeInBytes) {
0103 Preconditions.checkArgument(bufferSizeInBytes > 0,
0104 "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
0105 activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
0106 readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
0107 this.underlyingInputStream = inputStream;
0108 activeBuffer.flip();
0109 readAheadBuffer.flip();
0110 }
0111
0112 private boolean isEndOfStream() {
0113 return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream);
0114 }
0115
0116 private void checkReadException() throws IOException {
0117 if (readAborted) {
0118 Throwables.propagateIfPossible(readException, IOException.class);
0119 throw new IOException(readException);
0120 }
0121 }
0122
0123
0124 private void readAsync() throws IOException {
0125 stateChangeLock.lock();
0126 final byte[] arr = readAheadBuffer.array();
0127 try {
0128 if (endOfStream || readInProgress) {
0129 return;
0130 }
0131 checkReadException();
0132 readAheadBuffer.position(0);
0133 readAheadBuffer.flip();
0134 readInProgress = true;
0135 } finally {
0136 stateChangeLock.unlock();
0137 }
0138 executorService.execute(() -> {
0139 stateChangeLock.lock();
0140 try {
0141 if (isClosed) {
0142 readInProgress = false;
0143 return;
0144 }
0145
0146
0147 isReading = true;
0148 } finally {
0149 stateChangeLock.unlock();
0150 }
0151
0152
0153
0154
0155
0156
0157
0158 int read = 0;
0159 int off = 0, len = arr.length;
0160 Throwable exception = null;
0161 try {
0162
0163
0164 do {
0165 read = underlyingInputStream.read(arr, off, len);
0166 if (read <= 0) break;
0167 off += read;
0168 len -= read;
0169 } while (len > 0 && !isWaiting.get());
0170 } catch (Throwable ex) {
0171 exception = ex;
0172 if (ex instanceof Error) {
0173
0174
0175 throw (Error) ex;
0176 }
0177 } finally {
0178 stateChangeLock.lock();
0179 readAheadBuffer.limit(off);
0180 if (read < 0 || (exception instanceof EOFException)) {
0181 endOfStream = true;
0182 } else if (exception != null) {
0183 readAborted = true;
0184 readException = exception;
0185 }
0186 readInProgress = false;
0187 signalAsyncReadComplete();
0188 stateChangeLock.unlock();
0189 closeUnderlyingInputStreamIfNecessary();
0190 }
0191 });
0192 }
0193
0194 private void closeUnderlyingInputStreamIfNecessary() {
0195 boolean needToCloseUnderlyingInputStream = false;
0196 stateChangeLock.lock();
0197 try {
0198 isReading = false;
0199 if (isClosed && !isUnderlyingInputStreamBeingClosed) {
0200
0201 needToCloseUnderlyingInputStream = true;
0202 }
0203 } finally {
0204 stateChangeLock.unlock();
0205 }
0206 if (needToCloseUnderlyingInputStream) {
0207 try {
0208 underlyingInputStream.close();
0209 } catch (IOException e) {
0210 logger.warn(e.getMessage(), e);
0211 }
0212 }
0213 }
0214
0215 private void signalAsyncReadComplete() {
0216 stateChangeLock.lock();
0217 try {
0218 asyncReadComplete.signalAll();
0219 } finally {
0220 stateChangeLock.unlock();
0221 }
0222 }
0223
0224 private void waitForAsyncReadComplete() throws IOException {
0225 stateChangeLock.lock();
0226 isWaiting.set(true);
0227 try {
0228
0229
0230 while (readInProgress) {
0231 asyncReadComplete.await();
0232 }
0233 } catch (InterruptedException e) {
0234 InterruptedIOException iio = new InterruptedIOException(e.getMessage());
0235 iio.initCause(e);
0236 throw iio;
0237 } finally {
0238 isWaiting.set(false);
0239 stateChangeLock.unlock();
0240 }
0241 checkReadException();
0242 }
0243
0244 @Override
0245 public int read() throws IOException {
0246 if (activeBuffer.hasRemaining()) {
0247
0248 return activeBuffer.get() & 0xFF;
0249 } else {
0250 byte[] oneByteArray = oneByte.get();
0251 return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF;
0252 }
0253 }
0254
0255 @Override
0256 public int read(byte[] b, int offset, int len) throws IOException {
0257 if (offset < 0 || len < 0 || len > b.length - offset) {
0258 throw new IndexOutOfBoundsException();
0259 }
0260 if (len == 0) {
0261 return 0;
0262 }
0263
0264 if (!activeBuffer.hasRemaining()) {
0265
0266 stateChangeLock.lock();
0267 try {
0268 waitForAsyncReadComplete();
0269 if (!readAheadBuffer.hasRemaining()) {
0270
0271 readAsync();
0272 waitForAsyncReadComplete();
0273 if (isEndOfStream()) {
0274 return -1;
0275 }
0276 }
0277
0278 swapBuffers();
0279
0280 readAsync();
0281 } finally {
0282 stateChangeLock.unlock();
0283 }
0284 }
0285 len = Math.min(len, activeBuffer.remaining());
0286 activeBuffer.get(b, offset, len);
0287
0288 return len;
0289 }
0290
0291
0292
0293
0294 private void swapBuffers() {
0295 ByteBuffer temp = activeBuffer;
0296 activeBuffer = readAheadBuffer;
0297 readAheadBuffer = temp;
0298 }
0299
0300 @Override
0301 public int available() throws IOException {
0302 stateChangeLock.lock();
0303
0304 try {
0305 return (int) Math.min((long) Integer.MAX_VALUE,
0306 (long) activeBuffer.remaining() + readAheadBuffer.remaining());
0307 } finally {
0308 stateChangeLock.unlock();
0309 }
0310 }
0311
0312 @Override
0313 public long skip(long n) throws IOException {
0314 if (n <= 0L) {
0315 return 0L;
0316 }
0317 if (n <= activeBuffer.remaining()) {
0318
0319 activeBuffer.position((int) n + activeBuffer.position());
0320 return n;
0321 }
0322 stateChangeLock.lock();
0323 long skipped;
0324 try {
0325 skipped = skipInternal(n);
0326 } finally {
0327 stateChangeLock.unlock();
0328 }
0329 return skipped;
0330 }
0331
0332
0333
0334
0335
0336 private long skipInternal(long n) throws IOException {
0337 assert (stateChangeLock.isLocked());
0338 waitForAsyncReadComplete();
0339 if (isEndOfStream()) {
0340 return 0;
0341 }
0342 if (available() >= n) {
0343
0344 int toSkip = (int) n;
0345
0346 toSkip -= activeBuffer.remaining();
0347 assert(toSkip > 0);
0348 activeBuffer.position(0);
0349 activeBuffer.flip();
0350 readAheadBuffer.position(toSkip + readAheadBuffer.position());
0351 swapBuffers();
0352
0353 readAsync();
0354 return n;
0355 } else {
0356 int skippedBytes = available();
0357 long toSkip = n - skippedBytes;
0358 activeBuffer.position(0);
0359 activeBuffer.flip();
0360 readAheadBuffer.position(0);
0361 readAheadBuffer.flip();
0362 long skippedFromInputStream = underlyingInputStream.skip(toSkip);
0363 readAsync();
0364 return skippedBytes + skippedFromInputStream;
0365 }
0366 }
0367
0368 @Override
0369 public void close() throws IOException {
0370 boolean isSafeToCloseUnderlyingInputStream = false;
0371 stateChangeLock.lock();
0372 try {
0373 if (isClosed) {
0374 return;
0375 }
0376 isClosed = true;
0377 if (!isReading) {
0378
0379 isSafeToCloseUnderlyingInputStream = true;
0380
0381 isUnderlyingInputStreamBeingClosed = true;
0382 }
0383 } finally {
0384 stateChangeLock.unlock();
0385 }
0386
0387 try {
0388 executorService.shutdownNow();
0389 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
0390 } catch (InterruptedException e) {
0391 InterruptedIOException iio = new InterruptedIOException(e.getMessage());
0392 iio.initCause(e);
0393 throw iio;
0394 } finally {
0395 if (isSafeToCloseUnderlyingInputStream) {
0396 underlyingInputStream.close();
0397 }
0398 }
0399 }
0400 }