0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 r"""
0019 Counts words in UTF8 encoded, '\n' delimited text received from the network.
0020 Usage: structured_network_wordcount.py <hostname> <port>
0021 <hostname> and <port> describe the TCP server that Structured Streaming
0022 would connect to receive data.
0023
0024 To run this on your local machine, you need to first run a Netcat server
0025 `$ nc -lk 9999`
0026 and then run the example
0027 `$ bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py
0028 localhost 9999`
0029 """
0030 from __future__ import print_function
0031
0032 import sys
0033
0034 from pyspark.sql import SparkSession
0035 from pyspark.sql.functions import explode
0036 from pyspark.sql.functions import split
0037
0038 if __name__ == "__main__":
0039 if len(sys.argv) != 3:
0040 print("Usage: structured_network_wordcount.py <hostname> <port>", file=sys.stderr)
0041 sys.exit(-1)
0042
0043 host = sys.argv[1]
0044 port = int(sys.argv[2])
0045
0046 spark = SparkSession\
0047 .builder\
0048 .appName("StructuredNetworkWordCount")\
0049 .getOrCreate()
0050
0051
0052 lines = spark\
0053 .readStream\
0054 .format('socket')\
0055 .option('host', host)\
0056 .option('port', port)\
0057 .load()
0058
0059
0060 words = lines.select(
0061
0062 explode(
0063 split(lines.value, ' ')
0064 ).alias('word')
0065 )
0066
0067
0068 wordCounts = words.groupBy('word').count()
0069
0070
0071 query = wordCounts\
0072 .writeStream\
0073 .outputMode('complete')\
0074 .format('console')\
0075 .start()
0076
0077 query.awaitTermination()