Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.examples.sql.streaming;
0019 
0020 import org.apache.spark.api.java.function.FlatMapFunction;
0021 import org.apache.spark.sql.Dataset;
0022 import org.apache.spark.sql.Encoders;
0023 import org.apache.spark.sql.Row;
0024 import org.apache.spark.sql.SparkSession;
0025 import org.apache.spark.sql.streaming.StreamingQuery;
0026 
0027 import java.util.Arrays;
0028 
0029 /**
0030  * Consumes messages from one or more topics in Kafka and does wordcount.
0031  * Usage: JavaStructuredKafkaWordCount <bootstrap-servers> <subscribe-type> <topics>
0032  *   <bootstrap-servers> The Kafka "bootstrap.servers" configuration. A
0033  *   comma-separated list of host:port.
0034  *   <subscribe-type> There are three kinds of type, i.e. 'assign', 'subscribe',
0035  *   'subscribePattern'.
0036  *   |- <assign> Specific TopicPartitions to consume. Json string
0037  *   |  {"topicA":[0,1],"topicB":[2,4]}.
0038  *   |- <subscribe> The topic list to subscribe. A comma-separated list of
0039  *   |  topics.
0040  *   |- <subscribePattern> The pattern used to subscribe to topic(s).
0041  *   |  Java regex string.
0042  *   |- Only one of "assign, "subscribe" or "subscribePattern" options can be
0043  *   |  specified for Kafka source.
0044  *   <topics> Different value format depends on the value of 'subscribe-type'.
0045  *
0046  * Example:
0047  *    `$ bin/run-example \
0048  *      sql.streaming.JavaStructuredKafkaWordCount host1:port1,host2:port2 \
0049  *      subscribe topic1,topic2`
0050  */
0051 public final class JavaStructuredKafkaWordCount {
0052 
0053   public static void main(String[] args) throws Exception {
0054     if (args.length < 3) {
0055       System.err.println("Usage: JavaStructuredKafkaWordCount <bootstrap-servers> " +
0056         "<subscribe-type> <topics>");
0057       System.exit(1);
0058     }
0059 
0060     String bootstrapServers = args[0];
0061     String subscribeType = args[1];
0062     String topics = args[2];
0063 
0064     SparkSession spark = SparkSession
0065       .builder()
0066       .appName("JavaStructuredKafkaWordCount")
0067       .getOrCreate();
0068 
0069     // Create DataSet representing the stream of input lines from kafka
0070     Dataset<String> lines = spark
0071       .readStream()
0072       .format("kafka")
0073       .option("kafka.bootstrap.servers", bootstrapServers)
0074       .option(subscribeType, topics)
0075       .load()
0076       .selectExpr("CAST(value AS STRING)")
0077       .as(Encoders.STRING());
0078 
0079     // Generate running word count
0080     Dataset<Row> wordCounts = lines.flatMap(
0081         (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
0082         Encoders.STRING()).groupBy("value").count();
0083 
0084     // Start running the query that prints the running counts to the console
0085     StreamingQuery query = wordCounts.writeStream()
0086       .outputMode("complete")
0087       .format("console")
0088       .start();
0089 
0090     query.awaitTermination();
0091   }
0092 }