0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 r"""
0019 Shows the most positive words in UTF8 encoded, '\n' delimited text directly received the network
0020 every 5 seconds. The streaming data is joined with a static RDD of the AFINN word list
0021 (http://neuro.imm.dtu.dk/wiki/AFINN)
0022
0023 Usage: network_wordjoinsentiments.py <hostname> <port>
0024 <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
0025
0026 To run this on your local machine, you need to first run a Netcat server
0027 `$ nc -lk 9999`
0028 and then run the example
0029 `$ bin/spark-submit examples/src/main/python/streaming/network_wordjoinsentiments.py \
0030 localhost 9999`
0031 """
0032
0033 from __future__ import print_function
0034
0035 import sys
0036
0037 from pyspark import SparkContext
0038 from pyspark.streaming import StreamingContext
0039
0040
0041 def print_happiest_words(rdd):
0042 top_list = rdd.take(5)
0043 print("Happiest topics in the last 5 seconds (%d total):" % rdd.count())
0044 for tuple in top_list:
0045 print("%s (%d happiness)" % (tuple[1], tuple[0]))
0046
0047 if __name__ == "__main__":
0048 if len(sys.argv) != 3:
0049 print("Usage: network_wordjoinsentiments.py <hostname> <port>", file=sys.stderr)
0050 sys.exit(-1)
0051
0052 sc = SparkContext(appName="PythonStreamingNetworkWordJoinSentiments")
0053 ssc = StreamingContext(sc, 5)
0054
0055
0056 word_sentiments_file_path = "data/streaming/AFINN-111.txt"
0057 word_sentiments = ssc.sparkContext.textFile(word_sentiments_file_path) \
0058 .map(lambda line: tuple(line.split("\t")))
0059
0060 lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
0061
0062 word_counts = lines.flatMap(lambda line: line.split(" ")) \
0063 .map(lambda word: (word, 1)) \
0064 .reduceByKey(lambda a, b: a + b)
0065
0066
0067
0068
0069 happiest_words = word_counts.transform(lambda rdd: word_sentiments.join(rdd)) \
0070 .map(lambda word_tuples: (word_tuples[0], float(word_tuples[1][0]) * word_tuples[1][1])) \
0071 .map(lambda word_happiness: (word_happiness[1], word_happiness[0])) \
0072 .transform(lambda rdd: rdd.sortByKey(False))
0073
0074 happiest_words.foreachRDD(print_happiest_words)
0075
0076 ssc.start()
0077 ssc.awaitTermination()