|
||||
0001 /* 0002 * Licensed to the Apache Software Foundation (ASF) under one or more 0003 * contributor license agreements. See the NOTICE file distributed with 0004 * this work for additional information regarding copyright ownership. 0005 * The ASF licenses this file to You under the Apache License, Version 2.0 0006 * (the "License"); you may not use this file except in compliance with 0007 * the License. You may obtain a copy of the License at 0008 * 0009 * http://www.apache.org/licenses/LICENSE-2.0 0010 * 0011 * Unless required by applicable law or agreed to in writing, software 0012 * distributed under the License is distributed on an "AS IS" BASIS, 0013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 0014 * See the License for the specific language governing permissions and 0015 * limitations under the License. 0016 */ 0017 0018 package org.apache.spark.examples.sql.streaming; 0019 0020 import org.apache.kafka.common.security.auth.SecurityProtocol; 0021 import org.apache.spark.api.java.function.FlatMapFunction; 0022 import org.apache.spark.sql.Dataset; 0023 import org.apache.spark.sql.Encoders; 0024 import org.apache.spark.sql.Row; 0025 import org.apache.spark.sql.SparkSession; 0026 import org.apache.spark.sql.streaming.StreamingQuery; 0027 0028 import java.util.Arrays; 0029 0030 /** 0031 * Consumes messages from one or more topics in Kafka and does wordcount. 0032 * Usage: JavaStructuredKerberizedKafkaWordCount <bootstrap-servers> <subscribe-type> <topics> 0033 * <bootstrap-servers> The Kafka "bootstrap.servers" configuration. A 0034 * comma-separated list of host:port. 0035 * <subscribe-type> There are three kinds of type, i.e. 'assign', 'subscribe', 0036 * 'subscribePattern'. 0037 * |- <assign> Specific TopicPartitions to consume. Json string 0038 * | {"topicA":[0,1],"topicB":[2,4]}. 0039 * |- <subscribe> The topic list to subscribe. A comma-separated list of 0040 * | topics. 0041 * |- <subscribePattern> The pattern used to subscribe to topic(s). 0042 * | Java regex string. 0043 * |- Only one of "assign, "subscribe" or "subscribePattern" options can be 0044 * | specified for Kafka source. 0045 * <topics> Different value format depends on the value of 'subscribe-type'. 0046 * 0047 * Example: 0048 * Yarn client: 0049 * $ bin/run-example --files ${jaas_path}/kafka_jaas.conf,${keytab_path}/kafka.service.keytab \ 0050 * --driver-java-options "-Djava.security.auth.login.config=${path}/kafka_driver_jaas.conf" \ 0051 * --conf \ 0052 * "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_jaas.conf" \ 0053 * --master yarn 0054 * sql.streaming.JavaStructuredKerberizedKafkaWordCount broker1-host:port,broker2-host:port \ 0055 * subscribe topic1,topic2 0056 * Yarn cluster: 0057 * $ bin/run-example --files \ 0058 * ${jaas_path}/kafka_jaas.conf,${keytab_path}/kafka.service.keytab,${krb5_path}/krb5.conf \ 0059 * --driver-java-options \ 0060 * "-Djava.security.auth.login.config=./kafka_jaas.conf \ 0061 * -Djava.security.krb5.conf=./krb5.conf" \ 0062 * --conf \ 0063 * "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_jaas.conf" \ 0064 * --master yarn --deploy-mode cluster \ 0065 * sql.streaming.JavaStructuredKerberizedKafkaWordCount broker1-host:port,broker2-host:port \ 0066 * subscribe topic1,topic2 0067 * 0068 * kafka_jaas.conf can manually create, template as: 0069 * KafkaClient { 0070 * com.sun.security.auth.module.Krb5LoginModule required 0071 * keyTab="./kafka.service.keytab" 0072 * useKeyTab=true 0073 * storeKey=true 0074 * useTicketCache=false 0075 * serviceName="kafka" 0076 * principal="kafka/host@EXAMPLE.COM"; 0077 * }; 0078 * kafka_driver_jaas.conf (used by yarn client) and kafka_jaas.conf are basically the same 0079 * except for some differences at 'keyTab'. In kafka_driver_jaas.conf, 'keyTab' should be 0080 * "${keytab_path}/kafka.service.keytab". 0081 * In addition, for IBM JVMs, please use 'com.ibm.security.auth.module.Krb5LoginModule' 0082 * instead of 'com.sun.security.auth.module.Krb5LoginModule'. 0083 * 0084 * Note that this example uses SASL_PLAINTEXT for simplicity; however, 0085 * SASL_PLAINTEXT has no SSL encryption and likely be less secure. Please consider 0086 * using SASL_SSL in production. 0087 */ 0088 public final class JavaStructuredKerberizedKafkaWordCount { 0089 public static void main(String[] args) throws Exception { 0090 if (args.length < 3) { 0091 System.err.println("Usage: JavaStructuredKerberizedKafkaWordCount <bootstrap-servers> " + 0092 "<subscribe-type> <topics>"); 0093 System.exit(1); 0094 } 0095 0096 String bootstrapServers = args[0]; 0097 String subscribeType = args[1]; 0098 String topics = args[2]; 0099 0100 SparkSession spark = SparkSession 0101 .builder() 0102 .appName("JavaStructuredKerberizedKafkaWordCount") 0103 .getOrCreate(); 0104 0105 // Create DataSet representing the stream of input lines from kafka 0106 Dataset<String> lines = spark 0107 .readStream() 0108 .format("kafka") 0109 .option("kafka.bootstrap.servers", bootstrapServers) 0110 .option(subscribeType, topics) 0111 .option("kafka.security.protocol", SecurityProtocol.SASL_PLAINTEXT.name) 0112 .load() 0113 .selectExpr("CAST(value AS STRING)") 0114 .as(Encoders.STRING()); 0115 0116 // Generate running word count 0117 Dataset<Row> wordCounts = lines.flatMap( 0118 (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), 0119 Encoders.STRING()).groupBy("value").count(); 0120 0121 // Start running the query that prints the running counts to the console 0122 StreamingQuery query = wordCounts.writeStream() 0123 .outputMode("complete") 0124 .format("console") 0125 .start(); 0126 0127 query.awaitTermination(); 0128 } 0129 }
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.1.0 LXR engine. The LXR team |