0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.examples.sql;
0018
0019
0020 import java.io.Serializable;
0021 import java.util.ArrayList;
0022 import java.util.Arrays;
0023 import java.util.List;
0024
0025 import java.util.Properties;
0026
0027
0028 import org.apache.spark.api.java.function.MapFunction;
0029 import org.apache.spark.sql.Encoders;
0030
0031
0032 import org.apache.spark.sql.Dataset;
0033 import org.apache.spark.sql.Row;
0034
0035
0036
0037 import org.apache.spark.sql.SparkSession;
0038
0039 public class JavaSQLDataSourceExample {
0040
0041
0042 public static class Square implements Serializable {
0043 private int value;
0044 private int square;
0045
0046
0047
0048 public int getValue() {
0049 return value;
0050 }
0051
0052 public void setValue(int value) {
0053 this.value = value;
0054 }
0055
0056 public int getSquare() {
0057 return square;
0058 }
0059
0060 public void setSquare(int square) {
0061 this.square = square;
0062 }
0063
0064 }
0065
0066
0067
0068 public static class Cube implements Serializable {
0069 private int value;
0070 private int cube;
0071
0072
0073
0074 public int getValue() {
0075 return value;
0076 }
0077
0078 public void setValue(int value) {
0079 this.value = value;
0080 }
0081
0082 public int getCube() {
0083 return cube;
0084 }
0085
0086 public void setCube(int cube) {
0087 this.cube = cube;
0088 }
0089
0090 }
0091
0092
0093 public static void main(String[] args) {
0094 SparkSession spark = SparkSession
0095 .builder()
0096 .appName("Java Spark SQL data sources example")
0097 .config("spark.some.config.option", "some-value")
0098 .getOrCreate();
0099
0100 runBasicDataSourceExample(spark);
0101 runGenericFileSourceOptionsExample(spark);
0102 runBasicParquetExample(spark);
0103 runParquetSchemaMergingExample(spark);
0104 runJsonDatasetExample(spark);
0105 runJdbcDatasetExample(spark);
0106
0107 spark.stop();
0108 }
0109
0110 private static void runGenericFileSourceOptionsExample(SparkSession spark) {
0111
0112
0113 spark.sql("set spark.sql.files.ignoreCorruptFiles=true");
0114
0115 Dataset<Row> testCorruptDF = spark.read().parquet(
0116 "examples/src/main/resources/dir1/",
0117 "examples/src/main/resources/dir1/dir2/");
0118 testCorruptDF.show();
0119
0120
0121
0122
0123
0124
0125
0126
0127 Dataset<Row> recursiveLoadedDF = spark.read().format("parquet")
0128 .option("recursiveFileLookup", "true")
0129 .load("examples/src/main/resources/dir1");
0130 recursiveLoadedDF.show();
0131
0132
0133
0134
0135
0136
0137
0138 spark.sql("set spark.sql.files.ignoreCorruptFiles=false");
0139
0140 Dataset<Row> testGlobFilterDF = spark.read().format("parquet")
0141 .option("pathGlobFilter", "*.parquet")
0142 .load("examples/src/main/resources/dir1");
0143 testGlobFilterDF.show();
0144
0145
0146
0147
0148
0149
0150 }
0151
0152 private static void runBasicDataSourceExample(SparkSession spark) {
0153
0154 Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
0155 usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
0156
0157
0158 Dataset<Row> peopleDF =
0159 spark.read().format("json").load("examples/src/main/resources/people.json");
0160 peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
0161
0162
0163 Dataset<Row> peopleDFCsv = spark.read().format("csv")
0164 .option("sep", ";")
0165 .option("inferSchema", "true")
0166 .option("header", "true")
0167 .load("examples/src/main/resources/people.csv");
0168
0169
0170 usersDF.write().format("orc")
0171 .option("orc.bloom.filter.columns", "favorite_color")
0172 .option("orc.dictionary.key.threshold", "1.0")
0173 .option("orc.column.encoding.direct", "name")
0174 .save("users_with_options.orc");
0175
0176
0177 Dataset<Row> sqlDF =
0178 spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
0179
0180
0181 peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
0182
0183
0184 usersDF
0185 .write()
0186 .partitionBy("favorite_color")
0187 .format("parquet")
0188 .save("namesPartByColor.parquet");
0189
0190
0191 peopleDF
0192 .write()
0193 .partitionBy("favorite_color")
0194 .bucketBy(42, "name")
0195 .saveAsTable("people_partitioned_bucketed");
0196
0197
0198 spark.sql("DROP TABLE IF EXISTS people_bucketed");
0199 spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed");
0200 }
0201
0202 private static void runBasicParquetExample(SparkSession spark) {
0203
0204 Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
0205
0206
0207 peopleDF.write().parquet("people.parquet");
0208
0209
0210
0211
0212 Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
0213
0214
0215 parquetFileDF.createOrReplaceTempView("parquetFile");
0216 Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
0217 Dataset<String> namesDS = namesDF.map(
0218 (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
0219 Encoders.STRING());
0220 namesDS.show();
0221
0222
0223
0224
0225
0226
0227 }
0228
0229 private static void runParquetSchemaMergingExample(SparkSession spark) {
0230
0231 List<Square> squares = new ArrayList<>();
0232 for (int value = 1; value <= 5; value++) {
0233 Square square = new Square();
0234 square.setValue(value);
0235 square.setSquare(value * value);
0236 squares.add(square);
0237 }
0238
0239
0240 Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
0241 squaresDF.write().parquet("data/test_table/key=1");
0242
0243 List<Cube> cubes = new ArrayList<>();
0244 for (int value = 6; value <= 10; value++) {
0245 Cube cube = new Cube();
0246 cube.setValue(value);
0247 cube.setCube(value * value * value);
0248 cubes.add(cube);
0249 }
0250
0251
0252
0253 Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
0254 cubesDF.write().parquet("data/test_table/key=2");
0255
0256
0257 Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
0258 mergedDF.printSchema();
0259
0260
0261
0262
0263
0264
0265
0266
0267
0268 }
0269
0270 private static void runJsonDatasetExample(SparkSession spark) {
0271
0272
0273
0274 Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
0275
0276
0277 people.printSchema();
0278
0279
0280
0281
0282
0283 people.createOrReplaceTempView("people");
0284
0285
0286 Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
0287 namesDF.show();
0288
0289
0290
0291
0292
0293
0294
0295
0296 List<String> jsonData = Arrays.asList(
0297 "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
0298 Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
0299 Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
0300 anotherPeople.show();
0301
0302
0303
0304
0305
0306
0307 }
0308
0309 private static void runJdbcDatasetExample(SparkSession spark) {
0310
0311
0312
0313 Dataset<Row> jdbcDF = spark.read()
0314 .format("jdbc")
0315 .option("url", "jdbc:postgresql:dbserver")
0316 .option("dbtable", "schema.tablename")
0317 .option("user", "username")
0318 .option("password", "password")
0319 .load();
0320
0321 Properties connectionProperties = new Properties();
0322 connectionProperties.put("user", "username");
0323 connectionProperties.put("password", "password");
0324 Dataset<Row> jdbcDF2 = spark.read()
0325 .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
0326
0327
0328 jdbcDF.write()
0329 .format("jdbc")
0330 .option("url", "jdbc:postgresql:dbserver")
0331 .option("dbtable", "schema.tablename")
0332 .option("user", "username")
0333 .option("password", "password")
0334 .save();
0335
0336 jdbcDF2.write()
0337 .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
0338
0339
0340 jdbcDF.write()
0341 .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
0342 .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
0343
0344 }
0345 }