Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.examples.sql;
0018 
0019 // $example on:programmatic_schema$
0020 import java.util.ArrayList;
0021 import java.util.List;
0022 // $example off:programmatic_schema$
0023 // $example on:create_ds$
0024 import java.util.Arrays;
0025 import java.util.Collections;
0026 import java.io.Serializable;
0027 // $example off:create_ds$
0028 
0029 // $example on:schema_inferring$
0030 // $example on:programmatic_schema$
0031 import org.apache.spark.api.java.JavaRDD;
0032 import org.apache.spark.api.java.function.Function;
0033 // $example off:programmatic_schema$
0034 // $example on:create_ds$
0035 import org.apache.spark.api.java.function.MapFunction;
0036 // $example on:create_df$
0037 // $example on:run_sql$
0038 // $example on:programmatic_schema$
0039 import org.apache.spark.sql.Dataset;
0040 import org.apache.spark.sql.Row;
0041 // $example off:programmatic_schema$
0042 // $example off:create_df$
0043 // $example off:run_sql$
0044 import org.apache.spark.sql.Encoder;
0045 import org.apache.spark.sql.Encoders;
0046 // $example off:create_ds$
0047 // $example off:schema_inferring$
0048 import org.apache.spark.sql.RowFactory;
0049 // $example on:init_session$
0050 import org.apache.spark.sql.SparkSession;
0051 // $example off:init_session$
0052 // $example on:programmatic_schema$
0053 import org.apache.spark.sql.types.DataTypes;
0054 import org.apache.spark.sql.types.StructField;
0055 import org.apache.spark.sql.types.StructType;
0056 // $example off:programmatic_schema$
0057 import org.apache.spark.sql.AnalysisException;
0058 
0059 // $example on:untyped_ops$
0060 // col("...") is preferable to df.col("...")
0061 import static org.apache.spark.sql.functions.col;
0062 // $example off:untyped_ops$
0063 
0064 public class JavaSparkSQLExample {
0065   // $example on:create_ds$
0066   public static class Person implements Serializable {
0067     private String name;
0068     private int age;
0069 
0070     public String getName() {
0071       return name;
0072     }
0073 
0074     public void setName(String name) {
0075       this.name = name;
0076     }
0077 
0078     public int getAge() {
0079       return age;
0080     }
0081 
0082     public void setAge(int age) {
0083       this.age = age;
0084     }
0085   }
0086   // $example off:create_ds$
0087 
0088   public static void main(String[] args) throws AnalysisException {
0089     // $example on:init_session$
0090     SparkSession spark = SparkSession
0091       .builder()
0092       .appName("Java Spark SQL basic example")
0093       .config("spark.some.config.option", "some-value")
0094       .getOrCreate();
0095     // $example off:init_session$
0096 
0097     runBasicDataFrameExample(spark);
0098     runDatasetCreationExample(spark);
0099     runInferSchemaExample(spark);
0100     runProgrammaticSchemaExample(spark);
0101 
0102     spark.stop();
0103   }
0104 
0105   private static void runBasicDataFrameExample(SparkSession spark) throws AnalysisException {
0106     // $example on:create_df$
0107     Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
0108 
0109     // Displays the content of the DataFrame to stdout
0110     df.show();
0111     // +----+-------+
0112     // | age|   name|
0113     // +----+-------+
0114     // |null|Michael|
0115     // |  30|   Andy|
0116     // |  19| Justin|
0117     // +----+-------+
0118     // $example off:create_df$
0119 
0120     // $example on:untyped_ops$
0121     // Print the schema in a tree format
0122     df.printSchema();
0123     // root
0124     // |-- age: long (nullable = true)
0125     // |-- name: string (nullable = true)
0126 
0127     // Select only the "name" column
0128     df.select("name").show();
0129     // +-------+
0130     // |   name|
0131     // +-------+
0132     // |Michael|
0133     // |   Andy|
0134     // | Justin|
0135     // +-------+
0136 
0137     // Select everybody, but increment the age by 1
0138     df.select(col("name"), col("age").plus(1)).show();
0139     // +-------+---------+
0140     // |   name|(age + 1)|
0141     // +-------+---------+
0142     // |Michael|     null|
0143     // |   Andy|       31|
0144     // | Justin|       20|
0145     // +-------+---------+
0146 
0147     // Select people older than 21
0148     df.filter(col("age").gt(21)).show();
0149     // +---+----+
0150     // |age|name|
0151     // +---+----+
0152     // | 30|Andy|
0153     // +---+----+
0154 
0155     // Count people by age
0156     df.groupBy("age").count().show();
0157     // +----+-----+
0158     // | age|count|
0159     // +----+-----+
0160     // |  19|    1|
0161     // |null|    1|
0162     // |  30|    1|
0163     // +----+-----+
0164     // $example off:untyped_ops$
0165 
0166     // $example on:run_sql$
0167     // Register the DataFrame as a SQL temporary view
0168     df.createOrReplaceTempView("people");
0169 
0170     Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
0171     sqlDF.show();
0172     // +----+-------+
0173     // | age|   name|
0174     // +----+-------+
0175     // |null|Michael|
0176     // |  30|   Andy|
0177     // |  19| Justin|
0178     // +----+-------+
0179     // $example off:run_sql$
0180 
0181     // $example on:global_temp_view$
0182     // Register the DataFrame as a global temporary view
0183     df.createGlobalTempView("people");
0184 
0185     // Global temporary view is tied to a system preserved database `global_temp`
0186     spark.sql("SELECT * FROM global_temp.people").show();
0187     // +----+-------+
0188     // | age|   name|
0189     // +----+-------+
0190     // |null|Michael|
0191     // |  30|   Andy|
0192     // |  19| Justin|
0193     // +----+-------+
0194 
0195     // Global temporary view is cross-session
0196     spark.newSession().sql("SELECT * FROM global_temp.people").show();
0197     // +----+-------+
0198     // | age|   name|
0199     // +----+-------+
0200     // |null|Michael|
0201     // |  30|   Andy|
0202     // |  19| Justin|
0203     // +----+-------+
0204     // $example off:global_temp_view$
0205   }
0206 
0207   private static void runDatasetCreationExample(SparkSession spark) {
0208     // $example on:create_ds$
0209     // Create an instance of a Bean class
0210     Person person = new Person();
0211     person.setName("Andy");
0212     person.setAge(32);
0213 
0214     // Encoders are created for Java beans
0215     Encoder<Person> personEncoder = Encoders.bean(Person.class);
0216     Dataset<Person> javaBeanDS = spark.createDataset(
0217       Collections.singletonList(person),
0218       personEncoder
0219     );
0220     javaBeanDS.show();
0221     // +---+----+
0222     // |age|name|
0223     // +---+----+
0224     // | 32|Andy|
0225     // +---+----+
0226 
0227     // Encoders for most common types are provided in class Encoders
0228     Encoder<Integer> integerEncoder = Encoders.INT();
0229     Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
0230     Dataset<Integer> transformedDS = primitiveDS.map(
0231         (MapFunction<Integer, Integer>) value -> value + 1,
0232         integerEncoder);
0233     transformedDS.collect(); // Returns [2, 3, 4]
0234 
0235     // DataFrames can be converted to a Dataset by providing a class. Mapping based on name
0236     String path = "examples/src/main/resources/people.json";
0237     Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
0238     peopleDS.show();
0239     // +----+-------+
0240     // | age|   name|
0241     // +----+-------+
0242     // |null|Michael|
0243     // |  30|   Andy|
0244     // |  19| Justin|
0245     // +----+-------+
0246     // $example off:create_ds$
0247   }
0248 
0249   private static void runInferSchemaExample(SparkSession spark) {
0250     // $example on:schema_inferring$
0251     // Create an RDD of Person objects from a text file
0252     JavaRDD<Person> peopleRDD = spark.read()
0253       .textFile("examples/src/main/resources/people.txt")
0254       .javaRDD()
0255       .map(line -> {
0256         String[] parts = line.split(",");
0257         Person person = new Person();
0258         person.setName(parts[0]);
0259         person.setAge(Integer.parseInt(parts[1].trim()));
0260         return person;
0261       });
0262 
0263     // Apply a schema to an RDD of JavaBeans to get a DataFrame
0264     Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
0265     // Register the DataFrame as a temporary view
0266     peopleDF.createOrReplaceTempView("people");
0267 
0268     // SQL statements can be run by using the sql methods provided by spark
0269     Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
0270 
0271     // The columns of a row in the result can be accessed by field index
0272     Encoder<String> stringEncoder = Encoders.STRING();
0273     Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
0274         (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
0275         stringEncoder);
0276     teenagerNamesByIndexDF.show();
0277     // +------------+
0278     // |       value|
0279     // +------------+
0280     // |Name: Justin|
0281     // +------------+
0282 
0283     // or by field name
0284     Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
0285         (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
0286         stringEncoder);
0287     teenagerNamesByFieldDF.show();
0288     // +------------+
0289     // |       value|
0290     // +------------+
0291     // |Name: Justin|
0292     // +------------+
0293     // $example off:schema_inferring$
0294   }
0295 
0296   private static void runProgrammaticSchemaExample(SparkSession spark) {
0297     // $example on:programmatic_schema$
0298     // Create an RDD
0299     JavaRDD<String> peopleRDD = spark.sparkContext()
0300       .textFile("examples/src/main/resources/people.txt", 1)
0301       .toJavaRDD();
0302 
0303     // The schema is encoded in a string
0304     String schemaString = "name age";
0305 
0306     // Generate the schema based on the string of schema
0307     List<StructField> fields = new ArrayList<>();
0308     for (String fieldName : schemaString.split(" ")) {
0309       StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
0310       fields.add(field);
0311     }
0312     StructType schema = DataTypes.createStructType(fields);
0313 
0314     // Convert records of the RDD (people) to Rows
0315     JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
0316       String[] attributes = record.split(",");
0317       return RowFactory.create(attributes[0], attributes[1].trim());
0318     });
0319 
0320     // Apply the schema to the RDD
0321     Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
0322 
0323     // Creates a temporary view using the DataFrame
0324     peopleDataFrame.createOrReplaceTempView("people");
0325 
0326     // SQL can be run over a temporary view created using DataFrames
0327     Dataset<Row> results = spark.sql("SELECT name FROM people");
0328 
0329     // The results of SQL queries are DataFrames and support all the normal RDD operations
0330     // The columns of a row in the result can be accessed by field index or by field name
0331     Dataset<String> namesDS = results.map(
0332         (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
0333         Encoders.STRING());
0334     namesDS.show();
0335     // +-------------+
0336     // |        value|
0337     // +-------------+
0338     // |Name: Michael|
0339     // |   Name: Andy|
0340     // | Name: Justin|
0341     // +-------------+
0342     // $example off:programmatic_schema$
0343   }
0344 }