0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.sql.hive;
0019
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.util.ArrayList;
0023 import java.util.HashMap;
0024 import java.util.List;
0025 import java.util.Map;
0026
0027 import org.apache.hadoop.fs.FileSystem;
0028 import org.apache.hadoop.fs.Path;
0029 import org.junit.After;
0030 import org.junit.Before;
0031 import org.junit.Test;
0032
0033 import org.apache.spark.api.java.JavaSparkContext;
0034 import org.apache.spark.sql.Dataset;
0035 import org.apache.spark.sql.Encoders;
0036 import org.apache.spark.sql.QueryTest$;
0037 import org.apache.spark.sql.Row;
0038 import org.apache.spark.sql.SQLContext;
0039 import org.apache.spark.sql.hive.test.TestHive$;
0040 import org.apache.spark.sql.SaveMode;
0041 import org.apache.spark.sql.catalyst.TableIdentifier;
0042 import org.apache.spark.util.Utils;
0043
0044 public class JavaMetastoreDataSourcesSuite {
0045 private transient JavaSparkContext sc;
0046 private transient SQLContext sqlContext;
0047
0048 File path;
0049 Path hiveManagedPath;
0050 FileSystem fs;
0051 Dataset<Row> df;
0052
0053 @Before
0054 public void setUp() throws IOException {
0055 sqlContext = TestHive$.MODULE$;
0056 sc = new JavaSparkContext(sqlContext.sparkContext());
0057
0058 path =
0059 Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile();
0060 if (path.exists()) {
0061 path.delete();
0062 }
0063 HiveSessionCatalog catalog = (HiveSessionCatalog) sqlContext.sessionState().catalog();
0064 hiveManagedPath = new Path(catalog.defaultTablePath(new TableIdentifier("javaSavedTable")));
0065 fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
0066 fs.delete(hiveManagedPath, true);
0067
0068 List<String> jsonObjects = new ArrayList<>(10);
0069 for (int i = 0; i < 10; i++) {
0070 jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
0071 }
0072 Dataset<String> ds = sqlContext.createDataset(jsonObjects, Encoders.STRING());
0073 df = sqlContext.read().json(ds);
0074 df.createOrReplaceTempView("jsonTable");
0075 }
0076
0077 @After
0078 public void tearDown() throws IOException {
0079
0080 if (sqlContext != null) {
0081 sqlContext.sql("DROP TABLE IF EXISTS javaSavedTable");
0082 sqlContext.sql("DROP TABLE IF EXISTS externalTable");
0083 }
0084 }
0085
0086 @Test
0087 public void saveTableAndQueryIt() {
0088 Map<String, String> options = new HashMap<>();
0089 df.write()
0090 .format("org.apache.spark.sql.json")
0091 .mode(SaveMode.Append)
0092 .options(options)
0093 .saveAsTable("javaSavedTable");
0094
0095 QueryTest$.MODULE$.checkAnswer(
0096 sqlContext.sql("SELECT * FROM javaSavedTable"),
0097 df.collectAsList());
0098 }
0099 }