Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed under the Apache License, Version 2.0 (the "License");
0003  * you may not use this file except in compliance with the License.
0004  * You may obtain a copy of the License at
0005  *
0006  *     http://www.apache.org/licenses/LICENSE-2.0
0007  *
0008  * Unless required by applicable law or agreed to in writing, software
0009  * distributed under the License is distributed on an "AS IS" BASIS,
0010  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0011  * See the License for the specific language governing permissions and
0012  * limitations under the License.
0013  */
0014 package org.apache.spark.io;
0015 
0016 import com.google.common.base.Preconditions;
0017 import com.google.common.base.Throwables;
0018 import org.apache.spark.util.ThreadUtils;
0019 import org.slf4j.Logger;
0020 import org.slf4j.LoggerFactory;
0021 
0022 import javax.annotation.concurrent.GuardedBy;
0023 import java.io.EOFException;
0024 import java.io.IOException;
0025 import java.io.InputStream;
0026 import java.io.InterruptedIOException;
0027 import java.nio.ByteBuffer;
0028 import java.util.concurrent.ExecutorService;
0029 import java.util.concurrent.TimeUnit;
0030 import java.util.concurrent.atomic.AtomicBoolean;
0031 import java.util.concurrent.locks.Condition;
0032 import java.util.concurrent.locks.ReentrantLock;
0033 
0034 /**
0035  * {@link InputStream} implementation which asynchronously reads ahead from the underlying input
0036  * stream when specified amount of data has been read from the current buffer. It does it by
0037  * maintaining two buffers - active buffer and read ahead buffer. Active buffer contains data
0038  * which should be returned when a read() call is issued. The read ahead buffer is used to
0039  * asynchronously read from the underlying input stream and once the current active buffer is
0040  * exhausted, we flip the two buffers so that we can start reading from the read ahead buffer
0041  * without being blocked in disk I/O.
0042  */
0043 public class ReadAheadInputStream extends InputStream {
0044 
0045   private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class);
0046 
0047   private ReentrantLock stateChangeLock = new ReentrantLock();
0048 
0049   @GuardedBy("stateChangeLock")
0050   private ByteBuffer activeBuffer;
0051 
0052   @GuardedBy("stateChangeLock")
0053   private ByteBuffer readAheadBuffer;
0054 
0055   @GuardedBy("stateChangeLock")
0056   private boolean endOfStream;
0057 
0058   @GuardedBy("stateChangeLock")
0059   // true if async read is in progress
0060   private boolean readInProgress;
0061 
0062   @GuardedBy("stateChangeLock")
0063   // true if read is aborted due to an exception in reading from underlying input stream.
0064   private boolean readAborted;
0065 
0066   @GuardedBy("stateChangeLock")
0067   private Throwable readException;
0068 
0069   @GuardedBy("stateChangeLock")
0070   // whether the close method is called.
0071   private boolean isClosed;
0072 
0073   @GuardedBy("stateChangeLock")
0074   // true when the close method will close the underlying input stream. This is valid only if
0075   // `isClosed` is true.
0076   private boolean isUnderlyingInputStreamBeingClosed;
0077 
0078   @GuardedBy("stateChangeLock")
0079   // whether there is a read ahead task running,
0080   private boolean isReading;
0081 
0082   // whether there is a reader waiting for data.
0083   private AtomicBoolean isWaiting = new AtomicBoolean(false);
0084 
0085   private final InputStream underlyingInputStream;
0086 
0087   private final ExecutorService executorService =
0088       ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
0089 
0090   private final Condition asyncReadComplete = stateChangeLock.newCondition();
0091 
0092   private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]);
0093 
0094   /**
0095    * Creates a <code>ReadAheadInputStream</code> with the specified buffer size and read-ahead
0096    * threshold
0097    *
0098    * @param inputStream The underlying input stream.
0099    * @param bufferSizeInBytes The buffer size.
0100    */
0101   public ReadAheadInputStream(
0102       InputStream inputStream, int bufferSizeInBytes) {
0103     Preconditions.checkArgument(bufferSizeInBytes > 0,
0104         "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
0105     activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
0106     readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
0107     this.underlyingInputStream = inputStream;
0108     activeBuffer.flip();
0109     readAheadBuffer.flip();
0110   }
0111 
0112   private boolean isEndOfStream() {
0113     return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream);
0114   }
0115 
0116   private void checkReadException() throws IOException {
0117     if (readAborted) {
0118       Throwables.propagateIfPossible(readException, IOException.class);
0119       throw new IOException(readException);
0120     }
0121   }
0122 
0123   /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */
0124   private void readAsync() throws IOException {
0125     stateChangeLock.lock();
0126     final byte[] arr = readAheadBuffer.array();
0127     try {
0128       if (endOfStream || readInProgress) {
0129         return;
0130       }
0131       checkReadException();
0132       readAheadBuffer.position(0);
0133       readAheadBuffer.flip();
0134       readInProgress = true;
0135     } finally {
0136       stateChangeLock.unlock();
0137     }
0138     executorService.execute(() -> {
0139       stateChangeLock.lock();
0140       try {
0141         if (isClosed) {
0142           readInProgress = false;
0143           return;
0144         }
0145         // Flip this so that the close method will not close the underlying input stream when we
0146         // are reading.
0147         isReading = true;
0148       } finally {
0149         stateChangeLock.unlock();
0150       }
0151 
0152       // Please note that it is safe to release the lock and read into the read ahead buffer
0153       // because either of following two conditions will hold - 1. The active buffer has
0154       // data available to read so the reader will not read from the read ahead buffer.
0155       // 2. This is the first time read is called or the active buffer is exhausted,
0156       // in that case the reader waits for this async read to complete.
0157       // So there is no race condition in both the situations.
0158       int read = 0;
0159       int off = 0, len = arr.length;
0160       Throwable exception = null;
0161       try {
0162         // try to fill the read ahead buffer.
0163         // if a reader is waiting, possibly return early.
0164         do {
0165           read = underlyingInputStream.read(arr, off, len);
0166           if (read <= 0) break;
0167           off += read;
0168           len -= read;
0169         } while (len > 0 && !isWaiting.get());
0170       } catch (Throwable ex) {
0171         exception = ex;
0172         if (ex instanceof Error) {
0173           // `readException` may not be reported to the user. Rethrow Error to make sure at least
0174           // The user can see Error in UncaughtExceptionHandler.
0175           throw (Error) ex;
0176         }
0177       } finally {
0178         stateChangeLock.lock();
0179         readAheadBuffer.limit(off);
0180         if (read < 0 || (exception instanceof EOFException)) {
0181           endOfStream = true;
0182         } else if (exception != null) {
0183           readAborted = true;
0184           readException = exception;
0185         }
0186         readInProgress = false;
0187         signalAsyncReadComplete();
0188         stateChangeLock.unlock();
0189         closeUnderlyingInputStreamIfNecessary();
0190       }
0191     });
0192   }
0193 
0194   private void closeUnderlyingInputStreamIfNecessary() {
0195     boolean needToCloseUnderlyingInputStream = false;
0196     stateChangeLock.lock();
0197     try {
0198       isReading = false;
0199       if (isClosed && !isUnderlyingInputStreamBeingClosed) {
0200         // close method cannot close underlyingInputStream because we were reading.
0201         needToCloseUnderlyingInputStream = true;
0202       }
0203     } finally {
0204       stateChangeLock.unlock();
0205     }
0206     if (needToCloseUnderlyingInputStream) {
0207       try {
0208         underlyingInputStream.close();
0209       } catch (IOException e) {
0210         logger.warn(e.getMessage(), e);
0211       }
0212     }
0213   }
0214 
0215   private void signalAsyncReadComplete() {
0216     stateChangeLock.lock();
0217     try {
0218       asyncReadComplete.signalAll();
0219     } finally {
0220       stateChangeLock.unlock();
0221     }
0222   }
0223 
0224   private void waitForAsyncReadComplete() throws IOException {
0225     stateChangeLock.lock();
0226     isWaiting.set(true);
0227     try {
0228       // There is only one reader, and one writer, so the writer should signal only once,
0229       // but a while loop checking the wake up condition is still needed to avoid spurious wakeups.
0230       while (readInProgress) {
0231         asyncReadComplete.await();
0232       }
0233     } catch (InterruptedException e) {
0234       InterruptedIOException iio = new InterruptedIOException(e.getMessage());
0235       iio.initCause(e);
0236       throw iio;
0237     } finally {
0238       isWaiting.set(false);
0239       stateChangeLock.unlock();
0240     }
0241     checkReadException();
0242   }
0243 
0244   @Override
0245   public int read() throws IOException {
0246     if (activeBuffer.hasRemaining()) {
0247       // short path - just get one byte.
0248       return activeBuffer.get() & 0xFF;
0249     } else {
0250       byte[] oneByteArray = oneByte.get();
0251       return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF;
0252     }
0253   }
0254 
0255   @Override
0256   public int read(byte[] b, int offset, int len) throws IOException {
0257     if (offset < 0 || len < 0 || len > b.length - offset) {
0258       throw new IndexOutOfBoundsException();
0259     }
0260     if (len == 0) {
0261       return 0;
0262     }
0263 
0264     if (!activeBuffer.hasRemaining()) {
0265       // No remaining in active buffer - lock and switch to write ahead buffer.
0266       stateChangeLock.lock();
0267       try {
0268         waitForAsyncReadComplete();
0269         if (!readAheadBuffer.hasRemaining()) {
0270           // The first read.
0271           readAsync();
0272           waitForAsyncReadComplete();
0273           if (isEndOfStream()) {
0274             return -1;
0275           }
0276         }
0277         // Swap the newly read read ahead buffer in place of empty active buffer.
0278         swapBuffers();
0279         // After swapping buffers, trigger another async read for read ahead buffer.
0280         readAsync();
0281       } finally {
0282         stateChangeLock.unlock();
0283       }
0284     }
0285     len = Math.min(len, activeBuffer.remaining());
0286     activeBuffer.get(b, offset, len);
0287 
0288     return len;
0289   }
0290 
0291   /**
0292    * flip the active and read ahead buffer
0293    */
0294   private void swapBuffers() {
0295     ByteBuffer temp = activeBuffer;
0296     activeBuffer = readAheadBuffer;
0297     readAheadBuffer = temp;
0298   }
0299 
0300   @Override
0301   public int available() throws IOException {
0302     stateChangeLock.lock();
0303     // Make sure we have no integer overflow.
0304     try {
0305       return (int) Math.min((long) Integer.MAX_VALUE,
0306           (long) activeBuffer.remaining() + readAheadBuffer.remaining());
0307     } finally {
0308       stateChangeLock.unlock();
0309     }
0310   }
0311 
0312   @Override
0313   public long skip(long n) throws IOException {
0314     if (n <= 0L) {
0315       return 0L;
0316     }
0317     if (n <= activeBuffer.remaining()) {
0318       // Only skipping from active buffer is sufficient
0319       activeBuffer.position((int) n + activeBuffer.position());
0320       return n;
0321     }
0322     stateChangeLock.lock();
0323     long skipped;
0324     try {
0325       skipped = skipInternal(n);
0326     } finally {
0327       stateChangeLock.unlock();
0328     }
0329     return skipped;
0330   }
0331 
0332   /**
0333    * Internal skip function which should be called only from skip() api. The assumption is that
0334    * the stateChangeLock is already acquired in the caller before calling this function.
0335    */
0336   private long skipInternal(long n) throws IOException {
0337     assert (stateChangeLock.isLocked());
0338     waitForAsyncReadComplete();
0339     if (isEndOfStream()) {
0340       return 0;
0341     }
0342     if (available() >= n) {
0343       // we can skip from the internal buffers
0344       int toSkip = (int) n;
0345       // We need to skip from both active buffer and read ahead buffer
0346       toSkip -= activeBuffer.remaining();
0347       assert(toSkip > 0); // skipping from activeBuffer already handled.
0348       activeBuffer.position(0);
0349       activeBuffer.flip();
0350       readAheadBuffer.position(toSkip + readAheadBuffer.position());
0351       swapBuffers();
0352       // Trigger async read to emptied read ahead buffer.
0353       readAsync();
0354       return n;
0355     } else {
0356       int skippedBytes = available();
0357       long toSkip = n - skippedBytes;
0358       activeBuffer.position(0);
0359       activeBuffer.flip();
0360       readAheadBuffer.position(0);
0361       readAheadBuffer.flip();
0362       long skippedFromInputStream = underlyingInputStream.skip(toSkip);
0363       readAsync();
0364       return skippedBytes + skippedFromInputStream;
0365     }
0366   }
0367 
0368   @Override
0369   public void close() throws IOException {
0370     boolean isSafeToCloseUnderlyingInputStream = false;
0371     stateChangeLock.lock();
0372     try {
0373       if (isClosed) {
0374         return;
0375       }
0376       isClosed = true;
0377       if (!isReading) {
0378         // Nobody is reading, so we can close the underlying input stream in this method.
0379         isSafeToCloseUnderlyingInputStream = true;
0380         // Flip this to make sure the read ahead task will not close the underlying input stream.
0381         isUnderlyingInputStreamBeingClosed = true;
0382       }
0383     } finally {
0384       stateChangeLock.unlock();
0385     }
0386 
0387     try {
0388       executorService.shutdownNow();
0389       executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
0390     } catch (InterruptedException e) {
0391       InterruptedIOException iio = new InterruptedIOException(e.getMessage());
0392       iio.initCause(e);
0393       throw iio;
0394     } finally {
0395       if (isSafeToCloseUnderlyingInputStream) {
0396         underlyingInputStream.close();
0397       }
0398     }
0399   }
0400 }