Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.shuffle.api;
0019 
0020 import java.io.IOException;
0021 import java.util.Optional;
0022 import java.io.OutputStream;
0023 
0024 import org.apache.spark.annotation.Private;
0025 
0026 /**
0027  * :: Private ::
0028  * An interface for opening streams to persist partition bytes to a backing data store.
0029  * <p>
0030  * This writer stores bytes for one (mapper, reducer) pair, corresponding to one shuffle
0031  * block.
0032  *
0033  * @since 3.0.0
0034  */
0035 @Private
0036 public interface ShufflePartitionWriter {
0037 
0038   /**
0039    * Open and return an {@link OutputStream} that can write bytes to the underlying
0040    * data store.
0041    * <p>
0042    * This method will only be called once on this partition writer in the map task, to write the
0043    * bytes to the partition. The output stream will only be used to write the bytes for this
0044    * partition. The map task closes this output stream upon writing all the bytes for this
0045    * block, or if the write fails for any reason.
0046    * <p>
0047    * Implementations that intend on combining the bytes for all the partitions written by this
0048    * map task should reuse the same OutputStream instance across all the partition writers provided
0049    * by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
0050    * {@link OutputStream#close()} does not close the resource, since it will be reused across
0051    * partition writes. The underlying resources should be cleaned up in
0052    * {@link ShuffleMapOutputWriter#commitAllPartitions()} and
0053    * {@link ShuffleMapOutputWriter#abort(Throwable)}.
0054    */
0055   OutputStream openStream() throws IOException;
0056 
0057   /**
0058    * Opens and returns a {@link WritableByteChannelWrapper} for transferring bytes from
0059    * input byte channels to the underlying shuffle data store.
0060    * <p>
0061    * This method will only be called once on this partition writer in the map task, to write the
0062    * bytes to the partition. The channel will only be used to write the bytes for this
0063    * partition. The map task closes this channel upon writing all the bytes for this
0064    * block, or if the write fails for any reason.
0065    * <p>
0066    * Implementations that intend on combining the bytes for all the partitions written by this
0067    * map task should reuse the same channel instance across all the partition writers provided
0068    * by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
0069    * {@link WritableByteChannelWrapper#close()} does not close the resource, since the channel
0070    * will be reused across partition writes. The underlying resources should be cleaned up in
0071    * {@link ShuffleMapOutputWriter#commitAllPartitions()} and
0072    * {@link ShuffleMapOutputWriter#abort(Throwable)}.
0073    * <p>
0074    * This method is primarily for advanced optimizations where bytes can be copied from the input
0075    * spill files to the output channel without copying data into memory. If such optimizations are
0076    * not supported, the implementation should return {@link Optional#empty()}. By default, the
0077    * implementation returns {@link Optional#empty()}.
0078    * <p>
0079    * Note that the returned {@link WritableByteChannelWrapper} itself is closed, but not the
0080    * underlying channel that is returned by {@link WritableByteChannelWrapper#channel()}. Ensure
0081    * that the underlying channel is cleaned up in {@link WritableByteChannelWrapper#close()},
0082    * {@link ShuffleMapOutputWriter#commitAllPartitions()}, or
0083    * {@link ShuffleMapOutputWriter#abort(Throwable)}.
0084    */
0085   default Optional<WritableByteChannelWrapper> openChannelWrapper() throws IOException {
0086     return Optional.empty();
0087   }
0088 
0089   /**
0090    * Returns the number of bytes written either by this writer's output stream opened by
0091    * {@link #openStream()} or the byte channel opened by {@link #openChannelWrapper()}.
0092    * <p>
0093    * This can be different from the number of bytes given by the caller. For example, the
0094    * stream might compress or encrypt the bytes before persisting the data to the backing
0095    * data store.
0096    */
0097   long getNumBytesWritten();
0098 }