Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package test.org.apache.spark.streaming;
0019 
0020 import java.io.Serializable;
0021 import java.util.*;
0022 
0023 import org.apache.spark.api.java.function.Function3;
0024 import org.apache.spark.api.java.function.Function4;
0025 import org.apache.spark.streaming.Duration;
0026 import org.apache.spark.streaming.Durations;
0027 import org.apache.spark.streaming.JavaTestUtils;
0028 import org.apache.spark.streaming.LocalJavaStreamingContext;
0029 import org.apache.spark.streaming.State;
0030 import org.apache.spark.streaming.StateSpec;
0031 import org.apache.spark.streaming.Time;
0032 import scala.Tuple2;
0033 
0034 import com.google.common.collect.Sets;
0035 import org.junit.Assert;
0036 import org.junit.Test;
0037 
0038 import org.apache.spark.HashPartitioner;
0039 import org.apache.spark.api.java.Optional;
0040 import org.apache.spark.api.java.JavaPairRDD;
0041 import org.apache.spark.api.java.JavaRDD;
0042 import org.apache.spark.api.java.function.PairFunction;
0043 import org.apache.spark.streaming.api.java.JavaDStream;
0044 import org.apache.spark.streaming.api.java.JavaPairDStream;
0045 import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
0046 
0047 /**
0048  * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
0049  * lambda syntax.
0050  */
0051 @SuppressWarnings("unchecked")
0052 public class Java8APISuite extends LocalJavaStreamingContext implements Serializable {
0053 
0054   @Test
0055   public void testMap() {
0056     List<List<String>> inputData = Arrays.asList(
0057       Arrays.asList("hello", "world"),
0058       Arrays.asList("goodnight", "moon"));
0059 
0060     List<List<Integer>> expected = Arrays.asList(
0061       Arrays.asList(5, 5),
0062       Arrays.asList(9, 4));
0063 
0064     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0065     JavaDStream<Integer> letterCount = stream.map(String::length);
0066     JavaTestUtils.attachTestOutputStream(letterCount);
0067     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0068 
0069     assertOrderInvariantEquals(expected, result);
0070   }
0071 
0072   @Test
0073   public void testFilter() {
0074     List<List<String>> inputData = Arrays.asList(
0075       Arrays.asList("giants", "dodgers"),
0076       Arrays.asList("yankees", "red sox"));
0077 
0078     List<List<String>> expected = Arrays.asList(
0079       Arrays.asList("giants"),
0080       Arrays.asList("yankees"));
0081 
0082     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0083     JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
0084     JavaTestUtils.attachTestOutputStream(filtered);
0085     List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0086 
0087     assertOrderInvariantEquals(expected, result);
0088   }
0089 
0090   @Test
0091   public void testMapPartitions() {
0092     List<List<String>> inputData = Arrays.asList(
0093       Arrays.asList("giants", "dodgers"),
0094       Arrays.asList("yankees", "red sox"));
0095 
0096     List<List<String>> expected = Arrays.asList(
0097       Arrays.asList("GIANTSDODGERS"),
0098       Arrays.asList("YANKEESRED SOX"));
0099 
0100     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0101     JavaDStream<String> mapped = stream.mapPartitions(in -> {
0102       String out = "";
0103       while (in.hasNext()) {
0104         out = out + in.next().toUpperCase(Locale.ROOT);
0105       }
0106       return Arrays.asList(out).iterator();
0107     });
0108     JavaTestUtils.attachTestOutputStream(mapped);
0109     List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0110 
0111     Assert.assertEquals(expected, result);
0112   }
0113 
0114   @Test
0115   public void testReduce() {
0116     List<List<Integer>> inputData = Arrays.asList(
0117       Arrays.asList(1, 2, 3),
0118       Arrays.asList(4, 5, 6),
0119       Arrays.asList(7, 8, 9));
0120 
0121     List<List<Integer>> expected = Arrays.asList(
0122       Arrays.asList(6),
0123       Arrays.asList(15),
0124       Arrays.asList(24));
0125 
0126     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0127     JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y);
0128     JavaTestUtils.attachTestOutputStream(reduced);
0129     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0130 
0131     Assert.assertEquals(expected, result);
0132   }
0133 
0134   @Test
0135   public void testReduceByWindow() {
0136     List<List<Integer>> inputData = Arrays.asList(
0137       Arrays.asList(1, 2, 3),
0138       Arrays.asList(4, 5, 6),
0139       Arrays.asList(7, 8, 9));
0140 
0141     List<List<Integer>> expected = Arrays.asList(
0142       Arrays.asList(6),
0143       Arrays.asList(21),
0144       Arrays.asList(39),
0145       Arrays.asList(24));
0146 
0147     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0148     JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(
0149       (x, y) -> x + y, (x, y) -> x - y, new Duration(2000), new Duration(1000));
0150     JavaTestUtils.attachTestOutputStream(reducedWindowed);
0151     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
0152 
0153     Assert.assertEquals(expected, result);
0154   }
0155 
0156   @Test
0157   public void testTransform() {
0158     List<List<Integer>> inputData = Arrays.asList(
0159       Arrays.asList(1, 2, 3),
0160       Arrays.asList(4, 5, 6),
0161       Arrays.asList(7, 8, 9));
0162 
0163     List<List<Integer>> expected = Arrays.asList(
0164       Arrays.asList(3, 4, 5),
0165       Arrays.asList(6, 7, 8),
0166       Arrays.asList(9, 10, 11));
0167 
0168     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0169     JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2));
0170 
0171     JavaTestUtils.attachTestOutputStream(transformed);
0172     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0173 
0174     assertOrderInvariantEquals(expected, result);
0175   }
0176 
0177   @Test
0178   public void testVariousTransform() {
0179     // tests whether all variations of transform can be called from Java
0180 
0181     List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
0182     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0183 
0184     List<List<Tuple2<String, Integer>>> pairInputData =
0185       Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
0186     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
0187       JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
0188 
0189     JavaDStream<Integer> transformed1 = stream.transform(in -> null);
0190     JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null);
0191     JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null);
0192     JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null);
0193     JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null);
0194     JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null);
0195     JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null);
0196     JavaPairDStream<String, String> pairTransformed4 =
0197       pairStream.transformToPair((x, time) -> null);
0198 
0199   }
0200 
0201   @Test
0202   public void testTransformWith() {
0203     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
0204       Arrays.asList(
0205         new Tuple2<>("california", "dodgers"),
0206         new Tuple2<>("new york", "yankees")),
0207       Arrays.asList(
0208         new Tuple2<>("california", "sharks"),
0209         new Tuple2<>("new york", "rangers")));
0210 
0211     List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
0212       Arrays.asList(
0213         new Tuple2<>("california", "giants"),
0214         new Tuple2<>("new york", "mets")),
0215       Arrays.asList(
0216         new Tuple2<>("california", "ducks"),
0217         new Tuple2<>("new york", "islanders")));
0218 
0219 
0220     List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
0221       Sets.newHashSet(
0222         new Tuple2<>("california",
0223           new Tuple2<>("dodgers", "giants")),
0224         new Tuple2<>("new york",
0225           new Tuple2<>("yankees", "mets"))),
0226       Sets.newHashSet(
0227         new Tuple2<>("california",
0228           new Tuple2<>("sharks", "ducks")),
0229         new Tuple2<>("new york",
0230           new Tuple2<>("rangers", "islanders"))));
0231 
0232     JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
0233       ssc, stringStringKVStream1, 1);
0234     JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
0235 
0236     JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
0237       ssc, stringStringKVStream2, 1);
0238     JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
0239 
0240     JavaPairDStream<String, Tuple2<String, String>> joined =
0241       pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y));
0242 
0243     JavaTestUtils.attachTestOutputStream(joined);
0244     List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0245     List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new ArrayList<>();
0246     for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
0247       unorderedResult.add(Sets.newHashSet(res));
0248     }
0249 
0250     Assert.assertEquals(expected, unorderedResult);
0251   }
0252 
0253 
0254   @Test
0255   public void testVariousTransformWith() {
0256     // tests whether all variations of transformWith can be called from Java
0257 
0258     List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
0259     List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
0260     JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
0261     JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
0262 
0263     List<List<Tuple2<String, Integer>>> pairInputData1 =
0264       Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
0265     List<List<Tuple2<Double, Character>>> pairInputData2 =
0266       Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
0267     JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
0268       JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
0269     JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
0270       JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
0271 
0272     JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null);
0273     JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null);
0274 
0275     JavaPairDStream<Double, Double> transformed3 =
0276       stream1.transformWithToPair(stream2,(x, y, z) -> null);
0277 
0278     JavaPairDStream<Double, Double> transformed4 =
0279       stream1.transformWithToPair(pairStream1,(x, y, z) -> null);
0280 
0281     JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null);
0282 
0283     JavaDStream<Double> pairTransformed2_ =
0284       pairStream1.transformWith(pairStream1,(x, y, z) -> null);
0285 
0286     JavaPairDStream<Double, Double> pairTransformed3 =
0287       pairStream1.transformWithToPair(stream2,(x, y, z) -> null);
0288 
0289     JavaPairDStream<Double, Double> pairTransformed4 =
0290       pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null);
0291   }
0292 
0293   @Test
0294   public void testStreamingContextTransform() {
0295     List<List<Integer>> stream1input = Arrays.asList(
0296       Arrays.asList(1),
0297       Arrays.asList(2)
0298     );
0299 
0300     List<List<Integer>> stream2input = Arrays.asList(
0301       Arrays.asList(3),
0302       Arrays.asList(4)
0303     );
0304 
0305     List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
0306       Arrays.asList(new Tuple2<>(1, "x")),
0307       Arrays.asList(new Tuple2<>(2, "y"))
0308     );
0309 
0310     List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
0311       Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
0312       Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
0313     );
0314 
0315     JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
0316     JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
0317     JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
0318       JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
0319 
0320     List<JavaDStream<?>> listOfDStreams1 = Arrays.asList(stream1, stream2);
0321 
0322     // This is just to test whether this transform to JavaStream compiles
0323     JavaDStream<Long> transformed1 = ssc.transform(
0324       listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
0325       Assert.assertEquals(2, listOfRDDs.size());
0326       return null;
0327     });
0328 
0329     List<JavaDStream<?>> listOfDStreams2 =
0330       Arrays.asList(stream1, stream2, pairStream1.toJavaDStream());
0331 
0332     JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
0333       listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
0334       Assert.assertEquals(3, listOfRDDs.size());
0335       JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0);
0336       JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1);
0337       JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2);
0338       JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
0339       PairFunction<Integer, Integer, Integer> mapToTuple =
0340         (Integer i) -> new Tuple2<>(i, i);
0341       return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
0342     });
0343     JavaTestUtils.attachTestOutputStream(transformed2);
0344     List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
0345       JavaTestUtils.runStreams(ssc, 2, 2);
0346     Assert.assertEquals(expected, result);
0347   }
0348 
0349   @Test
0350   public void testFlatMap() {
0351     List<List<String>> inputData = Arrays.asList(
0352       Arrays.asList("go", "giants"),
0353       Arrays.asList("boo", "dodgers"),
0354       Arrays.asList("athletics"));
0355 
0356     List<List<String>> expected = Arrays.asList(
0357       Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"),
0358       Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"),
0359       Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s"));
0360 
0361     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0362     JavaDStream<String> flatMapped = stream.flatMap(
0363         s -> Arrays.asList(s.split("(?!^)")).iterator());
0364     JavaTestUtils.attachTestOutputStream(flatMapped);
0365     List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0366 
0367     assertOrderInvariantEquals(expected, result);
0368   }
0369 
0370   @Test
0371   public void testPairFlatMap() {
0372     List<List<String>> inputData = Arrays.asList(
0373       Arrays.asList("giants"),
0374       Arrays.asList("dodgers"),
0375       Arrays.asList("athletics"));
0376 
0377     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0378       Arrays.asList(
0379         new Tuple2<>(6, "g"),
0380         new Tuple2<>(6, "i"),
0381         new Tuple2<>(6, "a"),
0382         new Tuple2<>(6, "n"),
0383         new Tuple2<>(6, "t"),
0384         new Tuple2<>(6, "s")),
0385       Arrays.asList(
0386         new Tuple2<>(7, "d"),
0387         new Tuple2<>(7, "o"),
0388         new Tuple2<>(7, "d"),
0389         new Tuple2<>(7, "g"),
0390         new Tuple2<>(7, "e"),
0391         new Tuple2<>(7, "r"),
0392         new Tuple2<>(7, "s")),
0393       Arrays.asList(
0394         new Tuple2<>(9, "a"),
0395         new Tuple2<>(9, "t"),
0396         new Tuple2<>(9, "h"),
0397         new Tuple2<>(9, "l"),
0398         new Tuple2<>(9, "e"),
0399         new Tuple2<>(9, "t"),
0400         new Tuple2<>(9, "i"),
0401         new Tuple2<>(9, "c"),
0402         new Tuple2<>(9, "s")));
0403 
0404     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0405     JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
0406       List<Tuple2<Integer, String>> out = new ArrayList<>();
0407       for (String letter : s.split("(?!^)")) {
0408         out.add(new Tuple2<>(s.length(), letter));
0409       }
0410       return out.iterator();
0411     });
0412 
0413     JavaTestUtils.attachTestOutputStream(flatMapped);
0414     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0415 
0416     Assert.assertEquals(expected, result);
0417   }
0418 
0419   /*
0420    * Performs an order-invariant comparison of lists representing two RDD streams. This allows
0421    * us to account for ordering variation within individual RDD's which occurs during windowing.
0422    */
0423   public static <T extends Comparable<T>> void assertOrderInvariantEquals(
0424     List<List<T>> expected, List<List<T>> actual) {
0425     expected.forEach(Collections::sort);
0426     List<List<T>> sortedActual = new ArrayList<>();
0427     actual.forEach(list -> {
0428         List<T> sortedList = new ArrayList<>(list);
0429         Collections.sort(sortedList);
0430         sortedActual.add(sortedList);
0431     });
0432     Assert.assertEquals(expected, sortedActual);
0433   }
0434 
0435   @Test
0436   public void testPairFilter() {
0437     List<List<String>> inputData = Arrays.asList(
0438       Arrays.asList("giants", "dodgers"),
0439       Arrays.asList("yankees", "red sox"));
0440 
0441     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0442       Arrays.asList(new Tuple2<>("giants", 6)),
0443       Arrays.asList(new Tuple2<>("yankees", 7)));
0444 
0445     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0446     JavaPairDStream<String, Integer> pairStream =
0447       stream.mapToPair(x -> new Tuple2<>(x, x.length()));
0448     JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a"));
0449     JavaTestUtils.attachTestOutputStream(filtered);
0450     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0451 
0452     Assert.assertEquals(expected, result);
0453   }
0454 
0455   List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
0456     Arrays.asList(new Tuple2<>("california", "dodgers"),
0457       new Tuple2<>("california", "giants"),
0458       new Tuple2<>("new york", "yankees"),
0459       new Tuple2<>("new york", "mets")),
0460     Arrays.asList(new Tuple2<>("california", "sharks"),
0461       new Tuple2<>("california", "ducks"),
0462       new Tuple2<>("new york", "rangers"),
0463       new Tuple2<>("new york", "islanders")));
0464 
0465   List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
0466     Arrays.asList(
0467       new Tuple2<>("california", 1),
0468       new Tuple2<>("california", 3),
0469       new Tuple2<>("new york", 4),
0470       new Tuple2<>("new york", 1)),
0471     Arrays.asList(
0472       new Tuple2<>("california", 5),
0473       new Tuple2<>("california", 5),
0474       new Tuple2<>("new york", 3),
0475       new Tuple2<>("new york", 1)));
0476 
0477   @Test
0478   public void testPairMap() { // Maps pair -> pair of different type
0479     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0480 
0481     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0482       Arrays.asList(
0483         new Tuple2<>(1, "california"),
0484         new Tuple2<>(3, "california"),
0485         new Tuple2<>(4, "new york"),
0486         new Tuple2<>(1, "new york")),
0487       Arrays.asList(
0488         new Tuple2<>(5, "california"),
0489         new Tuple2<>(5, "california"),
0490         new Tuple2<>(3, "new york"),
0491         new Tuple2<>(1, "new york")));
0492 
0493     JavaDStream<Tuple2<String, Integer>> stream =
0494       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0495     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0496     JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(Tuple2::swap);
0497     JavaTestUtils.attachTestOutputStream(reversed);
0498     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0499 
0500     Assert.assertEquals(expected, result);
0501   }
0502 
0503   @Test
0504   public void testPairMapPartitions() { // Maps pair -> pair of different type
0505     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0506 
0507     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0508       Arrays.asList(
0509         new Tuple2<>(1, "california"),
0510         new Tuple2<>(3, "california"),
0511         new Tuple2<>(4, "new york"),
0512         new Tuple2<>(1, "new york")),
0513       Arrays.asList(
0514         new Tuple2<>(5, "california"),
0515         new Tuple2<>(5, "california"),
0516         new Tuple2<>(3, "new york"),
0517         new Tuple2<>(1, "new york")));
0518 
0519     JavaDStream<Tuple2<String, Integer>> stream =
0520       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0521     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0522     JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
0523       LinkedList<Tuple2<Integer, String>> out = new LinkedList<>();
0524       while (in.hasNext()) {
0525         Tuple2<String, Integer> next = in.next();
0526         out.add(next.swap());
0527       }
0528       return out.iterator();
0529     });
0530 
0531     JavaTestUtils.attachTestOutputStream(reversed);
0532     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0533 
0534     Assert.assertEquals(expected, result);
0535   }
0536 
0537   @Test
0538   public void testPairMap2() { // Maps pair -> single
0539     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0540 
0541     List<List<Integer>> expected = Arrays.asList(
0542       Arrays.asList(1, 3, 4, 1),
0543       Arrays.asList(5, 5, 3, 1));
0544 
0545     JavaDStream<Tuple2<String, Integer>> stream =
0546       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0547     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0548     JavaDStream<Integer> reversed = pairStream.map(Tuple2::_2);
0549     JavaTestUtils.attachTestOutputStream(reversed);
0550     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0551 
0552     Assert.assertEquals(expected, result);
0553   }
0554 
0555   @Test
0556   public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
0557     List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
0558       Arrays.asList(
0559         new Tuple2<>("hi", 1),
0560         new Tuple2<>("ho", 2)),
0561       Arrays.asList(
0562         new Tuple2<>("hi", 1),
0563         new Tuple2<>("ho", 2)));
0564 
0565     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0566       Arrays.asList(
0567         new Tuple2<>(1, "h"),
0568         new Tuple2<>(1, "i"),
0569         new Tuple2<>(2, "h"),
0570         new Tuple2<>(2, "o")),
0571       Arrays.asList(
0572         new Tuple2<>(1, "h"),
0573         new Tuple2<>(1, "i"),
0574         new Tuple2<>(2, "h"),
0575         new Tuple2<>(2, "o")));
0576 
0577     JavaDStream<Tuple2<String, Integer>> stream =
0578       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0579     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0580     JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
0581       List<Tuple2<Integer, String>> out = new LinkedList<>();
0582       for (Character s : in._1().toCharArray()) {
0583         out.add(new Tuple2<>(in._2(), s.toString()));
0584       }
0585       return out.iterator();
0586     });
0587 
0588     JavaTestUtils.attachTestOutputStream(flatMapped);
0589     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0590 
0591     Assert.assertEquals(expected, result);
0592   }
0593 
0594   @Test
0595   public void testPairReduceByKey() {
0596     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0597 
0598     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0599       Arrays.asList(
0600         new Tuple2<>("california", 4),
0601         new Tuple2<>("new york", 5)),
0602       Arrays.asList(
0603         new Tuple2<>("california", 10),
0604         new Tuple2<>("new york", 4)));
0605 
0606     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
0607       ssc, inputData, 1);
0608     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0609 
0610     JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y);
0611 
0612     JavaTestUtils.attachTestOutputStream(reduced);
0613     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0614 
0615     Assert.assertEquals(expected, result);
0616   }
0617 
0618   @Test
0619   public void testCombineByKey() {
0620     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0621 
0622     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0623       Arrays.asList(
0624         new Tuple2<>("california", 4),
0625         new Tuple2<>("new york", 5)),
0626       Arrays.asList(
0627         new Tuple2<>("california", 10),
0628         new Tuple2<>("new york", 4)));
0629 
0630     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
0631       ssc, inputData, 1);
0632     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0633 
0634     JavaPairDStream<String, Integer> combined = pairStream.combineByKey(i -> i,
0635       (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2));
0636 
0637     JavaTestUtils.attachTestOutputStream(combined);
0638     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0639 
0640     Assert.assertEquals(expected, result);
0641   }
0642 
0643   @Test
0644   public void testReduceByKeyAndWindow() {
0645     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0646 
0647     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0648       Arrays.asList(new Tuple2<>("california", 4),
0649         new Tuple2<>("new york", 5)),
0650       Arrays.asList(new Tuple2<>("california", 14),
0651         new Tuple2<>("new york", 9)),
0652       Arrays.asList(new Tuple2<>("california", 10),
0653         new Tuple2<>("new york", 4)));
0654 
0655     JavaDStream<Tuple2<String, Integer>> stream =
0656       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0657     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0658 
0659     JavaPairDStream<String, Integer> reduceWindowed =
0660       pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000));
0661     JavaTestUtils.attachTestOutputStream(reduceWindowed);
0662     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0663 
0664     Assert.assertEquals(expected, result);
0665   }
0666 
0667   @Test
0668   public void testUpdateStateByKey() {
0669     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0670 
0671     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0672       Arrays.asList(new Tuple2<>("california", 4),
0673         new Tuple2<>("new york", 5)),
0674       Arrays.asList(new Tuple2<>("california", 14),
0675         new Tuple2<>("new york", 9)),
0676       Arrays.asList(new Tuple2<>("california", 14),
0677         new Tuple2<>("new york", 9)));
0678 
0679     JavaDStream<Tuple2<String, Integer>> stream =
0680       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0681     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0682 
0683     JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
0684       int out = 0;
0685       if (state.isPresent()) {
0686         out = out + state.get();
0687       }
0688       for (Integer v : values) {
0689         out = out + v;
0690       }
0691       return Optional.of(out);
0692     });
0693 
0694     JavaTestUtils.attachTestOutputStream(updated);
0695     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0696 
0697     Assert.assertEquals(expected, result);
0698   }
0699 
0700   @Test
0701   public void testReduceByKeyAndWindowWithInverse() {
0702     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0703 
0704     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0705       Arrays.asList(new Tuple2<>("california", 4),
0706         new Tuple2<>("new york", 5)),
0707       Arrays.asList(new Tuple2<>("california", 14),
0708         new Tuple2<>("new york", 9)),
0709       Arrays.asList(new Tuple2<>("california", 10),
0710         new Tuple2<>("new york", 4)));
0711 
0712     JavaDStream<Tuple2<String, Integer>> stream =
0713       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0714     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0715 
0716     JavaPairDStream<String, Integer> reduceWindowed =
0717       pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000),
0718         new Duration(1000));
0719     JavaTestUtils.attachTestOutputStream(reduceWindowed);
0720     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0721 
0722     Assert.assertEquals(expected, result);
0723   }
0724 
0725   @Test
0726   public void testPairTransform() {
0727     List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
0728       Arrays.asList(
0729         new Tuple2<>(3, 5),
0730         new Tuple2<>(1, 5),
0731         new Tuple2<>(4, 5),
0732         new Tuple2<>(2, 5)),
0733       Arrays.asList(
0734         new Tuple2<>(2, 5),
0735         new Tuple2<>(3, 5),
0736         new Tuple2<>(4, 5),
0737         new Tuple2<>(1, 5)));
0738 
0739     List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
0740       Arrays.asList(
0741         new Tuple2<>(1, 5),
0742         new Tuple2<>(2, 5),
0743         new Tuple2<>(3, 5),
0744         new Tuple2<>(4, 5)),
0745       Arrays.asList(
0746         new Tuple2<>(1, 5),
0747         new Tuple2<>(2, 5),
0748         new Tuple2<>(3, 5),
0749         new Tuple2<>(4, 5)));
0750 
0751     JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
0752       ssc, inputData, 1);
0753     JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0754 
0755     JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey());
0756 
0757     JavaTestUtils.attachTestOutputStream(sorted);
0758     List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0759 
0760     Assert.assertEquals(expected, result);
0761   }
0762 
0763   @Test
0764   public void testPairToNormalRDDTransform() {
0765     List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
0766       Arrays.asList(
0767         new Tuple2<>(3, 5),
0768         new Tuple2<>(1, 5),
0769         new Tuple2<>(4, 5),
0770         new Tuple2<>(2, 5)),
0771       Arrays.asList(
0772         new Tuple2<>(2, 5),
0773         new Tuple2<>(3, 5),
0774         new Tuple2<>(4, 5),
0775         new Tuple2<>(1, 5)));
0776 
0777     List<List<Integer>> expected = Arrays.asList(
0778       Arrays.asList(3, 1, 4, 2),
0779       Arrays.asList(2, 3, 4, 1));
0780 
0781     JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
0782       ssc, inputData, 1);
0783     JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0784     JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1()));
0785     JavaTestUtils.attachTestOutputStream(firstParts);
0786     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0787 
0788     Assert.assertEquals(expected, result);
0789   }
0790 
0791   @Test
0792   public void testMapValues() {
0793     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
0794 
0795     List<List<Tuple2<String, String>>> expected = Arrays.asList(
0796       Arrays.asList(new Tuple2<>("california", "DODGERS"),
0797         new Tuple2<>("california", "GIANTS"),
0798         new Tuple2<>("new york", "YANKEES"),
0799         new Tuple2<>("new york", "METS")),
0800       Arrays.asList(new Tuple2<>("california", "SHARKS"),
0801         new Tuple2<>("california", "DUCKS"),
0802         new Tuple2<>("new york", "RANGERS"),
0803         new Tuple2<>("new york", "ISLANDERS")));
0804 
0805     JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
0806       ssc, inputData, 1);
0807     JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
0808 
0809     JavaPairDStream<String, String> mapped =
0810         pairStream.mapValues(s -> s.toUpperCase(Locale.ROOT));
0811     JavaTestUtils.attachTestOutputStream(mapped);
0812     List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0813 
0814     Assert.assertEquals(expected, result);
0815   }
0816 
0817   @Test
0818   public void testFlatMapValues() {
0819     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
0820 
0821     List<List<Tuple2<String, String>>> expected = Arrays.asList(
0822       Arrays.asList(new Tuple2<>("california", "dodgers1"),
0823         new Tuple2<>("california", "dodgers2"),
0824         new Tuple2<>("california", "giants1"),
0825         new Tuple2<>("california", "giants2"),
0826         new Tuple2<>("new york", "yankees1"),
0827         new Tuple2<>("new york", "yankees2"),
0828         new Tuple2<>("new york", "mets1"),
0829         new Tuple2<>("new york", "mets2")),
0830       Arrays.asList(new Tuple2<>("california", "sharks1"),
0831         new Tuple2<>("california", "sharks2"),
0832         new Tuple2<>("california", "ducks1"),
0833         new Tuple2<>("california", "ducks2"),
0834         new Tuple2<>("new york", "rangers1"),
0835         new Tuple2<>("new york", "rangers2"),
0836         new Tuple2<>("new york", "islanders1"),
0837         new Tuple2<>("new york", "islanders2")));
0838 
0839     JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
0840       ssc, inputData, 1);
0841     JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
0842 
0843     JavaPairDStream<String, String> flatMapped =
0844       pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2").iterator());
0845     JavaTestUtils.attachTestOutputStream(flatMapped);
0846     List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0847     Assert.assertEquals(expected, result);
0848   }
0849 
0850   /**
0851    * This test is only for testing the APIs. It's not necessary to run it.
0852    */
0853   public void testMapWithStateAPI() {
0854     JavaPairRDD<String, Boolean> initialRDD = null;
0855     JavaPairDStream<String, Integer> wordsDstream = null;
0856 
0857     Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mapFn =
0858       (time, key, value, state) -> {
0859         // Use all State's methods here
0860         state.exists();
0861         state.get();
0862         state.isTimingOut();
0863         state.remove();
0864         state.update(true);
0865         return Optional.of(2.0);
0866       };
0867 
0868     JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
0869       wordsDstream.mapWithState(
0870         StateSpec.function(mapFn)
0871           .initialState(initialRDD)
0872           .numPartitions(10)
0873           .partitioner(new HashPartitioner(10))
0874           .timeout(Durations.seconds(10)));
0875 
0876     JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots();
0877 
0878     Function3<String, Optional<Integer>, State<Boolean>, Double> mapFn2 =
0879       (key, value, state) -> {
0880         state.exists();
0881         state.get();
0882         state.isTimingOut();
0883         state.remove();
0884         state.update(true);
0885         return 2.0;
0886       };
0887 
0888     JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
0889       wordsDstream.mapWithState(
0890         StateSpec.function(mapFn2)
0891           .initialState(initialRDD)
0892           .numPartitions(10)
0893           .partitioner(new HashPartitioner(10))
0894           .timeout(Durations.seconds(10)));
0895 
0896     JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots();
0897   }
0898 }