Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.examples.streaming;
0019 
0020 import java.util.HashMap;
0021 import java.util.HashSet;
0022 import java.util.Arrays;
0023 import java.util.Map;
0024 import java.util.Set;
0025 import java.util.regex.Pattern;
0026 
0027 import scala.Tuple2;
0028 
0029 import org.apache.kafka.clients.CommonClientConfigs;
0030 import org.apache.kafka.common.security.auth.SecurityProtocol;
0031 import org.apache.kafka.clients.consumer.ConsumerConfig;
0032 import org.apache.kafka.clients.consumer.ConsumerRecord;
0033 import org.apache.kafka.common.serialization.StringDeserializer;
0034 
0035 import org.apache.spark.SparkConf;
0036 import org.apache.spark.streaming.api.java.*;
0037 import org.apache.spark.streaming.kafka010.ConsumerStrategies;
0038 import org.apache.spark.streaming.kafka010.KafkaUtils;
0039 import org.apache.spark.streaming.kafka010.LocationStrategies;
0040 import org.apache.spark.streaming.Durations;
0041 
0042 /**
0043  * Consumes messages from one or more topics in Kafka and does wordcount.
0044  * Usage: JavaDirectKerberizedKafkaWordCount <brokers> <groupId> <topics>
0045  *   <brokers> is a list of one or more Kafka brokers
0046  *   <groupId> is a consumer group name to consume from topics
0047  *   <topics> is a list of one or more kafka topics to consume from
0048  *
0049  * Example:
0050  *   Yarn client:
0051  *    $ bin/run-example --files ${jaas_path}/kafka_jaas.conf,${keytab_path}/kafka.service.keytab \
0052  *      --driver-java-options "-Djava.security.auth.login.config=${path}/kafka_driver_jaas.conf" \
0053  *      --conf \
0054  *      "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_jaas.conf" \
0055  *      --master yarn
0056  *      streaming.JavaDirectKerberizedKafkaWordCount broker1-host:port,broker2-host:port \
0057  *      consumer-group topic1,topic2
0058  *   Yarn cluster:
0059  *    $ bin/run-example --files \
0060  *      ${jaas_path}/kafka_jaas.conf,${keytab_path}/kafka.service.keytab,${krb5_path}/krb5.conf \
0061  *      --driver-java-options \
0062  *      "-Djava.security.auth.login.config=./kafka_jaas.conf \
0063  *      -Djava.security.krb5.conf=./krb5.conf" \
0064  *      --conf \
0065  *      "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_jaas.conf" \
0066  *      --master yarn --deploy-mode cluster \
0067  *      streaming.JavaDirectKerberizedKafkaWordCount broker1-host:port,broker2-host:port \
0068  *      consumer-group topic1,topic2
0069  *
0070  * kafka_jaas.conf can manually create, template as:
0071  *   KafkaClient {
0072  *     com.sun.security.auth.module.Krb5LoginModule required
0073  *     keyTab="./kafka.service.keytab"
0074  *     useKeyTab=true
0075  *     storeKey=true
0076  *     useTicketCache=false
0077  *     serviceName="kafka"
0078  *     principal="kafka/host@EXAMPLE.COM";
0079  *   };
0080  * kafka_driver_jaas.conf (used by yarn client) and kafka_jaas.conf are basically the same
0081  * except for some differences at 'keyTab'. In kafka_driver_jaas.conf, 'keyTab' should be
0082  * "${keytab_path}/kafka.service.keytab".
0083  * In addition, for IBM JVMs, please use 'com.ibm.security.auth.module.Krb5LoginModule'
0084  * instead of 'com.sun.security.auth.module.Krb5LoginModule'.
0085  *
0086  * Note that this example uses SASL_PLAINTEXT for simplicity; however,
0087  * SASL_PLAINTEXT has no SSL encryption and likely be less secure. Please consider
0088  * using SASL_SSL in production.
0089  */
0090 
0091 public final class JavaDirectKerberizedKafkaWordCount {
0092   private static final Pattern SPACE = Pattern.compile(" ");
0093 
0094   public static void main(String[] args) throws Exception {
0095     if (args.length < 3) {
0096       System.err.println(
0097         "Usage: JavaDirectKerberizedKafkaWordCount <brokers> <groupId> <topics>\n" +
0098         "  <brokers> is a list of one or more Kafka brokers\n" +
0099         "  <groupId> is a consumer group name to consume from topics\n" +
0100         "  <topics> is a list of one or more kafka topics to consume from\n\n");
0101       System.exit(1);
0102     }
0103 
0104     StreamingExamples.setStreamingLogLevels();
0105 
0106     String brokers = args[0];
0107     String groupId = args[1];
0108     String topics = args[2];
0109 
0110     // Create context with a 2 seconds batch interval
0111     SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKerberizedKafkaWordCount");
0112     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
0113 
0114     Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
0115     Map<String, Object> kafkaParams = new HashMap<>();
0116     kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
0117     kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
0118     kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
0119     kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
0120     kafkaParams.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
0121                                                   SecurityProtocol.SASL_PLAINTEXT.name);
0122 
0123     // Create direct kafka stream with brokers and topics
0124     JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
0125       jssc,
0126       LocationStrategies.PreferConsistent(),
0127       ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
0128 
0129     // Get the lines, split them into words, count the words and print
0130     JavaDStream<String> lines = messages.map(ConsumerRecord::value);
0131     JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
0132     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
0133       .reduceByKey((i1, i2) -> i1 + i2);
0134     wordCounts.print();
0135 
0136     // Start the computation
0137     jssc.start();
0138     jssc.awaitTermination();
0139   }
0140 }