0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.examples.streaming;
0019
0020 import java.util.ArrayList;
0021 import java.util.LinkedList;
0022 import java.util.List;
0023 import java.util.Queue;
0024
0025 import scala.Tuple2;
0026
0027 import org.apache.spark.SparkConf;
0028 import org.apache.spark.api.java.JavaRDD;
0029 import org.apache.spark.streaming.Duration;
0030 import org.apache.spark.streaming.api.java.JavaDStream;
0031 import org.apache.spark.streaming.api.java.JavaPairDStream;
0032 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0033
0034 public final class JavaQueueStream {
0035 private JavaQueueStream() {
0036 }
0037
0038 public static void main(String[] args) throws Exception {
0039
0040 StreamingExamples.setStreamingLogLevels();
0041 SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream");
0042
0043
0044 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
0045
0046
0047
0048
0049
0050 List<Integer> list = new ArrayList<>();
0051 for (int i = 0; i < 1000; i++) {
0052 list.add(i);
0053 }
0054
0055 Queue<JavaRDD<Integer>> rddQueue = new LinkedList<>();
0056 for (int i = 0; i < 30; i++) {
0057 rddQueue.add(ssc.sparkContext().parallelize(list));
0058 }
0059
0060
0061 JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
0062 JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair(
0063 i -> new Tuple2<>(i % 10, 1));
0064 JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
0065 (i1, i2) -> i1 + i2);
0066
0067 reducedStream.print();
0068 ssc.start();
0069 ssc.awaitTermination();
0070 }
0071 }