Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.examples.streaming;
0019 
0020 import java.util.ArrayList;
0021 import java.util.LinkedList;
0022 import java.util.List;
0023 import java.util.Queue;
0024 
0025 import scala.Tuple2;
0026 
0027 import org.apache.spark.SparkConf;
0028 import org.apache.spark.api.java.JavaRDD;
0029 import org.apache.spark.streaming.Duration;
0030 import org.apache.spark.streaming.api.java.JavaDStream;
0031 import org.apache.spark.streaming.api.java.JavaPairDStream;
0032 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0033 
0034 public final class JavaQueueStream {
0035   private JavaQueueStream() {
0036   }
0037 
0038   public static void main(String[] args) throws Exception {
0039 
0040     StreamingExamples.setStreamingLogLevels();
0041     SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream");
0042 
0043     // Create the context
0044     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
0045 
0046     // Create the queue through which RDDs can be pushed to
0047     // a QueueInputDStream
0048 
0049     // Create and push some RDDs into the queue
0050     List<Integer> list = new ArrayList<>();
0051     for (int i = 0; i < 1000; i++) {
0052       list.add(i);
0053     }
0054 
0055     Queue<JavaRDD<Integer>> rddQueue = new LinkedList<>();
0056     for (int i = 0; i < 30; i++) {
0057       rddQueue.add(ssc.sparkContext().parallelize(list));
0058     }
0059 
0060     // Create the QueueInputDStream and use it do some processing
0061     JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
0062     JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair(
0063         i -> new Tuple2<>(i % 10, 1));
0064     JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
0065         (i1, i2) -> i1 + i2);
0066 
0067     reducedStream.print();
0068     ssc.start();
0069     ssc.awaitTermination();
0070   }
0071 }