Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.streaming.kafka010;
0019 
0020 import java.io.Serializable;
0021 import java.util.*;
0022 import java.util.concurrent.atomic.AtomicReference;
0023 
0024 import org.apache.kafka.common.serialization.StringDeserializer;
0025 import org.apache.kafka.clients.consumer.ConsumerRecord;
0026 
0027 import org.junit.After;
0028 import org.junit.Assert;
0029 import org.junit.Before;
0030 import org.junit.Test;
0031 
0032 import org.apache.spark.SparkConf;
0033 import org.apache.spark.api.java.JavaRDD;
0034 import org.apache.spark.api.java.function.Function;
0035 import org.apache.spark.api.java.function.VoidFunction;
0036 import org.apache.spark.streaming.Durations;
0037 import org.apache.spark.streaming.api.java.JavaDStream;
0038 import org.apache.spark.streaming.api.java.JavaInputDStream;
0039 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0040 
0041 public class JavaDirectKafkaStreamSuite implements Serializable {
0042   private transient JavaStreamingContext ssc = null;
0043   private transient KafkaTestUtils kafkaTestUtils = null;
0044 
0045   @Before
0046   public void setUp() {
0047     kafkaTestUtils = new KafkaTestUtils();
0048     kafkaTestUtils.setup();
0049     SparkConf sparkConf = new SparkConf()
0050       .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
0051     ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
0052   }
0053 
0054   @After
0055   public void tearDown() {
0056     if (ssc != null) {
0057       ssc.stop();
0058       ssc = null;
0059     }
0060 
0061     if (kafkaTestUtils != null) {
0062       kafkaTestUtils.teardown();
0063       kafkaTestUtils = null;
0064     }
0065   }
0066 
0067   @Test
0068   public void testKafkaStream() throws InterruptedException {
0069     final String topic1 = "topic1";
0070     final String topic2 = "topic2";
0071     // hold a reference to the current offset ranges, so it can be used downstream
0072     final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
0073 
0074     String[] topic1data = createTopicAndSendData(topic1);
0075     String[] topic2data = createTopicAndSendData(topic2);
0076 
0077     Set<String> sent = new HashSet<>();
0078     sent.addAll(Arrays.asList(topic1data));
0079     sent.addAll(Arrays.asList(topic2data));
0080 
0081     Random random = new Random();
0082 
0083     final Map<String, Object> kafkaParams = new HashMap<>();
0084     kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
0085     kafkaParams.put("key.deserializer", StringDeserializer.class);
0086     kafkaParams.put("value.deserializer", StringDeserializer.class);
0087     kafkaParams.put("auto.offset.reset", "earliest");
0088     kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt() +
0089       "-" + System.currentTimeMillis());
0090 
0091     JavaInputDStream<ConsumerRecord<String, String>> istream1 = KafkaUtils.createDirectStream(
0092         ssc,
0093         LocationStrategies.PreferConsistent(),
0094         ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topic1), kafkaParams)
0095     );
0096 
0097     JavaDStream<String> stream1 = istream1.transform(
0098       // Make sure you can get offset ranges from the rdd
0099       new Function<JavaRDD<ConsumerRecord<String, String>>,
0100         JavaRDD<ConsumerRecord<String, String>>>() {
0101           @Override
0102           public JavaRDD<ConsumerRecord<String, String>> call(
0103             JavaRDD<ConsumerRecord<String, String>> rdd
0104           ) {
0105             OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
0106             offsetRanges.set(offsets);
0107             Assert.assertEquals(topic1, offsets[0].topic());
0108             return rdd;
0109           }
0110         }
0111     ).map(
0112         new Function<ConsumerRecord<String, String>, String>() {
0113           @Override
0114           public String call(ConsumerRecord<String, String> r) {
0115             return r.value();
0116           }
0117         }
0118     );
0119 
0120     final Map<String, Object> kafkaParams2 = new HashMap<>(kafkaParams);
0121     kafkaParams2.put("group.id", "java-test-consumer-" + random.nextInt() +
0122       "-" + System.currentTimeMillis());
0123 
0124     JavaInputDStream<ConsumerRecord<String, String>> istream2 = KafkaUtils.createDirectStream(
0125         ssc,
0126         LocationStrategies.PreferConsistent(),
0127         ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topic2), kafkaParams2)
0128     );
0129 
0130     JavaDStream<String> stream2 = istream2.transform(
0131       // Make sure you can get offset ranges from the rdd
0132       new Function<JavaRDD<ConsumerRecord<String, String>>,
0133         JavaRDD<ConsumerRecord<String, String>>>() {
0134           @Override
0135           public JavaRDD<ConsumerRecord<String, String>> call(
0136             JavaRDD<ConsumerRecord<String, String>> rdd
0137           ) {
0138             OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
0139             offsetRanges.set(offsets);
0140             Assert.assertEquals(topic2, offsets[0].topic());
0141             return rdd;
0142           }
0143         }
0144     ).map(
0145         new Function<ConsumerRecord<String, String>, String>() {
0146           @Override
0147           public String call(ConsumerRecord<String, String> r) {
0148             return r.value();
0149           }
0150         }
0151     );
0152 
0153     JavaDStream<String> unifiedStream = stream1.union(stream2);
0154 
0155     final Set<String> result = Collections.synchronizedSet(new HashSet<>());
0156     unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
0157           @Override
0158           public void call(JavaRDD<String> rdd) {
0159             result.addAll(rdd.collect());
0160           }
0161         }
0162     );
0163     ssc.start();
0164     long startTime = System.currentTimeMillis();
0165     boolean matches = false;
0166     while (!matches && System.currentTimeMillis() - startTime < 20000) {
0167       matches = sent.size() == result.size();
0168       Thread.sleep(50);
0169     }
0170     Assert.assertEquals(sent, result);
0171     ssc.stop();
0172   }
0173 
0174   private  String[] createTopicAndSendData(String topic) {
0175     String[] data = { topic + "-1", topic + "-2", topic + "-3"};
0176     kafkaTestUtils.createTopic(topic);
0177     kafkaTestUtils.sendMessages(topic, data);
0178     return data;
0179   }
0180 }