0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.examples.sql.streaming;
0018
0019 import org.apache.spark.api.java.function.FlatMapFunction;
0020 import org.apache.spark.sql.*;
0021 import org.apache.spark.sql.streaming.StreamingQuery;
0022 import scala.Tuple2;
0023
0024 import java.sql.Timestamp;
0025 import java.util.ArrayList;
0026 import java.util.List;
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051 public final class JavaStructuredNetworkWordCountWindowed {
0052
0053 public static void main(String[] args) throws Exception {
0054 if (args.length < 3) {
0055 System.err.println("Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port>" +
0056 " <window duration in seconds> [<slide duration in seconds>]");
0057 System.exit(1);
0058 }
0059
0060 String host = args[0];
0061 int port = Integer.parseInt(args[1]);
0062 int windowSize = Integer.parseInt(args[2]);
0063 int slideSize = (args.length == 3) ? windowSize : Integer.parseInt(args[3]);
0064 if (slideSize > windowSize) {
0065 System.err.println("<slide duration> must be less than or equal to <window duration>");
0066 }
0067 String windowDuration = windowSize + " seconds";
0068 String slideDuration = slideSize + " seconds";
0069
0070 SparkSession spark = SparkSession
0071 .builder()
0072 .appName("JavaStructuredNetworkWordCountWindowed")
0073 .getOrCreate();
0074
0075
0076 Dataset<Row> lines = spark
0077 .readStream()
0078 .format("socket")
0079 .option("host", host)
0080 .option("port", port)
0081 .option("includeTimestamp", true)
0082 .load();
0083
0084
0085 Dataset<Row> words = lines
0086 .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
0087 .flatMap((FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>) t -> {
0088 List<Tuple2<String, Timestamp>> result = new ArrayList<>();
0089 for (String word : t._1.split(" ")) {
0090 result.add(new Tuple2<>(word, t._2));
0091 }
0092 return result.iterator();
0093 },
0094 Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
0095 ).toDF("word", "timestamp");
0096
0097
0098 Dataset<Row> windowedCounts = words.groupBy(
0099 functions.window(words.col("timestamp"), windowDuration, slideDuration),
0100 words.col("word")
0101 ).count().orderBy("window");
0102
0103
0104 StreamingQuery query = windowedCounts.writeStream()
0105 .outputMode("complete")
0106 .format("console")
0107 .option("truncate", "false")
0108 .start();
0109
0110 query.awaitTermination();
0111 }
0112 }