0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.examples.streaming;
0019
0020 import java.util.HashMap;
0021 import java.util.HashSet;
0022 import java.util.Arrays;
0023 import java.util.Map;
0024 import java.util.Set;
0025 import java.util.regex.Pattern;
0026
0027 import scala.Tuple2;
0028
0029 import org.apache.kafka.clients.CommonClientConfigs;
0030 import org.apache.kafka.common.security.auth.SecurityProtocol;
0031 import org.apache.kafka.clients.consumer.ConsumerConfig;
0032 import org.apache.kafka.clients.consumer.ConsumerRecord;
0033 import org.apache.kafka.common.serialization.StringDeserializer;
0034
0035 import org.apache.spark.SparkConf;
0036 import org.apache.spark.streaming.api.java.*;
0037 import org.apache.spark.streaming.kafka010.ConsumerStrategies;
0038 import org.apache.spark.streaming.kafka010.KafkaUtils;
0039 import org.apache.spark.streaming.kafka010.LocationStrategies;
0040 import org.apache.spark.streaming.Durations;
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091 public final class JavaDirectKerberizedKafkaWordCount {
0092 private static final Pattern SPACE = Pattern.compile(" ");
0093
0094 public static void main(String[] args) throws Exception {
0095 if (args.length < 3) {
0096 System.err.println(
0097 "Usage: JavaDirectKerberizedKafkaWordCount <brokers> <groupId> <topics>\n" +
0098 " <brokers> is a list of one or more Kafka brokers\n" +
0099 " <groupId> is a consumer group name to consume from topics\n" +
0100 " <topics> is a list of one or more kafka topics to consume from\n\n");
0101 System.exit(1);
0102 }
0103
0104 StreamingExamples.setStreamingLogLevels();
0105
0106 String brokers = args[0];
0107 String groupId = args[1];
0108 String topics = args[2];
0109
0110
0111 SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKerberizedKafkaWordCount");
0112 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
0113
0114 Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
0115 Map<String, Object> kafkaParams = new HashMap<>();
0116 kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
0117 kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
0118 kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
0119 kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
0120 kafkaParams.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
0121 SecurityProtocol.SASL_PLAINTEXT.name);
0122
0123
0124 JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
0125 jssc,
0126 LocationStrategies.PreferConsistent(),
0127 ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
0128
0129
0130 JavaDStream<String> lines = messages.map(ConsumerRecord::value);
0131 JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
0132 JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
0133 .reduceByKey((i1, i2) -> i1 + i2);
0134 wordCounts.print();
0135
0136
0137 jssc.start();
0138 jssc.awaitTermination();
0139 }
0140 }