0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 r"""
0019 Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
0020 network every second.
0021
0022 Usage: sql_network_wordcount.py <hostname> <port>
0023 <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
0024
0025 To run this on your local machine, you need to first run a Netcat server
0026 `$ nc -lk 9999`
0027 and then run the example
0028 `$ bin/spark-submit examples/src/main/python/streaming/sql_network_wordcount.py localhost 9999`
0029 """
0030 from __future__ import print_function
0031
0032 import sys
0033
0034 from pyspark import SparkContext
0035 from pyspark.streaming import StreamingContext
0036 from pyspark.sql import Row, SparkSession
0037
0038
0039 def getSparkSessionInstance(sparkConf):
0040 if ('sparkSessionSingletonInstance' not in globals()):
0041 globals()['sparkSessionSingletonInstance'] = SparkSession\
0042 .builder\
0043 .config(conf=sparkConf)\
0044 .getOrCreate()
0045 return globals()['sparkSessionSingletonInstance']
0046
0047
0048 if __name__ == "__main__":
0049 if len(sys.argv) != 3:
0050 print("Usage: sql_network_wordcount.py <hostname> <port> ", file=sys.stderr)
0051 sys.exit(-1)
0052 host, port = sys.argv[1:]
0053 sc = SparkContext(appName="PythonSqlNetworkWordCount")
0054 ssc = StreamingContext(sc, 1)
0055
0056
0057
0058 lines = ssc.socketTextStream(host, int(port))
0059 words = lines.flatMap(lambda line: line.split(" "))
0060
0061
0062 def process(time, rdd):
0063 print("========= %s =========" % str(time))
0064
0065 try:
0066
0067 spark = getSparkSessionInstance(rdd.context.getConf())
0068
0069
0070 rowRdd = rdd.map(lambda w: Row(word=w))
0071 wordsDataFrame = spark.createDataFrame(rowRdd)
0072
0073
0074 wordsDataFrame.createOrReplaceTempView("words")
0075
0076
0077 wordCountsDataFrame = \
0078 spark.sql("select word, count(*) as total from words group by word")
0079 wordCountsDataFrame.show()
0080 except:
0081 pass
0082
0083 words.foreachRDD(process)
0084 ssc.start()
0085 ssc.awaitTermination()