Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.connector.catalog;
0019 
0020 import java.util.Map;
0021 
0022 import org.apache.spark.annotation.Evolving;
0023 import org.apache.spark.sql.connector.expressions.Transform;
0024 import org.apache.spark.sql.connector.write.LogicalWriteInfo;
0025 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
0026 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
0027 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
0028 import org.apache.spark.sql.connector.write.BatchWrite;
0029 import org.apache.spark.sql.connector.write.WriterCommitMessage;
0030 import org.apache.spark.sql.types.StructType;
0031 
0032 /**
0033  * An optional mix-in for implementations of {@link TableCatalog} that support staging creation of
0034  * the a table before committing the table's metadata along with its contents in CREATE TABLE AS
0035  * SELECT or REPLACE TABLE AS SELECT operations.
0036  * <p>
0037  * It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS
0038  * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE
0039  * TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first
0040  * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via
0041  * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform
0042  * the write via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}.
0043  * However, if the write operation fails, the catalog will have already dropped the table, and the
0044  * planner cannot roll back the dropping of the table.
0045  * <p>
0046  * If the catalog implements this plugin, the catalog can implement the methods to "stage" the
0047  * creation and the replacement of a table. After the table's
0048  * {@link BatchWrite#commit(WriterCommitMessage[])} is called,
0049  * {@link StagedTable#commitStagedChanges()} is called, at which point the staged table can
0050  * complete both the data write and the metadata swap operation atomically.
0051  *
0052  * @since 3.0.0
0053  */
0054 @Evolving
0055 public interface StagingTableCatalog extends TableCatalog {
0056 
0057   /**
0058    * Stage the creation of a table, preparing it to be committed into the metastore.
0059    * <p>
0060    * When the table is committed, the contents of any writes performed by the Spark planner are
0061    * committed along with the metadata about the table passed into this method's arguments. If the
0062    * table exists when this method is called, the method should throw an exception accordingly. If
0063    * another process concurrently creates the table before this table's staged changes are
0064    * committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}.
0065    *
0066    * @param ident a table identifier
0067    * @param schema the schema of the new table, as a struct type
0068    * @param partitions transforms to use for partitioning data in the table
0069    * @param properties a string map of table properties
0070    * @return metadata for the new table
0071    * @throws TableAlreadyExistsException If a table or view already exists for the identifier
0072    * @throws UnsupportedOperationException If a requested partition transform is not supported
0073    * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
0074    */
0075   StagedTable stageCreate(
0076       Identifier ident,
0077       StructType schema,
0078       Transform[] partitions,
0079       Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;
0080 
0081   /**
0082    * Stage the replacement of a table, preparing it to be committed into the metastore when the
0083    * returned table's {@link StagedTable#commitStagedChanges()} is called.
0084    * <p>
0085    * When the table is committed, the contents of any writes performed by the Spark planner are
0086    * committed along with the metadata about the table passed into this method's arguments. If the
0087    * table exists, the metadata and the contents of this table replace the metadata and contents of
0088    * the existing table. If a concurrent process commits changes to the table's data or metadata
0089    * while the write is being performed but before the staged changes are committed, the catalog
0090    * can decide whether to move forward with the table replacement anyways or abort the commit
0091    * operation.
0092    * <p>
0093    * If the table does not exist, committing the staged changes should fail with
0094    * {@link NoSuchTableException}. This differs from the semantics of
0095    * {@link #stageCreateOrReplace(Identifier, StructType, Transform[], Map)}, which should create
0096    * the table in the data source if the table does not exist at the time of committing the
0097    * operation.
0098    *
0099    * @param ident a table identifier
0100    * @param schema the schema of the new table, as a struct type
0101    * @param partitions transforms to use for partitioning data in the table
0102    * @param properties a string map of table properties
0103    * @return metadata for the new table
0104    * @throws UnsupportedOperationException If a requested partition transform is not supported
0105    * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
0106    * @throws NoSuchTableException If the table does not exist
0107    */
0108   StagedTable stageReplace(
0109       Identifier ident,
0110       StructType schema,
0111       Transform[] partitions,
0112       Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException;
0113 
0114   /**
0115    * Stage the creation or replacement of a table, preparing it to be committed into the metastore
0116    * when the returned table's {@link StagedTable#commitStagedChanges()} is called.
0117    * <p>
0118    * When the table is committed, the contents of any writes performed by the Spark planner are
0119    * committed along with the metadata about the table passed into this method's arguments. If the
0120    * table exists, the metadata and the contents of this table replace the metadata and contents of
0121    * the existing table. If a concurrent process commits changes to the table's data or metadata
0122    * while the write is being performed but before the staged changes are committed, the catalog
0123    * can decide whether to move forward with the table replacement anyways or abort the commit
0124    * operation.
0125    * <p>
0126    * If the table does not exist when the changes are committed, the table should be created in the
0127    * backing data source. This differs from the expected semantics of
0128    * {@link #stageReplace(Identifier, StructType, Transform[], Map)}, which should fail when
0129    * the staged changes are committed but the table doesn't exist at commit time.
0130    *
0131    * @param ident a table identifier
0132    * @param schema the schema of the new table, as a struct type
0133    * @param partitions transforms to use for partitioning data in the table
0134    * @param properties a string map of table properties
0135    * @return metadata for the new table
0136    * @throws UnsupportedOperationException If a requested partition transform is not supported
0137    * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
0138    */
0139   StagedTable stageCreateOrReplace(
0140       Identifier ident,
0141       StructType schema,
0142       Transform[] partitions,
0143       Map<String, String> properties) throws NoSuchNamespaceException;
0144 }