0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 r"""
0019 Counts words in UTF8 encoded, '\n' delimited text received from the network over a
0020 sliding window of configurable duration. Each line from the network is tagged
0021 with a timestamp that is used to determine the windows into which it falls.
0022
0023 Usage: structured_network_wordcount_windowed.py <hostname> <port> <window duration>
0024 [<slide duration>]
0025 <hostname> and <port> describe the TCP server that Structured Streaming
0026 would connect to receive data.
0027 <window duration> gives the size of window, specified as integer number of seconds
0028 <slide duration> gives the amount of time successive windows are offset from one another,
0029 given in the same units as above. <slide duration> should be less than or equal to
0030 <window duration>. If the two are equal, successive windows have no overlap. If
0031 <slide duration> is not provided, it defaults to <window duration>.
0032
0033 To run this on your local machine, you need to first run a Netcat server
0034 `$ nc -lk 9999`
0035 and then run the example
0036 `$ bin/spark-submit
0037 examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
0038 localhost 9999 <window duration> [<slide duration>]`
0039
0040 One recommended <window duration>, <slide duration> pair is 10, 5
0041 """
0042 from __future__ import print_function
0043
0044 import sys
0045
0046 from pyspark.sql import SparkSession
0047 from pyspark.sql.functions import explode
0048 from pyspark.sql.functions import split
0049 from pyspark.sql.functions import window
0050
0051 if __name__ == "__main__":
0052 if len(sys.argv) != 5 and len(sys.argv) != 4:
0053 msg = ("Usage: structured_network_wordcount_windowed.py <hostname> <port> "
0054 "<window duration in seconds> [<slide duration in seconds>]")
0055 print(msg, file=sys.stderr)
0056 sys.exit(-1)
0057
0058 host = sys.argv[1]
0059 port = int(sys.argv[2])
0060 windowSize = int(sys.argv[3])
0061 slideSize = int(sys.argv[4]) if (len(sys.argv) == 5) else windowSize
0062 if slideSize > windowSize:
0063 print("<slide duration> must be less than or equal to <window duration>", file=sys.stderr)
0064 windowDuration = '{} seconds'.format(windowSize)
0065 slideDuration = '{} seconds'.format(slideSize)
0066
0067 spark = SparkSession\
0068 .builder\
0069 .appName("StructuredNetworkWordCountWindowed")\
0070 .getOrCreate()
0071
0072
0073 lines = spark\
0074 .readStream\
0075 .format('socket')\
0076 .option('host', host)\
0077 .option('port', port)\
0078 .option('includeTimestamp', 'true')\
0079 .load()
0080
0081
0082
0083 words = lines.select(
0084 explode(split(lines.value, ' ')).alias('word'),
0085 lines.timestamp
0086 )
0087
0088
0089 windowedCounts = words.groupBy(
0090 window(words.timestamp, windowDuration, slideDuration),
0091 words.word
0092 ).count().orderBy('window')
0093
0094
0095 query = windowedCounts\
0096 .writeStream\
0097 .outputMode('complete')\
0098 .format('console')\
0099 .option('truncate', 'false')\
0100 .start()
0101
0102 query.awaitTermination()