Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.examples.streaming;
0019 
0020 import java.util.Arrays;
0021 import java.util.regex.Pattern;
0022 
0023 import scala.Tuple2;
0024 
0025 import org.apache.spark.SparkConf;
0026 import org.apache.spark.api.java.StorageLevels;
0027 import org.apache.spark.streaming.Durations;
0028 import org.apache.spark.streaming.api.java.JavaDStream;
0029 import org.apache.spark.streaming.api.java.JavaPairDStream;
0030 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
0031 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0032 
0033 /**
0034  * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
0035  *
0036  * Usage: JavaNetworkWordCount <hostname> <port>
0037  * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
0038  *
0039  * To run this on your local machine, you need to first run a Netcat server
0040  *    `$ nc -lk 9999`
0041  * and then run the example
0042  *    `$ bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999`
0043  */
0044 public final class JavaNetworkWordCount {
0045   private static final Pattern SPACE = Pattern.compile(" ");
0046 
0047   public static void main(String[] args) throws Exception {
0048     if (args.length < 2) {
0049       System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
0050       System.exit(1);
0051     }
0052 
0053     StreamingExamples.setStreamingLogLevels();
0054 
0055     // Create the context with a 1 second batch size
0056     SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
0057     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
0058 
0059     // Create a JavaReceiverInputDStream on target ip:port and count the
0060     // words in input stream of \n delimited text (eg. generated by 'nc')
0061     // Note that no duplication in storage level only for running locally.
0062     // Replication necessary in distributed scenario for fault tolerance.
0063     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
0064             args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
0065     JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
0066     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
0067         .reduceByKey((i1, i2) -> i1 + i2);
0068 
0069     wordCounts.print();
0070     ssc.start();
0071     ssc.awaitTermination();
0072   }
0073 }