Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.connector.write;
0019 
0020 import org.apache.spark.annotation.Evolving;
0021 
0022 /**
0023  * An interface that defines how to write the data to data source for batch processing.
0024  *
0025  * The writing procedure is:
0026  *   1. Create a writer factory by {@link #createBatchWriterFactory(PhysicalWriteInfo)}, serialize
0027  *      and send it to all the partitions of the input data(RDD).
0028  *   2. For each partition, create the data writer, and write the data of the partition with this
0029  *      writer. If all the data are written successfully, call {@link DataWriter#commit()}. If
0030  *      exception happens during the writing, call {@link DataWriter#abort()}.
0031  *   3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If
0032  *      some writers are aborted, or the job failed with an unknown reason, call
0033  *      {@link #abort(WriterCommitMessage[])}.
0034  *
0035  * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should
0036  * do it manually in their Spark applications if they want to retry.
0037  *
0038  * Please refer to the documentation of commit/abort methods for detailed specifications.
0039  *
0040  * @since 3.0.0
0041  */
0042 @Evolving
0043 public interface BatchWrite {
0044 
0045   /**
0046    * Creates a writer factory which will be serialized and sent to executors.
0047    *
0048    * If this method fails (by throwing an exception), the action will fail and no Spark job will be
0049    * submitted.
0050    *
0051    * @param info Physical information about the input data that will be written to this table.
0052    */
0053   DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info);
0054 
0055   /**
0056    * Returns whether Spark should use the commit coordinator to ensure that at most one task for
0057    * each partition commits.
0058    *
0059    * @return true if commit coordinator should be used, false otherwise.
0060    */
0061   default boolean useCommitCoordinator() {
0062     return true;
0063   }
0064 
0065   /**
0066    * Handles a commit message on receiving from a successful data writer.
0067    *
0068    * If this method fails (by throwing an exception), this writing job is considered to to have been
0069    * failed, and {@link #abort(WriterCommitMessage[])} would be called.
0070    */
0071   default void onDataWriterCommit(WriterCommitMessage message) {}
0072 
0073   /**
0074    * Commits this writing job with a list of commit messages. The commit messages are collected from
0075    * successful data writers and are produced by {@link DataWriter#commit()}.
0076    *
0077    * If this method fails (by throwing an exception), this writing job is considered to to have been
0078    * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
0079    * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
0080    *
0081    * Note that speculative execution may cause multiple tasks to run for a partition. By default,
0082    * Spark uses the commit coordinator to allow at most one task to commit. Implementations can
0083    * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple
0084    * tasks may have committed successfully and one successful commit message per task will be
0085    * passed to this commit method. The remaining commit messages are ignored by Spark.
0086    */
0087   void commit(WriterCommitMessage[] messages);
0088 
0089   /**
0090    * Aborts this writing job because some data writers are failed and keep failing when retry,
0091    * or the Spark job fails with some unknown reasons,
0092    * or {@link #onDataWriterCommit(WriterCommitMessage)} fails,
0093    * or {@link #commit(WriterCommitMessage[])} fails.
0094    *
0095    * If this method fails (by throwing an exception), the underlying data source may require manual
0096    * cleanup.
0097    *
0098    * Unless the abort is triggered by the failure of commit, the given messages should have some
0099    * null slots as there maybe only a few data writers that are committed before the abort
0100    * happens, or some data writers were committed but their commit messages haven't reached the
0101    * driver when the abort is triggered. So this is just a "best effort" for data sources to
0102    * clean up the data left by data writers.
0103    */
0104   void abort(WriterCommitMessage[] messages);
0105 }