0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.network.shuffle;
0018
0019 import java.io.File;
0020 import java.io.FileNotFoundException;
0021 import java.io.FileOutputStream;
0022 import java.io.IOException;
0023 import java.nio.ByteBuffer;
0024 import java.nio.channels.Channels;
0025 import java.nio.channels.WritableByteChannel;
0026
0027 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
0028 import org.apache.spark.network.buffer.ManagedBuffer;
0029 import org.apache.spark.network.util.TransportConf;
0030
0031
0032
0033
0034
0035
0036
0037
0038 public class SimpleDownloadFile implements DownloadFile {
0039
0040 private final File file;
0041 private final TransportConf transportConf;
0042
0043 public SimpleDownloadFile(File file, TransportConf transportConf) {
0044 this.file = file;
0045 this.transportConf = transportConf;
0046 }
0047
0048 @Override
0049 public boolean delete() {
0050 return file.delete();
0051 }
0052
0053 @Override
0054 public DownloadFileWritableChannel openForWriting() throws IOException {
0055 return new SimpleDownloadWritableChannel();
0056 }
0057
0058 @Override
0059 public String path() {
0060 return file.getAbsolutePath();
0061 }
0062
0063 private class SimpleDownloadWritableChannel implements DownloadFileWritableChannel {
0064
0065 private final WritableByteChannel channel;
0066
0067 SimpleDownloadWritableChannel() throws FileNotFoundException {
0068 channel = Channels.newChannel(new FileOutputStream(file));
0069 }
0070
0071 @Override
0072 public ManagedBuffer closeAndRead() {
0073 return new FileSegmentManagedBuffer(transportConf, file, 0, file.length());
0074 }
0075
0076 @Override
0077 public int write(ByteBuffer src) throws IOException {
0078 return channel.write(src);
0079 }
0080
0081 @Override
0082 public boolean isOpen() {
0083 return channel.isOpen();
0084 }
0085
0086 @Override
0087 public void close() throws IOException {
0088 channel.close();
0089 }
0090 }
0091 }