0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 Consumes messages from one or more topics in Kafka and does wordcount.
0020 Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
0021 <bootstrap-servers> The Kafka "bootstrap.servers" configuration. A
0022 comma-separated list of host:port.
0023 <subscribe-type> There are three kinds of type, i.e. 'assign', 'subscribe',
0024 'subscribePattern'.
0025 |- <assign> Specific TopicPartitions to consume. Json string
0026 | {"topicA":[0,1],"topicB":[2,4]}.
0027 |- <subscribe> The topic list to subscribe. A comma-separated list of
0028 | topics.
0029 |- <subscribePattern> The pattern used to subscribe to topic(s).
0030 | Java regex string.
0031 |- Only one of "assign, "subscribe" or "subscribePattern" options can be
0032 | specified for Kafka source.
0033 <topics> Different value format depends on the value of 'subscribe-type'.
0034
0035 Run the example
0036 `$ bin/spark-submit examples/src/main/python/sql/streaming/structured_kafka_wordcount.py \
0037 host1:port1,host2:port2 subscribe topic1,topic2`
0038 """
0039 from __future__ import print_function
0040
0041 import sys
0042
0043 from pyspark.sql import SparkSession
0044 from pyspark.sql.functions import explode
0045 from pyspark.sql.functions import split
0046
0047 if __name__ == "__main__":
0048 if len(sys.argv) != 4:
0049 print("""
0050 Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
0051 """, file=sys.stderr)
0052 sys.exit(-1)
0053
0054 bootstrapServers = sys.argv[1]
0055 subscribeType = sys.argv[2]
0056 topics = sys.argv[3]
0057
0058 spark = SparkSession\
0059 .builder\
0060 .appName("StructuredKafkaWordCount")\
0061 .getOrCreate()
0062
0063
0064 lines = spark\
0065 .readStream\
0066 .format("kafka")\
0067 .option("kafka.bootstrap.servers", bootstrapServers)\
0068 .option(subscribeType, topics)\
0069 .load()\
0070 .selectExpr("CAST(value AS STRING)")
0071
0072
0073 words = lines.select(
0074
0075 explode(
0076 split(lines.value, ' ')
0077 ).alias('word')
0078 )
0079
0080
0081 wordCounts = words.groupBy('word').count()
0082
0083
0084 query = wordCounts\
0085 .writeStream\
0086 .outputMode('complete')\
0087 .format('console')\
0088 .start()
0089
0090 query.awaitTermination()