Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019  Consumes messages from one or more topics in Kafka and does wordcount.
0020  Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
0021    <bootstrap-servers> The Kafka "bootstrap.servers" configuration. A
0022    comma-separated list of host:port.
0023    <subscribe-type> There are three kinds of type, i.e. 'assign', 'subscribe',
0024    'subscribePattern'.
0025    |- <assign> Specific TopicPartitions to consume. Json string
0026    |  {"topicA":[0,1],"topicB":[2,4]}.
0027    |- <subscribe> The topic list to subscribe. A comma-separated list of
0028    |  topics.
0029    |- <subscribePattern> The pattern used to subscribe to topic(s).
0030    |  Java regex string.
0031    |- Only one of "assign, "subscribe" or "subscribePattern" options can be
0032    |  specified for Kafka source.
0033    <topics> Different value format depends on the value of 'subscribe-type'.
0034 
0035  Run the example
0036     `$ bin/spark-submit examples/src/main/python/sql/streaming/structured_kafka_wordcount.py \
0037     host1:port1,host2:port2 subscribe topic1,topic2`
0038 """
0039 from __future__ import print_function
0040 
0041 import sys
0042 
0043 from pyspark.sql import SparkSession
0044 from pyspark.sql.functions import explode
0045 from pyspark.sql.functions import split
0046 
0047 if __name__ == "__main__":
0048     if len(sys.argv) != 4:
0049         print("""
0050         Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
0051         """, file=sys.stderr)
0052         sys.exit(-1)
0053 
0054     bootstrapServers = sys.argv[1]
0055     subscribeType = sys.argv[2]
0056     topics = sys.argv[3]
0057 
0058     spark = SparkSession\
0059         .builder\
0060         .appName("StructuredKafkaWordCount")\
0061         .getOrCreate()
0062 
0063     # Create DataSet representing the stream of input lines from kafka
0064     lines = spark\
0065         .readStream\
0066         .format("kafka")\
0067         .option("kafka.bootstrap.servers", bootstrapServers)\
0068         .option(subscribeType, topics)\
0069         .load()\
0070         .selectExpr("CAST(value AS STRING)")
0071 
0072     # Split the lines into words
0073     words = lines.select(
0074         # explode turns each item in an array into a separate row
0075         explode(
0076             split(lines.value, ' ')
0077         ).alias('word')
0078     )
0079 
0080     # Generate running word count
0081     wordCounts = words.groupBy('word').count()
0082 
0083     # Start running the query that prints the running counts to the console
0084     query = wordCounts\
0085         .writeStream\
0086         .outputMode('complete')\
0087         .format('console')\
0088         .start()
0089 
0090     query.awaitTermination()