Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package test.org.apache.spark.streaming;
0019 
0020 import java.io.*;
0021 import java.nio.charset.StandardCharsets;
0022 import java.util.*;
0023 import java.util.concurrent.atomic.AtomicBoolean;
0024 
0025 import org.apache.spark.streaming.Duration;
0026 import org.apache.spark.streaming.JavaCheckpointTestUtils;
0027 import org.apache.spark.streaming.JavaTestUtils;
0028 import org.apache.spark.streaming.LocalJavaStreamingContext;
0029 import org.apache.spark.streaming.Seconds;
0030 import org.apache.spark.streaming.StreamingContextState;
0031 import org.apache.spark.streaming.StreamingContextSuite;
0032 import scala.Tuple2;
0033 
0034 import org.apache.hadoop.conf.Configuration;
0035 import org.apache.hadoop.io.LongWritable;
0036 import org.apache.hadoop.io.Text;
0037 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
0038 
0039 import org.junit.Assert;
0040 import org.junit.Test;
0041 
0042 import com.google.common.io.Files;
0043 import com.google.common.collect.Sets;
0044 
0045 import org.apache.spark.HashPartitioner;
0046 import org.apache.spark.SparkConf;
0047 import org.apache.spark.api.java.JavaPairRDD;
0048 import org.apache.spark.api.java.JavaRDD;
0049 import org.apache.spark.api.java.JavaSparkContext;
0050 import org.apache.spark.api.java.Optional;
0051 import org.apache.spark.api.java.function.*;
0052 import org.apache.spark.storage.StorageLevel;
0053 import org.apache.spark.streaming.api.java.*;
0054 import org.apache.spark.util.LongAccumulator;
0055 import org.apache.spark.util.Utils;
0056 
0057 // The test suite itself is Serializable so that anonymous Function implementations can be
0058 // serialized, as an alternative to converting these anonymous classes to static inner classes;
0059 // see http://stackoverflow.com/questions/758570/.
0060 public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
0061 
0062   public static void equalIterator(Iterator<?> a, Iterator<?> b) {
0063     while (a.hasNext() && b.hasNext()) {
0064       Assert.assertEquals(a.next(), b.next());
0065     }
0066     Assert.assertEquals(a.hasNext(), b.hasNext());
0067   }
0068 
0069   public static void equalIterable(Iterable<?> a, Iterable<?> b) {
0070       equalIterator(a.iterator(), b.iterator());
0071   }
0072 
0073   @Test
0074   public void testInitialization() {
0075     Assert.assertNotNull(ssc.sparkContext());
0076   }
0077 
0078   @SuppressWarnings("unchecked")
0079   @Test
0080   public void testContextState() {
0081     List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
0082     Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
0083     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0084     JavaTestUtils.attachTestOutputStream(stream);
0085     Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
0086     ssc.start();
0087     Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState());
0088     ssc.stop();
0089     Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState());
0090   }
0091 
0092   @SuppressWarnings("unchecked")
0093   @Test
0094   public void testCount() {
0095     List<List<Integer>> inputData = Arrays.asList(
0096         Arrays.asList(1,2,3,4),
0097         Arrays.asList(3,4,5),
0098         Arrays.asList(3));
0099 
0100     List<List<Long>> expected = Arrays.asList(
0101         Arrays.asList(4L),
0102         Arrays.asList(3L),
0103         Arrays.asList(1L));
0104 
0105     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0106     JavaDStream<Long> count = stream.count();
0107     JavaTestUtils.attachTestOutputStream(count);
0108     List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0109     assertOrderInvariantEquals(expected, result);
0110   }
0111 
0112   @SuppressWarnings("unchecked")
0113   @Test
0114   public void testMap() {
0115     List<List<String>> inputData = Arrays.asList(
0116         Arrays.asList("hello", "world"),
0117         Arrays.asList("goodnight", "moon"));
0118 
0119     List<List<Integer>> expected = Arrays.asList(
0120         Arrays.asList(5,5),
0121         Arrays.asList(9,4));
0122 
0123     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0124     JavaDStream<Integer> letterCount = stream.map(String::length);
0125     JavaTestUtils.attachTestOutputStream(letterCount);
0126     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0127 
0128     assertOrderInvariantEquals(expected, result);
0129   }
0130 
0131   @SuppressWarnings("unchecked")
0132   @Test
0133   public void testWindow() {
0134     List<List<Integer>> inputData = Arrays.asList(
0135         Arrays.asList(1,2,3),
0136         Arrays.asList(4,5,6),
0137         Arrays.asList(7,8,9));
0138 
0139     List<List<Integer>> expected = Arrays.asList(
0140         Arrays.asList(1,2,3),
0141         Arrays.asList(4,5,6,1,2,3),
0142         Arrays.asList(7,8,9,4,5,6),
0143         Arrays.asList(7,8,9));
0144 
0145     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0146     JavaDStream<Integer> windowed = stream.window(new Duration(2000));
0147     JavaTestUtils.attachTestOutputStream(windowed);
0148     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
0149 
0150     assertOrderInvariantEquals(expected, result);
0151   }
0152 
0153   @SuppressWarnings("unchecked")
0154   @Test
0155   public void testWindowWithSlideDuration() {
0156     List<List<Integer>> inputData = Arrays.asList(
0157         Arrays.asList(1,2,3),
0158         Arrays.asList(4,5,6),
0159         Arrays.asList(7,8,9),
0160         Arrays.asList(10,11,12),
0161         Arrays.asList(13,14,15),
0162         Arrays.asList(16,17,18));
0163 
0164     List<List<Integer>> expected = Arrays.asList(
0165         Arrays.asList(1,2,3,4,5,6),
0166         Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12),
0167         Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18),
0168         Arrays.asList(13,14,15,16,17,18));
0169 
0170     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0171     JavaDStream<Integer> windowed = stream.window(new Duration(4000), new Duration(2000));
0172     JavaTestUtils.attachTestOutputStream(windowed);
0173     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4);
0174 
0175     assertOrderInvariantEquals(expected, result);
0176   }
0177 
0178   @SuppressWarnings("unchecked")
0179   @Test
0180   public void testFilter() {
0181     List<List<String>> inputData = Arrays.asList(
0182         Arrays.asList("giants", "dodgers"),
0183         Arrays.asList("yankees", "red sox"));
0184 
0185     List<List<String>> expected = Arrays.asList(
0186         Arrays.asList("giants"),
0187         Arrays.asList("yankees"));
0188 
0189     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0190     JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
0191     JavaTestUtils.attachTestOutputStream(filtered);
0192     List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0193 
0194     assertOrderInvariantEquals(expected, result);
0195   }
0196 
0197   @SuppressWarnings("unchecked")
0198   @Test
0199   public void testRepartitionMorePartitions() {
0200     List<List<Integer>> inputData = Arrays.asList(
0201       Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
0202       Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
0203     JavaDStream<Integer> stream =
0204         JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
0205     JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
0206         stream.repartition(4);
0207     JavaTestUtils.attachTestOutputStream(repartitioned);
0208     List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
0209     Assert.assertEquals(2, result.size());
0210     for (List<List<Integer>> rdd : result) {
0211       Assert.assertEquals(4, rdd.size());
0212       Assert.assertEquals(
0213         10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size());
0214     }
0215   }
0216 
0217   @SuppressWarnings("unchecked")
0218   @Test
0219   public void testRepartitionFewerPartitions() {
0220     List<List<Integer>> inputData = Arrays.asList(
0221       Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
0222       Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
0223     JavaDStream<Integer> stream =
0224         JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
0225     JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
0226         stream.repartition(2);
0227     JavaTestUtils.attachTestOutputStream(repartitioned);
0228     List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
0229     Assert.assertEquals(2, result.size());
0230     for (List<List<Integer>> rdd : result) {
0231       Assert.assertEquals(2, rdd.size());
0232       Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size());
0233     }
0234   }
0235 
0236   @SuppressWarnings("unchecked")
0237   @Test
0238   public void testGlom() {
0239     List<List<String>> inputData = Arrays.asList(
0240         Arrays.asList("giants", "dodgers"),
0241         Arrays.asList("yankees", "red sox"));
0242 
0243     List<List<List<String>>> expected = Arrays.asList(
0244         Arrays.asList(Arrays.asList("giants", "dodgers")),
0245         Arrays.asList(Arrays.asList("yankees", "red sox")));
0246 
0247     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0248     JavaDStream<List<String>> glommed = stream.glom();
0249     JavaTestUtils.attachTestOutputStream(glommed);
0250     List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0251 
0252     Assert.assertEquals(expected, result);
0253   }
0254 
0255   @SuppressWarnings("unchecked")
0256   @Test
0257   public void testMapPartitions() {
0258     List<List<String>> inputData = Arrays.asList(
0259         Arrays.asList("giants", "dodgers"),
0260         Arrays.asList("yankees", "red sox"));
0261 
0262     List<List<String>> expected = Arrays.asList(
0263         Arrays.asList("GIANTSDODGERS"),
0264         Arrays.asList("YANKEESRED SOX"));
0265 
0266     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0267     JavaDStream<String> mapped = stream.mapPartitions(in -> {
0268         StringBuilder out = new StringBuilder();
0269         while (in.hasNext()) {
0270           out.append(in.next().toUpperCase(Locale.ROOT));
0271         }
0272         return Arrays.asList(out.toString()).iterator();
0273       });
0274     JavaTestUtils.attachTestOutputStream(mapped);
0275     List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0276 
0277     Assert.assertEquals(expected, result);
0278   }
0279 
0280   private static class IntegerSum implements Function2<Integer, Integer, Integer> {
0281     @Override
0282     public Integer call(Integer i1, Integer i2) {
0283       return i1 + i2;
0284     }
0285   }
0286 
0287   private static class IntegerDifference implements Function2<Integer, Integer, Integer> {
0288     @Override
0289     public Integer call(Integer i1, Integer i2) {
0290       return i1 - i2;
0291     }
0292   }
0293 
0294   @SuppressWarnings("unchecked")
0295   @Test
0296   public void testReduce() {
0297     List<List<Integer>> inputData = Arrays.asList(
0298         Arrays.asList(1,2,3),
0299         Arrays.asList(4,5,6),
0300         Arrays.asList(7,8,9));
0301 
0302     List<List<Integer>> expected = Arrays.asList(
0303         Arrays.asList(6),
0304         Arrays.asList(15),
0305         Arrays.asList(24));
0306 
0307     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0308     JavaDStream<Integer> reduced = stream.reduce(new IntegerSum());
0309     JavaTestUtils.attachTestOutputStream(reduced);
0310     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0311 
0312     Assert.assertEquals(expected, result);
0313   }
0314 
0315   @SuppressWarnings("unchecked")
0316   @Test
0317   public void testReduceByWindowWithInverse() {
0318     testReduceByWindow(true);
0319   }
0320 
0321   @SuppressWarnings("unchecked")
0322   @Test
0323   public void testReduceByWindowWithoutInverse() {
0324     testReduceByWindow(false);
0325   }
0326 
0327   @SuppressWarnings("unchecked")
0328   private void testReduceByWindow(boolean withInverse) {
0329     List<List<Integer>> inputData = Arrays.asList(
0330         Arrays.asList(1,2,3),
0331         Arrays.asList(4,5,6),
0332         Arrays.asList(7,8,9));
0333 
0334     List<List<Integer>> expected = Arrays.asList(
0335         Arrays.asList(6),
0336         Arrays.asList(21),
0337         Arrays.asList(39),
0338         Arrays.asList(24));
0339 
0340     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0341     JavaDStream<Integer> reducedWindowed;
0342     if (withInverse) {
0343       reducedWindowed = stream.reduceByWindow(new IntegerSum(),
0344                                               new IntegerDifference(),
0345                                               new Duration(2000),
0346                                               new Duration(1000));
0347     } else {
0348       reducedWindowed = stream.reduceByWindow(new IntegerSum(),
0349                                               new Duration(2000), new Duration(1000));
0350     }
0351     JavaTestUtils.attachTestOutputStream(reducedWindowed);
0352     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
0353 
0354     Assert.assertEquals(expected, result);
0355   }
0356 
0357   @SuppressWarnings("unchecked")
0358   @Test
0359   public void testQueueStream() {
0360     ssc.stop();
0361     // Create a new JavaStreamingContext without checkpointing
0362     SparkConf conf = new SparkConf()
0363         .setMaster("local[2]")
0364         .setAppName("test")
0365         .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
0366     ssc = new JavaStreamingContext(conf, new Duration(1000));
0367 
0368     List<List<Integer>> expected = Arrays.asList(
0369         Arrays.asList(1,2,3),
0370         Arrays.asList(4,5,6),
0371         Arrays.asList(7,8,9));
0372 
0373     JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
0374     JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3));
0375     JavaRDD<Integer> rdd2 = jsc.parallelize(Arrays.asList(4, 5, 6));
0376     JavaRDD<Integer> rdd3 = jsc.parallelize(Arrays.asList(7,8,9));
0377 
0378     Queue<JavaRDD<Integer>> rdds = new LinkedList<>();
0379     rdds.add(rdd1);
0380     rdds.add(rdd2);
0381     rdds.add(rdd3);
0382 
0383     JavaDStream<Integer> stream = ssc.queueStream(rdds);
0384     JavaTestUtils.attachTestOutputStream(stream);
0385     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0386     Assert.assertEquals(expected, result);
0387   }
0388 
0389   @SuppressWarnings("unchecked")
0390   @Test
0391   public void testTransform() {
0392     List<List<Integer>> inputData = Arrays.asList(
0393         Arrays.asList(1,2,3),
0394         Arrays.asList(4,5,6),
0395         Arrays.asList(7,8,9));
0396 
0397     List<List<Integer>> expected = Arrays.asList(
0398         Arrays.asList(3,4,5),
0399         Arrays.asList(6,7,8),
0400         Arrays.asList(9,10,11));
0401 
0402     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0403     JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2));
0404 
0405     JavaTestUtils.attachTestOutputStream(transformed);
0406     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0407 
0408     assertOrderInvariantEquals(expected, result);
0409   }
0410 
0411   @SuppressWarnings("unchecked")
0412   @Test
0413   public void testVariousTransform() {
0414     // tests whether all variations of transform can be called from Java
0415 
0416     List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
0417     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0418 
0419     List<List<Tuple2<String, Integer>>> pairInputData =
0420         Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
0421     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
0422         JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
0423 
0424     stream.transform(in -> null);
0425 
0426     stream.transform((in, time) -> null);
0427 
0428     stream.transformToPair(in -> null);
0429 
0430     stream.transformToPair((in, time) -> null);
0431 
0432     pairStream.transform(in -> null);
0433 
0434     pairStream.transform((in, time) -> null);
0435 
0436     pairStream.transformToPair(in -> null);
0437 
0438     pairStream.transformToPair((in, time) -> null);
0439 
0440   }
0441 
0442   @SuppressWarnings("unchecked")
0443   @Test
0444   public void testTransformWith() {
0445     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
0446         Arrays.asList(
0447             new Tuple2<>("california", "dodgers"),
0448             new Tuple2<>("new york", "yankees")),
0449         Arrays.asList(
0450             new Tuple2<>("california", "sharks"),
0451             new Tuple2<>("new york", "rangers")));
0452 
0453     List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
0454         Arrays.asList(
0455             new Tuple2<>("california", "giants"),
0456             new Tuple2<>("new york", "mets")),
0457         Arrays.asList(
0458             new Tuple2<>("california", "ducks"),
0459             new Tuple2<>("new york", "islanders")));
0460 
0461 
0462     List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
0463         Sets.newHashSet(
0464             new Tuple2<>("california",
0465                          new Tuple2<>("dodgers", "giants")),
0466             new Tuple2<>("new york",
0467                          new Tuple2<>("yankees", "mets"))),
0468         Sets.newHashSet(
0469             new Tuple2<>("california",
0470                          new Tuple2<>("sharks", "ducks")),
0471             new Tuple2<>("new york",
0472                          new Tuple2<>("rangers", "islanders"))));
0473 
0474     JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
0475         ssc, stringStringKVStream1, 1);
0476     JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
0477 
0478     JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
0479         ssc, stringStringKVStream2, 1);
0480     JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
0481 
0482     JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWithToPair(
0483         pairStream2,
0484         (rdd1, rdd2, time) -> rdd1.join(rdd2)
0485     );
0486 
0487     JavaTestUtils.attachTestOutputStream(joined);
0488     List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0489     List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new ArrayList<>();
0490     for (List<Tuple2<String, Tuple2<String, String>>> res: result) {
0491       unorderedResult.add(Sets.newHashSet(res));
0492     }
0493 
0494     Assert.assertEquals(expected, unorderedResult);
0495   }
0496 
0497 
0498   @SuppressWarnings("unchecked")
0499   @Test
0500   public void testVariousTransformWith() {
0501     // tests whether all variations of transformWith can be called from Java
0502 
0503     List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
0504     List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
0505     JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
0506     JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
0507 
0508     List<List<Tuple2<String, Integer>>> pairInputData1 =
0509         Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
0510     List<List<Tuple2<Double, Character>>> pairInputData2 =
0511         Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
0512     JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
0513         JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
0514     JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
0515         JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
0516 
0517     stream1.transformWith(stream2, (rdd1, rdd2, time) -> null);
0518 
0519     stream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null);
0520 
0521     stream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null);
0522 
0523     stream1.transformWithToPair(pairStream1, (rdd1, rdd2, time) -> null);
0524 
0525     pairStream1.transformWith(stream2, (rdd1, rdd2, time) -> null);
0526 
0527     pairStream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null);
0528 
0529     pairStream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null);
0530 
0531     pairStream1.transformWithToPair(pairStream2, (rdd1, rdd2, time) -> null);
0532   }
0533 
0534   @SuppressWarnings("unchecked")
0535   @Test
0536   public void testStreamingContextTransform(){
0537     List<List<Integer>> stream1input = Arrays.asList(
0538         Arrays.asList(1),
0539         Arrays.asList(2)
0540     );
0541 
0542     List<List<Integer>> stream2input = Arrays.asList(
0543         Arrays.asList(3),
0544         Arrays.asList(4)
0545     );
0546 
0547     List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
0548         Arrays.asList(new Tuple2<>(1, "x")),
0549         Arrays.asList(new Tuple2<>(2, "y"))
0550     );
0551 
0552     List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
0553         Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
0554         Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
0555     );
0556 
0557     JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
0558     JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
0559     JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
0560         JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
0561 
0562     List<JavaDStream<?>> listOfDStreams1 = Arrays.asList(stream1, stream2);
0563 
0564     // This is just to test whether this transform to JavaStream compiles
0565     ssc.transform(
0566       listOfDStreams1,
0567       (listOfRDDs, time) -> {
0568         Assert.assertEquals(2, listOfRDDs.size());
0569         return null;
0570       }
0571     );
0572 
0573     List<JavaDStream<?>> listOfDStreams2 =
0574         Arrays.asList(stream1, stream2, pairStream1.toJavaDStream());
0575 
0576     JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
0577       listOfDStreams2,
0578       (listOfRDDs, time) -> {
0579         Assert.assertEquals(3, listOfRDDs.size());
0580         JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
0581         JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1);
0582         JavaRDD<Tuple2<Integer, String>> rdd3 =
0583           (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2);
0584         JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
0585         PairFunction<Integer, Integer, Integer> mapToTuple =
0586             (PairFunction<Integer, Integer, Integer>) i -> new Tuple2<>(i, i);
0587         return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
0588       }
0589     );
0590     JavaTestUtils.attachTestOutputStream(transformed2);
0591     List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
0592       JavaTestUtils.runStreams(ssc, 2, 2);
0593     Assert.assertEquals(expected, result);
0594   }
0595 
0596   @SuppressWarnings("unchecked")
0597   @Test
0598   public void testFlatMap() {
0599     List<List<String>> inputData = Arrays.asList(
0600         Arrays.asList("go", "giants"),
0601         Arrays.asList("boo", "dodgers"),
0602         Arrays.asList("athletics"));
0603 
0604     List<List<String>> expected = Arrays.asList(
0605         Arrays.asList("g","o","g","i","a","n","t","s"),
0606         Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"),
0607         Arrays.asList("a","t","h","l","e","t","i","c","s"));
0608 
0609     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0610     JavaDStream<String> flatMapped =
0611       stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator());
0612     JavaTestUtils.attachTestOutputStream(flatMapped);
0613     List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0614 
0615     assertOrderInvariantEquals(expected, result);
0616   }
0617 
0618   @SuppressWarnings("unchecked")
0619   @Test
0620   public void testForeachRDD() {
0621     final LongAccumulator accumRdd = ssc.sparkContext().sc().longAccumulator();
0622     final LongAccumulator accumEle = ssc.sparkContext().sc().longAccumulator();
0623     List<List<Integer>> inputData = Arrays.asList(
0624         Arrays.asList(1,1,1),
0625         Arrays.asList(1,1,1));
0626 
0627     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0628     JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
0629 
0630     stream.foreachRDD(rdd -> {
0631       accumRdd.add(1);
0632       rdd.foreach(i -> accumEle.add(1));
0633     });
0634 
0635     // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
0636     stream.foreachRDD((rdd, time) -> {});
0637 
0638     JavaTestUtils.runStreams(ssc, 2, 2);
0639 
0640     Assert.assertEquals(2, accumRdd.value().intValue());
0641     Assert.assertEquals(6, accumEle.value().intValue());
0642   }
0643 
0644   @SuppressWarnings("unchecked")
0645   @Test
0646   public void testPairFlatMap() {
0647     List<List<String>> inputData = Arrays.asList(
0648         Arrays.asList("giants"),
0649         Arrays.asList("dodgers"),
0650         Arrays.asList("athletics"));
0651 
0652     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0653         Arrays.asList(
0654             new Tuple2<>(6, "g"),
0655             new Tuple2<>(6, "i"),
0656             new Tuple2<>(6, "a"),
0657             new Tuple2<>(6, "n"),
0658             new Tuple2<>(6, "t"),
0659             new Tuple2<>(6, "s")),
0660         Arrays.asList(
0661             new Tuple2<>(7, "d"),
0662             new Tuple2<>(7, "o"),
0663             new Tuple2<>(7, "d"),
0664             new Tuple2<>(7, "g"),
0665             new Tuple2<>(7, "e"),
0666             new Tuple2<>(7, "r"),
0667             new Tuple2<>(7, "s")),
0668         Arrays.asList(
0669             new Tuple2<>(9, "a"),
0670             new Tuple2<>(9, "t"),
0671             new Tuple2<>(9, "h"),
0672             new Tuple2<>(9, "l"),
0673             new Tuple2<>(9, "e"),
0674             new Tuple2<>(9, "t"),
0675             new Tuple2<>(9, "i"),
0676             new Tuple2<>(9, "c"),
0677             new Tuple2<>(9, "s")));
0678 
0679     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0680     JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(in -> {
0681         List<Tuple2<Integer, String>> out = new ArrayList<>();
0682         for (String letter : in.split("(?!^)")) {
0683           out.add(new Tuple2<>(in.length(), letter));
0684         }
0685         return out.iterator();
0686       });
0687     JavaTestUtils.attachTestOutputStream(flatMapped);
0688     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0689 
0690     Assert.assertEquals(expected, result);
0691   }
0692 
0693   @SuppressWarnings("unchecked")
0694   @Test
0695   public void testUnion() {
0696     List<List<Integer>> inputData1 = Arrays.asList(
0697         Arrays.asList(1,1),
0698         Arrays.asList(2,2),
0699         Arrays.asList(3,3));
0700 
0701     List<List<Integer>> inputData2 = Arrays.asList(
0702         Arrays.asList(4,4),
0703         Arrays.asList(5,5),
0704         Arrays.asList(6,6));
0705 
0706     List<List<Integer>> expected = Arrays.asList(
0707         Arrays.asList(1,1,4,4),
0708         Arrays.asList(2,2,5,5),
0709         Arrays.asList(3,3,6,6));
0710 
0711     JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
0712     JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
0713 
0714     JavaDStream<Integer> unioned = stream1.union(stream2);
0715     JavaTestUtils.attachTestOutputStream(unioned);
0716     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0717 
0718     assertOrderInvariantEquals(expected, result);
0719   }
0720 
0721   /*
0722    * Performs an order-invariant comparison of lists representing two RDD streams. This allows
0723    * us to account for ordering variation within individual RDD's which occurs during windowing.
0724    */
0725   public static <T> void assertOrderInvariantEquals(
0726       List<List<T>> expected, List<List<T>> actual) {
0727     List<Set<T>> expectedSets = new ArrayList<>();
0728     for (List<T> list: expected) {
0729       expectedSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
0730     }
0731     List<Set<T>> actualSets = new ArrayList<>();
0732     for (List<T> list: actual) {
0733       actualSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
0734     }
0735     Assert.assertEquals(expectedSets, actualSets);
0736   }
0737 
0738 
0739   // PairDStream Functions
0740   @SuppressWarnings("unchecked")
0741   @Test
0742   public void testPairFilter() {
0743     List<List<String>> inputData = Arrays.asList(
0744         Arrays.asList("giants", "dodgers"),
0745         Arrays.asList("yankees", "red sox"));
0746 
0747     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0748         Arrays.asList(new Tuple2<>("giants", 6)),
0749         Arrays.asList(new Tuple2<>("yankees", 7)));
0750 
0751     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0752     JavaPairDStream<String, Integer> pairStream =
0753         stream.mapToPair(in -> new Tuple2<>(in, in.length()));
0754 
0755     JavaPairDStream<String, Integer> filtered = pairStream.filter(in -> in._1().contains("a"));
0756     JavaTestUtils.attachTestOutputStream(filtered);
0757     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0758 
0759     Assert.assertEquals(expected, result);
0760   }
0761 
0762   @SuppressWarnings("unchecked")
0763   private final List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
0764       Arrays.asList(new Tuple2<>("california", "dodgers"),
0765                     new Tuple2<>("california", "giants"),
0766                     new Tuple2<>("new york", "yankees"),
0767                     new Tuple2<>("new york", "mets")),
0768       Arrays.asList(new Tuple2<>("california", "sharks"),
0769                     new Tuple2<>("california", "ducks"),
0770                     new Tuple2<>("new york", "rangers"),
0771                     new Tuple2<>("new york", "islanders")));
0772 
0773   @SuppressWarnings("unchecked")
0774   private final List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
0775       Arrays.asList(
0776           new Tuple2<>("california", 1),
0777           new Tuple2<>("california", 3),
0778           new Tuple2<>("new york", 4),
0779           new Tuple2<>("new york", 1)),
0780       Arrays.asList(
0781           new Tuple2<>("california", 5),
0782           new Tuple2<>("california", 5),
0783           new Tuple2<>("new york", 3),
0784           new Tuple2<>("new york", 1)));
0785 
0786   @SuppressWarnings("unchecked")
0787   @Test
0788   public void testPairMap() { // Maps pair -> pair of different type
0789     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0790 
0791     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0792         Arrays.asList(
0793             new Tuple2<>(1, "california"),
0794             new Tuple2<>(3, "california"),
0795             new Tuple2<>(4, "new york"),
0796             new Tuple2<>(1, "new york")),
0797         Arrays.asList(
0798             new Tuple2<>(5, "california"),
0799             new Tuple2<>(5, "california"),
0800             new Tuple2<>(3, "new york"),
0801             new Tuple2<>(1, "new york")));
0802 
0803     JavaDStream<Tuple2<String, Integer>> stream =
0804       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0805     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0806     JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(Tuple2::swap);
0807 
0808     JavaTestUtils.attachTestOutputStream(reversed);
0809     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0810 
0811     Assert.assertEquals(expected, result);
0812   }
0813 
0814   @SuppressWarnings("unchecked")
0815   @Test
0816   public void testPairMapPartitions() { // Maps pair -> pair of different type
0817     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0818 
0819     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0820         Arrays.asList(
0821             new Tuple2<>(1, "california"),
0822             new Tuple2<>(3, "california"),
0823             new Tuple2<>(4, "new york"),
0824             new Tuple2<>(1, "new york")),
0825         Arrays.asList(
0826             new Tuple2<>(5, "california"),
0827             new Tuple2<>(5, "california"),
0828             new Tuple2<>(3, "new york"),
0829             new Tuple2<>(1, "new york")));
0830 
0831     JavaDStream<Tuple2<String, Integer>> stream =
0832       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0833     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0834     JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
0835         List<Tuple2<Integer, String>> out = new LinkedList<>();
0836         while (in.hasNext()) {
0837           Tuple2<String, Integer> next = in.next();
0838           out.add(next.swap());
0839         }
0840         return out.iterator();
0841       });
0842 
0843     JavaTestUtils.attachTestOutputStream(reversed);
0844     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0845 
0846     Assert.assertEquals(expected, result);
0847   }
0848 
0849   @SuppressWarnings("unchecked")
0850   @Test
0851   public void testPairMap2() { // Maps pair -> single
0852     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0853 
0854     List<List<Integer>> expected = Arrays.asList(
0855             Arrays.asList(1, 3, 4, 1),
0856             Arrays.asList(5, 5, 3, 1));
0857 
0858     JavaDStream<Tuple2<String, Integer>> stream =
0859       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0860     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0861     JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
0862 
0863     JavaTestUtils.attachTestOutputStream(reversed);
0864     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0865 
0866     Assert.assertEquals(expected, result);
0867   }
0868 
0869   @SuppressWarnings("unchecked")
0870   @Test
0871   public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
0872     List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
0873         Arrays.asList(
0874             new Tuple2<>("hi", 1),
0875             new Tuple2<>("ho", 2)),
0876         Arrays.asList(
0877             new Tuple2<>("hi", 1),
0878             new Tuple2<>("ho", 2)));
0879 
0880     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0881         Arrays.asList(
0882             new Tuple2<>(1, "h"),
0883             new Tuple2<>(1, "i"),
0884             new Tuple2<>(2, "h"),
0885             new Tuple2<>(2, "o")),
0886         Arrays.asList(
0887             new Tuple2<>(1, "h"),
0888             new Tuple2<>(1, "i"),
0889             new Tuple2<>(2, "h"),
0890             new Tuple2<>(2, "o")));
0891 
0892     JavaDStream<Tuple2<String, Integer>> stream =
0893         JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0894     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0895     JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
0896         List<Tuple2<Integer, String>> out = new LinkedList<>();
0897         for (Character s : in._1().toCharArray()) {
0898           out.add(new Tuple2<>(in._2(), s.toString()));
0899         }
0900         return out.iterator();
0901       });
0902     JavaTestUtils.attachTestOutputStream(flatMapped);
0903     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0904 
0905     Assert.assertEquals(expected, result);
0906   }
0907 
0908   @SuppressWarnings("unchecked")
0909   @Test
0910   public void testPairGroupByKey() {
0911     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
0912 
0913     List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
0914         Arrays.asList(
0915             new Tuple2<>("california", Arrays.asList("dodgers", "giants")),
0916             new Tuple2<>("new york", Arrays.asList("yankees", "mets"))),
0917         Arrays.asList(
0918             new Tuple2<>("california", Arrays.asList("sharks", "ducks")),
0919             new Tuple2<>("new york", Arrays.asList("rangers", "islanders"))));
0920 
0921     JavaDStream<Tuple2<String, String>> stream =
0922       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0923     JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
0924 
0925     JavaPairDStream<String, Iterable<String>> grouped = pairStream.groupByKey();
0926     JavaTestUtils.attachTestOutputStream(grouped);
0927     List<List<Tuple2<String, Iterable<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0928 
0929     Assert.assertEquals(expected.size(), result.size());
0930     Iterator<List<Tuple2<String, Iterable<String>>>> resultItr = result.iterator();
0931     Iterator<List<Tuple2<String, List<String>>>> expectedItr = expected.iterator();
0932     while (resultItr.hasNext() && expectedItr.hasNext()) {
0933       Iterator<Tuple2<String, Iterable<String>>> resultElements = resultItr.next().iterator();
0934       Iterator<Tuple2<String, List<String>>> expectedElements = expectedItr.next().iterator();
0935       while (resultElements.hasNext() && expectedElements.hasNext()) {
0936         Tuple2<String, Iterable<String>> resultElement = resultElements.next();
0937         Tuple2<String, List<String>> expectedElement = expectedElements.next();
0938         Assert.assertEquals(expectedElement._1(), resultElement._1());
0939         equalIterable(expectedElement._2(), resultElement._2());
0940       }
0941       Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext());
0942     }
0943   }
0944 
0945   @SuppressWarnings("unchecked")
0946   @Test
0947   public void testPairReduceByKey() {
0948     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0949 
0950     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0951         Arrays.asList(
0952             new Tuple2<>("california", 4),
0953             new Tuple2<>("new york", 5)),
0954         Arrays.asList(
0955             new Tuple2<>("california", 10),
0956             new Tuple2<>("new york", 4)));
0957 
0958     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
0959         ssc, inputData, 1);
0960     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0961 
0962     JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new IntegerSum());
0963 
0964     JavaTestUtils.attachTestOutputStream(reduced);
0965     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0966 
0967     Assert.assertEquals(expected, result);
0968   }
0969 
0970   @SuppressWarnings("unchecked")
0971   @Test
0972   public void testCombineByKey() {
0973     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0974 
0975     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0976         Arrays.asList(
0977             new Tuple2<>("california", 4),
0978             new Tuple2<>("new york", 5)),
0979         Arrays.asList(
0980             new Tuple2<>("california", 10),
0981             new Tuple2<>("new york", 4)));
0982 
0983     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
0984         ssc, inputData, 1);
0985     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0986 
0987     JavaPairDStream<String, Integer> combined = pairStream.combineByKey(
0988         i -> i, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
0989 
0990     JavaTestUtils.attachTestOutputStream(combined);
0991     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0992 
0993     Assert.assertEquals(expected, result);
0994   }
0995 
0996   @SuppressWarnings("unchecked")
0997   @Test
0998   public void testCountByValue() {
0999     List<List<String>> inputData = Arrays.asList(
1000       Arrays.asList("hello", "world"),
1001       Arrays.asList("hello", "moon"),
1002       Arrays.asList("hello"));
1003 
1004     List<List<Tuple2<String, Long>>> expected = Arrays.asList(
1005         Arrays.asList(
1006             new Tuple2<>("hello", 1L),
1007             new Tuple2<>("world", 1L)),
1008         Arrays.asList(
1009             new Tuple2<>("hello", 1L),
1010             new Tuple2<>("moon", 1L)),
1011         Arrays.asList(
1012             new Tuple2<>("hello", 1L)));
1013 
1014     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
1015     JavaPairDStream<String, Long> counted = stream.countByValue();
1016     JavaTestUtils.attachTestOutputStream(counted);
1017     List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1018 
1019     Assert.assertEquals(expected, result);
1020   }
1021 
1022   @SuppressWarnings("unchecked")
1023   @Test
1024   public void testGroupByKeyAndWindow() {
1025     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
1026 
1027     List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList(
1028       Arrays.asList(
1029         new Tuple2<>("california", Arrays.asList(1, 3)),
1030         new Tuple2<>("new york", Arrays.asList(1, 4))
1031       ),
1032       Arrays.asList(
1033         new Tuple2<>("california", Arrays.asList(1, 3, 5, 5)),
1034         new Tuple2<>("new york", Arrays.asList(1, 1, 3, 4))
1035       ),
1036       Arrays.asList(
1037         new Tuple2<>("california", Arrays.asList(5, 5)),
1038         new Tuple2<>("new york", Arrays.asList(1, 3))
1039       )
1040     );
1041 
1042     JavaDStream<Tuple2<String, Integer>> stream =
1043       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
1044     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1045 
1046     JavaPairDStream<String, Iterable<Integer>> groupWindowed =
1047         pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
1048     JavaTestUtils.attachTestOutputStream(groupWindowed);
1049     List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1050 
1051     Assert.assertEquals(expected.size(), result.size());
1052     for (int i = 0; i < result.size(); i++) {
1053       Assert.assertEquals(convert(expected.get(i)), convert(result.get(i)));
1054     }
1055   }
1056 
1057   private static Set<Tuple2<String, HashSet<Integer>>>
1058     convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
1059     List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<>();
1060     for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
1061       newListOfTuples.add(convert(tuple));
1062     }
1063     return new HashSet<>(newListOfTuples);
1064   }
1065 
1066   private static Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
1067     return new Tuple2<>(tuple._1(), new HashSet<>(tuple._2()));
1068   }
1069 
1070   @SuppressWarnings("unchecked")
1071   @Test
1072   public void testReduceByKeyAndWindow() {
1073     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
1074 
1075     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
1076         Arrays.asList(new Tuple2<>("california", 4),
1077                       new Tuple2<>("new york", 5)),
1078         Arrays.asList(new Tuple2<>("california", 14),
1079                       new Tuple2<>("new york", 9)),
1080         Arrays.asList(new Tuple2<>("california", 10),
1081                       new Tuple2<>("new york", 4)));
1082 
1083     JavaDStream<Tuple2<String, Integer>> stream =
1084       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
1085     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1086 
1087     JavaPairDStream<String, Integer> reduceWindowed =
1088         pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000));
1089     JavaTestUtils.attachTestOutputStream(reduceWindowed);
1090     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1091 
1092     Assert.assertEquals(expected, result);
1093   }
1094 
1095   @SuppressWarnings("unchecked")
1096   @Test
1097   public void testUpdateStateByKey() {
1098     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
1099 
1100     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
1101         Arrays.asList(new Tuple2<>("california", 4),
1102                       new Tuple2<>("new york", 5)),
1103         Arrays.asList(new Tuple2<>("california", 14),
1104                       new Tuple2<>("new york", 9)),
1105         Arrays.asList(new Tuple2<>("california", 14),
1106                       new Tuple2<>("new york", 9)));
1107 
1108     JavaDStream<Tuple2<String, Integer>> stream =
1109       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
1110     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1111 
1112     JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
1113         int out = 0;
1114         if (state.isPresent()) {
1115           out += state.get();
1116         }
1117         for (Integer v : values) {
1118           out += v;
1119         }
1120         return Optional.of(out);
1121       });
1122     JavaTestUtils.attachTestOutputStream(updated);
1123     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1124 
1125     Assert.assertEquals(expected, result);
1126   }
1127 
1128   @SuppressWarnings("unchecked")
1129   @Test
1130   public void testUpdateStateByKeyWithInitial() {
1131     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
1132 
1133     List<Tuple2<String, Integer>> initial = Arrays.asList(
1134         new Tuple2<>("california", 1),
1135             new Tuple2<>("new york", 2));
1136 
1137     JavaRDD<Tuple2<String, Integer>> tmpRDD = ssc.sparkContext().parallelize(initial);
1138     JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD(tmpRDD);
1139 
1140     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
1141         Arrays.asList(new Tuple2<>("california", 5),
1142                       new Tuple2<>("new york", 7)),
1143         Arrays.asList(new Tuple2<>("california", 15),
1144                       new Tuple2<>("new york", 11)),
1145         Arrays.asList(new Tuple2<>("california", 15),
1146                       new Tuple2<>("new york", 11)));
1147 
1148     JavaDStream<Tuple2<String, Integer>> stream =
1149       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
1150     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1151 
1152     JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
1153         int out = 0;
1154         if (state.isPresent()) {
1155           out += state.get();
1156         }
1157         for (Integer v : values) {
1158           out += v;
1159         }
1160         return Optional.of(out);
1161       }, new HashPartitioner(1), initialRDD);
1162     JavaTestUtils.attachTestOutputStream(updated);
1163     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1164 
1165     assertOrderInvariantEquals(expected, result);
1166   }
1167 
1168   @SuppressWarnings("unchecked")
1169   @Test
1170   public void testReduceByKeyAndWindowWithInverse() {
1171     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
1172 
1173     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
1174         Arrays.asList(new Tuple2<>("california", 4),
1175                       new Tuple2<>("new york", 5)),
1176         Arrays.asList(new Tuple2<>("california", 14),
1177                       new Tuple2<>("new york", 9)),
1178         Arrays.asList(new Tuple2<>("california", 10),
1179                       new Tuple2<>("new york", 4)));
1180 
1181     JavaDStream<Tuple2<String, Integer>> stream =
1182       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
1183     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1184 
1185     JavaPairDStream<String, Integer> reduceWindowed =
1186         pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
1187                                         new Duration(2000), new Duration(1000));
1188     JavaTestUtils.attachTestOutputStream(reduceWindowed);
1189     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1190 
1191     Assert.assertEquals(expected, result);
1192   }
1193 
1194   @SuppressWarnings("unchecked")
1195   @Test
1196   public void testCountByValueAndWindow() {
1197     List<List<String>> inputData = Arrays.asList(
1198         Arrays.asList("hello", "world"),
1199         Arrays.asList("hello", "moon"),
1200         Arrays.asList("hello"));
1201 
1202     List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList(
1203         Sets.newHashSet(
1204             new Tuple2<>("hello", 1L),
1205             new Tuple2<>("world", 1L)),
1206         Sets.newHashSet(
1207             new Tuple2<>("hello", 2L),
1208             new Tuple2<>("world", 1L),
1209             new Tuple2<>("moon", 1L)),
1210         Sets.newHashSet(
1211             new Tuple2<>("hello", 2L),
1212             new Tuple2<>("moon", 1L)));
1213 
1214     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(
1215         ssc, inputData, 1);
1216     JavaPairDStream<String, Long> counted =
1217       stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
1218     JavaTestUtils.attachTestOutputStream(counted);
1219     List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1220     List<Set<Tuple2<String, Long>>> unorderedResult = new ArrayList<>();
1221     for (List<Tuple2<String, Long>> res: result) {
1222       unorderedResult.add(Sets.newHashSet(res));
1223     }
1224 
1225     Assert.assertEquals(expected, unorderedResult);
1226   }
1227 
1228   @SuppressWarnings("unchecked")
1229   @Test
1230   public void testPairTransform() {
1231     List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
1232         Arrays.asList(
1233             new Tuple2<>(3, 5),
1234             new Tuple2<>(1, 5),
1235             new Tuple2<>(4, 5),
1236             new Tuple2<>(2, 5)),
1237         Arrays.asList(
1238             new Tuple2<>(2, 5),
1239             new Tuple2<>(3, 5),
1240             new Tuple2<>(4, 5),
1241             new Tuple2<>(1, 5)));
1242 
1243     List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
1244         Arrays.asList(
1245             new Tuple2<>(1, 5),
1246             new Tuple2<>(2, 5),
1247             new Tuple2<>(3, 5),
1248             new Tuple2<>(4, 5)),
1249         Arrays.asList(
1250             new Tuple2<>(1, 5),
1251             new Tuple2<>(2, 5),
1252             new Tuple2<>(3, 5),
1253             new Tuple2<>(4, 5)));
1254 
1255     JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
1256         ssc, inputData, 1);
1257     JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1258 
1259     JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey());
1260 
1261     JavaTestUtils.attachTestOutputStream(sorted);
1262     List<List<Tuple2<Integer, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
1263 
1264     Assert.assertEquals(expected, result);
1265   }
1266 
1267   @SuppressWarnings("unchecked")
1268   @Test
1269   public void testPairToNormalRDDTransform() {
1270     List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
1271         Arrays.asList(
1272             new Tuple2<>(3, 5),
1273             new Tuple2<>(1, 5),
1274             new Tuple2<>(4, 5),
1275             new Tuple2<>(2, 5)),
1276         Arrays.asList(
1277             new Tuple2<>(2, 5),
1278             new Tuple2<>(3, 5),
1279             new Tuple2<>(4, 5),
1280             new Tuple2<>(1, 5)));
1281 
1282     List<List<Integer>> expected = Arrays.asList(
1283         Arrays.asList(3,1,4,2),
1284         Arrays.asList(2,3,4,1));
1285 
1286     JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
1287         ssc, inputData, 1);
1288     JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1289 
1290     JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(in2 -> in2._1()));
1291 
1292     JavaTestUtils.attachTestOutputStream(firstParts);
1293     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
1294 
1295     Assert.assertEquals(expected, result);
1296   }
1297 
1298   @SuppressWarnings("unchecked")
1299   @Test
1300   public void testMapValues() {
1301     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
1302 
1303     List<List<Tuple2<String, String>>> expected = Arrays.asList(
1304         Arrays.asList(new Tuple2<>("california", "DODGERS"),
1305                       new Tuple2<>("california", "GIANTS"),
1306                       new Tuple2<>("new york", "YANKEES"),
1307                       new Tuple2<>("new york", "METS")),
1308         Arrays.asList(new Tuple2<>("california", "SHARKS"),
1309                       new Tuple2<>("california", "DUCKS"),
1310                       new Tuple2<>("new york", "RANGERS"),
1311                       new Tuple2<>("new york", "ISLANDERS")));
1312 
1313     JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
1314         ssc, inputData, 1);
1315     JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
1316 
1317     JavaPairDStream<String, String> mapped =
1318       pairStream.mapValues(s -> s.toUpperCase(Locale.ROOT));
1319 
1320     JavaTestUtils.attachTestOutputStream(mapped);
1321     List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
1322 
1323     Assert.assertEquals(expected, result);
1324   }
1325 
1326   @SuppressWarnings("unchecked")
1327   @Test
1328   public void testFlatMapValues() {
1329     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
1330 
1331     List<List<Tuple2<String, String>>> expected = Arrays.asList(
1332         Arrays.asList(new Tuple2<>("california", "dodgers1"),
1333                       new Tuple2<>("california", "dodgers2"),
1334                       new Tuple2<>("california", "giants1"),
1335                       new Tuple2<>("california", "giants2"),
1336                       new Tuple2<>("new york", "yankees1"),
1337                       new Tuple2<>("new york", "yankees2"),
1338                       new Tuple2<>("new york", "mets1"),
1339                       new Tuple2<>("new york", "mets2")),
1340         Arrays.asList(new Tuple2<>("california", "sharks1"),
1341                       new Tuple2<>("california", "sharks2"),
1342                       new Tuple2<>("california", "ducks1"),
1343                       new Tuple2<>("california", "ducks2"),
1344                       new Tuple2<>("new york", "rangers1"),
1345                       new Tuple2<>("new york", "rangers2"),
1346                       new Tuple2<>("new york", "islanders1"),
1347                       new Tuple2<>("new york", "islanders2")));
1348 
1349     JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
1350         ssc, inputData, 1);
1351     JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
1352 
1353 
1354     JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> {
1355         List<String> out = new ArrayList<>();
1356         out.add(in + "1");
1357         out.add(in + "2");
1358         return out.iterator();
1359       });
1360 
1361     JavaTestUtils.attachTestOutputStream(flatMapped);
1362     List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
1363 
1364     Assert.assertEquals(expected, result);
1365   }
1366 
1367   @SuppressWarnings("unchecked")
1368   @Test
1369   public void testCoGroup() {
1370     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
1371         Arrays.asList(new Tuple2<>("california", "dodgers"),
1372                       new Tuple2<>("new york", "yankees")),
1373         Arrays.asList(new Tuple2<>("california", "sharks"),
1374                       new Tuple2<>("new york", "rangers")));
1375 
1376     List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
1377         Arrays.asList(new Tuple2<>("california", "giants"),
1378                       new Tuple2<>("new york", "mets")),
1379         Arrays.asList(new Tuple2<>("california", "ducks"),
1380                       new Tuple2<>("new york", "islanders")));
1381 
1382 
1383     List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList(
1384         Arrays.asList(
1385             new Tuple2<>("california",
1386                          new Tuple2<>(Arrays.asList("dodgers"), Arrays.asList("giants"))),
1387             new Tuple2<>("new york",
1388                          new Tuple2<>(Arrays.asList("yankees"), Arrays.asList("mets")))),
1389         Arrays.asList(
1390             new Tuple2<>("california",
1391                          new Tuple2<>(Arrays.asList("sharks"), Arrays.asList("ducks"))),
1392             new Tuple2<>("new york",
1393                          new Tuple2<>(Arrays.asList("rangers"), Arrays.asList("islanders")))));
1394 
1395 
1396     JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
1397         ssc, stringStringKVStream1, 1);
1398     JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
1399 
1400     JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
1401         ssc, stringStringKVStream2, 1);
1402     JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
1403 
1404     JavaPairDStream<String, Tuple2<Iterable<String>, Iterable<String>>> grouped =
1405         pairStream1.cogroup(pairStream2);
1406     JavaTestUtils.attachTestOutputStream(grouped);
1407     List<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> result =
1408         JavaTestUtils.runStreams(ssc, 2, 2);
1409 
1410     Assert.assertEquals(expected.size(), result.size());
1411     Iterator<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> resultItr =
1412         result.iterator();
1413     Iterator<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expectedItr =
1414         expected.iterator();
1415     while (resultItr.hasNext() && expectedItr.hasNext()) {
1416       Iterator<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>> resultElements =
1417           resultItr.next().iterator();
1418       Iterator<Tuple2<String, Tuple2<List<String>, List<String>>>> expectedElements =
1419           expectedItr.next().iterator();
1420       while (resultElements.hasNext() && expectedElements.hasNext()) {
1421         Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> resultElement =
1422             resultElements.next();
1423         Tuple2<String, Tuple2<List<String>, List<String>>> expectedElement =
1424             expectedElements.next();
1425         Assert.assertEquals(expectedElement._1(), resultElement._1());
1426         equalIterable(expectedElement._2()._1(), resultElement._2()._1());
1427         equalIterable(expectedElement._2()._2(), resultElement._2()._2());
1428       }
1429       Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext());
1430     }
1431   }
1432 
1433   @SuppressWarnings("unchecked")
1434   @Test
1435   public void testJoin() {
1436     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
1437         Arrays.asList(new Tuple2<>("california", "dodgers"),
1438                       new Tuple2<>("new york", "yankees")),
1439         Arrays.asList(new Tuple2<>("california", "sharks"),
1440                       new Tuple2<>("new york", "rangers")));
1441 
1442     List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
1443         Arrays.asList(new Tuple2<>("california", "giants"),
1444                       new Tuple2<>("new york", "mets")),
1445         Arrays.asList(new Tuple2<>("california", "ducks"),
1446                       new Tuple2<>("new york", "islanders")));
1447 
1448 
1449     List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
1450         Arrays.asList(
1451             new Tuple2<>("california",
1452                          new Tuple2<>("dodgers", "giants")),
1453             new Tuple2<>("new york",
1454                          new Tuple2<>("yankees", "mets"))),
1455         Arrays.asList(
1456             new Tuple2<>("california",
1457                          new Tuple2<>("sharks", "ducks")),
1458             new Tuple2<>("new york",
1459                          new Tuple2<>("rangers", "islanders"))));
1460 
1461 
1462     JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
1463         ssc, stringStringKVStream1, 1);
1464     JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
1465 
1466     JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
1467         ssc, stringStringKVStream2, 1);
1468     JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
1469 
1470     JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
1471     JavaTestUtils.attachTestOutputStream(joined);
1472     List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
1473 
1474     Assert.assertEquals(expected, result);
1475   }
1476 
1477   @SuppressWarnings("unchecked")
1478   @Test
1479   public void testLeftOuterJoin() {
1480     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
1481         Arrays.asList(new Tuple2<>("california", "dodgers"),
1482                       new Tuple2<>("new york", "yankees")),
1483         Arrays.asList(new Tuple2<>("california", "sharks") ));
1484 
1485     List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
1486         Arrays.asList(new Tuple2<>("california", "giants") ),
1487         Arrays.asList(new Tuple2<>("new york", "islanders") )
1488 
1489     );
1490 
1491     List<List<Long>> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L));
1492 
1493     JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
1494         ssc, stringStringKVStream1, 1);
1495     JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
1496 
1497     JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
1498         ssc, stringStringKVStream2, 1);
1499     JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
1500 
1501     JavaPairDStream<String, Tuple2<String, Optional<String>>> joined =
1502         pairStream1.leftOuterJoin(pairStream2);
1503     JavaDStream<Long> counted = joined.count();
1504     JavaTestUtils.attachTestOutputStream(counted);
1505     List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2);
1506 
1507     Assert.assertEquals(expected, result);
1508   }
1509 
1510   @SuppressWarnings("unchecked")
1511   @Test
1512   public void testCheckpointMasterRecovery() throws InterruptedException {
1513     List<List<String>> inputData = Arrays.asList(
1514         Arrays.asList("this", "is"),
1515         Arrays.asList("a", "test"),
1516         Arrays.asList("counting", "letters"));
1517 
1518     List<List<Integer>> expectedInitial = Arrays.asList(
1519         Arrays.asList(4,2));
1520     List<List<Integer>> expectedFinal = Arrays.asList(
1521         Arrays.asList(1,4),
1522         Arrays.asList(8,7));
1523 
1524     File tempDir = Files.createTempDir();
1525     tempDir.deleteOnExit();
1526     ssc.checkpoint(tempDir.getAbsolutePath());
1527 
1528     JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
1529     JavaDStream<Integer> letterCount = stream.map(String::length);
1530     JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
1531     List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1);
1532 
1533     assertOrderInvariantEquals(expectedInitial, initialResult);
1534     Thread.sleep(1000);
1535     ssc.stop();
1536 
1537     ssc = new JavaStreamingContext(tempDir.getAbsolutePath());
1538     // Tweak to take into consideration that the last batch before failure
1539     // will be re-processed after recovery
1540     List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
1541     assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
1542     ssc.stop();
1543     Utils.deleteRecursively(tempDir);
1544   }
1545 
1546   @SuppressWarnings("unchecked")
1547   @Test
1548   public void testContextGetOrCreate() throws InterruptedException {
1549     ssc.stop();
1550 
1551     SparkConf conf = new SparkConf()
1552         .setMaster("local[2]")
1553         .setAppName("test")
1554         .set("newContext", "true");
1555 
1556     File emptyDir = Files.createTempDir();
1557     emptyDir.deleteOnExit();
1558     StreamingContextSuite contextSuite = new StreamingContextSuite();
1559     String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint();
1560     String checkpointDir = contextSuite.createValidCheckpoint();
1561 
1562     // Function to create JavaStreamingContext without any output operations
1563     // (used to detect the new context)
1564     AtomicBoolean newContextCreated = new AtomicBoolean(false);
1565     Function0<JavaStreamingContext> creatingFunc = () -> {
1566       newContextCreated.set(true);
1567       return new JavaStreamingContext(conf, Seconds.apply(1));
1568     };
1569 
1570     newContextCreated.set(false);
1571     ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc);
1572     Assert.assertTrue("new context not created", newContextCreated.get());
1573     ssc.stop();
1574 
1575     newContextCreated.set(false);
1576     ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc,
1577         new Configuration(), true);
1578     Assert.assertTrue("new context not created", newContextCreated.get());
1579     ssc.stop();
1580 
1581     newContextCreated.set(false);
1582     ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
1583         new Configuration());
1584     Assert.assertTrue("old context not recovered", !newContextCreated.get());
1585     ssc.stop();
1586 
1587     newContextCreated.set(false);
1588     JavaSparkContext sc = new JavaSparkContext(conf);
1589     ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
1590         new Configuration());
1591     Assert.assertTrue("old context not recovered", !newContextCreated.get());
1592     ssc.stop();
1593   }
1594 
1595   /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
1596   @SuppressWarnings("unchecked")
1597   @Test
1598   public void testCheckpointofIndividualStream() throws InterruptedException {
1599     List<List<String>> inputData = Arrays.asList(
1600         Arrays.asList("this", "is"),
1601         Arrays.asList("a", "test"),
1602         Arrays.asList("counting", "letters"));
1603 
1604     List<List<Integer>> expected = Arrays.asList(
1605         Arrays.asList(4,2),
1606         Arrays.asList(1,4),
1607         Arrays.asList(8,7));
1608 
1609     JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
1610     JavaDStream letterCount = stream.map(new Function<String, Integer>() {
1611       @Override
1612       public Integer call(String s) {
1613         return s.length();
1614       }
1615     });
1616     JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
1617 
1618     letterCount.checkpoint(new Duration(1000));
1619 
1620     List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 3);
1621     assertOrderInvariantEquals(expected, result1);
1622   }
1623   */
1624 
1625   // Input stream tests. These mostly just test that we can instantiate a given InputStream with
1626   // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
1627   // InputStream functionality is deferred to the existing Scala tests.
1628   @Test
1629   public void testSocketTextStream() {
1630     ssc.socketTextStream("localhost", 12345);
1631   }
1632 
1633   @Test
1634   public void testSocketString() {
1635     ssc.socketStream(
1636       "localhost",
1637       12345,
1638       in -> {
1639         List<String> out = new ArrayList<>();
1640         try (BufferedReader reader = new BufferedReader(
1641             new InputStreamReader(in, StandardCharsets.UTF_8))) {
1642           for (String line; (line = reader.readLine()) != null;) {
1643             out.add(line);
1644           }
1645         }
1646         return out;
1647       },
1648       StorageLevel.MEMORY_ONLY());
1649   }
1650 
1651   @SuppressWarnings("unchecked")
1652   @Test
1653   public void testTextFileStream() throws IOException {
1654     File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark");
1655     List<List<String>> expected = fileTestPrepare(testDir);
1656 
1657     JavaDStream<String> input = ssc.textFileStream(testDir.toString());
1658     JavaTestUtils.attachTestOutputStream(input);
1659     List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
1660 
1661     assertOrderInvariantEquals(expected, result);
1662   }
1663 
1664   @SuppressWarnings("unchecked")
1665   @Test
1666   public void testFileStream() throws IOException {
1667     File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark");
1668     List<List<String>> expected = fileTestPrepare(testDir);
1669 
1670     JavaPairInputDStream<LongWritable, Text> inputStream = ssc.fileStream(
1671       testDir.toString(),
1672       LongWritable.class,
1673       Text.class,
1674       TextInputFormat.class,
1675       v1 -> Boolean.TRUE,
1676       true);
1677 
1678     JavaDStream<String> test = inputStream.map(v1 -> v1._2().toString());
1679 
1680     JavaTestUtils.attachTestOutputStream(test);
1681     List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
1682 
1683     assertOrderInvariantEquals(expected, result);
1684   }
1685 
1686   @Test
1687   public void testRawSocketStream() {
1688     ssc.rawSocketStream("localhost", 12345);
1689   }
1690 
1691   private static List<List<String>> fileTestPrepare(File testDir) throws IOException {
1692     File existingFile = new File(testDir, "0");
1693     Files.write("0\n", existingFile, StandardCharsets.UTF_8);
1694     Assert.assertTrue(existingFile.setLastModified(1000));
1695     Assert.assertEquals(1000, existingFile.lastModified());
1696     return Arrays.asList(Arrays.asList("0"));
1697   }
1698 
1699   @SuppressWarnings("unchecked")
1700   // SPARK-5795: no logic assertions, just testing that intended API invocations compile
1701   private void compileSaveAsJavaAPI(JavaPairDStream<LongWritable,Text> pds) {
1702     pds.saveAsNewAPIHadoopFiles(
1703         "", "", LongWritable.class, Text.class,
1704         org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
1705     pds.saveAsHadoopFiles(
1706         "", "", LongWritable.class, Text.class,
1707         org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
1708     // Checks that a previous common workaround for this API still compiles
1709     pds.saveAsNewAPIHadoopFiles(
1710         "", "", LongWritable.class, Text.class,
1711         (Class) org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
1712     pds.saveAsHadoopFiles(
1713         "", "", LongWritable.class, Text.class,
1714         (Class) org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
1715   }
1716 
1717 }