0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package test.org.apache.spark.sql;
0019
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.util.ArrayList;
0023 import java.util.HashMap;
0024 import java.util.List;
0025 import java.util.Map;
0026
0027 import org.junit.After;
0028 import org.junit.Before;
0029 import org.junit.Test;
0030
0031 import org.apache.spark.sql.*;
0032 import org.apache.spark.sql.types.DataTypes;
0033 import org.apache.spark.sql.types.StructField;
0034 import org.apache.spark.sql.types.StructType;
0035 import org.apache.spark.util.Utils;
0036
0037 public class JavaSaveLoadSuite {
0038
0039 private transient SparkSession spark;
0040
0041 File path;
0042 Dataset<Row> df;
0043
0044 private static void checkAnswer(Dataset<Row> actual, List<Row> expected) {
0045 QueryTest$.MODULE$.checkAnswer(actual, expected);
0046 }
0047
0048 @Before
0049 public void setUp() throws IOException {
0050 spark = SparkSession.builder()
0051 .master("local[*]")
0052 .appName("testing")
0053 .getOrCreate();
0054
0055 path =
0056 Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile();
0057 if (path.exists()) {
0058 path.delete();
0059 }
0060
0061 List<String> jsonObjects = new ArrayList<>(10);
0062 for (int i = 0; i < 10; i++) {
0063 jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
0064 }
0065 Dataset<String> ds = spark.createDataset(jsonObjects, Encoders.STRING());
0066 df = spark.read().json(ds);
0067 df.createOrReplaceTempView("jsonTable");
0068 }
0069
0070 @After
0071 public void tearDown() {
0072 spark.stop();
0073 spark = null;
0074 }
0075
0076 @Test
0077 public void saveAndLoad() {
0078 Map<String, String> options = new HashMap<>();
0079 options.put("path", path.toString());
0080 df.write().mode(SaveMode.ErrorIfExists).format("json").options(options).save();
0081 Dataset<Row> loadedDF = spark.read().format("json").options(options).load();
0082 checkAnswer(loadedDF, df.collectAsList());
0083 }
0084
0085 @Test
0086 public void saveAndLoadWithSchema() {
0087 Map<String, String> options = new HashMap<>();
0088 options.put("path", path.toString());
0089 df.write().format("json").mode(SaveMode.ErrorIfExists).options(options).save();
0090
0091 List<StructField> fields = new ArrayList<>();
0092 fields.add(DataTypes.createStructField("b", DataTypes.StringType, true));
0093 StructType schema = DataTypes.createStructType(fields);
0094 Dataset<Row> loadedDF = spark.read().format("json").schema(schema).options(options).load();
0095
0096 checkAnswer(loadedDF, spark.sql("SELECT b FROM jsonTable").collectAsList());
0097 }
0098 }