0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.streaming.kafka010;
0019
0020 import java.io.Serializable;
0021 import java.util.*;
0022 import java.util.regex.Pattern;
0023
0024 import scala.collection.JavaConverters;
0025
0026 import org.apache.kafka.common.TopicPartition;
0027
0028 import org.junit.Assert;
0029 import org.junit.Test;
0030
0031 public class JavaConsumerStrategySuite implements Serializable {
0032
0033 @Test
0034 public void testConsumerStrategyConstructors() {
0035 final String topic1 = "topic1";
0036 final Pattern pat = Pattern.compile("top.*");
0037 final Collection<String> topics = Arrays.asList(topic1);
0038 final scala.collection.Iterable<String> sTopics =
0039 JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
0040 final TopicPartition tp1 = new TopicPartition(topic1, 0);
0041 final TopicPartition tp2 = new TopicPartition(topic1, 1);
0042 final Collection<TopicPartition> parts = Arrays.asList(tp1, tp2);
0043 final scala.collection.Iterable<TopicPartition> sParts =
0044 JavaConverters.collectionAsScalaIterableConverter(parts).asScala();
0045 final Map<String, Object> kafkaParams = new HashMap<>();
0046 kafkaParams.put("bootstrap.servers", "not used");
0047 final scala.collection.Map<String, Object> sKafkaParams =
0048 JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
0049 final Map<TopicPartition, Long> offsets = new HashMap<>();
0050 offsets.put(tp1, 23L);
0051 final scala.collection.Map<TopicPartition, Object> sOffsets =
0052 JavaConverters.mapAsScalaMapConverter(offsets).asScala().mapValues(
0053 new scala.runtime.AbstractFunction1<Long, Object>() {
0054 @Override
0055 public Object apply(Long x) {
0056 return (Object) x;
0057 }
0058 }
0059 );
0060
0061 final ConsumerStrategy<String, String> sub1 =
0062 ConsumerStrategies.Subscribe(sTopics, sKafkaParams, sOffsets);
0063 final ConsumerStrategy<String, String> sub2 =
0064 ConsumerStrategies.Subscribe(sTopics, sKafkaParams);
0065 final ConsumerStrategy<String, String> sub3 =
0066 ConsumerStrategies.Subscribe(topics, kafkaParams, offsets);
0067 final ConsumerStrategy<String, String> sub4 =
0068 ConsumerStrategies.Subscribe(topics, kafkaParams);
0069
0070 Assert.assertEquals(
0071 sub1.executorKafkaParams().get("bootstrap.servers"),
0072 sub3.executorKafkaParams().get("bootstrap.servers"));
0073
0074 final ConsumerStrategy<String, String> psub1 =
0075 ConsumerStrategies.SubscribePattern(pat, sKafkaParams, sOffsets);
0076 final ConsumerStrategy<String, String> psub2 =
0077 ConsumerStrategies.SubscribePattern(pat, sKafkaParams);
0078 final ConsumerStrategy<String, String> psub3 =
0079 ConsumerStrategies.SubscribePattern(pat, kafkaParams, offsets);
0080 final ConsumerStrategy<String, String> psub4 =
0081 ConsumerStrategies.SubscribePattern(pat, kafkaParams);
0082
0083 Assert.assertEquals(
0084 psub1.executorKafkaParams().get("bootstrap.servers"),
0085 psub3.executorKafkaParams().get("bootstrap.servers"));
0086
0087 final ConsumerStrategy<String, String> asn1 =
0088 ConsumerStrategies.Assign(sParts, sKafkaParams, sOffsets);
0089 final ConsumerStrategy<String, String> asn2 =
0090 ConsumerStrategies.Assign(sParts, sKafkaParams);
0091 final ConsumerStrategy<String, String> asn3 =
0092 ConsumerStrategies.Assign(parts, kafkaParams, offsets);
0093 final ConsumerStrategy<String, String> asn4 =
0094 ConsumerStrategies.Assign(parts, kafkaParams);
0095
0096 Assert.assertEquals(
0097 asn1.executorKafkaParams().get("bootstrap.servers"),
0098 asn3.executorKafkaParams().get("bootstrap.servers"));
0099 }
0100
0101 }