0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 r"""
0019 Counts words in UTF8 encoded, '\n' delimited text received from the
0020 network every second.
0021
0022 Usage: stateful_network_wordcount.py <hostname> <port>
0023 <hostname> and <port> describe the TCP server that Spark Streaming
0024 would connect to receive data.
0025
0026 To run this on your local machine, you need to first run a Netcat server
0027 `$ nc -lk 9999`
0028 and then run the example
0029 `$ bin/spark-submit examples/src/main/python/streaming/stateful_network_wordcount.py \
0030 localhost 9999`
0031 """
0032 from __future__ import print_function
0033
0034 import sys
0035
0036 from pyspark import SparkContext
0037 from pyspark.streaming import StreamingContext
0038
0039 if __name__ == "__main__":
0040 if len(sys.argv) != 3:
0041 print("Usage: stateful_network_wordcount.py <hostname> <port>", file=sys.stderr)
0042 sys.exit(-1)
0043 sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
0044 ssc = StreamingContext(sc, 1)
0045 ssc.checkpoint("checkpoint")
0046
0047
0048 initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
0049
0050 def updateFunc(new_values, last_sum):
0051 return sum(new_values) + (last_sum or 0)
0052
0053 lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
0054 running_counts = lines.flatMap(lambda line: line.split(" "))\
0055 .map(lambda word: (word, 1))\
0056 .updateStateByKey(updateFunc, initialRDD=initialStateRDD)
0057
0058 running_counts.pprint()
0059
0060 ssc.start()
0061 ssc.awaitTermination()