|
||||
0001 /* 0002 * Licensed to the Apache Software Foundation (ASF) under one or more 0003 * contributor license agreements. See the NOTICE file distributed with 0004 * this work for additional information regarding copyright ownership. 0005 * The ASF licenses this file to You under the Apache License, Version 2.0 0006 * (the "License"); you may not use this file except in compliance with 0007 * the License. You may obtain a copy of the License at 0008 * 0009 * http://www.apache.org/licenses/LICENSE-2.0 0010 * 0011 * Unless required by applicable law or agreed to in writing, software 0012 * distributed under the License is distributed on an "AS IS" BASIS, 0013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 0014 * See the License for the specific language governing permissions and 0015 * limitations under the License. 0016 */ 0017 0018 package org.apache.spark.sql.connector.write.streaming; 0019 0020 import org.apache.spark.annotation.Evolving; 0021 import org.apache.spark.sql.connector.write.DataWriter; 0022 import org.apache.spark.sql.connector.write.PhysicalWriteInfo; 0023 import org.apache.spark.sql.connector.write.WriterCommitMessage; 0024 0025 /** 0026 * An interface that defines how to write the data to data source in streaming queries. 0027 * 0028 * The writing procedure is: 0029 * 1. Create a writer factory by {@link #createStreamingWriterFactory(PhysicalWriteInfo)}, 0030 * serialize and send it to all the partitions of the input data(RDD). 0031 * 2. For each epoch in each partition, create the data writer, and write the data of the epoch in 0032 * the partition with this writer. If all the data are written successfully, call 0033 * {@link DataWriter#commit()}. If exception happens during the writing, call 0034 * {@link DataWriter#abort()}. 0035 * 3. If writers in all partitions of one epoch are successfully committed, call 0036 * {@link #commit(long, WriterCommitMessage[])}. If some writers are aborted, or the job failed 0037 * with an unknown reason, call {@link #abort(long, WriterCommitMessage[])}. 0038 * 0039 * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should 0040 * do it manually in their Spark applications if they want to retry. 0041 * 0042 * Please refer to the documentation of commit/abort methods for detailed specifications. 0043 * 0044 * @since 3.0.0 0045 */ 0046 @Evolving 0047 public interface StreamingWrite { 0048 0049 /** 0050 * Creates a writer factory which will be serialized and sent to executors. 0051 * 0052 * If this method fails (by throwing an exception), the action will fail and no Spark job will be 0053 * submitted. 0054 * 0055 * @param info Information about the RDD that will be written to this data writer 0056 */ 0057 StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info); 0058 0059 /** 0060 * Commits this writing job for the specified epoch with a list of commit messages. The commit 0061 * messages are collected from successful data writers and are produced by 0062 * {@link DataWriter#commit()}. 0063 * 0064 * If this method fails (by throwing an exception), this writing job is considered to have been 0065 * failed, and the execution engine will attempt to call 0066 * {@link #abort(long, WriterCommitMessage[])}. 0067 * 0068 * The execution engine may call `commit` multiple times for the same epoch in some circumstances. 0069 * To support exactly-once data semantics, implementations must ensure that multiple commits for 0070 * the same epoch are idempotent. 0071 */ 0072 void commit(long epochId, WriterCommitMessage[] messages); 0073 0074 /** 0075 * Aborts this writing job because some data writers are failed and keep failing when retried, or 0076 * the Spark job fails with some unknown reasons, or {@link #commit(long, WriterCommitMessage[])} 0077 * fails. 0078 * 0079 * If this method fails (by throwing an exception), the underlying data source may require manual 0080 * cleanup. 0081 * 0082 * Unless the abort is triggered by the failure of commit, the given messages will have some 0083 * null slots, as there may be only a few data writers that were committed before the abort 0084 * happens, or some data writers were committed but their commit messages haven't reached the 0085 * driver when the abort is triggered. So this is just a "best effort" for data sources to 0086 * clean up the data left by data writers. 0087 */ 0088 void abort(long epochId, WriterCommitMessage[] messages); 0089 }
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.1.0 LXR engine. The LXR team |