Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019  Counts words in text encoded with UTF8 received from the network every second.
0020 
0021  Usage: recoverable_network_wordcount.py <hostname> <port> <checkpoint-directory> <output-file>
0022    <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
0023    data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
0024    <output-file> file to which the word counts will be appended
0025 
0026  To run this on your local machine, you need to first run a Netcat server
0027     `$ nc -lk 9999`
0028 
0029  and then run the example
0030     `$ bin/spark-submit examples/src/main/python/streaming/recoverable_network_wordcount.py \
0031         localhost 9999 ~/checkpoint/ ~/out`
0032 
0033  If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
0034  a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
0035  checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
0036  the checkpoint data.
0037 """
0038 from __future__ import print_function
0039 
0040 import os
0041 import sys
0042 
0043 from pyspark import SparkContext
0044 from pyspark.streaming import StreamingContext
0045 
0046 
0047 # Get or register a Broadcast variable
0048 def getWordBlacklist(sparkContext):
0049     if ('wordBlacklist' not in globals()):
0050         globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
0051     return globals()['wordBlacklist']
0052 
0053 
0054 # Get or register an Accumulator
0055 def getDroppedWordsCounter(sparkContext):
0056     if ('droppedWordsCounter' not in globals()):
0057         globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
0058     return globals()['droppedWordsCounter']
0059 
0060 
0061 def createContext(host, port, outputPath):
0062     # If you do not see this printed, that means the StreamingContext has been loaded
0063     # from the new checkpoint
0064     print("Creating new context")
0065     if os.path.exists(outputPath):
0066         os.remove(outputPath)
0067     sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
0068     ssc = StreamingContext(sc, 1)
0069 
0070     # Create a socket stream on target ip:port and count the
0071     # words in input stream of \n delimited text (eg. generated by 'nc')
0072     lines = ssc.socketTextStream(host, port)
0073     words = lines.flatMap(lambda line: line.split(" "))
0074     wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
0075 
0076     def echo(time, rdd):
0077         # Get or register the blacklist Broadcast
0078         blacklist = getWordBlacklist(rdd.context)
0079         # Get or register the droppedWordsCounter Accumulator
0080         droppedWordsCounter = getDroppedWordsCounter(rdd.context)
0081 
0082         # Use blacklist to drop words and use droppedWordsCounter to count them
0083         def filterFunc(wordCount):
0084             if wordCount[0] in blacklist.value:
0085                 droppedWordsCounter.add(wordCount[1])
0086                 return False
0087             else:
0088                 return True
0089 
0090         counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
0091         print(counts)
0092         print("Dropped %d word(s) totally" % droppedWordsCounter.value)
0093         print("Appending to " + os.path.abspath(outputPath))
0094         with open(outputPath, 'a') as f:
0095             f.write(counts + "\n")
0096 
0097     wordCounts.foreachRDD(echo)
0098     return ssc
0099 
0100 if __name__ == "__main__":
0101     if len(sys.argv) != 5:
0102         print("Usage: recoverable_network_wordcount.py <hostname> <port> "
0103               "<checkpoint-directory> <output-file>", file=sys.stderr)
0104         sys.exit(-1)
0105     host, port, checkpoint, output = sys.argv[1:]
0106     ssc = StreamingContext.getOrCreate(checkpoint,
0107                                        lambda: createContext(host, int(port), output))
0108     ssc.start()
0109     ssc.awaitTermination()