0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.examples.streaming;
0019
0020 import java.util.HashMap;
0021 import java.util.HashSet;
0022 import java.util.Arrays;
0023 import java.util.Map;
0024 import java.util.Set;
0025 import java.util.regex.Pattern;
0026
0027 import scala.Tuple2;
0028
0029 import org.apache.kafka.clients.consumer.ConsumerConfig;
0030 import org.apache.kafka.clients.consumer.ConsumerRecord;
0031 import org.apache.kafka.common.serialization.StringDeserializer;
0032
0033 import org.apache.spark.SparkConf;
0034 import org.apache.spark.streaming.api.java.*;
0035 import org.apache.spark.streaming.kafka010.ConsumerStrategies;
0036 import org.apache.spark.streaming.kafka010.KafkaUtils;
0037 import org.apache.spark.streaming.kafka010.LocationStrategies;
0038 import org.apache.spark.streaming.Durations;
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052 public final class JavaDirectKafkaWordCount {
0053 private static final Pattern SPACE = Pattern.compile(" ");
0054
0055 public static void main(String[] args) throws Exception {
0056 if (args.length < 3) {
0057 System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <groupId> <topics>\n" +
0058 " <brokers> is a list of one or more Kafka brokers\n" +
0059 " <groupId> is a consumer group name to consume from topics\n" +
0060 " <topics> is a list of one or more kafka topics to consume from\n\n");
0061 System.exit(1);
0062 }
0063
0064 StreamingExamples.setStreamingLogLevels();
0065
0066 String brokers = args[0];
0067 String groupId = args[1];
0068 String topics = args[2];
0069
0070
0071 SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
0072 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
0073
0074 Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
0075 Map<String, Object> kafkaParams = new HashMap<>();
0076 kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
0077 kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
0078 kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
0079 kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
0080
0081
0082 JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
0083 jssc,
0084 LocationStrategies.PreferConsistent(),
0085 ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
0086
0087
0088 JavaDStream<String> lines = messages.map(ConsumerRecord::value);
0089 JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
0090 JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
0091 .reduceByKey((i1, i2) -> i1 + i2);
0092 wordCounts.print();
0093
0094
0095 jssc.start();
0096 jssc.awaitTermination();
0097 }
0098 }