|
||||
0001 /* 0002 * Licensed to the Apache Software Foundation (ASF) under one or more 0003 * contributor license agreements. See the NOTICE file distributed with 0004 * this work for additional information regarding copyright ownership. 0005 * The ASF licenses this file to You under the Apache License, Version 2.0 0006 * (the "License"); you may not use this file except in compliance with 0007 * the License. You may obtain a copy of the License at 0008 * 0009 * http://www.apache.org/licenses/LICENSE-2.0 0010 * 0011 * Unless required by applicable law or agreed to in writing, software 0012 * distributed under the License is distributed on an "AS IS" BASIS, 0013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 0014 * See the License for the specific language governing permissions and 0015 * limitations under the License. 0016 */ 0017 0018 package org.apache.spark.sql.connector.catalog; 0019 0020 import java.util.Map; 0021 0022 import org.apache.spark.annotation.Evolving; 0023 import org.apache.spark.sql.connector.expressions.Transform; 0024 import org.apache.spark.sql.connector.write.LogicalWriteInfo; 0025 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; 0026 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; 0027 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; 0028 import org.apache.spark.sql.connector.write.BatchWrite; 0029 import org.apache.spark.sql.connector.write.WriterCommitMessage; 0030 import org.apache.spark.sql.types.StructType; 0031 0032 /** 0033 * An optional mix-in for implementations of {@link TableCatalog} that support staging creation of 0034 * the a table before committing the table's metadata along with its contents in CREATE TABLE AS 0035 * SELECT or REPLACE TABLE AS SELECT operations. 0036 * <p> 0037 * It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS 0038 * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE 0039 * TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first 0040 * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via 0041 * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform 0042 * the write via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}. 0043 * However, if the write operation fails, the catalog will have already dropped the table, and the 0044 * planner cannot roll back the dropping of the table. 0045 * <p> 0046 * If the catalog implements this plugin, the catalog can implement the methods to "stage" the 0047 * creation and the replacement of a table. After the table's 0048 * {@link BatchWrite#commit(WriterCommitMessage[])} is called, 0049 * {@link StagedTable#commitStagedChanges()} is called, at which point the staged table can 0050 * complete both the data write and the metadata swap operation atomically. 0051 * 0052 * @since 3.0.0 0053 */ 0054 @Evolving 0055 public interface StagingTableCatalog extends TableCatalog { 0056 0057 /** 0058 * Stage the creation of a table, preparing it to be committed into the metastore. 0059 * <p> 0060 * When the table is committed, the contents of any writes performed by the Spark planner are 0061 * committed along with the metadata about the table passed into this method's arguments. If the 0062 * table exists when this method is called, the method should throw an exception accordingly. If 0063 * another process concurrently creates the table before this table's staged changes are 0064 * committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}. 0065 * 0066 * @param ident a table identifier 0067 * @param schema the schema of the new table, as a struct type 0068 * @param partitions transforms to use for partitioning data in the table 0069 * @param properties a string map of table properties 0070 * @return metadata for the new table 0071 * @throws TableAlreadyExistsException If a table or view already exists for the identifier 0072 * @throws UnsupportedOperationException If a requested partition transform is not supported 0073 * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) 0074 */ 0075 StagedTable stageCreate( 0076 Identifier ident, 0077 StructType schema, 0078 Transform[] partitions, 0079 Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException; 0080 0081 /** 0082 * Stage the replacement of a table, preparing it to be committed into the metastore when the 0083 * returned table's {@link StagedTable#commitStagedChanges()} is called. 0084 * <p> 0085 * When the table is committed, the contents of any writes performed by the Spark planner are 0086 * committed along with the metadata about the table passed into this method's arguments. If the 0087 * table exists, the metadata and the contents of this table replace the metadata and contents of 0088 * the existing table. If a concurrent process commits changes to the table's data or metadata 0089 * while the write is being performed but before the staged changes are committed, the catalog 0090 * can decide whether to move forward with the table replacement anyways or abort the commit 0091 * operation. 0092 * <p> 0093 * If the table does not exist, committing the staged changes should fail with 0094 * {@link NoSuchTableException}. This differs from the semantics of 0095 * {@link #stageCreateOrReplace(Identifier, StructType, Transform[], Map)}, which should create 0096 * the table in the data source if the table does not exist at the time of committing the 0097 * operation. 0098 * 0099 * @param ident a table identifier 0100 * @param schema the schema of the new table, as a struct type 0101 * @param partitions transforms to use for partitioning data in the table 0102 * @param properties a string map of table properties 0103 * @return metadata for the new table 0104 * @throws UnsupportedOperationException If a requested partition transform is not supported 0105 * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) 0106 * @throws NoSuchTableException If the table does not exist 0107 */ 0108 StagedTable stageReplace( 0109 Identifier ident, 0110 StructType schema, 0111 Transform[] partitions, 0112 Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException; 0113 0114 /** 0115 * Stage the creation or replacement of a table, preparing it to be committed into the metastore 0116 * when the returned table's {@link StagedTable#commitStagedChanges()} is called. 0117 * <p> 0118 * When the table is committed, the contents of any writes performed by the Spark planner are 0119 * committed along with the metadata about the table passed into this method's arguments. If the 0120 * table exists, the metadata and the contents of this table replace the metadata and contents of 0121 * the existing table. If a concurrent process commits changes to the table's data or metadata 0122 * while the write is being performed but before the staged changes are committed, the catalog 0123 * can decide whether to move forward with the table replacement anyways or abort the commit 0124 * operation. 0125 * <p> 0126 * If the table does not exist when the changes are committed, the table should be created in the 0127 * backing data source. This differs from the expected semantics of 0128 * {@link #stageReplace(Identifier, StructType, Transform[], Map)}, which should fail when 0129 * the staged changes are committed but the table doesn't exist at commit time. 0130 * 0131 * @param ident a table identifier 0132 * @param schema the schema of the new table, as a struct type 0133 * @param partitions transforms to use for partitioning data in the table 0134 * @param properties a string map of table properties 0135 * @return metadata for the new table 0136 * @throws UnsupportedOperationException If a requested partition transform is not supported 0137 * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) 0138 */ 0139 StagedTable stageCreateOrReplace( 0140 Identifier ident, 0141 StructType schema, 0142 Transform[] partitions, 0143 Map<String, String> properties) throws NoSuchNamespaceException; 0144 }
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.1.0 LXR engine. The LXR team |