0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.examples.sql.hive;
0018
0019
0020 import java.io.File;
0021 import java.io.Serializable;
0022 import java.util.ArrayList;
0023 import java.util.List;
0024
0025 import org.apache.spark.api.java.function.MapFunction;
0026 import org.apache.spark.sql.Dataset;
0027 import org.apache.spark.sql.Encoders;
0028 import org.apache.spark.sql.Row;
0029 import org.apache.spark.sql.SparkSession;
0030
0031
0032 public class JavaSparkHiveExample {
0033
0034
0035 public static class Record implements Serializable {
0036 private int key;
0037 private String value;
0038
0039 public int getKey() {
0040 return key;
0041 }
0042
0043 public void setKey(int key) {
0044 this.key = key;
0045 }
0046
0047 public String getValue() {
0048 return value;
0049 }
0050
0051 public void setValue(String value) {
0052 this.value = value;
0053 }
0054 }
0055
0056
0057 public static void main(String[] args) {
0058
0059
0060 String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
0061 SparkSession spark = SparkSession
0062 .builder()
0063 .appName("Java Spark Hive Example")
0064 .config("spark.sql.warehouse.dir", warehouseLocation)
0065 .enableHiveSupport()
0066 .getOrCreate();
0067
0068 spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
0069 spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
0070
0071
0072 spark.sql("SELECT * FROM src").show();
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082 spark.sql("SELECT COUNT(*) FROM src").show();
0083
0084
0085
0086
0087
0088
0089
0090 Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
0091
0092
0093 Dataset<String> stringsDS = sqlDF.map(
0094 (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
0095 Encoders.STRING());
0096 stringsDS.show();
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106 List<Record> records = new ArrayList<>();
0107 for (int key = 1; key < 100; key++) {
0108 Record record = new Record();
0109 record.setKey(key);
0110 record.setValue("val_" + key);
0111 records.add(record);
0112 }
0113 Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
0114 recordsDF.createOrReplaceTempView("records");
0115
0116
0117 spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
0118
0119
0120
0121
0122
0123
0124
0125
0126
0127 spark.stop();
0128 }
0129 }