Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.connector.write.streaming;
0019 
0020 import org.apache.spark.annotation.Evolving;
0021 import org.apache.spark.sql.connector.write.DataWriter;
0022 import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
0023 import org.apache.spark.sql.connector.write.WriterCommitMessage;
0024 
0025 /**
0026  * An interface that defines how to write the data to data source in streaming queries.
0027  *
0028  * The writing procedure is:
0029  *   1. Create a writer factory by {@link #createStreamingWriterFactory(PhysicalWriteInfo)},
0030  *      serialize and send it to all the partitions of the input data(RDD).
0031  *   2. For each epoch in each partition, create the data writer, and write the data of the epoch in
0032  *      the partition with this writer. If all the data are written successfully, call
0033  *      {@link DataWriter#commit()}. If exception happens during the writing, call
0034  *      {@link DataWriter#abort()}.
0035  *   3. If writers in all partitions of one epoch are successfully committed, call
0036  *      {@link #commit(long, WriterCommitMessage[])}. If some writers are aborted, or the job failed
0037  *      with an unknown reason, call {@link #abort(long, WriterCommitMessage[])}.
0038  *
0039  * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should
0040  * do it manually in their Spark applications if they want to retry.
0041  *
0042  * Please refer to the documentation of commit/abort methods for detailed specifications.
0043  *
0044  * @since 3.0.0
0045  */
0046 @Evolving
0047 public interface StreamingWrite {
0048 
0049   /**
0050    * Creates a writer factory which will be serialized and sent to executors.
0051    *
0052    * If this method fails (by throwing an exception), the action will fail and no Spark job will be
0053    * submitted.
0054    *
0055    * @param info Information about the RDD that will be written to this data writer
0056    */
0057   StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info);
0058 
0059   /**
0060    * Commits this writing job for the specified epoch with a list of commit messages. The commit
0061    * messages are collected from successful data writers and are produced by
0062    * {@link DataWriter#commit()}.
0063    *
0064    * If this method fails (by throwing an exception), this writing job is considered to have been
0065    * failed, and the execution engine will attempt to call
0066    * {@link #abort(long, WriterCommitMessage[])}.
0067    *
0068    * The execution engine may call `commit` multiple times for the same epoch in some circumstances.
0069    * To support exactly-once data semantics, implementations must ensure that multiple commits for
0070    * the same epoch are idempotent.
0071    */
0072   void commit(long epochId, WriterCommitMessage[] messages);
0073 
0074   /**
0075    * Aborts this writing job because some data writers are failed and keep failing when retried, or
0076    * the Spark job fails with some unknown reasons, or {@link #commit(long, WriterCommitMessage[])}
0077    * fails.
0078    *
0079    * If this method fails (by throwing an exception), the underlying data source may require manual
0080    * cleanup.
0081    *
0082    * Unless the abort is triggered by the failure of commit, the given messages will have some
0083    * null slots, as there may be only a few data writers that were committed before the abort
0084    * happens, or some data writers were committed but their commit messages haven't reached the
0085    * driver when the abort is triggered. So this is just a "best effort" for data sources to
0086    * clean up the data left by data writers.
0087    */
0088   void abort(long epochId, WriterCommitMessage[] messages);
0089 }