Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package test.org.apache.spark.sql;
0019 
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.util.ArrayList;
0023 import java.util.HashMap;
0024 import java.util.List;
0025 import java.util.Map;
0026 
0027 import org.junit.After;
0028 import org.junit.Before;
0029 import org.junit.Test;
0030 
0031 import org.apache.spark.sql.*;
0032 import org.apache.spark.sql.types.DataTypes;
0033 import org.apache.spark.sql.types.StructField;
0034 import org.apache.spark.sql.types.StructType;
0035 import org.apache.spark.util.Utils;
0036 
0037 public class JavaSaveLoadSuite {
0038 
0039   private transient SparkSession spark;
0040 
0041   File path;
0042   Dataset<Row> df;
0043 
0044   private static void checkAnswer(Dataset<Row> actual, List<Row> expected) {
0045     QueryTest$.MODULE$.checkAnswer(actual, expected);
0046   }
0047 
0048   @Before
0049   public void setUp() throws IOException {
0050     spark = SparkSession.builder()
0051       .master("local[*]")
0052       .appName("testing")
0053       .getOrCreate();
0054 
0055     path =
0056       Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile();
0057     if (path.exists()) {
0058       path.delete();
0059     }
0060 
0061     List<String> jsonObjects = new ArrayList<>(10);
0062     for (int i = 0; i < 10; i++) {
0063       jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
0064     }
0065     Dataset<String> ds = spark.createDataset(jsonObjects, Encoders.STRING());
0066     df = spark.read().json(ds);
0067     df.createOrReplaceTempView("jsonTable");
0068   }
0069 
0070   @After
0071   public void tearDown() {
0072     spark.stop();
0073     spark = null;
0074   }
0075 
0076   @Test
0077   public void saveAndLoad() {
0078     Map<String, String> options = new HashMap<>();
0079     options.put("path", path.toString());
0080     df.write().mode(SaveMode.ErrorIfExists).format("json").options(options).save();
0081     Dataset<Row> loadedDF = spark.read().format("json").options(options).load();
0082     checkAnswer(loadedDF, df.collectAsList());
0083   }
0084 
0085   @Test
0086   public void saveAndLoadWithSchema() {
0087     Map<String, String> options = new HashMap<>();
0088     options.put("path", path.toString());
0089     df.write().format("json").mode(SaveMode.ErrorIfExists).options(options).save();
0090 
0091     List<StructField> fields = new ArrayList<>();
0092     fields.add(DataTypes.createStructField("b", DataTypes.StringType, true));
0093     StructType schema = DataTypes.createStructType(fields);
0094     Dataset<Row> loadedDF = spark.read().format("json").schema(schema).options(options).load();
0095 
0096     checkAnswer(loadedDF, spark.sql("SELECT b FROM jsonTable").collectAsList());
0097   }
0098 }