Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package test.org.apache.spark.sql;
0019 
0020 import org.apache.spark.sql.Dataset;
0021 import org.apache.spark.sql.Row;
0022 import org.apache.spark.sql.SparkSession;
0023 import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException;
0024 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
0025 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
0026 import org.apache.spark.sql.connector.InMemoryTableCatalog;
0027 import org.apache.spark.sql.test.TestSparkSession;
0028 import org.apache.spark.sql.types.StructType;
0029 import org.junit.After;
0030 import org.junit.Before;
0031 import org.junit.Test;
0032 
0033 import static org.apache.spark.sql.functions.*;
0034 
0035 public class JavaDataFrameWriterV2Suite {
0036   private static StructType schema = new StructType().add("s", "string");
0037   private SparkSession spark = null;
0038 
0039   public Dataset<Row> df() {
0040     return spark.read().schema(schema).text();
0041   }
0042 
0043   @Before
0044   public void createTestTable() {
0045     this.spark = new TestSparkSession();
0046     spark.conf().set("spark.sql.catalog.testcat", InMemoryTableCatalog.class.getName());
0047     spark.sql("CREATE TABLE testcat.t (s string) USING foo");
0048   }
0049 
0050   @After
0051   public void dropTestTable() {
0052     spark.sql("DROP TABLE testcat.t");
0053     spark.stop();
0054   }
0055 
0056   @Test
0057   public void testAppendAPI() throws NoSuchTableException {
0058     df().writeTo("testcat.t").append();
0059     df().writeTo("testcat.t").option("property", "value").append();
0060   }
0061 
0062   @Test
0063   public void testOverwritePartitionsAPI() throws NoSuchTableException {
0064     df().writeTo("testcat.t").overwritePartitions();
0065     df().writeTo("testcat.t").option("property", "value").overwritePartitions();
0066   }
0067 
0068   @Test
0069   public void testOverwriteAPI() throws NoSuchTableException {
0070     df().writeTo("testcat.t").overwrite(lit(true));
0071     df().writeTo("testcat.t").option("property", "value").overwrite(lit(true));
0072   }
0073 
0074   @Test
0075   public void testCreateAPI() throws TableAlreadyExistsException {
0076     df().writeTo("testcat.t2").create();
0077     spark.sql("DROP TABLE testcat.t2");
0078 
0079     df().writeTo("testcat.t2").option("property", "value").create();
0080     spark.sql("DROP TABLE testcat.t2");
0081 
0082     df().writeTo("testcat.t2").tableProperty("property", "value").create();
0083     spark.sql("DROP TABLE testcat.t2");
0084 
0085     df().writeTo("testcat.t2").using("v2format").create();
0086     spark.sql("DROP TABLE testcat.t2");
0087 
0088     df().writeTo("testcat.t2").partitionedBy(col("s")).create();
0089     spark.sql("DROP TABLE testcat.t2");
0090   }
0091 
0092   @Test
0093   public void testReplaceAPI() throws CannotReplaceMissingTableException {
0094     df().writeTo("testcat.t").replace();
0095     df().writeTo("testcat.t").option("property", "value").replace();
0096     df().writeTo("testcat.t").tableProperty("property", "value").replace();
0097     df().writeTo("testcat.t").using("v2format").replace();
0098     df().writeTo("testcat.t").partitionedBy(col("s")).replace();
0099   }
0100 
0101   @Test
0102   public void testCreateOrReplaceAPI() {
0103     df().writeTo("testcat.t").createOrReplace();
0104     df().writeTo("testcat.t").option("property", "value").createOrReplace();
0105     df().writeTo("testcat.t").tableProperty("property", "value").createOrReplace();
0106     df().writeTo("testcat.t").using("v2format").createOrReplace();
0107     df().writeTo("testcat.t").partitionedBy(col("s")).createOrReplace();
0108   }
0109 }