0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.streaming.kafka010;
0019
0020 import java.io.Serializable;
0021 import java.util.*;
0022
0023 import scala.collection.JavaConverters;
0024
0025 import org.apache.kafka.common.TopicPartition;
0026
0027 import org.junit.Assert;
0028 import org.junit.Test;
0029
0030 public class JavaLocationStrategySuite implements Serializable {
0031
0032 @Test
0033 public void testLocationStrategyConstructors() {
0034 final String topic1 = "topic1";
0035 final TopicPartition tp1 = new TopicPartition(topic1, 0);
0036 final TopicPartition tp2 = new TopicPartition(topic1, 1);
0037 final Map<TopicPartition, String> hosts = new HashMap<>();
0038 hosts.put(tp1, "node1");
0039 hosts.put(tp2, "node2");
0040 final scala.collection.Map<TopicPartition, String> sHosts =
0041 JavaConverters.mapAsScalaMapConverter(hosts).asScala();
0042
0043
0044 final LocationStrategy c1 = LocationStrategies.PreferConsistent();
0045 final LocationStrategy c2 = LocationStrategies.PreferConsistent();
0046 Assert.assertSame(c1, c2);
0047
0048 final LocationStrategy c3 = LocationStrategies.PreferBrokers();
0049 final LocationStrategy c4 = LocationStrategies.PreferBrokers();
0050 Assert.assertSame(c3, c4);
0051
0052 Assert.assertNotSame(c1, c3);
0053
0054 final LocationStrategy c5 = LocationStrategies.PreferFixed(hosts);
0055 final LocationStrategy c6 = LocationStrategies.PreferFixed(sHosts);
0056 Assert.assertEquals(c5, c6);
0057 }
0058
0059 }