0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.buffer;
0019
0020 import java.io.File;
0021 import java.io.FileInputStream;
0022 import java.io.IOException;
0023 import java.io.InputStream;
0024 import java.io.RandomAccessFile;
0025 import java.nio.ByteBuffer;
0026 import java.nio.channels.FileChannel;
0027 import java.nio.file.StandardOpenOption;
0028
0029 import com.google.common.io.ByteStreams;
0030 import io.netty.channel.DefaultFileRegion;
0031 import org.apache.commons.lang3.builder.ToStringBuilder;
0032 import org.apache.commons.lang3.builder.ToStringStyle;
0033
0034 import org.apache.spark.network.util.JavaUtils;
0035 import org.apache.spark.network.util.LimitedInputStream;
0036 import org.apache.spark.network.util.TransportConf;
0037
0038
0039
0040
0041 public final class FileSegmentManagedBuffer extends ManagedBuffer {
0042 private final TransportConf conf;
0043 private final File file;
0044 private final long offset;
0045 private final long length;
0046
0047 public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
0048 this.conf = conf;
0049 this.file = file;
0050 this.offset = offset;
0051 this.length = length;
0052 }
0053
0054 @Override
0055 public long size() {
0056 return length;
0057 }
0058
0059 @Override
0060 public ByteBuffer nioByteBuffer() throws IOException {
0061 FileChannel channel = null;
0062 try {
0063 channel = new RandomAccessFile(file, "r").getChannel();
0064
0065 if (length < conf.memoryMapBytes()) {
0066 ByteBuffer buf = ByteBuffer.allocate((int) length);
0067 channel.position(offset);
0068 while (buf.remaining() != 0) {
0069 if (channel.read(buf) == -1) {
0070 throw new IOException(String.format("Reached EOF before filling buffer\n" +
0071 "offset=%s\nfile=%s\nbuf.remaining=%s",
0072 offset, file.getAbsoluteFile(), buf.remaining()));
0073 }
0074 }
0075 buf.flip();
0076 return buf;
0077 } else {
0078 return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
0079 }
0080 } catch (IOException e) {
0081 String errorMessage = "Error in reading " + this;
0082 try {
0083 if (channel != null) {
0084 long size = channel.size();
0085 errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
0086 }
0087 } catch (IOException ignored) {
0088
0089 }
0090 throw new IOException(errorMessage, e);
0091 } finally {
0092 JavaUtils.closeQuietly(channel);
0093 }
0094 }
0095
0096 @Override
0097 public InputStream createInputStream() throws IOException {
0098 FileInputStream is = null;
0099 boolean shouldClose = true;
0100 try {
0101 is = new FileInputStream(file);
0102 ByteStreams.skipFully(is, offset);
0103 InputStream r = new LimitedInputStream(is, length);
0104 shouldClose = false;
0105 return r;
0106 } catch (IOException e) {
0107 String errorMessage = "Error in reading " + this;
0108 if (is != null) {
0109 long size = file.length();
0110 errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
0111 }
0112 throw new IOException(errorMessage, e);
0113 } finally {
0114 if (shouldClose) {
0115 JavaUtils.closeQuietly(is);
0116 }
0117 }
0118 }
0119
0120 @Override
0121 public ManagedBuffer retain() {
0122 return this;
0123 }
0124
0125 @Override
0126 public ManagedBuffer release() {
0127 return this;
0128 }
0129
0130 @Override
0131 public Object convertToNetty() throws IOException {
0132 if (conf.lazyFileDescriptor()) {
0133 return new DefaultFileRegion(file, offset, length);
0134 } else {
0135 FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
0136 return new DefaultFileRegion(fileChannel, offset, length);
0137 }
0138 }
0139
0140 public File getFile() { return file; }
0141
0142 public long getOffset() { return offset; }
0143
0144 public long getLength() { return length; }
0145
0146 @Override
0147 public String toString() {
0148 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0149 .append("file", file)
0150 .append("offset", offset)
0151 .append("length", length)
0152 .toString();
0153 }
0154 }