0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.streaming.kafka010;
0019
0020 import java.io.Serializable;
0021 import java.util.*;
0022 import java.util.concurrent.atomic.AtomicReference;
0023
0024 import org.apache.kafka.common.serialization.StringDeserializer;
0025 import org.apache.kafka.clients.consumer.ConsumerRecord;
0026
0027 import org.junit.After;
0028 import org.junit.Assert;
0029 import org.junit.Before;
0030 import org.junit.Test;
0031
0032 import org.apache.spark.SparkConf;
0033 import org.apache.spark.api.java.JavaRDD;
0034 import org.apache.spark.api.java.function.Function;
0035 import org.apache.spark.api.java.function.VoidFunction;
0036 import org.apache.spark.streaming.Durations;
0037 import org.apache.spark.streaming.api.java.JavaDStream;
0038 import org.apache.spark.streaming.api.java.JavaInputDStream;
0039 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0040
0041 public class JavaDirectKafkaStreamSuite implements Serializable {
0042 private transient JavaStreamingContext ssc = null;
0043 private transient KafkaTestUtils kafkaTestUtils = null;
0044
0045 @Before
0046 public void setUp() {
0047 kafkaTestUtils = new KafkaTestUtils();
0048 kafkaTestUtils.setup();
0049 SparkConf sparkConf = new SparkConf()
0050 .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
0051 ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
0052 }
0053
0054 @After
0055 public void tearDown() {
0056 if (ssc != null) {
0057 ssc.stop();
0058 ssc = null;
0059 }
0060
0061 if (kafkaTestUtils != null) {
0062 kafkaTestUtils.teardown();
0063 kafkaTestUtils = null;
0064 }
0065 }
0066
0067 @Test
0068 public void testKafkaStream() throws InterruptedException {
0069 final String topic1 = "topic1";
0070 final String topic2 = "topic2";
0071
0072 final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
0073
0074 String[] topic1data = createTopicAndSendData(topic1);
0075 String[] topic2data = createTopicAndSendData(topic2);
0076
0077 Set<String> sent = new HashSet<>();
0078 sent.addAll(Arrays.asList(topic1data));
0079 sent.addAll(Arrays.asList(topic2data));
0080
0081 Random random = new Random();
0082
0083 final Map<String, Object> kafkaParams = new HashMap<>();
0084 kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
0085 kafkaParams.put("key.deserializer", StringDeserializer.class);
0086 kafkaParams.put("value.deserializer", StringDeserializer.class);
0087 kafkaParams.put("auto.offset.reset", "earliest");
0088 kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt() +
0089 "-" + System.currentTimeMillis());
0090
0091 JavaInputDStream<ConsumerRecord<String, String>> istream1 = KafkaUtils.createDirectStream(
0092 ssc,
0093 LocationStrategies.PreferConsistent(),
0094 ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topic1), kafkaParams)
0095 );
0096
0097 JavaDStream<String> stream1 = istream1.transform(
0098
0099 new Function<JavaRDD<ConsumerRecord<String, String>>,
0100 JavaRDD<ConsumerRecord<String, String>>>() {
0101 @Override
0102 public JavaRDD<ConsumerRecord<String, String>> call(
0103 JavaRDD<ConsumerRecord<String, String>> rdd
0104 ) {
0105 OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
0106 offsetRanges.set(offsets);
0107 Assert.assertEquals(topic1, offsets[0].topic());
0108 return rdd;
0109 }
0110 }
0111 ).map(
0112 new Function<ConsumerRecord<String, String>, String>() {
0113 @Override
0114 public String call(ConsumerRecord<String, String> r) {
0115 return r.value();
0116 }
0117 }
0118 );
0119
0120 final Map<String, Object> kafkaParams2 = new HashMap<>(kafkaParams);
0121 kafkaParams2.put("group.id", "java-test-consumer-" + random.nextInt() +
0122 "-" + System.currentTimeMillis());
0123
0124 JavaInputDStream<ConsumerRecord<String, String>> istream2 = KafkaUtils.createDirectStream(
0125 ssc,
0126 LocationStrategies.PreferConsistent(),
0127 ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topic2), kafkaParams2)
0128 );
0129
0130 JavaDStream<String> stream2 = istream2.transform(
0131
0132 new Function<JavaRDD<ConsumerRecord<String, String>>,
0133 JavaRDD<ConsumerRecord<String, String>>>() {
0134 @Override
0135 public JavaRDD<ConsumerRecord<String, String>> call(
0136 JavaRDD<ConsumerRecord<String, String>> rdd
0137 ) {
0138 OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
0139 offsetRanges.set(offsets);
0140 Assert.assertEquals(topic2, offsets[0].topic());
0141 return rdd;
0142 }
0143 }
0144 ).map(
0145 new Function<ConsumerRecord<String, String>, String>() {
0146 @Override
0147 public String call(ConsumerRecord<String, String> r) {
0148 return r.value();
0149 }
0150 }
0151 );
0152
0153 JavaDStream<String> unifiedStream = stream1.union(stream2);
0154
0155 final Set<String> result = Collections.synchronizedSet(new HashSet<>());
0156 unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
0157 @Override
0158 public void call(JavaRDD<String> rdd) {
0159 result.addAll(rdd.collect());
0160 }
0161 }
0162 );
0163 ssc.start();
0164 long startTime = System.currentTimeMillis();
0165 boolean matches = false;
0166 while (!matches && System.currentTimeMillis() - startTime < 20000) {
0167 matches = sent.size() == result.size();
0168 Thread.sleep(50);
0169 }
0170 Assert.assertEquals(sent, result);
0171 ssc.stop();
0172 }
0173
0174 private String[] createTopicAndSendData(String topic) {
0175 String[] data = { topic + "-1", topic + "-2", topic + "-3"};
0176 kafkaTestUtils.createTopic(topic);
0177 kafkaTestUtils.sendMessages(topic, data);
0178 return data;
0179 }
0180 }