Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.buffer;
0019 
0020 import java.io.File;
0021 import java.io.FileInputStream;
0022 import java.io.IOException;
0023 import java.io.InputStream;
0024 import java.io.RandomAccessFile;
0025 import java.nio.ByteBuffer;
0026 import java.nio.channels.FileChannel;
0027 import java.nio.file.StandardOpenOption;
0028 
0029 import com.google.common.io.ByteStreams;
0030 import io.netty.channel.DefaultFileRegion;
0031 import org.apache.commons.lang3.builder.ToStringBuilder;
0032 import org.apache.commons.lang3.builder.ToStringStyle;
0033 
0034 import org.apache.spark.network.util.JavaUtils;
0035 import org.apache.spark.network.util.LimitedInputStream;
0036 import org.apache.spark.network.util.TransportConf;
0037 
0038 /**
0039  * A {@link ManagedBuffer} backed by a segment in a file.
0040  */
0041 public final class FileSegmentManagedBuffer extends ManagedBuffer {
0042   private final TransportConf conf;
0043   private final File file;
0044   private final long offset;
0045   private final long length;
0046 
0047   public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
0048     this.conf = conf;
0049     this.file = file;
0050     this.offset = offset;
0051     this.length = length;
0052   }
0053 
0054   @Override
0055   public long size() {
0056     return length;
0057   }
0058 
0059   @Override
0060   public ByteBuffer nioByteBuffer() throws IOException {
0061     FileChannel channel = null;
0062     try {
0063       channel = new RandomAccessFile(file, "r").getChannel();
0064       // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
0065       if (length < conf.memoryMapBytes()) {
0066         ByteBuffer buf = ByteBuffer.allocate((int) length);
0067         channel.position(offset);
0068         while (buf.remaining() != 0) {
0069           if (channel.read(buf) == -1) {
0070             throw new IOException(String.format("Reached EOF before filling buffer\n" +
0071               "offset=%s\nfile=%s\nbuf.remaining=%s",
0072               offset, file.getAbsoluteFile(), buf.remaining()));
0073           }
0074         }
0075         buf.flip();
0076         return buf;
0077       } else {
0078         return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
0079       }
0080     } catch (IOException e) {
0081       String errorMessage = "Error in reading " + this;
0082       try {
0083         if (channel != null) {
0084           long size = channel.size();
0085           errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
0086         }
0087       } catch (IOException ignored) {
0088         // ignore
0089       }
0090       throw new IOException(errorMessage, e);
0091     } finally {
0092       JavaUtils.closeQuietly(channel);
0093     }
0094   }
0095 
0096   @Override
0097   public InputStream createInputStream() throws IOException {
0098     FileInputStream is = null;
0099     boolean shouldClose = true;
0100     try {
0101       is = new FileInputStream(file);
0102       ByteStreams.skipFully(is, offset);
0103       InputStream r = new LimitedInputStream(is, length);
0104       shouldClose = false;
0105       return r;
0106     } catch (IOException e) {
0107       String errorMessage = "Error in reading " + this;
0108       if (is != null) {
0109         long size = file.length();
0110         errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
0111       }
0112       throw new IOException(errorMessage, e);
0113     } finally {
0114       if (shouldClose) {
0115         JavaUtils.closeQuietly(is);
0116       }
0117     }
0118   }
0119 
0120   @Override
0121   public ManagedBuffer retain() {
0122     return this;
0123   }
0124 
0125   @Override
0126   public ManagedBuffer release() {
0127     return this;
0128   }
0129 
0130   @Override
0131   public Object convertToNetty() throws IOException {
0132     if (conf.lazyFileDescriptor()) {
0133       return new DefaultFileRegion(file, offset, length);
0134     } else {
0135       FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
0136       return new DefaultFileRegion(fileChannel, offset, length);
0137     }
0138   }
0139 
0140   public File getFile() { return file; }
0141 
0142   public long getOffset() { return offset; }
0143 
0144   public long getLength() { return length; }
0145 
0146   @Override
0147   public String toString() {
0148     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0149       .append("file", file)
0150       .append("offset", offset)
0151       .append("length", length)
0152       .toString();
0153   }
0154 }