0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.examples.streaming;
0019
0020 import java.util.Arrays;
0021 import java.util.regex.Pattern;
0022
0023 import scala.Tuple2;
0024
0025 import org.apache.spark.SparkConf;
0026 import org.apache.spark.api.java.StorageLevels;
0027 import org.apache.spark.streaming.Durations;
0028 import org.apache.spark.streaming.api.java.JavaDStream;
0029 import org.apache.spark.streaming.api.java.JavaPairDStream;
0030 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
0031 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044 public final class JavaNetworkWordCount {
0045 private static final Pattern SPACE = Pattern.compile(" ");
0046
0047 public static void main(String[] args) throws Exception {
0048 if (args.length < 2) {
0049 System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
0050 System.exit(1);
0051 }
0052
0053 StreamingExamples.setStreamingLogLevels();
0054
0055
0056 SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
0057 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
0058
0059
0060
0061
0062
0063 JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
0064 args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
0065 JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
0066 JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
0067 .reduceByKey((i1, i2) -> i1 + i2);
0068
0069 wordCounts.print();
0070 ssc.start();
0071 ssc.awaitTermination();
0072 }
0073 }