Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.examples.sql;
0018 
0019 // $example on:schema_merging$
0020 import java.io.Serializable;
0021 import java.util.ArrayList;
0022 import java.util.Arrays;
0023 import java.util.List;
0024 // $example off:schema_merging$
0025 import java.util.Properties;
0026 
0027 // $example on:basic_parquet_example$
0028 import org.apache.spark.api.java.function.MapFunction;
0029 import org.apache.spark.sql.Encoders;
0030 // $example on:schema_merging$
0031 // $example on:json_dataset$
0032 import org.apache.spark.sql.Dataset;
0033 import org.apache.spark.sql.Row;
0034 // $example off:json_dataset$
0035 // $example off:schema_merging$
0036 // $example off:basic_parquet_example$
0037 import org.apache.spark.sql.SparkSession;
0038 
0039 public class JavaSQLDataSourceExample {
0040 
0041   // $example on:schema_merging$
0042   public static class Square implements Serializable {
0043     private int value;
0044     private int square;
0045 
0046     // Getters and setters...
0047     // $example off:schema_merging$
0048     public int getValue() {
0049       return value;
0050     }
0051 
0052     public void setValue(int value) {
0053       this.value = value;
0054     }
0055 
0056     public int getSquare() {
0057       return square;
0058     }
0059 
0060     public void setSquare(int square) {
0061       this.square = square;
0062     }
0063     // $example on:schema_merging$
0064   }
0065   // $example off:schema_merging$
0066 
0067   // $example on:schema_merging$
0068   public static class Cube implements Serializable {
0069     private int value;
0070     private int cube;
0071 
0072     // Getters and setters...
0073     // $example off:schema_merging$
0074     public int getValue() {
0075       return value;
0076     }
0077 
0078     public void setValue(int value) {
0079       this.value = value;
0080     }
0081 
0082     public int getCube() {
0083       return cube;
0084     }
0085 
0086     public void setCube(int cube) {
0087       this.cube = cube;
0088     }
0089     // $example on:schema_merging$
0090   }
0091   // $example off:schema_merging$
0092 
0093   public static void main(String[] args) {
0094     SparkSession spark = SparkSession
0095       .builder()
0096       .appName("Java Spark SQL data sources example")
0097       .config("spark.some.config.option", "some-value")
0098       .getOrCreate();
0099 
0100     runBasicDataSourceExample(spark);
0101     runGenericFileSourceOptionsExample(spark);
0102     runBasicParquetExample(spark);
0103     runParquetSchemaMergingExample(spark);
0104     runJsonDatasetExample(spark);
0105     runJdbcDatasetExample(spark);
0106 
0107     spark.stop();
0108   }
0109 
0110   private static void runGenericFileSourceOptionsExample(SparkSession spark) {
0111     // $example on:ignore_corrupt_files$
0112     // enable ignore corrupt files
0113     spark.sql("set spark.sql.files.ignoreCorruptFiles=true");
0114     // dir1/file3.json is corrupt from parquet's view
0115     Dataset<Row> testCorruptDF = spark.read().parquet(
0116             "examples/src/main/resources/dir1/",
0117             "examples/src/main/resources/dir1/dir2/");
0118     testCorruptDF.show();
0119     // +-------------+
0120     // |         file|
0121     // +-------------+
0122     // |file1.parquet|
0123     // |file2.parquet|
0124     // +-------------+
0125     // $example off:ignore_corrupt_files$
0126     // $example on:recursive_file_lookup$
0127     Dataset<Row> recursiveLoadedDF = spark.read().format("parquet")
0128             .option("recursiveFileLookup", "true")
0129             .load("examples/src/main/resources/dir1");
0130     recursiveLoadedDF.show();
0131     // +-------------+
0132     // |         file|
0133     // +-------------+
0134     // |file1.parquet|
0135     // |file2.parquet|
0136     // +-------------+
0137     // $example off:recursive_file_lookup$
0138     spark.sql("set spark.sql.files.ignoreCorruptFiles=false");
0139     // $example on:load_with_path_glob_filter$
0140     Dataset<Row> testGlobFilterDF = spark.read().format("parquet")
0141             .option("pathGlobFilter", "*.parquet") // json file should be filtered out
0142             .load("examples/src/main/resources/dir1");
0143     testGlobFilterDF.show();
0144     // +-------------+
0145     // |         file|
0146     // +-------------+
0147     // |file1.parquet|
0148     // +-------------+
0149     // $example off:load_with_path_glob_filter$
0150   }
0151 
0152   private static void runBasicDataSourceExample(SparkSession spark) {
0153     // $example on:generic_load_save_functions$
0154     Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
0155     usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
0156     // $example off:generic_load_save_functions$
0157     // $example on:manual_load_options$
0158     Dataset<Row> peopleDF =
0159       spark.read().format("json").load("examples/src/main/resources/people.json");
0160     peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
0161     // $example off:manual_load_options$
0162     // $example on:manual_load_options_csv$
0163     Dataset<Row> peopleDFCsv = spark.read().format("csv")
0164       .option("sep", ";")
0165       .option("inferSchema", "true")
0166       .option("header", "true")
0167       .load("examples/src/main/resources/people.csv");
0168     // $example off:manual_load_options_csv$
0169     // $example on:manual_save_options_orc$
0170     usersDF.write().format("orc")
0171       .option("orc.bloom.filter.columns", "favorite_color")
0172       .option("orc.dictionary.key.threshold", "1.0")
0173       .option("orc.column.encoding.direct", "name")
0174       .save("users_with_options.orc");
0175     // $example off:manual_save_options_orc$
0176     // $example on:direct_sql$
0177     Dataset<Row> sqlDF =
0178       spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
0179     // $example off:direct_sql$
0180     // $example on:write_sorting_and_bucketing$
0181     peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
0182     // $example off:write_sorting_and_bucketing$
0183     // $example on:write_partitioning$
0184     usersDF
0185       .write()
0186       .partitionBy("favorite_color")
0187       .format("parquet")
0188       .save("namesPartByColor.parquet");
0189     // $example off:write_partitioning$
0190     // $example on:write_partition_and_bucket$
0191     peopleDF
0192       .write()
0193       .partitionBy("favorite_color")
0194       .bucketBy(42, "name")
0195       .saveAsTable("people_partitioned_bucketed");
0196     // $example off:write_partition_and_bucket$
0197 
0198     spark.sql("DROP TABLE IF EXISTS people_bucketed");
0199     spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed");
0200   }
0201 
0202   private static void runBasicParquetExample(SparkSession spark) {
0203     // $example on:basic_parquet_example$
0204     Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
0205 
0206     // DataFrames can be saved as Parquet files, maintaining the schema information
0207     peopleDF.write().parquet("people.parquet");
0208 
0209     // Read in the Parquet file created above.
0210     // Parquet files are self-describing so the schema is preserved
0211     // The result of loading a parquet file is also a DataFrame
0212     Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
0213 
0214     // Parquet files can also be used to create a temporary view and then used in SQL statements
0215     parquetFileDF.createOrReplaceTempView("parquetFile");
0216     Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
0217     Dataset<String> namesDS = namesDF.map(
0218         (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
0219         Encoders.STRING());
0220     namesDS.show();
0221     // +------------+
0222     // |       value|
0223     // +------------+
0224     // |Name: Justin|
0225     // +------------+
0226     // $example off:basic_parquet_example$
0227   }
0228 
0229   private static void runParquetSchemaMergingExample(SparkSession spark) {
0230     // $example on:schema_merging$
0231     List<Square> squares = new ArrayList<>();
0232     for (int value = 1; value <= 5; value++) {
0233       Square square = new Square();
0234       square.setValue(value);
0235       square.setSquare(value * value);
0236       squares.add(square);
0237     }
0238 
0239     // Create a simple DataFrame, store into a partition directory
0240     Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
0241     squaresDF.write().parquet("data/test_table/key=1");
0242 
0243     List<Cube> cubes = new ArrayList<>();
0244     for (int value = 6; value <= 10; value++) {
0245       Cube cube = new Cube();
0246       cube.setValue(value);
0247       cube.setCube(value * value * value);
0248       cubes.add(cube);
0249     }
0250 
0251     // Create another DataFrame in a new partition directory,
0252     // adding a new column and dropping an existing column
0253     Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
0254     cubesDF.write().parquet("data/test_table/key=2");
0255 
0256     // Read the partitioned table
0257     Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
0258     mergedDF.printSchema();
0259 
0260     // The final schema consists of all 3 columns in the Parquet files together
0261     // with the partitioning column appeared in the partition directory paths
0262     // root
0263     //  |-- value: int (nullable = true)
0264     //  |-- square: int (nullable = true)
0265     //  |-- cube: int (nullable = true)
0266     //  |-- key: int (nullable = true)
0267     // $example off:schema_merging$
0268   }
0269 
0270   private static void runJsonDatasetExample(SparkSession spark) {
0271     // $example on:json_dataset$
0272     // A JSON dataset is pointed to by path.
0273     // The path can be either a single text file or a directory storing text files
0274     Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
0275 
0276     // The inferred schema can be visualized using the printSchema() method
0277     people.printSchema();
0278     // root
0279     //  |-- age: long (nullable = true)
0280     //  |-- name: string (nullable = true)
0281 
0282     // Creates a temporary view using the DataFrame
0283     people.createOrReplaceTempView("people");
0284 
0285     // SQL statements can be run by using the sql methods provided by spark
0286     Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
0287     namesDF.show();
0288     // +------+
0289     // |  name|
0290     // +------+
0291     // |Justin|
0292     // +------+
0293 
0294     // Alternatively, a DataFrame can be created for a JSON dataset represented by
0295     // a Dataset<String> storing one JSON object per string.
0296     List<String> jsonData = Arrays.asList(
0297             "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
0298     Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
0299     Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
0300     anotherPeople.show();
0301     // +---------------+----+
0302     // |        address|name|
0303     // +---------------+----+
0304     // |[Columbus,Ohio]| Yin|
0305     // +---------------+----+
0306     // $example off:json_dataset$
0307   }
0308 
0309   private static void runJdbcDatasetExample(SparkSession spark) {
0310     // $example on:jdbc_dataset$
0311     // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
0312     // Loading data from a JDBC source
0313     Dataset<Row> jdbcDF = spark.read()
0314       .format("jdbc")
0315       .option("url", "jdbc:postgresql:dbserver")
0316       .option("dbtable", "schema.tablename")
0317       .option("user", "username")
0318       .option("password", "password")
0319       .load();
0320 
0321     Properties connectionProperties = new Properties();
0322     connectionProperties.put("user", "username");
0323     connectionProperties.put("password", "password");
0324     Dataset<Row> jdbcDF2 = spark.read()
0325       .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
0326 
0327     // Saving data to a JDBC source
0328     jdbcDF.write()
0329       .format("jdbc")
0330       .option("url", "jdbc:postgresql:dbserver")
0331       .option("dbtable", "schema.tablename")
0332       .option("user", "username")
0333       .option("password", "password")
0334       .save();
0335 
0336     jdbcDF2.write()
0337       .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
0338 
0339     // Specifying create table column data types on write
0340     jdbcDF.write()
0341       .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
0342       .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
0343     // $example off:jdbc_dataset$
0344   }
0345 }