Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.examples.streaming;
0019 
0020 import java.util.Arrays;
0021 import java.util.regex.Pattern;
0022 
0023 import org.apache.spark.SparkConf;
0024 import org.apache.spark.api.java.JavaRDD;
0025 import org.apache.spark.sql.Dataset;
0026 import org.apache.spark.sql.Row;
0027 import org.apache.spark.sql.SparkSession;
0028 import org.apache.spark.api.java.StorageLevels;
0029 import org.apache.spark.streaming.Durations;
0030 import org.apache.spark.streaming.api.java.JavaDStream;
0031 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
0032 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0033 
0034 /**
0035  * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
0036  * network every second.
0037  *
0038  * Usage: JavaSqlNetworkWordCount <hostname> <port>
0039  * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
0040  *
0041  * To run this on your local machine, you need to first run a Netcat server
0042  *    `$ nc -lk 9999`
0043  * and then run the example
0044  *    `$ bin/run-example org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost 9999`
0045  */
0046 public final class JavaSqlNetworkWordCount {
0047   private static final Pattern SPACE = Pattern.compile(" ");
0048 
0049   public static void main(String[] args) throws Exception {
0050     if (args.length < 2) {
0051       System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
0052       System.exit(1);
0053     }
0054 
0055     StreamingExamples.setStreamingLogLevels();
0056 
0057     // Create the context with a 1 second batch size
0058     SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount");
0059     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
0060 
0061     // Create a JavaReceiverInputDStream on target ip:port and count the
0062     // words in input stream of \n delimited text (eg. generated by 'nc')
0063     // Note that no duplication in storage level only for running locally.
0064     // Replication necessary in distributed scenario for fault tolerance.
0065     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
0066         args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
0067     JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
0068 
0069     // Convert RDDs of the words DStream to DataFrame and run SQL query
0070     words.foreachRDD((rdd, time) -> {
0071       SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
0072 
0073       // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
0074       JavaRDD<JavaRecord> rowRDD = rdd.map(word -> {
0075         JavaRecord record = new JavaRecord();
0076         record.setWord(word);
0077         return record;
0078       });
0079       Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class);
0080 
0081       // Creates a temporary view using the DataFrame
0082       wordsDataFrame.createOrReplaceTempView("words");
0083 
0084       // Do word count on table using SQL and print it
0085       Dataset<Row> wordCountsDataFrame =
0086           spark.sql("select word, count(*) as total from words group by word");
0087       System.out.println("========= " + time + "=========");
0088       wordCountsDataFrame.show();
0089     });
0090 
0091     ssc.start();
0092     ssc.awaitTermination();
0093   }
0094 }
0095 
0096 /** Lazily instantiated singleton instance of SparkSession */
0097 class JavaSparkSessionSingleton {
0098   private static transient SparkSession instance = null;
0099   public static SparkSession getInstance(SparkConf sparkConf) {
0100     if (instance == null) {
0101       instance = SparkSession
0102         .builder()
0103         .config(sparkConf)
0104         .getOrCreate();
0105     }
0106     return instance;
0107   }
0108 }