Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.streaming.kafka010;
0019 
0020 import java.io.Serializable;
0021 import java.util.HashMap;
0022 import java.util.Map;
0023 import java.util.Random;
0024 
0025 import org.apache.kafka.common.serialization.StringDeserializer;
0026 import org.apache.kafka.common.TopicPartition;
0027 import org.apache.kafka.clients.consumer.ConsumerRecord;
0028 import org.junit.After;
0029 import org.junit.Assert;
0030 import org.junit.Before;
0031 import org.junit.Test;
0032 
0033 import org.apache.spark.SparkConf;
0034 import org.apache.spark.api.java.JavaRDD;
0035 import org.apache.spark.api.java.JavaSparkContext;
0036 import org.apache.spark.api.java.function.Function;
0037 
0038 public class JavaKafkaRDDSuite implements Serializable {
0039   private transient JavaSparkContext sc = null;
0040   private transient KafkaTestUtils kafkaTestUtils = null;
0041 
0042   @Before
0043   public void setUp() {
0044     kafkaTestUtils = new KafkaTestUtils();
0045     kafkaTestUtils.setup();
0046     SparkConf sparkConf = new SparkConf()
0047       .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
0048     sc = new JavaSparkContext(sparkConf);
0049   }
0050 
0051   @After
0052   public void tearDown() {
0053     if (sc != null) {
0054       sc.stop();
0055       sc = null;
0056     }
0057 
0058     if (kafkaTestUtils != null) {
0059       kafkaTestUtils.teardown();
0060       kafkaTestUtils = null;
0061     }
0062   }
0063 
0064   @Test
0065   public void testKafkaRDD() throws InterruptedException {
0066     String topic1 = "topic1";
0067     String topic2 = "topic2";
0068 
0069     Random random = new Random();
0070 
0071     createTopicAndSendData(topic1);
0072     createTopicAndSendData(topic2);
0073 
0074     Map<String, Object> kafkaParams = new HashMap<>();
0075     kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
0076     kafkaParams.put("key.deserializer", StringDeserializer.class);
0077     kafkaParams.put("value.deserializer", StringDeserializer.class);
0078     kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt() +
0079       "-" + System.currentTimeMillis());
0080 
0081     OffsetRange[] offsetRanges = {
0082       OffsetRange.create(topic1, 0, 0, 1),
0083       OffsetRange.create(topic2, 0, 0, 1)
0084     };
0085 
0086     Map<TopicPartition, String> leaders = new HashMap<>();
0087     String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
0088     String broker = hostAndPort[0];
0089     leaders.put(offsetRanges[0].topicPartition(), broker);
0090     leaders.put(offsetRanges[1].topicPartition(), broker);
0091 
0092     Function<ConsumerRecord<String, String>, String> handler =
0093       new Function<ConsumerRecord<String, String>, String>() {
0094         @Override
0095         public String call(ConsumerRecord<String, String> r) {
0096           return r.value();
0097         }
0098       };
0099 
0100     JavaRDD<String> rdd1 = KafkaUtils.<String, String>createRDD(
0101         sc,
0102         kafkaParams,
0103         offsetRanges,
0104         LocationStrategies.PreferFixed(leaders)
0105     ).map(handler);
0106 
0107     JavaRDD<String> rdd2 = KafkaUtils.<String, String>createRDD(
0108         sc,
0109         kafkaParams,
0110         offsetRanges,
0111         LocationStrategies.PreferConsistent()
0112     ).map(handler);
0113 
0114     // just making sure the java user APIs work; the scala tests handle logic corner cases
0115     long count1 = rdd1.count();
0116     long count2 = rdd2.count();
0117     Assert.assertTrue(count1 > 0);
0118     Assert.assertEquals(count1, count2);
0119   }
0120 
0121   private  String[] createTopicAndSendData(String topic) {
0122     String[] data = { topic + "-1", topic + "-2", topic + "-3"};
0123     kafkaTestUtils.createTopic(topic);
0124     kafkaTestUtils.sendMessages(topic, data);
0125     return data;
0126   }
0127 }