Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.streaming.kafka010;
0019 
0020 import java.io.Serializable;
0021 import java.util.*;
0022 import java.util.regex.Pattern;
0023 
0024 import scala.collection.JavaConverters;
0025 
0026 import org.apache.kafka.common.TopicPartition;
0027 
0028 import org.junit.Assert;
0029 import org.junit.Test;
0030 
0031 public class JavaConsumerStrategySuite implements Serializable {
0032 
0033   @Test
0034   public void testConsumerStrategyConstructors() {
0035     final String topic1 = "topic1";
0036     final Pattern pat = Pattern.compile("top.*");
0037     final Collection<String> topics = Arrays.asList(topic1);
0038     final scala.collection.Iterable<String> sTopics =
0039       JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
0040     final TopicPartition tp1 = new TopicPartition(topic1, 0);
0041     final TopicPartition tp2 = new TopicPartition(topic1, 1);
0042     final Collection<TopicPartition> parts = Arrays.asList(tp1, tp2);
0043     final scala.collection.Iterable<TopicPartition> sParts =
0044       JavaConverters.collectionAsScalaIterableConverter(parts).asScala();
0045     final Map<String, Object> kafkaParams = new HashMap<>();
0046     kafkaParams.put("bootstrap.servers", "not used");
0047     final scala.collection.Map<String, Object> sKafkaParams =
0048       JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
0049     final Map<TopicPartition, Long> offsets = new HashMap<>();
0050     offsets.put(tp1, 23L);
0051     final scala.collection.Map<TopicPartition, Object> sOffsets =
0052       JavaConverters.mapAsScalaMapConverter(offsets).asScala().mapValues(
0053         new scala.runtime.AbstractFunction1<Long, Object>() {
0054           @Override
0055           public Object apply(Long x) {
0056             return (Object) x;
0057           }
0058         }
0059       );
0060 
0061     final ConsumerStrategy<String, String> sub1 =
0062       ConsumerStrategies.Subscribe(sTopics, sKafkaParams, sOffsets);
0063     final ConsumerStrategy<String, String> sub2 =
0064       ConsumerStrategies.Subscribe(sTopics, sKafkaParams);
0065     final ConsumerStrategy<String, String> sub3 =
0066       ConsumerStrategies.Subscribe(topics, kafkaParams, offsets);
0067     final ConsumerStrategy<String, String> sub4 =
0068       ConsumerStrategies.Subscribe(topics, kafkaParams);
0069 
0070     Assert.assertEquals(
0071       sub1.executorKafkaParams().get("bootstrap.servers"),
0072       sub3.executorKafkaParams().get("bootstrap.servers"));
0073 
0074     final ConsumerStrategy<String, String> psub1 =
0075       ConsumerStrategies.SubscribePattern(pat, sKafkaParams, sOffsets);
0076     final ConsumerStrategy<String, String> psub2 =
0077       ConsumerStrategies.SubscribePattern(pat, sKafkaParams);
0078     final ConsumerStrategy<String, String> psub3 =
0079       ConsumerStrategies.SubscribePattern(pat, kafkaParams, offsets);
0080     final ConsumerStrategy<String, String> psub4 =
0081       ConsumerStrategies.SubscribePattern(pat, kafkaParams);
0082 
0083     Assert.assertEquals(
0084       psub1.executorKafkaParams().get("bootstrap.servers"),
0085       psub3.executorKafkaParams().get("bootstrap.servers"));
0086 
0087     final ConsumerStrategy<String, String> asn1 =
0088       ConsumerStrategies.Assign(sParts, sKafkaParams, sOffsets);
0089     final ConsumerStrategy<String, String> asn2 =
0090       ConsumerStrategies.Assign(sParts, sKafkaParams);
0091     final ConsumerStrategy<String, String> asn3 =
0092       ConsumerStrategies.Assign(parts, kafkaParams, offsets);
0093     final ConsumerStrategy<String, String> asn4 =
0094       ConsumerStrategies.Assign(parts, kafkaParams);
0095 
0096     Assert.assertEquals(
0097       asn1.executorKafkaParams().get("bootstrap.servers"),
0098       asn3.executorKafkaParams().get("bootstrap.servers"));
0099   }
0100 
0101 }