Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.examples.streaming;
0019 
0020 import java.util.HashMap;
0021 import java.util.HashSet;
0022 import java.util.Arrays;
0023 import java.util.Map;
0024 import java.util.Set;
0025 import java.util.regex.Pattern;
0026 
0027 import scala.Tuple2;
0028 
0029 import org.apache.kafka.clients.consumer.ConsumerConfig;
0030 import org.apache.kafka.clients.consumer.ConsumerRecord;
0031 import org.apache.kafka.common.serialization.StringDeserializer;
0032 
0033 import org.apache.spark.SparkConf;
0034 import org.apache.spark.streaming.api.java.*;
0035 import org.apache.spark.streaming.kafka010.ConsumerStrategies;
0036 import org.apache.spark.streaming.kafka010.KafkaUtils;
0037 import org.apache.spark.streaming.kafka010.LocationStrategies;
0038 import org.apache.spark.streaming.Durations;
0039 
0040 /**
0041  * Consumes messages from one or more topics in Kafka and does wordcount.
0042  * Usage: JavaDirectKafkaWordCount <brokers> <groupId> <topics>
0043  *   <brokers> is a list of one or more Kafka brokers
0044  *   <groupId> is a consumer group name to consume from topics
0045  *   <topics> is a list of one or more kafka topics to consume from
0046  *
0047  * Example:
0048  *    $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port \
0049  *      consumer-group topic1,topic2
0050  */
0051 
0052 public final class JavaDirectKafkaWordCount {
0053   private static final Pattern SPACE = Pattern.compile(" ");
0054 
0055   public static void main(String[] args) throws Exception {
0056     if (args.length < 3) {
0057       System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <groupId> <topics>\n" +
0058                          "  <brokers> is a list of one or more Kafka brokers\n" +
0059                          "  <groupId> is a consumer group name to consume from topics\n" +
0060                          "  <topics> is a list of one or more kafka topics to consume from\n\n");
0061       System.exit(1);
0062     }
0063 
0064     StreamingExamples.setStreamingLogLevels();
0065 
0066     String brokers = args[0];
0067     String groupId = args[1];
0068     String topics = args[2];
0069 
0070     // Create context with a 2 seconds batch interval
0071     SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
0072     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
0073 
0074     Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
0075     Map<String, Object> kafkaParams = new HashMap<>();
0076     kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
0077     kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
0078     kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
0079     kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
0080 
0081     // Create direct kafka stream with brokers and topics
0082     JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
0083         jssc,
0084         LocationStrategies.PreferConsistent(),
0085         ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
0086 
0087     // Get the lines, split them into words, count the words and print
0088     JavaDStream<String> lines = messages.map(ConsumerRecord::value);
0089     JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
0090     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
0091         .reduceByKey((i1, i2) -> i1 + i2);
0092     wordCounts.print();
0093 
0094     // Start the computation
0095     jssc.start();
0096     jssc.awaitTermination();
0097   }
0098 }