|
||||
0001 /* 0002 * Licensed to the Apache Software Foundation (ASF) under one or more 0003 * contributor license agreements. See the NOTICE file distributed with 0004 * this work for additional information regarding copyright ownership. 0005 * The ASF licenses this file to You under the Apache License, Version 2.0 0006 * (the "License"); you may not use this file except in compliance with 0007 * the License. You may obtain a copy of the License at 0008 * 0009 * http://www.apache.org/licenses/LICENSE-2.0 0010 * 0011 * Unless required by applicable law or agreed to in writing, software 0012 * distributed under the License is distributed on an "AS IS" BASIS, 0013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 0014 * See the License for the specific language governing permissions and 0015 * limitations under the License. 0016 */ 0017 package org.apache.spark.examples.sql.streaming; 0018 0019 import org.apache.spark.api.java.function.FlatMapFunction; 0020 import org.apache.spark.sql.*; 0021 import org.apache.spark.sql.streaming.StreamingQuery; 0022 0023 import java.util.Arrays; 0024 0025 /** 0026 * Counts words in UTF8 encoded, '\n' delimited text received from the network. 0027 * 0028 * Usage: JavaStructuredNetworkWordCount <hostname> <port> 0029 * <hostname> and <port> describe the TCP server that Structured Streaming 0030 * would connect to receive data. 0031 * 0032 * To run this on your local machine, you need to first run a Netcat server 0033 * `$ nc -lk 9999` 0034 * and then run the example 0035 * `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCount 0036 * localhost 9999` 0037 */ 0038 public final class JavaStructuredNetworkWordCount { 0039 0040 public static void main(String[] args) throws Exception { 0041 if (args.length < 2) { 0042 System.err.println("Usage: JavaStructuredNetworkWordCount <hostname> <port>"); 0043 System.exit(1); 0044 } 0045 0046 String host = args[0]; 0047 int port = Integer.parseInt(args[1]); 0048 0049 SparkSession spark = SparkSession 0050 .builder() 0051 .appName("JavaStructuredNetworkWordCount") 0052 .getOrCreate(); 0053 0054 // Create DataFrame representing the stream of input lines from connection to host:port 0055 Dataset<Row> lines = spark 0056 .readStream() 0057 .format("socket") 0058 .option("host", host) 0059 .option("port", port) 0060 .load(); 0061 0062 // Split the lines into words 0063 Dataset<String> words = lines.as(Encoders.STRING()).flatMap( 0064 (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), 0065 Encoders.STRING()); 0066 0067 // Generate running word count 0068 Dataset<Row> wordCounts = words.groupBy("value").count(); 0069 0070 // Start running the query that prints the running counts to the console 0071 StreamingQuery query = wordCounts.writeStream() 0072 .outputMode("complete") 0073 .format("console") 0074 .start(); 0075 0076 query.awaitTermination(); 0077 } 0078 }
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.1.0 LXR engine. The LXR team |