0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.streaming.kafka010;
0019
0020 import java.io.Serializable;
0021 import java.util.HashMap;
0022 import java.util.Map;
0023 import java.util.Random;
0024
0025 import org.apache.kafka.common.serialization.StringDeserializer;
0026 import org.apache.kafka.common.TopicPartition;
0027 import org.apache.kafka.clients.consumer.ConsumerRecord;
0028 import org.junit.After;
0029 import org.junit.Assert;
0030 import org.junit.Before;
0031 import org.junit.Test;
0032
0033 import org.apache.spark.SparkConf;
0034 import org.apache.spark.api.java.JavaRDD;
0035 import org.apache.spark.api.java.JavaSparkContext;
0036 import org.apache.spark.api.java.function.Function;
0037
0038 public class JavaKafkaRDDSuite implements Serializable {
0039 private transient JavaSparkContext sc = null;
0040 private transient KafkaTestUtils kafkaTestUtils = null;
0041
0042 @Before
0043 public void setUp() {
0044 kafkaTestUtils = new KafkaTestUtils();
0045 kafkaTestUtils.setup();
0046 SparkConf sparkConf = new SparkConf()
0047 .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
0048 sc = new JavaSparkContext(sparkConf);
0049 }
0050
0051 @After
0052 public void tearDown() {
0053 if (sc != null) {
0054 sc.stop();
0055 sc = null;
0056 }
0057
0058 if (kafkaTestUtils != null) {
0059 kafkaTestUtils.teardown();
0060 kafkaTestUtils = null;
0061 }
0062 }
0063
0064 @Test
0065 public void testKafkaRDD() throws InterruptedException {
0066 String topic1 = "topic1";
0067 String topic2 = "topic2";
0068
0069 Random random = new Random();
0070
0071 createTopicAndSendData(topic1);
0072 createTopicAndSendData(topic2);
0073
0074 Map<String, Object> kafkaParams = new HashMap<>();
0075 kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
0076 kafkaParams.put("key.deserializer", StringDeserializer.class);
0077 kafkaParams.put("value.deserializer", StringDeserializer.class);
0078 kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt() +
0079 "-" + System.currentTimeMillis());
0080
0081 OffsetRange[] offsetRanges = {
0082 OffsetRange.create(topic1, 0, 0, 1),
0083 OffsetRange.create(topic2, 0, 0, 1)
0084 };
0085
0086 Map<TopicPartition, String> leaders = new HashMap<>();
0087 String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
0088 String broker = hostAndPort[0];
0089 leaders.put(offsetRanges[0].topicPartition(), broker);
0090 leaders.put(offsetRanges[1].topicPartition(), broker);
0091
0092 Function<ConsumerRecord<String, String>, String> handler =
0093 new Function<ConsumerRecord<String, String>, String>() {
0094 @Override
0095 public String call(ConsumerRecord<String, String> r) {
0096 return r.value();
0097 }
0098 };
0099
0100 JavaRDD<String> rdd1 = KafkaUtils.<String, String>createRDD(
0101 sc,
0102 kafkaParams,
0103 offsetRanges,
0104 LocationStrategies.PreferFixed(leaders)
0105 ).map(handler);
0106
0107 JavaRDD<String> rdd2 = KafkaUtils.<String, String>createRDD(
0108 sc,
0109 kafkaParams,
0110 offsetRanges,
0111 LocationStrategies.PreferConsistent()
0112 ).map(handler);
0113
0114
0115 long count1 = rdd1.count();
0116 long count2 = rdd2.count();
0117 Assert.assertTrue(count1 > 0);
0118 Assert.assertEquals(count1, count2);
0119 }
0120
0121 private String[] createTopicAndSendData(String topic) {
0122 String[] data = { topic + "-1", topic + "-2", topic + "-3"};
0123 kafkaTestUtils.createTopic(topic);
0124 kafkaTestUtils.sendMessages(topic, data);
0125 return data;
0126 }
0127 }