0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.examples.sql;
0018
0019
0020 import java.util.ArrayList;
0021 import java.util.List;
0022
0023
0024 import java.util.Arrays;
0025 import java.util.Collections;
0026 import java.io.Serializable;
0027
0028
0029
0030
0031 import org.apache.spark.api.java.JavaRDD;
0032 import org.apache.spark.api.java.function.Function;
0033
0034
0035 import org.apache.spark.api.java.function.MapFunction;
0036
0037
0038
0039 import org.apache.spark.sql.Dataset;
0040 import org.apache.spark.sql.Row;
0041
0042
0043
0044 import org.apache.spark.sql.Encoder;
0045 import org.apache.spark.sql.Encoders;
0046
0047
0048 import org.apache.spark.sql.RowFactory;
0049
0050 import org.apache.spark.sql.SparkSession;
0051
0052
0053 import org.apache.spark.sql.types.DataTypes;
0054 import org.apache.spark.sql.types.StructField;
0055 import org.apache.spark.sql.types.StructType;
0056
0057 import org.apache.spark.sql.AnalysisException;
0058
0059
0060
0061 import static org.apache.spark.sql.functions.col;
0062
0063
0064 public class JavaSparkSQLExample {
0065
0066 public static class Person implements Serializable {
0067 private String name;
0068 private int age;
0069
0070 public String getName() {
0071 return name;
0072 }
0073
0074 public void setName(String name) {
0075 this.name = name;
0076 }
0077
0078 public int getAge() {
0079 return age;
0080 }
0081
0082 public void setAge(int age) {
0083 this.age = age;
0084 }
0085 }
0086
0087
0088 public static void main(String[] args) throws AnalysisException {
0089
0090 SparkSession spark = SparkSession
0091 .builder()
0092 .appName("Java Spark SQL basic example")
0093 .config("spark.some.config.option", "some-value")
0094 .getOrCreate();
0095
0096
0097 runBasicDataFrameExample(spark);
0098 runDatasetCreationExample(spark);
0099 runInferSchemaExample(spark);
0100 runProgrammaticSchemaExample(spark);
0101
0102 spark.stop();
0103 }
0104
0105 private static void runBasicDataFrameExample(SparkSession spark) throws AnalysisException {
0106
0107 Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
0108
0109
0110 df.show();
0111
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122 df.printSchema();
0123
0124
0125
0126
0127
0128 df.select("name").show();
0129
0130
0131
0132
0133
0134
0135
0136
0137
0138 df.select(col("name"), col("age").plus(1)).show();
0139
0140
0141
0142
0143
0144
0145
0146
0147
0148 df.filter(col("age").gt(21)).show();
0149
0150
0151
0152
0153
0154
0155
0156 df.groupBy("age").count().show();
0157
0158
0159
0160
0161
0162
0163
0164
0165
0166
0167
0168 df.createOrReplaceTempView("people");
0169
0170 Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
0171 sqlDF.show();
0172
0173
0174
0175
0176
0177
0178
0179
0180
0181
0182
0183 df.createGlobalTempView("people");
0184
0185
0186 spark.sql("SELECT * FROM global_temp.people").show();
0187
0188
0189
0190
0191
0192
0193
0194
0195
0196 spark.newSession().sql("SELECT * FROM global_temp.people").show();
0197
0198
0199
0200
0201
0202
0203
0204
0205 }
0206
0207 private static void runDatasetCreationExample(SparkSession spark) {
0208
0209
0210 Person person = new Person();
0211 person.setName("Andy");
0212 person.setAge(32);
0213
0214
0215 Encoder<Person> personEncoder = Encoders.bean(Person.class);
0216 Dataset<Person> javaBeanDS = spark.createDataset(
0217 Collections.singletonList(person),
0218 personEncoder
0219 );
0220 javaBeanDS.show();
0221
0222
0223
0224
0225
0226
0227
0228 Encoder<Integer> integerEncoder = Encoders.INT();
0229 Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
0230 Dataset<Integer> transformedDS = primitiveDS.map(
0231 (MapFunction<Integer, Integer>) value -> value + 1,
0232 integerEncoder);
0233 transformedDS.collect();
0234
0235
0236 String path = "examples/src/main/resources/people.json";
0237 Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
0238 peopleDS.show();
0239
0240
0241
0242
0243
0244
0245
0246
0247 }
0248
0249 private static void runInferSchemaExample(SparkSession spark) {
0250
0251
0252 JavaRDD<Person> peopleRDD = spark.read()
0253 .textFile("examples/src/main/resources/people.txt")
0254 .javaRDD()
0255 .map(line -> {
0256 String[] parts = line.split(",");
0257 Person person = new Person();
0258 person.setName(parts[0]);
0259 person.setAge(Integer.parseInt(parts[1].trim()));
0260 return person;
0261 });
0262
0263
0264 Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
0265
0266 peopleDF.createOrReplaceTempView("people");
0267
0268
0269 Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
0270
0271
0272 Encoder<String> stringEncoder = Encoders.STRING();
0273 Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
0274 (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
0275 stringEncoder);
0276 teenagerNamesByIndexDF.show();
0277
0278
0279
0280
0281
0282
0283
0284 Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
0285 (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
0286 stringEncoder);
0287 teenagerNamesByFieldDF.show();
0288
0289
0290
0291
0292
0293
0294 }
0295
0296 private static void runProgrammaticSchemaExample(SparkSession spark) {
0297
0298
0299 JavaRDD<String> peopleRDD = spark.sparkContext()
0300 .textFile("examples/src/main/resources/people.txt", 1)
0301 .toJavaRDD();
0302
0303
0304 String schemaString = "name age";
0305
0306
0307 List<StructField> fields = new ArrayList<>();
0308 for (String fieldName : schemaString.split(" ")) {
0309 StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
0310 fields.add(field);
0311 }
0312 StructType schema = DataTypes.createStructType(fields);
0313
0314
0315 JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
0316 String[] attributes = record.split(",");
0317 return RowFactory.create(attributes[0], attributes[1].trim());
0318 });
0319
0320
0321 Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
0322
0323
0324 peopleDataFrame.createOrReplaceTempView("people");
0325
0326
0327 Dataset<Row> results = spark.sql("SELECT name FROM people");
0328
0329
0330
0331 Dataset<String> namesDS = results.map(
0332 (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
0333 Encoders.STRING());
0334 namesDS.show();
0335
0336
0337
0338
0339
0340
0341
0342
0343 }
0344 }