0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.examples.streaming;
0019
0020 import java.util.Arrays;
0021 import java.util.List;
0022 import java.util.regex.Pattern;
0023
0024 import scala.Tuple2;
0025
0026 import org.apache.spark.SparkConf;
0027 import org.apache.spark.api.java.function.*;
0028 import org.apache.spark.api.java.JavaPairRDD;
0029 import org.apache.spark.api.java.Optional;
0030 import org.apache.spark.api.java.StorageLevels;
0031 import org.apache.spark.streaming.Durations;
0032 import org.apache.spark.streaming.State;
0033 import org.apache.spark.streaming.StateSpec;
0034 import org.apache.spark.streaming.api.java.*;
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049 public class JavaStatefulNetworkWordCount {
0050 private static final Pattern SPACE = Pattern.compile(" ");
0051
0052 public static void main(String[] args) throws Exception {
0053 if (args.length < 2) {
0054 System.err.println("Usage: JavaStatefulNetworkWordCount <hostname> <port>");
0055 System.exit(1);
0056 }
0057
0058 StreamingExamples.setStreamingLogLevels();
0059
0060
0061 SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount");
0062 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
0063 ssc.checkpoint(".");
0064
0065
0066 @SuppressWarnings("unchecked")
0067 List<Tuple2<String, Integer>> tuples =
0068 Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
0069 JavaPairRDD<String, Integer> initialRDD = ssc.sparkContext().parallelizePairs(tuples);
0070
0071 JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
0072 args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2);
0073
0074 JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
0075
0076 JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(s -> new Tuple2<>(s, 1));
0077
0078
0079 Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
0080 (word, one, state) -> {
0081 int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
0082 Tuple2<String, Integer> output = new Tuple2<>(word, sum);
0083 state.update(sum);
0084 return output;
0085 };
0086
0087
0088 JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
0089 wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
0090
0091 stateDstream.print();
0092 ssc.start();
0093 ssc.awaitTermination();
0094 }
0095 }