Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.examples.streaming;
0019 
0020 import java.util.Arrays;
0021 import java.util.List;
0022 import java.util.regex.Pattern;
0023 
0024 import scala.Tuple2;
0025 
0026 import org.apache.spark.SparkConf;
0027 import org.apache.spark.api.java.function.*;
0028 import org.apache.spark.api.java.JavaPairRDD;
0029 import org.apache.spark.api.java.Optional;
0030 import org.apache.spark.api.java.StorageLevels;
0031 import org.apache.spark.streaming.Durations;
0032 import org.apache.spark.streaming.State;
0033 import org.apache.spark.streaming.StateSpec;
0034 import org.apache.spark.streaming.api.java.*;
0035 
0036 /**
0037  * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
0038  * second starting with initial value of word count.
0039  * Usage: JavaStatefulNetworkWordCount <hostname> <port>
0040  * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
0041  * data.
0042  * <p>
0043  * To run this on your local machine, you need to first run a Netcat server
0044  * `$ nc -lk 9999`
0045  * and then run the example
0046  * `$ bin/run-example
0047  * org.apache.spark.examples.streaming.JavaStatefulNetworkWordCount localhost 9999`
0048  */
0049 public class JavaStatefulNetworkWordCount {
0050   private static final Pattern SPACE = Pattern.compile(" ");
0051 
0052   public static void main(String[] args) throws Exception {
0053     if (args.length < 2) {
0054       System.err.println("Usage: JavaStatefulNetworkWordCount <hostname> <port>");
0055       System.exit(1);
0056     }
0057 
0058     StreamingExamples.setStreamingLogLevels();
0059 
0060     // Create the context with a 1 second batch size
0061     SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount");
0062     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
0063     ssc.checkpoint(".");
0064 
0065     // Initial state RDD input to mapWithState
0066     @SuppressWarnings("unchecked")
0067     List<Tuple2<String, Integer>> tuples =
0068         Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
0069     JavaPairRDD<String, Integer> initialRDD = ssc.sparkContext().parallelizePairs(tuples);
0070 
0071     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
0072             args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2);
0073 
0074     JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
0075 
0076     JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(s -> new Tuple2<>(s, 1));
0077 
0078     // Update the cumulative count function
0079     Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
0080         (word, one, state) -> {
0081           int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
0082           Tuple2<String, Integer> output = new Tuple2<>(word, sum);
0083           state.update(sum);
0084           return output;
0085         };
0086 
0087     // DStream made of get cumulative counts that get updated in every batch
0088     JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
0089         wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
0090 
0091     stateDstream.print();
0092     ssc.start();
0093     ssc.awaitTermination();
0094   }
0095 }