Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.examples.sql.streaming;
0018 
0019 import org.apache.spark.api.java.function.FlatMapFunction;
0020 import org.apache.spark.sql.*;
0021 import org.apache.spark.sql.streaming.StreamingQuery;
0022 
0023 import java.util.Arrays;
0024 
0025 /**
0026  * Counts words in UTF8 encoded, '\n' delimited text received from the network.
0027  *
0028  * Usage: JavaStructuredNetworkWordCount <hostname> <port>
0029  * <hostname> and <port> describe the TCP server that Structured Streaming
0030  * would connect to receive data.
0031  *
0032  * To run this on your local machine, you need to first run a Netcat server
0033  *    `$ nc -lk 9999`
0034  * and then run the example
0035  *    `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCount
0036  *    localhost 9999`
0037  */
0038 public final class JavaStructuredNetworkWordCount {
0039 
0040   public static void main(String[] args) throws Exception {
0041     if (args.length < 2) {
0042       System.err.println("Usage: JavaStructuredNetworkWordCount <hostname> <port>");
0043       System.exit(1);
0044     }
0045 
0046     String host = args[0];
0047     int port = Integer.parseInt(args[1]);
0048 
0049     SparkSession spark = SparkSession
0050       .builder()
0051       .appName("JavaStructuredNetworkWordCount")
0052       .getOrCreate();
0053 
0054     // Create DataFrame representing the stream of input lines from connection to host:port
0055     Dataset<Row> lines = spark
0056       .readStream()
0057       .format("socket")
0058       .option("host", host)
0059       .option("port", port)
0060       .load();
0061 
0062     // Split the lines into words
0063     Dataset<String> words = lines.as(Encoders.STRING()).flatMap(
0064         (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
0065         Encoders.STRING());
0066 
0067     // Generate running word count
0068     Dataset<Row> wordCounts = words.groupBy("value").count();
0069 
0070     // Start running the query that prints the running counts to the console
0071     StreamingQuery query = wordCounts.writeStream()
0072       .outputMode("complete")
0073       .format("console")
0074       .start();
0075 
0076     query.awaitTermination();
0077   }
0078 }