Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 r"""
0019  Counts words in UTF8 encoded, '\n' delimited text received from the network over a
0020  sliding window of configurable duration. Each line from the network is tagged
0021  with a timestamp that is used to determine the windows into which it falls.
0022 
0023  Usage: structured_network_wordcount_windowed.py <hostname> <port> <window duration>
0024    [<slide duration>]
0025  <hostname> and <port> describe the TCP server that Structured Streaming
0026  would connect to receive data.
0027  <window duration> gives the size of window, specified as integer number of seconds
0028  <slide duration> gives the amount of time successive windows are offset from one another,
0029  given in the same units as above. <slide duration> should be less than or equal to
0030  <window duration>. If the two are equal, successive windows have no overlap. If
0031  <slide duration> is not provided, it defaults to <window duration>.
0032 
0033  To run this on your local machine, you need to first run a Netcat server
0034     `$ nc -lk 9999`
0035  and then run the example
0036     `$ bin/spark-submit
0037     examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
0038     localhost 9999 <window duration> [<slide duration>]`
0039 
0040  One recommended <window duration>, <slide duration> pair is 10, 5
0041 """
0042 from __future__ import print_function
0043 
0044 import sys
0045 
0046 from pyspark.sql import SparkSession
0047 from pyspark.sql.functions import explode
0048 from pyspark.sql.functions import split
0049 from pyspark.sql.functions import window
0050 
0051 if __name__ == "__main__":
0052     if len(sys.argv) != 5 and len(sys.argv) != 4:
0053         msg = ("Usage: structured_network_wordcount_windowed.py <hostname> <port> "
0054                "<window duration in seconds> [<slide duration in seconds>]")
0055         print(msg, file=sys.stderr)
0056         sys.exit(-1)
0057 
0058     host = sys.argv[1]
0059     port = int(sys.argv[2])
0060     windowSize = int(sys.argv[3])
0061     slideSize = int(sys.argv[4]) if (len(sys.argv) == 5) else windowSize
0062     if slideSize > windowSize:
0063         print("<slide duration> must be less than or equal to <window duration>", file=sys.stderr)
0064     windowDuration = '{} seconds'.format(windowSize)
0065     slideDuration = '{} seconds'.format(slideSize)
0066 
0067     spark = SparkSession\
0068         .builder\
0069         .appName("StructuredNetworkWordCountWindowed")\
0070         .getOrCreate()
0071 
0072     # Create DataFrame representing the stream of input lines from connection to host:port
0073     lines = spark\
0074         .readStream\
0075         .format('socket')\
0076         .option('host', host)\
0077         .option('port', port)\
0078         .option('includeTimestamp', 'true')\
0079         .load()
0080 
0081     # Split the lines into words, retaining timestamps
0082     # split() splits each line into an array, and explode() turns the array into multiple rows
0083     words = lines.select(
0084         explode(split(lines.value, ' ')).alias('word'),
0085         lines.timestamp
0086     )
0087 
0088     # Group the data by window and word and compute the count of each group
0089     windowedCounts = words.groupBy(
0090         window(words.timestamp, windowDuration, slideDuration),
0091         words.word
0092     ).count().orderBy('window')
0093 
0094     # Start running the query that prints the windowed word counts to the console
0095     query = windowedCounts\
0096         .writeStream\
0097         .outputMode('complete')\
0098         .format('console')\
0099         .option('truncate', 'false')\
0100         .start()
0101 
0102     query.awaitTermination()