0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 Counts words in text encoded with UTF8 received from the network every second.
0020
0021 Usage: recoverable_network_wordcount.py <hostname> <port> <checkpoint-directory> <output-file>
0022 <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
0023 data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
0024 <output-file> file to which the word counts will be appended
0025
0026 To run this on your local machine, you need to first run a Netcat server
0027 `$ nc -lk 9999`
0028
0029 and then run the example
0030 `$ bin/spark-submit examples/src/main/python/streaming/recoverable_network_wordcount.py \
0031 localhost 9999 ~/checkpoint/ ~/out`
0032
0033 If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
0034 a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
0035 checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
0036 the checkpoint data.
0037 """
0038 from __future__ import print_function
0039
0040 import os
0041 import sys
0042
0043 from pyspark import SparkContext
0044 from pyspark.streaming import StreamingContext
0045
0046
0047
0048 def getWordBlacklist(sparkContext):
0049 if ('wordBlacklist' not in globals()):
0050 globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
0051 return globals()['wordBlacklist']
0052
0053
0054
0055 def getDroppedWordsCounter(sparkContext):
0056 if ('droppedWordsCounter' not in globals()):
0057 globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
0058 return globals()['droppedWordsCounter']
0059
0060
0061 def createContext(host, port, outputPath):
0062
0063
0064 print("Creating new context")
0065 if os.path.exists(outputPath):
0066 os.remove(outputPath)
0067 sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
0068 ssc = StreamingContext(sc, 1)
0069
0070
0071
0072 lines = ssc.socketTextStream(host, port)
0073 words = lines.flatMap(lambda line: line.split(" "))
0074 wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
0075
0076 def echo(time, rdd):
0077
0078 blacklist = getWordBlacklist(rdd.context)
0079
0080 droppedWordsCounter = getDroppedWordsCounter(rdd.context)
0081
0082
0083 def filterFunc(wordCount):
0084 if wordCount[0] in blacklist.value:
0085 droppedWordsCounter.add(wordCount[1])
0086 return False
0087 else:
0088 return True
0089
0090 counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
0091 print(counts)
0092 print("Dropped %d word(s) totally" % droppedWordsCounter.value)
0093 print("Appending to " + os.path.abspath(outputPath))
0094 with open(outputPath, 'a') as f:
0095 f.write(counts + "\n")
0096
0097 wordCounts.foreachRDD(echo)
0098 return ssc
0099
0100 if __name__ == "__main__":
0101 if len(sys.argv) != 5:
0102 print("Usage: recoverable_network_wordcount.py <hostname> <port> "
0103 "<checkpoint-directory> <output-file>", file=sys.stderr)
0104 sys.exit(-1)
0105 host, port, checkpoint, output = sys.argv[1:]
0106 ssc = StreamingContext.getOrCreate(checkpoint,
0107 lambda: createContext(host, int(port), output))
0108 ssc.start()
0109 ssc.awaitTermination()