Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.examples.sql.hive;
0018 
0019 // $example on:spark_hive$
0020 import java.io.File;
0021 import java.io.Serializable;
0022 import java.util.ArrayList;
0023 import java.util.List;
0024 
0025 import org.apache.spark.api.java.function.MapFunction;
0026 import org.apache.spark.sql.Dataset;
0027 import org.apache.spark.sql.Encoders;
0028 import org.apache.spark.sql.Row;
0029 import org.apache.spark.sql.SparkSession;
0030 // $example off:spark_hive$
0031 
0032 public class JavaSparkHiveExample {
0033 
0034   // $example on:spark_hive$
0035   public static class Record implements Serializable {
0036     private int key;
0037     private String value;
0038 
0039     public int getKey() {
0040       return key;
0041     }
0042 
0043     public void setKey(int key) {
0044       this.key = key;
0045     }
0046 
0047     public String getValue() {
0048       return value;
0049     }
0050 
0051     public void setValue(String value) {
0052       this.value = value;
0053     }
0054   }
0055   // $example off:spark_hive$
0056 
0057   public static void main(String[] args) {
0058     // $example on:spark_hive$
0059     // warehouseLocation points to the default location for managed databases and tables
0060     String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
0061     SparkSession spark = SparkSession
0062       .builder()
0063       .appName("Java Spark Hive Example")
0064       .config("spark.sql.warehouse.dir", warehouseLocation)
0065       .enableHiveSupport()
0066       .getOrCreate();
0067 
0068     spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
0069     spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
0070 
0071     // Queries are expressed in HiveQL
0072     spark.sql("SELECT * FROM src").show();
0073     // +---+-------+
0074     // |key|  value|
0075     // +---+-------+
0076     // |238|val_238|
0077     // | 86| val_86|
0078     // |311|val_311|
0079     // ...
0080 
0081     // Aggregation queries are also supported.
0082     spark.sql("SELECT COUNT(*) FROM src").show();
0083     // +--------+
0084     // |count(1)|
0085     // +--------+
0086     // |    500 |
0087     // +--------+
0088 
0089     // The results of SQL queries are themselves DataFrames and support all normal functions.
0090     Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
0091 
0092     // The items in DataFrames are of type Row, which lets you to access each column by ordinal.
0093     Dataset<String> stringsDS = sqlDF.map(
0094         (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
0095         Encoders.STRING());
0096     stringsDS.show();
0097     // +--------------------+
0098     // |               value|
0099     // +--------------------+
0100     // |Key: 0, Value: val_0|
0101     // |Key: 0, Value: val_0|
0102     // |Key: 0, Value: val_0|
0103     // ...
0104 
0105     // You can also use DataFrames to create temporary views within a SparkSession.
0106     List<Record> records = new ArrayList<>();
0107     for (int key = 1; key < 100; key++) {
0108       Record record = new Record();
0109       record.setKey(key);
0110       record.setValue("val_" + key);
0111       records.add(record);
0112     }
0113     Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
0114     recordsDF.createOrReplaceTempView("records");
0115 
0116     // Queries can then join DataFrames data with data stored in Hive.
0117     spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
0118     // +---+------+---+------+
0119     // |key| value|key| value|
0120     // +---+------+---+------+
0121     // |  2| val_2|  2| val_2|
0122     // |  2| val_2|  2| val_2|
0123     // |  4| val_4|  4| val_4|
0124     // ...
0125     // $example off:spark_hive$
0126 
0127     spark.stop();
0128   }
0129 }