Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 r"""
0019  Counts words in UTF8 encoded, '\n' delimited text received from the network.
0020  Usage: structured_network_wordcount.py <hostname> <port>
0021    <hostname> and <port> describe the TCP server that Structured Streaming
0022    would connect to receive data.
0023 
0024  To run this on your local machine, you need to first run a Netcat server
0025     `$ nc -lk 9999`
0026  and then run the example
0027     `$ bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py
0028     localhost 9999`
0029 """
0030 from __future__ import print_function
0031 
0032 import sys
0033 
0034 from pyspark.sql import SparkSession
0035 from pyspark.sql.functions import explode
0036 from pyspark.sql.functions import split
0037 
0038 if __name__ == "__main__":
0039     if len(sys.argv) != 3:
0040         print("Usage: structured_network_wordcount.py <hostname> <port>", file=sys.stderr)
0041         sys.exit(-1)
0042 
0043     host = sys.argv[1]
0044     port = int(sys.argv[2])
0045 
0046     spark = SparkSession\
0047         .builder\
0048         .appName("StructuredNetworkWordCount")\
0049         .getOrCreate()
0050 
0051     # Create DataFrame representing the stream of input lines from connection to host:port
0052     lines = spark\
0053         .readStream\
0054         .format('socket')\
0055         .option('host', host)\
0056         .option('port', port)\
0057         .load()
0058 
0059     # Split the lines into words
0060     words = lines.select(
0061         # explode turns each item in an array into a separate row
0062         explode(
0063             split(lines.value, ' ')
0064         ).alias('word')
0065     )
0066 
0067     # Generate running word count
0068     wordCounts = words.groupBy('word').count()
0069 
0070     # Start running the query that prints the running counts to the console
0071     query = wordCounts\
0072         .writeStream\
0073         .outputMode('complete')\
0074         .format('console')\
0075         .start()
0076 
0077     query.awaitTermination()