Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 /*
0019  * Based on LimitedInputStream.java from Google Guava
0020  *
0021  * Copyright (C) 2007 The Guava Authors
0022  *
0023  *    Licensed under the Apache License, Version 2.0 (the "License");
0024  *    you may not use this file except in compliance with the License.
0025  *    You may obtain a copy of the License at
0026  *
0027  *    http://www.apache.org/licenses/LICENSE-2.0
0028  *
0029  *    Unless required by applicable law or agreed to in writing, software
0030  *    distributed under the License is distributed on an "AS IS" BASIS,
0031  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0032  *    See the License for the specific language governing permissions and
0033  *    limitations under the License.
0034  */
0035 
0036 package org.apache.spark.network.util;
0037 
0038 import java.io.FilterInputStream;
0039 import java.io.IOException;
0040 import java.io.InputStream;
0041 
0042 import com.google.common.base.Preconditions;
0043 
0044 /**
0045  * Wraps a {@link InputStream}, limiting the number of bytes which can be read.
0046  *
0047  * This code is from Guava's 14.0 source code, because there is no compatible way to
0048  * use this functionality in both a Guava 11 environment and a Guava >14 environment.
0049  */
0050 public final class LimitedInputStream extends FilterInputStream {
0051   private final boolean closeWrappedStream;
0052   private long left;
0053   private long mark = -1;
0054 
0055   public LimitedInputStream(InputStream in, long limit) {
0056     this(in, limit, true);
0057   }
0058 
0059   /**
0060    * Create a LimitedInputStream that will read {@code limit} bytes from {@code in}.
0061    * <p>
0062    * If {@code closeWrappedStream} is true, this will close {@code in} when it is closed.
0063    * Otherwise, the stream is left open for reading its remaining content.
0064    *
0065    * @param in a {@link InputStream} to read from
0066    * @param limit the number of bytes to read
0067    * @param closeWrappedStream whether to close {@code in} when {@link #close} is called
0068      */
0069   public LimitedInputStream(InputStream in, long limit, boolean closeWrappedStream) {
0070     super(in);
0071     this.closeWrappedStream = closeWrappedStream;
0072     Preconditions.checkNotNull(in);
0073     Preconditions.checkArgument(limit >= 0, "limit must be non-negative");
0074     left = limit;
0075   }
0076   @Override public int available() throws IOException {
0077     return (int) Math.min(in.available(), left);
0078   }
0079   // it's okay to mark even if mark isn't supported, as reset won't work
0080   @Override public synchronized void mark(int readLimit) {
0081     in.mark(readLimit);
0082     mark = left;
0083   }
0084   @Override public int read() throws IOException {
0085     if (left == 0) {
0086       return -1;
0087     }
0088     int result = in.read();
0089     if (result != -1) {
0090       --left;
0091     }
0092     return result;
0093   }
0094   @Override public int read(byte[] b, int off, int len) throws IOException {
0095     if (left == 0) {
0096       return -1;
0097     }
0098     len = (int) Math.min(len, left);
0099     int result = in.read(b, off, len);
0100     if (result != -1) {
0101       left -= result;
0102     }
0103     return result;
0104   }
0105   @Override public synchronized void reset() throws IOException {
0106     if (!in.markSupported()) {
0107       throw new IOException("Mark not supported");
0108     }
0109     if (mark == -1) {
0110       throw new IOException("Mark not set");
0111     }
0112     in.reset();
0113     left = mark;
0114   }
0115   @Override public long skip(long n) throws IOException {
0116     n = Math.min(n, left);
0117     long skipped = in.skip(n);
0118     left -= skipped;
0119     return skipped;
0120   }
0121 
0122   @Override
0123   public void close() throws IOException {
0124     if (closeWrappedStream) {
0125       super.close();
0126     }
0127   }
0128 }