0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package test.org.apache.spark.sql;
0019
0020 import org.apache.spark.sql.Dataset;
0021 import org.apache.spark.sql.Row;
0022 import org.apache.spark.sql.SparkSession;
0023 import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException;
0024 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
0025 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
0026 import org.apache.spark.sql.connector.InMemoryTableCatalog;
0027 import org.apache.spark.sql.test.TestSparkSession;
0028 import org.apache.spark.sql.types.StructType;
0029 import org.junit.After;
0030 import org.junit.Before;
0031 import org.junit.Test;
0032
0033 import static org.apache.spark.sql.functions.*;
0034
0035 public class JavaDataFrameWriterV2Suite {
0036 private static StructType schema = new StructType().add("s", "string");
0037 private SparkSession spark = null;
0038
0039 public Dataset<Row> df() {
0040 return spark.read().schema(schema).text();
0041 }
0042
0043 @Before
0044 public void createTestTable() {
0045 this.spark = new TestSparkSession();
0046 spark.conf().set("spark.sql.catalog.testcat", InMemoryTableCatalog.class.getName());
0047 spark.sql("CREATE TABLE testcat.t (s string) USING foo");
0048 }
0049
0050 @After
0051 public void dropTestTable() {
0052 spark.sql("DROP TABLE testcat.t");
0053 spark.stop();
0054 }
0055
0056 @Test
0057 public void testAppendAPI() throws NoSuchTableException {
0058 df().writeTo("testcat.t").append();
0059 df().writeTo("testcat.t").option("property", "value").append();
0060 }
0061
0062 @Test
0063 public void testOverwritePartitionsAPI() throws NoSuchTableException {
0064 df().writeTo("testcat.t").overwritePartitions();
0065 df().writeTo("testcat.t").option("property", "value").overwritePartitions();
0066 }
0067
0068 @Test
0069 public void testOverwriteAPI() throws NoSuchTableException {
0070 df().writeTo("testcat.t").overwrite(lit(true));
0071 df().writeTo("testcat.t").option("property", "value").overwrite(lit(true));
0072 }
0073
0074 @Test
0075 public void testCreateAPI() throws TableAlreadyExistsException {
0076 df().writeTo("testcat.t2").create();
0077 spark.sql("DROP TABLE testcat.t2");
0078
0079 df().writeTo("testcat.t2").option("property", "value").create();
0080 spark.sql("DROP TABLE testcat.t2");
0081
0082 df().writeTo("testcat.t2").tableProperty("property", "value").create();
0083 spark.sql("DROP TABLE testcat.t2");
0084
0085 df().writeTo("testcat.t2").using("v2format").create();
0086 spark.sql("DROP TABLE testcat.t2");
0087
0088 df().writeTo("testcat.t2").partitionedBy(col("s")).create();
0089 spark.sql("DROP TABLE testcat.t2");
0090 }
0091
0092 @Test
0093 public void testReplaceAPI() throws CannotReplaceMissingTableException {
0094 df().writeTo("testcat.t").replace();
0095 df().writeTo("testcat.t").option("property", "value").replace();
0096 df().writeTo("testcat.t").tableProperty("property", "value").replace();
0097 df().writeTo("testcat.t").using("v2format").replace();
0098 df().writeTo("testcat.t").partitionedBy(col("s")).replace();
0099 }
0100
0101 @Test
0102 public void testCreateOrReplaceAPI() {
0103 df().writeTo("testcat.t").createOrReplace();
0104 df().writeTo("testcat.t").option("property", "value").createOrReplace();
0105 df().writeTo("testcat.t").tableProperty("property", "value").createOrReplace();
0106 df().writeTo("testcat.t").using("v2format").createOrReplace();
0107 df().writeTo("testcat.t").partitionedBy(col("s")).createOrReplace();
0108 }
0109 }