0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package test.org.apache.spark.streaming;
0019
0020 import java.io.*;
0021 import java.nio.charset.StandardCharsets;
0022 import java.util.*;
0023 import java.util.concurrent.atomic.AtomicBoolean;
0024
0025 import org.apache.spark.streaming.Duration;
0026 import org.apache.spark.streaming.JavaCheckpointTestUtils;
0027 import org.apache.spark.streaming.JavaTestUtils;
0028 import org.apache.spark.streaming.LocalJavaStreamingContext;
0029 import org.apache.spark.streaming.Seconds;
0030 import org.apache.spark.streaming.StreamingContextState;
0031 import org.apache.spark.streaming.StreamingContextSuite;
0032 import scala.Tuple2;
0033
0034 import org.apache.hadoop.conf.Configuration;
0035 import org.apache.hadoop.io.LongWritable;
0036 import org.apache.hadoop.io.Text;
0037 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
0038
0039 import org.junit.Assert;
0040 import org.junit.Test;
0041
0042 import com.google.common.io.Files;
0043 import com.google.common.collect.Sets;
0044
0045 import org.apache.spark.HashPartitioner;
0046 import org.apache.spark.SparkConf;
0047 import org.apache.spark.api.java.JavaPairRDD;
0048 import org.apache.spark.api.java.JavaRDD;
0049 import org.apache.spark.api.java.JavaSparkContext;
0050 import org.apache.spark.api.java.Optional;
0051 import org.apache.spark.api.java.function.*;
0052 import org.apache.spark.storage.StorageLevel;
0053 import org.apache.spark.streaming.api.java.*;
0054 import org.apache.spark.util.LongAccumulator;
0055 import org.apache.spark.util.Utils;
0056
0057
0058
0059
0060 public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
0061
0062 public static void equalIterator(Iterator<?> a, Iterator<?> b) {
0063 while (a.hasNext() && b.hasNext()) {
0064 Assert.assertEquals(a.next(), b.next());
0065 }
0066 Assert.assertEquals(a.hasNext(), b.hasNext());
0067 }
0068
0069 public static void equalIterable(Iterable<?> a, Iterable<?> b) {
0070 equalIterator(a.iterator(), b.iterator());
0071 }
0072
0073 @Test
0074 public void testInitialization() {
0075 Assert.assertNotNull(ssc.sparkContext());
0076 }
0077
0078 @SuppressWarnings("unchecked")
0079 @Test
0080 public void testContextState() {
0081 List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
0082 Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
0083 JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0084 JavaTestUtils.attachTestOutputStream(stream);
0085 Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
0086 ssc.start();
0087 Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState());
0088 ssc.stop();
0089 Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState());
0090 }
0091
0092 @SuppressWarnings("unchecked")
0093 @Test
0094 public void testCount() {
0095 List<List<Integer>> inputData = Arrays.asList(
0096 Arrays.asList(1,2,3,4),
0097 Arrays.asList(3,4,5),
0098 Arrays.asList(3));
0099
0100 List<List<Long>> expected = Arrays.asList(
0101 Arrays.asList(4L),
0102 Arrays.asList(3L),
0103 Arrays.asList(1L));
0104
0105 JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0106 JavaDStream<Long> count = stream.count();
0107 JavaTestUtils.attachTestOutputStream(count);
0108 List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0109 assertOrderInvariantEquals(expected, result);
0110 }
0111
0112 @SuppressWarnings("unchecked")
0113 @Test
0114 public void testMap() {
0115 List<List<String>> inputData = Arrays.asList(
0116 Arrays.asList("hello", "world"),
0117 Arrays.asList("goodnight", "moon"));
0118
0119 List<List<Integer>> expected = Arrays.asList(
0120 Arrays.asList(5,5),
0121 Arrays.asList(9,4));
0122
0123 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0124 JavaDStream<Integer> letterCount = stream.map(String::length);
0125 JavaTestUtils.attachTestOutputStream(letterCount);
0126 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0127
0128 assertOrderInvariantEquals(expected, result);
0129 }
0130
0131 @SuppressWarnings("unchecked")
0132 @Test
0133 public void testWindow() {
0134 List<List<Integer>> inputData = Arrays.asList(
0135 Arrays.asList(1,2,3),
0136 Arrays.asList(4,5,6),
0137 Arrays.asList(7,8,9));
0138
0139 List<List<Integer>> expected = Arrays.asList(
0140 Arrays.asList(1,2,3),
0141 Arrays.asList(4,5,6,1,2,3),
0142 Arrays.asList(7,8,9,4,5,6),
0143 Arrays.asList(7,8,9));
0144
0145 JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0146 JavaDStream<Integer> windowed = stream.window(new Duration(2000));
0147 JavaTestUtils.attachTestOutputStream(windowed);
0148 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
0149
0150 assertOrderInvariantEquals(expected, result);
0151 }
0152
0153 @SuppressWarnings("unchecked")
0154 @Test
0155 public void testWindowWithSlideDuration() {
0156 List<List<Integer>> inputData = Arrays.asList(
0157 Arrays.asList(1,2,3),
0158 Arrays.asList(4,5,6),
0159 Arrays.asList(7,8,9),
0160 Arrays.asList(10,11,12),
0161 Arrays.asList(13,14,15),
0162 Arrays.asList(16,17,18));
0163
0164 List<List<Integer>> expected = Arrays.asList(
0165 Arrays.asList(1,2,3,4,5,6),
0166 Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12),
0167 Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18),
0168 Arrays.asList(13,14,15,16,17,18));
0169
0170 JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0171 JavaDStream<Integer> windowed = stream.window(new Duration(4000), new Duration(2000));
0172 JavaTestUtils.attachTestOutputStream(windowed);
0173 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4);
0174
0175 assertOrderInvariantEquals(expected, result);
0176 }
0177
0178 @SuppressWarnings("unchecked")
0179 @Test
0180 public void testFilter() {
0181 List<List<String>> inputData = Arrays.asList(
0182 Arrays.asList("giants", "dodgers"),
0183 Arrays.asList("yankees", "red sox"));
0184
0185 List<List<String>> expected = Arrays.asList(
0186 Arrays.asList("giants"),
0187 Arrays.asList("yankees"));
0188
0189 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0190 JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
0191 JavaTestUtils.attachTestOutputStream(filtered);
0192 List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0193
0194 assertOrderInvariantEquals(expected, result);
0195 }
0196
0197 @SuppressWarnings("unchecked")
0198 @Test
0199 public void testRepartitionMorePartitions() {
0200 List<List<Integer>> inputData = Arrays.asList(
0201 Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
0202 Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
0203 JavaDStream<Integer> stream =
0204 JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
0205 JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
0206 stream.repartition(4);
0207 JavaTestUtils.attachTestOutputStream(repartitioned);
0208 List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
0209 Assert.assertEquals(2, result.size());
0210 for (List<List<Integer>> rdd : result) {
0211 Assert.assertEquals(4, rdd.size());
0212 Assert.assertEquals(
0213 10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size());
0214 }
0215 }
0216
0217 @SuppressWarnings("unchecked")
0218 @Test
0219 public void testRepartitionFewerPartitions() {
0220 List<List<Integer>> inputData = Arrays.asList(
0221 Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
0222 Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
0223 JavaDStream<Integer> stream =
0224 JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
0225 JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
0226 stream.repartition(2);
0227 JavaTestUtils.attachTestOutputStream(repartitioned);
0228 List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
0229 Assert.assertEquals(2, result.size());
0230 for (List<List<Integer>> rdd : result) {
0231 Assert.assertEquals(2, rdd.size());
0232 Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size());
0233 }
0234 }
0235
0236 @SuppressWarnings("unchecked")
0237 @Test
0238 public void testGlom() {
0239 List<List<String>> inputData = Arrays.asList(
0240 Arrays.asList("giants", "dodgers"),
0241 Arrays.asList("yankees", "red sox"));
0242
0243 List<List<List<String>>> expected = Arrays.asList(
0244 Arrays.asList(Arrays.asList("giants", "dodgers")),
0245 Arrays.asList(Arrays.asList("yankees", "red sox")));
0246
0247 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0248 JavaDStream<List<String>> glommed = stream.glom();
0249 JavaTestUtils.attachTestOutputStream(glommed);
0250 List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0251
0252 Assert.assertEquals(expected, result);
0253 }
0254
0255 @SuppressWarnings("unchecked")
0256 @Test
0257 public void testMapPartitions() {
0258 List<List<String>> inputData = Arrays.asList(
0259 Arrays.asList("giants", "dodgers"),
0260 Arrays.asList("yankees", "red sox"));
0261
0262 List<List<String>> expected = Arrays.asList(
0263 Arrays.asList("GIANTSDODGERS"),
0264 Arrays.asList("YANKEESRED SOX"));
0265
0266 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0267 JavaDStream<String> mapped = stream.mapPartitions(in -> {
0268 StringBuilder out = new StringBuilder();
0269 while (in.hasNext()) {
0270 out.append(in.next().toUpperCase(Locale.ROOT));
0271 }
0272 return Arrays.asList(out.toString()).iterator();
0273 });
0274 JavaTestUtils.attachTestOutputStream(mapped);
0275 List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0276
0277 Assert.assertEquals(expected, result);
0278 }
0279
0280 private static class IntegerSum implements Function2<Integer, Integer, Integer> {
0281 @Override
0282 public Integer call(Integer i1, Integer i2) {
0283 return i1 + i2;
0284 }
0285 }
0286
0287 private static class IntegerDifference implements Function2<Integer, Integer, Integer> {
0288 @Override
0289 public Integer call(Integer i1, Integer i2) {
0290 return i1 - i2;
0291 }
0292 }
0293
0294 @SuppressWarnings("unchecked")
0295 @Test
0296 public void testReduce() {
0297 List<List<Integer>> inputData = Arrays.asList(
0298 Arrays.asList(1,2,3),
0299 Arrays.asList(4,5,6),
0300 Arrays.asList(7,8,9));
0301
0302 List<List<Integer>> expected = Arrays.asList(
0303 Arrays.asList(6),
0304 Arrays.asList(15),
0305 Arrays.asList(24));
0306
0307 JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0308 JavaDStream<Integer> reduced = stream.reduce(new IntegerSum());
0309 JavaTestUtils.attachTestOutputStream(reduced);
0310 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0311
0312 Assert.assertEquals(expected, result);
0313 }
0314
0315 @SuppressWarnings("unchecked")
0316 @Test
0317 public void testReduceByWindowWithInverse() {
0318 testReduceByWindow(true);
0319 }
0320
0321 @SuppressWarnings("unchecked")
0322 @Test
0323 public void testReduceByWindowWithoutInverse() {
0324 testReduceByWindow(false);
0325 }
0326
0327 @SuppressWarnings("unchecked")
0328 private void testReduceByWindow(boolean withInverse) {
0329 List<List<Integer>> inputData = Arrays.asList(
0330 Arrays.asList(1,2,3),
0331 Arrays.asList(4,5,6),
0332 Arrays.asList(7,8,9));
0333
0334 List<List<Integer>> expected = Arrays.asList(
0335 Arrays.asList(6),
0336 Arrays.asList(21),
0337 Arrays.asList(39),
0338 Arrays.asList(24));
0339
0340 JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0341 JavaDStream<Integer> reducedWindowed;
0342 if (withInverse) {
0343 reducedWindowed = stream.reduceByWindow(new IntegerSum(),
0344 new IntegerDifference(),
0345 new Duration(2000),
0346 new Duration(1000));
0347 } else {
0348 reducedWindowed = stream.reduceByWindow(new IntegerSum(),
0349 new Duration(2000), new Duration(1000));
0350 }
0351 JavaTestUtils.attachTestOutputStream(reducedWindowed);
0352 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
0353
0354 Assert.assertEquals(expected, result);
0355 }
0356
0357 @SuppressWarnings("unchecked")
0358 @Test
0359 public void testQueueStream() {
0360 ssc.stop();
0361
0362 SparkConf conf = new SparkConf()
0363 .setMaster("local[2]")
0364 .setAppName("test")
0365 .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
0366 ssc = new JavaStreamingContext(conf, new Duration(1000));
0367
0368 List<List<Integer>> expected = Arrays.asList(
0369 Arrays.asList(1,2,3),
0370 Arrays.asList(4,5,6),
0371 Arrays.asList(7,8,9));
0372
0373 JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
0374 JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3));
0375 JavaRDD<Integer> rdd2 = jsc.parallelize(Arrays.asList(4, 5, 6));
0376 JavaRDD<Integer> rdd3 = jsc.parallelize(Arrays.asList(7,8,9));
0377
0378 Queue<JavaRDD<Integer>> rdds = new LinkedList<>();
0379 rdds.add(rdd1);
0380 rdds.add(rdd2);
0381 rdds.add(rdd3);
0382
0383 JavaDStream<Integer> stream = ssc.queueStream(rdds);
0384 JavaTestUtils.attachTestOutputStream(stream);
0385 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0386 Assert.assertEquals(expected, result);
0387 }
0388
0389 @SuppressWarnings("unchecked")
0390 @Test
0391 public void testTransform() {
0392 List<List<Integer>> inputData = Arrays.asList(
0393 Arrays.asList(1,2,3),
0394 Arrays.asList(4,5,6),
0395 Arrays.asList(7,8,9));
0396
0397 List<List<Integer>> expected = Arrays.asList(
0398 Arrays.asList(3,4,5),
0399 Arrays.asList(6,7,8),
0400 Arrays.asList(9,10,11));
0401
0402 JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0403 JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2));
0404
0405 JavaTestUtils.attachTestOutputStream(transformed);
0406 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0407
0408 assertOrderInvariantEquals(expected, result);
0409 }
0410
0411 @SuppressWarnings("unchecked")
0412 @Test
0413 public void testVariousTransform() {
0414
0415
0416 List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
0417 JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0418
0419 List<List<Tuple2<String, Integer>>> pairInputData =
0420 Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
0421 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
0422 JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
0423
0424 stream.transform(in -> null);
0425
0426 stream.transform((in, time) -> null);
0427
0428 stream.transformToPair(in -> null);
0429
0430 stream.transformToPair((in, time) -> null);
0431
0432 pairStream.transform(in -> null);
0433
0434 pairStream.transform((in, time) -> null);
0435
0436 pairStream.transformToPair(in -> null);
0437
0438 pairStream.transformToPair((in, time) -> null);
0439
0440 }
0441
0442 @SuppressWarnings("unchecked")
0443 @Test
0444 public void testTransformWith() {
0445 List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
0446 Arrays.asList(
0447 new Tuple2<>("california", "dodgers"),
0448 new Tuple2<>("new york", "yankees")),
0449 Arrays.asList(
0450 new Tuple2<>("california", "sharks"),
0451 new Tuple2<>("new york", "rangers")));
0452
0453 List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
0454 Arrays.asList(
0455 new Tuple2<>("california", "giants"),
0456 new Tuple2<>("new york", "mets")),
0457 Arrays.asList(
0458 new Tuple2<>("california", "ducks"),
0459 new Tuple2<>("new york", "islanders")));
0460
0461
0462 List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
0463 Sets.newHashSet(
0464 new Tuple2<>("california",
0465 new Tuple2<>("dodgers", "giants")),
0466 new Tuple2<>("new york",
0467 new Tuple2<>("yankees", "mets"))),
0468 Sets.newHashSet(
0469 new Tuple2<>("california",
0470 new Tuple2<>("sharks", "ducks")),
0471 new Tuple2<>("new york",
0472 new Tuple2<>("rangers", "islanders"))));
0473
0474 JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
0475 ssc, stringStringKVStream1, 1);
0476 JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
0477
0478 JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
0479 ssc, stringStringKVStream2, 1);
0480 JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
0481
0482 JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWithToPair(
0483 pairStream2,
0484 (rdd1, rdd2, time) -> rdd1.join(rdd2)
0485 );
0486
0487 JavaTestUtils.attachTestOutputStream(joined);
0488 List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0489 List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new ArrayList<>();
0490 for (List<Tuple2<String, Tuple2<String, String>>> res: result) {
0491 unorderedResult.add(Sets.newHashSet(res));
0492 }
0493
0494 Assert.assertEquals(expected, unorderedResult);
0495 }
0496
0497
0498 @SuppressWarnings("unchecked")
0499 @Test
0500 public void testVariousTransformWith() {
0501
0502
0503 List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
0504 List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
0505 JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
0506 JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
0507
0508 List<List<Tuple2<String, Integer>>> pairInputData1 =
0509 Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
0510 List<List<Tuple2<Double, Character>>> pairInputData2 =
0511 Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
0512 JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
0513 JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
0514 JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
0515 JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
0516
0517 stream1.transformWith(stream2, (rdd1, rdd2, time) -> null);
0518
0519 stream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null);
0520
0521 stream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null);
0522
0523 stream1.transformWithToPair(pairStream1, (rdd1, rdd2, time) -> null);
0524
0525 pairStream1.transformWith(stream2, (rdd1, rdd2, time) -> null);
0526
0527 pairStream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null);
0528
0529 pairStream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null);
0530
0531 pairStream1.transformWithToPair(pairStream2, (rdd1, rdd2, time) -> null);
0532 }
0533
0534 @SuppressWarnings("unchecked")
0535 @Test
0536 public void testStreamingContextTransform(){
0537 List<List<Integer>> stream1input = Arrays.asList(
0538 Arrays.asList(1),
0539 Arrays.asList(2)
0540 );
0541
0542 List<List<Integer>> stream2input = Arrays.asList(
0543 Arrays.asList(3),
0544 Arrays.asList(4)
0545 );
0546
0547 List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
0548 Arrays.asList(new Tuple2<>(1, "x")),
0549 Arrays.asList(new Tuple2<>(2, "y"))
0550 );
0551
0552 List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
0553 Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
0554 Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
0555 );
0556
0557 JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
0558 JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
0559 JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
0560 JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
0561
0562 List<JavaDStream<?>> listOfDStreams1 = Arrays.asList(stream1, stream2);
0563
0564
0565 ssc.transform(
0566 listOfDStreams1,
0567 (listOfRDDs, time) -> {
0568 Assert.assertEquals(2, listOfRDDs.size());
0569 return null;
0570 }
0571 );
0572
0573 List<JavaDStream<?>> listOfDStreams2 =
0574 Arrays.asList(stream1, stream2, pairStream1.toJavaDStream());
0575
0576 JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
0577 listOfDStreams2,
0578 (listOfRDDs, time) -> {
0579 Assert.assertEquals(3, listOfRDDs.size());
0580 JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
0581 JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1);
0582 JavaRDD<Tuple2<Integer, String>> rdd3 =
0583 (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2);
0584 JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
0585 PairFunction<Integer, Integer, Integer> mapToTuple =
0586 (PairFunction<Integer, Integer, Integer>) i -> new Tuple2<>(i, i);
0587 return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
0588 }
0589 );
0590 JavaTestUtils.attachTestOutputStream(transformed2);
0591 List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
0592 JavaTestUtils.runStreams(ssc, 2, 2);
0593 Assert.assertEquals(expected, result);
0594 }
0595
0596 @SuppressWarnings("unchecked")
0597 @Test
0598 public void testFlatMap() {
0599 List<List<String>> inputData = Arrays.asList(
0600 Arrays.asList("go", "giants"),
0601 Arrays.asList("boo", "dodgers"),
0602 Arrays.asList("athletics"));
0603
0604 List<List<String>> expected = Arrays.asList(
0605 Arrays.asList("g","o","g","i","a","n","t","s"),
0606 Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"),
0607 Arrays.asList("a","t","h","l","e","t","i","c","s"));
0608
0609 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0610 JavaDStream<String> flatMapped =
0611 stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator());
0612 JavaTestUtils.attachTestOutputStream(flatMapped);
0613 List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0614
0615 assertOrderInvariantEquals(expected, result);
0616 }
0617
0618 @SuppressWarnings("unchecked")
0619 @Test
0620 public void testForeachRDD() {
0621 final LongAccumulator accumRdd = ssc.sparkContext().sc().longAccumulator();
0622 final LongAccumulator accumEle = ssc.sparkContext().sc().longAccumulator();
0623 List<List<Integer>> inputData = Arrays.asList(
0624 Arrays.asList(1,1,1),
0625 Arrays.asList(1,1,1));
0626
0627 JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0628 JavaTestUtils.attachTestOutputStream(stream.count());
0629
0630 stream.foreachRDD(rdd -> {
0631 accumRdd.add(1);
0632 rdd.foreach(i -> accumEle.add(1));
0633 });
0634
0635
0636 stream.foreachRDD((rdd, time) -> {});
0637
0638 JavaTestUtils.runStreams(ssc, 2, 2);
0639
0640 Assert.assertEquals(2, accumRdd.value().intValue());
0641 Assert.assertEquals(6, accumEle.value().intValue());
0642 }
0643
0644 @SuppressWarnings("unchecked")
0645 @Test
0646 public void testPairFlatMap() {
0647 List<List<String>> inputData = Arrays.asList(
0648 Arrays.asList("giants"),
0649 Arrays.asList("dodgers"),
0650 Arrays.asList("athletics"));
0651
0652 List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0653 Arrays.asList(
0654 new Tuple2<>(6, "g"),
0655 new Tuple2<>(6, "i"),
0656 new Tuple2<>(6, "a"),
0657 new Tuple2<>(6, "n"),
0658 new Tuple2<>(6, "t"),
0659 new Tuple2<>(6, "s")),
0660 Arrays.asList(
0661 new Tuple2<>(7, "d"),
0662 new Tuple2<>(7, "o"),
0663 new Tuple2<>(7, "d"),
0664 new Tuple2<>(7, "g"),
0665 new Tuple2<>(7, "e"),
0666 new Tuple2<>(7, "r"),
0667 new Tuple2<>(7, "s")),
0668 Arrays.asList(
0669 new Tuple2<>(9, "a"),
0670 new Tuple2<>(9, "t"),
0671 new Tuple2<>(9, "h"),
0672 new Tuple2<>(9, "l"),
0673 new Tuple2<>(9, "e"),
0674 new Tuple2<>(9, "t"),
0675 new Tuple2<>(9, "i"),
0676 new Tuple2<>(9, "c"),
0677 new Tuple2<>(9, "s")));
0678
0679 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0680 JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(in -> {
0681 List<Tuple2<Integer, String>> out = new ArrayList<>();
0682 for (String letter : in.split("(?!^)")) {
0683 out.add(new Tuple2<>(in.length(), letter));
0684 }
0685 return out.iterator();
0686 });
0687 JavaTestUtils.attachTestOutputStream(flatMapped);
0688 List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0689
0690 Assert.assertEquals(expected, result);
0691 }
0692
0693 @SuppressWarnings("unchecked")
0694 @Test
0695 public void testUnion() {
0696 List<List<Integer>> inputData1 = Arrays.asList(
0697 Arrays.asList(1,1),
0698 Arrays.asList(2,2),
0699 Arrays.asList(3,3));
0700
0701 List<List<Integer>> inputData2 = Arrays.asList(
0702 Arrays.asList(4,4),
0703 Arrays.asList(5,5),
0704 Arrays.asList(6,6));
0705
0706 List<List<Integer>> expected = Arrays.asList(
0707 Arrays.asList(1,1,4,4),
0708 Arrays.asList(2,2,5,5),
0709 Arrays.asList(3,3,6,6));
0710
0711 JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
0712 JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
0713
0714 JavaDStream<Integer> unioned = stream1.union(stream2);
0715 JavaTestUtils.attachTestOutputStream(unioned);
0716 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0717
0718 assertOrderInvariantEquals(expected, result);
0719 }
0720
0721
0722
0723
0724
0725 public static <T> void assertOrderInvariantEquals(
0726 List<List<T>> expected, List<List<T>> actual) {
0727 List<Set<T>> expectedSets = new ArrayList<>();
0728 for (List<T> list: expected) {
0729 expectedSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
0730 }
0731 List<Set<T>> actualSets = new ArrayList<>();
0732 for (List<T> list: actual) {
0733 actualSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
0734 }
0735 Assert.assertEquals(expectedSets, actualSets);
0736 }
0737
0738
0739
0740 @SuppressWarnings("unchecked")
0741 @Test
0742 public void testPairFilter() {
0743 List<List<String>> inputData = Arrays.asList(
0744 Arrays.asList("giants", "dodgers"),
0745 Arrays.asList("yankees", "red sox"));
0746
0747 List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0748 Arrays.asList(new Tuple2<>("giants", 6)),
0749 Arrays.asList(new Tuple2<>("yankees", 7)));
0750
0751 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0752 JavaPairDStream<String, Integer> pairStream =
0753 stream.mapToPair(in -> new Tuple2<>(in, in.length()));
0754
0755 JavaPairDStream<String, Integer> filtered = pairStream.filter(in -> in._1().contains("a"));
0756 JavaTestUtils.attachTestOutputStream(filtered);
0757 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0758
0759 Assert.assertEquals(expected, result);
0760 }
0761
0762 @SuppressWarnings("unchecked")
0763 private final List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
0764 Arrays.asList(new Tuple2<>("california", "dodgers"),
0765 new Tuple2<>("california", "giants"),
0766 new Tuple2<>("new york", "yankees"),
0767 new Tuple2<>("new york", "mets")),
0768 Arrays.asList(new Tuple2<>("california", "sharks"),
0769 new Tuple2<>("california", "ducks"),
0770 new Tuple2<>("new york", "rangers"),
0771 new Tuple2<>("new york", "islanders")));
0772
0773 @SuppressWarnings("unchecked")
0774 private final List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
0775 Arrays.asList(
0776 new Tuple2<>("california", 1),
0777 new Tuple2<>("california", 3),
0778 new Tuple2<>("new york", 4),
0779 new Tuple2<>("new york", 1)),
0780 Arrays.asList(
0781 new Tuple2<>("california", 5),
0782 new Tuple2<>("california", 5),
0783 new Tuple2<>("new york", 3),
0784 new Tuple2<>("new york", 1)));
0785
0786 @SuppressWarnings("unchecked")
0787 @Test
0788 public void testPairMap() {
0789 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0790
0791 List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0792 Arrays.asList(
0793 new Tuple2<>(1, "california"),
0794 new Tuple2<>(3, "california"),
0795 new Tuple2<>(4, "new york"),
0796 new Tuple2<>(1, "new york")),
0797 Arrays.asList(
0798 new Tuple2<>(5, "california"),
0799 new Tuple2<>(5, "california"),
0800 new Tuple2<>(3, "new york"),
0801 new Tuple2<>(1, "new york")));
0802
0803 JavaDStream<Tuple2<String, Integer>> stream =
0804 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0805 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0806 JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(Tuple2::swap);
0807
0808 JavaTestUtils.attachTestOutputStream(reversed);
0809 List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0810
0811 Assert.assertEquals(expected, result);
0812 }
0813
0814 @SuppressWarnings("unchecked")
0815 @Test
0816 public void testPairMapPartitions() {
0817 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0818
0819 List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0820 Arrays.asList(
0821 new Tuple2<>(1, "california"),
0822 new Tuple2<>(3, "california"),
0823 new Tuple2<>(4, "new york"),
0824 new Tuple2<>(1, "new york")),
0825 Arrays.asList(
0826 new Tuple2<>(5, "california"),
0827 new Tuple2<>(5, "california"),
0828 new Tuple2<>(3, "new york"),
0829 new Tuple2<>(1, "new york")));
0830
0831 JavaDStream<Tuple2<String, Integer>> stream =
0832 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0833 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0834 JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
0835 List<Tuple2<Integer, String>> out = new LinkedList<>();
0836 while (in.hasNext()) {
0837 Tuple2<String, Integer> next = in.next();
0838 out.add(next.swap());
0839 }
0840 return out.iterator();
0841 });
0842
0843 JavaTestUtils.attachTestOutputStream(reversed);
0844 List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0845
0846 Assert.assertEquals(expected, result);
0847 }
0848
0849 @SuppressWarnings("unchecked")
0850 @Test
0851 public void testPairMap2() {
0852 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0853
0854 List<List<Integer>> expected = Arrays.asList(
0855 Arrays.asList(1, 3, 4, 1),
0856 Arrays.asList(5, 5, 3, 1));
0857
0858 JavaDStream<Tuple2<String, Integer>> stream =
0859 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0860 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0861 JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
0862
0863 JavaTestUtils.attachTestOutputStream(reversed);
0864 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0865
0866 Assert.assertEquals(expected, result);
0867 }
0868
0869 @SuppressWarnings("unchecked")
0870 @Test
0871 public void testPairToPairFlatMapWithChangingTypes() {
0872 List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
0873 Arrays.asList(
0874 new Tuple2<>("hi", 1),
0875 new Tuple2<>("ho", 2)),
0876 Arrays.asList(
0877 new Tuple2<>("hi", 1),
0878 new Tuple2<>("ho", 2)));
0879
0880 List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0881 Arrays.asList(
0882 new Tuple2<>(1, "h"),
0883 new Tuple2<>(1, "i"),
0884 new Tuple2<>(2, "h"),
0885 new Tuple2<>(2, "o")),
0886 Arrays.asList(
0887 new Tuple2<>(1, "h"),
0888 new Tuple2<>(1, "i"),
0889 new Tuple2<>(2, "h"),
0890 new Tuple2<>(2, "o")));
0891
0892 JavaDStream<Tuple2<String, Integer>> stream =
0893 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0894 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0895 JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
0896 List<Tuple2<Integer, String>> out = new LinkedList<>();
0897 for (Character s : in._1().toCharArray()) {
0898 out.add(new Tuple2<>(in._2(), s.toString()));
0899 }
0900 return out.iterator();
0901 });
0902 JavaTestUtils.attachTestOutputStream(flatMapped);
0903 List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0904
0905 Assert.assertEquals(expected, result);
0906 }
0907
0908 @SuppressWarnings("unchecked")
0909 @Test
0910 public void testPairGroupByKey() {
0911 List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
0912
0913 List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
0914 Arrays.asList(
0915 new Tuple2<>("california", Arrays.asList("dodgers", "giants")),
0916 new Tuple2<>("new york", Arrays.asList("yankees", "mets"))),
0917 Arrays.asList(
0918 new Tuple2<>("california", Arrays.asList("sharks", "ducks")),
0919 new Tuple2<>("new york", Arrays.asList("rangers", "islanders"))));
0920
0921 JavaDStream<Tuple2<String, String>> stream =
0922 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0923 JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
0924
0925 JavaPairDStream<String, Iterable<String>> grouped = pairStream.groupByKey();
0926 JavaTestUtils.attachTestOutputStream(grouped);
0927 List<List<Tuple2<String, Iterable<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0928
0929 Assert.assertEquals(expected.size(), result.size());
0930 Iterator<List<Tuple2<String, Iterable<String>>>> resultItr = result.iterator();
0931 Iterator<List<Tuple2<String, List<String>>>> expectedItr = expected.iterator();
0932 while (resultItr.hasNext() && expectedItr.hasNext()) {
0933 Iterator<Tuple2<String, Iterable<String>>> resultElements = resultItr.next().iterator();
0934 Iterator<Tuple2<String, List<String>>> expectedElements = expectedItr.next().iterator();
0935 while (resultElements.hasNext() && expectedElements.hasNext()) {
0936 Tuple2<String, Iterable<String>> resultElement = resultElements.next();
0937 Tuple2<String, List<String>> expectedElement = expectedElements.next();
0938 Assert.assertEquals(expectedElement._1(), resultElement._1());
0939 equalIterable(expectedElement._2(), resultElement._2());
0940 }
0941 Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext());
0942 }
0943 }
0944
0945 @SuppressWarnings("unchecked")
0946 @Test
0947 public void testPairReduceByKey() {
0948 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0949
0950 List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0951 Arrays.asList(
0952 new Tuple2<>("california", 4),
0953 new Tuple2<>("new york", 5)),
0954 Arrays.asList(
0955 new Tuple2<>("california", 10),
0956 new Tuple2<>("new york", 4)));
0957
0958 JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
0959 ssc, inputData, 1);
0960 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0961
0962 JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new IntegerSum());
0963
0964 JavaTestUtils.attachTestOutputStream(reduced);
0965 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0966
0967 Assert.assertEquals(expected, result);
0968 }
0969
0970 @SuppressWarnings("unchecked")
0971 @Test
0972 public void testCombineByKey() {
0973 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0974
0975 List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0976 Arrays.asList(
0977 new Tuple2<>("california", 4),
0978 new Tuple2<>("new york", 5)),
0979 Arrays.asList(
0980 new Tuple2<>("california", 10),
0981 new Tuple2<>("new york", 4)));
0982
0983 JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
0984 ssc, inputData, 1);
0985 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0986
0987 JavaPairDStream<String, Integer> combined = pairStream.combineByKey(
0988 i -> i, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
0989
0990 JavaTestUtils.attachTestOutputStream(combined);
0991 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0992
0993 Assert.assertEquals(expected, result);
0994 }
0995
0996 @SuppressWarnings("unchecked")
0997 @Test
0998 public void testCountByValue() {
0999 List<List<String>> inputData = Arrays.asList(
1000 Arrays.asList("hello", "world"),
1001 Arrays.asList("hello", "moon"),
1002 Arrays.asList("hello"));
1003
1004 List<List<Tuple2<String, Long>>> expected = Arrays.asList(
1005 Arrays.asList(
1006 new Tuple2<>("hello", 1L),
1007 new Tuple2<>("world", 1L)),
1008 Arrays.asList(
1009 new Tuple2<>("hello", 1L),
1010 new Tuple2<>("moon", 1L)),
1011 Arrays.asList(
1012 new Tuple2<>("hello", 1L)));
1013
1014 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
1015 JavaPairDStream<String, Long> counted = stream.countByValue();
1016 JavaTestUtils.attachTestOutputStream(counted);
1017 List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1018
1019 Assert.assertEquals(expected, result);
1020 }
1021
1022 @SuppressWarnings("unchecked")
1023 @Test
1024 public void testGroupByKeyAndWindow() {
1025 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
1026
1027 List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList(
1028 Arrays.asList(
1029 new Tuple2<>("california", Arrays.asList(1, 3)),
1030 new Tuple2<>("new york", Arrays.asList(1, 4))
1031 ),
1032 Arrays.asList(
1033 new Tuple2<>("california", Arrays.asList(1, 3, 5, 5)),
1034 new Tuple2<>("new york", Arrays.asList(1, 1, 3, 4))
1035 ),
1036 Arrays.asList(
1037 new Tuple2<>("california", Arrays.asList(5, 5)),
1038 new Tuple2<>("new york", Arrays.asList(1, 3))
1039 )
1040 );
1041
1042 JavaDStream<Tuple2<String, Integer>> stream =
1043 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
1044 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1045
1046 JavaPairDStream<String, Iterable<Integer>> groupWindowed =
1047 pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
1048 JavaTestUtils.attachTestOutputStream(groupWindowed);
1049 List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1050
1051 Assert.assertEquals(expected.size(), result.size());
1052 for (int i = 0; i < result.size(); i++) {
1053 Assert.assertEquals(convert(expected.get(i)), convert(result.get(i)));
1054 }
1055 }
1056
1057 private static Set<Tuple2<String, HashSet<Integer>>>
1058 convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
1059 List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<>();
1060 for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
1061 newListOfTuples.add(convert(tuple));
1062 }
1063 return new HashSet<>(newListOfTuples);
1064 }
1065
1066 private static Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
1067 return new Tuple2<>(tuple._1(), new HashSet<>(tuple._2()));
1068 }
1069
1070 @SuppressWarnings("unchecked")
1071 @Test
1072 public void testReduceByKeyAndWindow() {
1073 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
1074
1075 List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
1076 Arrays.asList(new Tuple2<>("california", 4),
1077 new Tuple2<>("new york", 5)),
1078 Arrays.asList(new Tuple2<>("california", 14),
1079 new Tuple2<>("new york", 9)),
1080 Arrays.asList(new Tuple2<>("california", 10),
1081 new Tuple2<>("new york", 4)));
1082
1083 JavaDStream<Tuple2<String, Integer>> stream =
1084 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
1085 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1086
1087 JavaPairDStream<String, Integer> reduceWindowed =
1088 pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000));
1089 JavaTestUtils.attachTestOutputStream(reduceWindowed);
1090 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1091
1092 Assert.assertEquals(expected, result);
1093 }
1094
1095 @SuppressWarnings("unchecked")
1096 @Test
1097 public void testUpdateStateByKey() {
1098 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
1099
1100 List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
1101 Arrays.asList(new Tuple2<>("california", 4),
1102 new Tuple2<>("new york", 5)),
1103 Arrays.asList(new Tuple2<>("california", 14),
1104 new Tuple2<>("new york", 9)),
1105 Arrays.asList(new Tuple2<>("california", 14),
1106 new Tuple2<>("new york", 9)));
1107
1108 JavaDStream<Tuple2<String, Integer>> stream =
1109 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
1110 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1111
1112 JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
1113 int out = 0;
1114 if (state.isPresent()) {
1115 out += state.get();
1116 }
1117 for (Integer v : values) {
1118 out += v;
1119 }
1120 return Optional.of(out);
1121 });
1122 JavaTestUtils.attachTestOutputStream(updated);
1123 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1124
1125 Assert.assertEquals(expected, result);
1126 }
1127
1128 @SuppressWarnings("unchecked")
1129 @Test
1130 public void testUpdateStateByKeyWithInitial() {
1131 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
1132
1133 List<Tuple2<String, Integer>> initial = Arrays.asList(
1134 new Tuple2<>("california", 1),
1135 new Tuple2<>("new york", 2));
1136
1137 JavaRDD<Tuple2<String, Integer>> tmpRDD = ssc.sparkContext().parallelize(initial);
1138 JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD(tmpRDD);
1139
1140 List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
1141 Arrays.asList(new Tuple2<>("california", 5),
1142 new Tuple2<>("new york", 7)),
1143 Arrays.asList(new Tuple2<>("california", 15),
1144 new Tuple2<>("new york", 11)),
1145 Arrays.asList(new Tuple2<>("california", 15),
1146 new Tuple2<>("new york", 11)));
1147
1148 JavaDStream<Tuple2<String, Integer>> stream =
1149 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
1150 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1151
1152 JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
1153 int out = 0;
1154 if (state.isPresent()) {
1155 out += state.get();
1156 }
1157 for (Integer v : values) {
1158 out += v;
1159 }
1160 return Optional.of(out);
1161 }, new HashPartitioner(1), initialRDD);
1162 JavaTestUtils.attachTestOutputStream(updated);
1163 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1164
1165 assertOrderInvariantEquals(expected, result);
1166 }
1167
1168 @SuppressWarnings("unchecked")
1169 @Test
1170 public void testReduceByKeyAndWindowWithInverse() {
1171 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
1172
1173 List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
1174 Arrays.asList(new Tuple2<>("california", 4),
1175 new Tuple2<>("new york", 5)),
1176 Arrays.asList(new Tuple2<>("california", 14),
1177 new Tuple2<>("new york", 9)),
1178 Arrays.asList(new Tuple2<>("california", 10),
1179 new Tuple2<>("new york", 4)));
1180
1181 JavaDStream<Tuple2<String, Integer>> stream =
1182 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
1183 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1184
1185 JavaPairDStream<String, Integer> reduceWindowed =
1186 pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
1187 new Duration(2000), new Duration(1000));
1188 JavaTestUtils.attachTestOutputStream(reduceWindowed);
1189 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1190
1191 Assert.assertEquals(expected, result);
1192 }
1193
1194 @SuppressWarnings("unchecked")
1195 @Test
1196 public void testCountByValueAndWindow() {
1197 List<List<String>> inputData = Arrays.asList(
1198 Arrays.asList("hello", "world"),
1199 Arrays.asList("hello", "moon"),
1200 Arrays.asList("hello"));
1201
1202 List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList(
1203 Sets.newHashSet(
1204 new Tuple2<>("hello", 1L),
1205 new Tuple2<>("world", 1L)),
1206 Sets.newHashSet(
1207 new Tuple2<>("hello", 2L),
1208 new Tuple2<>("world", 1L),
1209 new Tuple2<>("moon", 1L)),
1210 Sets.newHashSet(
1211 new Tuple2<>("hello", 2L),
1212 new Tuple2<>("moon", 1L)));
1213
1214 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(
1215 ssc, inputData, 1);
1216 JavaPairDStream<String, Long> counted =
1217 stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
1218 JavaTestUtils.attachTestOutputStream(counted);
1219 List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
1220 List<Set<Tuple2<String, Long>>> unorderedResult = new ArrayList<>();
1221 for (List<Tuple2<String, Long>> res: result) {
1222 unorderedResult.add(Sets.newHashSet(res));
1223 }
1224
1225 Assert.assertEquals(expected, unorderedResult);
1226 }
1227
1228 @SuppressWarnings("unchecked")
1229 @Test
1230 public void testPairTransform() {
1231 List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
1232 Arrays.asList(
1233 new Tuple2<>(3, 5),
1234 new Tuple2<>(1, 5),
1235 new Tuple2<>(4, 5),
1236 new Tuple2<>(2, 5)),
1237 Arrays.asList(
1238 new Tuple2<>(2, 5),
1239 new Tuple2<>(3, 5),
1240 new Tuple2<>(4, 5),
1241 new Tuple2<>(1, 5)));
1242
1243 List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
1244 Arrays.asList(
1245 new Tuple2<>(1, 5),
1246 new Tuple2<>(2, 5),
1247 new Tuple2<>(3, 5),
1248 new Tuple2<>(4, 5)),
1249 Arrays.asList(
1250 new Tuple2<>(1, 5),
1251 new Tuple2<>(2, 5),
1252 new Tuple2<>(3, 5),
1253 new Tuple2<>(4, 5)));
1254
1255 JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
1256 ssc, inputData, 1);
1257 JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1258
1259 JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey());
1260
1261 JavaTestUtils.attachTestOutputStream(sorted);
1262 List<List<Tuple2<Integer, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
1263
1264 Assert.assertEquals(expected, result);
1265 }
1266
1267 @SuppressWarnings("unchecked")
1268 @Test
1269 public void testPairToNormalRDDTransform() {
1270 List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
1271 Arrays.asList(
1272 new Tuple2<>(3, 5),
1273 new Tuple2<>(1, 5),
1274 new Tuple2<>(4, 5),
1275 new Tuple2<>(2, 5)),
1276 Arrays.asList(
1277 new Tuple2<>(2, 5),
1278 new Tuple2<>(3, 5),
1279 new Tuple2<>(4, 5),
1280 new Tuple2<>(1, 5)));
1281
1282 List<List<Integer>> expected = Arrays.asList(
1283 Arrays.asList(3,1,4,2),
1284 Arrays.asList(2,3,4,1));
1285
1286 JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
1287 ssc, inputData, 1);
1288 JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
1289
1290 JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(in2 -> in2._1()));
1291
1292 JavaTestUtils.attachTestOutputStream(firstParts);
1293 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
1294
1295 Assert.assertEquals(expected, result);
1296 }
1297
1298 @SuppressWarnings("unchecked")
1299 @Test
1300 public void testMapValues() {
1301 List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
1302
1303 List<List<Tuple2<String, String>>> expected = Arrays.asList(
1304 Arrays.asList(new Tuple2<>("california", "DODGERS"),
1305 new Tuple2<>("california", "GIANTS"),
1306 new Tuple2<>("new york", "YANKEES"),
1307 new Tuple2<>("new york", "METS")),
1308 Arrays.asList(new Tuple2<>("california", "SHARKS"),
1309 new Tuple2<>("california", "DUCKS"),
1310 new Tuple2<>("new york", "RANGERS"),
1311 new Tuple2<>("new york", "ISLANDERS")));
1312
1313 JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
1314 ssc, inputData, 1);
1315 JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
1316
1317 JavaPairDStream<String, String> mapped =
1318 pairStream.mapValues(s -> s.toUpperCase(Locale.ROOT));
1319
1320 JavaTestUtils.attachTestOutputStream(mapped);
1321 List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
1322
1323 Assert.assertEquals(expected, result);
1324 }
1325
1326 @SuppressWarnings("unchecked")
1327 @Test
1328 public void testFlatMapValues() {
1329 List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
1330
1331 List<List<Tuple2<String, String>>> expected = Arrays.asList(
1332 Arrays.asList(new Tuple2<>("california", "dodgers1"),
1333 new Tuple2<>("california", "dodgers2"),
1334 new Tuple2<>("california", "giants1"),
1335 new Tuple2<>("california", "giants2"),
1336 new Tuple2<>("new york", "yankees1"),
1337 new Tuple2<>("new york", "yankees2"),
1338 new Tuple2<>("new york", "mets1"),
1339 new Tuple2<>("new york", "mets2")),
1340 Arrays.asList(new Tuple2<>("california", "sharks1"),
1341 new Tuple2<>("california", "sharks2"),
1342 new Tuple2<>("california", "ducks1"),
1343 new Tuple2<>("california", "ducks2"),
1344 new Tuple2<>("new york", "rangers1"),
1345 new Tuple2<>("new york", "rangers2"),
1346 new Tuple2<>("new york", "islanders1"),
1347 new Tuple2<>("new york", "islanders2")));
1348
1349 JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
1350 ssc, inputData, 1);
1351 JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
1352
1353
1354 JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> {
1355 List<String> out = new ArrayList<>();
1356 out.add(in + "1");
1357 out.add(in + "2");
1358 return out.iterator();
1359 });
1360
1361 JavaTestUtils.attachTestOutputStream(flatMapped);
1362 List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
1363
1364 Assert.assertEquals(expected, result);
1365 }
1366
1367 @SuppressWarnings("unchecked")
1368 @Test
1369 public void testCoGroup() {
1370 List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
1371 Arrays.asList(new Tuple2<>("california", "dodgers"),
1372 new Tuple2<>("new york", "yankees")),
1373 Arrays.asList(new Tuple2<>("california", "sharks"),
1374 new Tuple2<>("new york", "rangers")));
1375
1376 List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
1377 Arrays.asList(new Tuple2<>("california", "giants"),
1378 new Tuple2<>("new york", "mets")),
1379 Arrays.asList(new Tuple2<>("california", "ducks"),
1380 new Tuple2<>("new york", "islanders")));
1381
1382
1383 List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList(
1384 Arrays.asList(
1385 new Tuple2<>("california",
1386 new Tuple2<>(Arrays.asList("dodgers"), Arrays.asList("giants"))),
1387 new Tuple2<>("new york",
1388 new Tuple2<>(Arrays.asList("yankees"), Arrays.asList("mets")))),
1389 Arrays.asList(
1390 new Tuple2<>("california",
1391 new Tuple2<>(Arrays.asList("sharks"), Arrays.asList("ducks"))),
1392 new Tuple2<>("new york",
1393 new Tuple2<>(Arrays.asList("rangers"), Arrays.asList("islanders")))));
1394
1395
1396 JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
1397 ssc, stringStringKVStream1, 1);
1398 JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
1399
1400 JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
1401 ssc, stringStringKVStream2, 1);
1402 JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
1403
1404 JavaPairDStream<String, Tuple2<Iterable<String>, Iterable<String>>> grouped =
1405 pairStream1.cogroup(pairStream2);
1406 JavaTestUtils.attachTestOutputStream(grouped);
1407 List<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> result =
1408 JavaTestUtils.runStreams(ssc, 2, 2);
1409
1410 Assert.assertEquals(expected.size(), result.size());
1411 Iterator<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> resultItr =
1412 result.iterator();
1413 Iterator<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expectedItr =
1414 expected.iterator();
1415 while (resultItr.hasNext() && expectedItr.hasNext()) {
1416 Iterator<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>> resultElements =
1417 resultItr.next().iterator();
1418 Iterator<Tuple2<String, Tuple2<List<String>, List<String>>>> expectedElements =
1419 expectedItr.next().iterator();
1420 while (resultElements.hasNext() && expectedElements.hasNext()) {
1421 Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> resultElement =
1422 resultElements.next();
1423 Tuple2<String, Tuple2<List<String>, List<String>>> expectedElement =
1424 expectedElements.next();
1425 Assert.assertEquals(expectedElement._1(), resultElement._1());
1426 equalIterable(expectedElement._2()._1(), resultElement._2()._1());
1427 equalIterable(expectedElement._2()._2(), resultElement._2()._2());
1428 }
1429 Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext());
1430 }
1431 }
1432
1433 @SuppressWarnings("unchecked")
1434 @Test
1435 public void testJoin() {
1436 List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
1437 Arrays.asList(new Tuple2<>("california", "dodgers"),
1438 new Tuple2<>("new york", "yankees")),
1439 Arrays.asList(new Tuple2<>("california", "sharks"),
1440 new Tuple2<>("new york", "rangers")));
1441
1442 List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
1443 Arrays.asList(new Tuple2<>("california", "giants"),
1444 new Tuple2<>("new york", "mets")),
1445 Arrays.asList(new Tuple2<>("california", "ducks"),
1446 new Tuple2<>("new york", "islanders")));
1447
1448
1449 List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
1450 Arrays.asList(
1451 new Tuple2<>("california",
1452 new Tuple2<>("dodgers", "giants")),
1453 new Tuple2<>("new york",
1454 new Tuple2<>("yankees", "mets"))),
1455 Arrays.asList(
1456 new Tuple2<>("california",
1457 new Tuple2<>("sharks", "ducks")),
1458 new Tuple2<>("new york",
1459 new Tuple2<>("rangers", "islanders"))));
1460
1461
1462 JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
1463 ssc, stringStringKVStream1, 1);
1464 JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
1465
1466 JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
1467 ssc, stringStringKVStream2, 1);
1468 JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
1469
1470 JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
1471 JavaTestUtils.attachTestOutputStream(joined);
1472 List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
1473
1474 Assert.assertEquals(expected, result);
1475 }
1476
1477 @SuppressWarnings("unchecked")
1478 @Test
1479 public void testLeftOuterJoin() {
1480 List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
1481 Arrays.asList(new Tuple2<>("california", "dodgers"),
1482 new Tuple2<>("new york", "yankees")),
1483 Arrays.asList(new Tuple2<>("california", "sharks") ));
1484
1485 List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
1486 Arrays.asList(new Tuple2<>("california", "giants") ),
1487 Arrays.asList(new Tuple2<>("new york", "islanders") )
1488
1489 );
1490
1491 List<List<Long>> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L));
1492
1493 JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
1494 ssc, stringStringKVStream1, 1);
1495 JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
1496
1497 JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
1498 ssc, stringStringKVStream2, 1);
1499 JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
1500
1501 JavaPairDStream<String, Tuple2<String, Optional<String>>> joined =
1502 pairStream1.leftOuterJoin(pairStream2);
1503 JavaDStream<Long> counted = joined.count();
1504 JavaTestUtils.attachTestOutputStream(counted);
1505 List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2);
1506
1507 Assert.assertEquals(expected, result);
1508 }
1509
1510 @SuppressWarnings("unchecked")
1511 @Test
1512 public void testCheckpointMasterRecovery() throws InterruptedException {
1513 List<List<String>> inputData = Arrays.asList(
1514 Arrays.asList("this", "is"),
1515 Arrays.asList("a", "test"),
1516 Arrays.asList("counting", "letters"));
1517
1518 List<List<Integer>> expectedInitial = Arrays.asList(
1519 Arrays.asList(4,2));
1520 List<List<Integer>> expectedFinal = Arrays.asList(
1521 Arrays.asList(1,4),
1522 Arrays.asList(8,7));
1523
1524 File tempDir = Files.createTempDir();
1525 tempDir.deleteOnExit();
1526 ssc.checkpoint(tempDir.getAbsolutePath());
1527
1528 JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
1529 JavaDStream<Integer> letterCount = stream.map(String::length);
1530 JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
1531 List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1);
1532
1533 assertOrderInvariantEquals(expectedInitial, initialResult);
1534 Thread.sleep(1000);
1535 ssc.stop();
1536
1537 ssc = new JavaStreamingContext(tempDir.getAbsolutePath());
1538
1539
1540 List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
1541 assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
1542 ssc.stop();
1543 Utils.deleteRecursively(tempDir);
1544 }
1545
1546 @SuppressWarnings("unchecked")
1547 @Test
1548 public void testContextGetOrCreate() throws InterruptedException {
1549 ssc.stop();
1550
1551 SparkConf conf = new SparkConf()
1552 .setMaster("local[2]")
1553 .setAppName("test")
1554 .set("newContext", "true");
1555
1556 File emptyDir = Files.createTempDir();
1557 emptyDir.deleteOnExit();
1558 StreamingContextSuite contextSuite = new StreamingContextSuite();
1559 String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint();
1560 String checkpointDir = contextSuite.createValidCheckpoint();
1561
1562
1563
1564 AtomicBoolean newContextCreated = new AtomicBoolean(false);
1565 Function0<JavaStreamingContext> creatingFunc = () -> {
1566 newContextCreated.set(true);
1567 return new JavaStreamingContext(conf, Seconds.apply(1));
1568 };
1569
1570 newContextCreated.set(false);
1571 ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc);
1572 Assert.assertTrue("new context not created", newContextCreated.get());
1573 ssc.stop();
1574
1575 newContextCreated.set(false);
1576 ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc,
1577 new Configuration(), true);
1578 Assert.assertTrue("new context not created", newContextCreated.get());
1579 ssc.stop();
1580
1581 newContextCreated.set(false);
1582 ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
1583 new Configuration());
1584 Assert.assertTrue("old context not recovered", !newContextCreated.get());
1585 ssc.stop();
1586
1587 newContextCreated.set(false);
1588 JavaSparkContext sc = new JavaSparkContext(conf);
1589 ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
1590 new Configuration());
1591 Assert.assertTrue("old context not recovered", !newContextCreated.get());
1592 ssc.stop();
1593 }
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628 @Test
1629 public void testSocketTextStream() {
1630 ssc.socketTextStream("localhost", 12345);
1631 }
1632
1633 @Test
1634 public void testSocketString() {
1635 ssc.socketStream(
1636 "localhost",
1637 12345,
1638 in -> {
1639 List<String> out = new ArrayList<>();
1640 try (BufferedReader reader = new BufferedReader(
1641 new InputStreamReader(in, StandardCharsets.UTF_8))) {
1642 for (String line; (line = reader.readLine()) != null;) {
1643 out.add(line);
1644 }
1645 }
1646 return out;
1647 },
1648 StorageLevel.MEMORY_ONLY());
1649 }
1650
1651 @SuppressWarnings("unchecked")
1652 @Test
1653 public void testTextFileStream() throws IOException {
1654 File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark");
1655 List<List<String>> expected = fileTestPrepare(testDir);
1656
1657 JavaDStream<String> input = ssc.textFileStream(testDir.toString());
1658 JavaTestUtils.attachTestOutputStream(input);
1659 List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
1660
1661 assertOrderInvariantEquals(expected, result);
1662 }
1663
1664 @SuppressWarnings("unchecked")
1665 @Test
1666 public void testFileStream() throws IOException {
1667 File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark");
1668 List<List<String>> expected = fileTestPrepare(testDir);
1669
1670 JavaPairInputDStream<LongWritable, Text> inputStream = ssc.fileStream(
1671 testDir.toString(),
1672 LongWritable.class,
1673 Text.class,
1674 TextInputFormat.class,
1675 v1 -> Boolean.TRUE,
1676 true);
1677
1678 JavaDStream<String> test = inputStream.map(v1 -> v1._2().toString());
1679
1680 JavaTestUtils.attachTestOutputStream(test);
1681 List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
1682
1683 assertOrderInvariantEquals(expected, result);
1684 }
1685
1686 @Test
1687 public void testRawSocketStream() {
1688 ssc.rawSocketStream("localhost", 12345);
1689 }
1690
1691 private static List<List<String>> fileTestPrepare(File testDir) throws IOException {
1692 File existingFile = new File(testDir, "0");
1693 Files.write("0\n", existingFile, StandardCharsets.UTF_8);
1694 Assert.assertTrue(existingFile.setLastModified(1000));
1695 Assert.assertEquals(1000, existingFile.lastModified());
1696 return Arrays.asList(Arrays.asList("0"));
1697 }
1698
1699 @SuppressWarnings("unchecked")
1700
1701 private void compileSaveAsJavaAPI(JavaPairDStream<LongWritable,Text> pds) {
1702 pds.saveAsNewAPIHadoopFiles(
1703 "", "", LongWritable.class, Text.class,
1704 org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
1705 pds.saveAsHadoopFiles(
1706 "", "", LongWritable.class, Text.class,
1707 org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
1708
1709 pds.saveAsNewAPIHadoopFiles(
1710 "", "", LongWritable.class, Text.class,
1711 (Class) org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
1712 pds.saveAsHadoopFiles(
1713 "", "", LongWritable.class, Text.class,
1714 (Class) org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
1715 }
1716
1717 }