Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.network.shuffle;
0018 
0019 import java.io.File;
0020 import java.io.FileNotFoundException;
0021 import java.io.FileOutputStream;
0022 import java.io.IOException;
0023 import java.nio.ByteBuffer;
0024 import java.nio.channels.Channels;
0025 import java.nio.channels.WritableByteChannel;
0026 
0027 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
0028 import org.apache.spark.network.buffer.ManagedBuffer;
0029 import org.apache.spark.network.util.TransportConf;
0030 
0031 /**
0032  * A DownloadFile that does not take any encryption settings into account for reading and
0033  * writing data.
0034  *
0035  * This does *not* mean the data in the file is un-encrypted -- it could be that the data is
0036  * already encrypted when its written, and subsequent layer is responsible for decrypting.
0037  */
0038 public class SimpleDownloadFile implements DownloadFile {
0039 
0040   private final File file;
0041   private final TransportConf transportConf;
0042 
0043   public SimpleDownloadFile(File file, TransportConf transportConf) {
0044     this.file = file;
0045     this.transportConf = transportConf;
0046   }
0047 
0048   @Override
0049   public boolean delete() {
0050     return file.delete();
0051   }
0052 
0053   @Override
0054   public DownloadFileWritableChannel openForWriting() throws IOException {
0055     return new SimpleDownloadWritableChannel();
0056   }
0057 
0058   @Override
0059   public String path() {
0060     return file.getAbsolutePath();
0061   }
0062 
0063   private class SimpleDownloadWritableChannel implements DownloadFileWritableChannel {
0064 
0065     private final WritableByteChannel channel;
0066 
0067     SimpleDownloadWritableChannel() throws FileNotFoundException {
0068       channel = Channels.newChannel(new FileOutputStream(file));
0069     }
0070 
0071     @Override
0072     public ManagedBuffer closeAndRead() {
0073       return new FileSegmentManagedBuffer(transportConf, file, 0, file.length());
0074     }
0075 
0076     @Override
0077     public int write(ByteBuffer src) throws IOException {
0078       return channel.write(src);
0079     }
0080 
0081     @Override
0082     public boolean isOpen() {
0083       return channel.isOpen();
0084     }
0085 
0086     @Override
0087     public void close() throws IOException {
0088       channel.close();
0089     }
0090   }
0091 }