0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package test.org.apache.spark.streaming;
0019
0020 import java.io.Serializable;
0021 import java.util.*;
0022
0023 import org.apache.spark.api.java.function.Function3;
0024 import org.apache.spark.api.java.function.Function4;
0025 import org.apache.spark.streaming.Duration;
0026 import org.apache.spark.streaming.Durations;
0027 import org.apache.spark.streaming.JavaTestUtils;
0028 import org.apache.spark.streaming.LocalJavaStreamingContext;
0029 import org.apache.spark.streaming.State;
0030 import org.apache.spark.streaming.StateSpec;
0031 import org.apache.spark.streaming.Time;
0032 import scala.Tuple2;
0033
0034 import com.google.common.collect.Sets;
0035 import org.junit.Assert;
0036 import org.junit.Test;
0037
0038 import org.apache.spark.HashPartitioner;
0039 import org.apache.spark.api.java.Optional;
0040 import org.apache.spark.api.java.JavaPairRDD;
0041 import org.apache.spark.api.java.JavaRDD;
0042 import org.apache.spark.api.java.function.PairFunction;
0043 import org.apache.spark.streaming.api.java.JavaDStream;
0044 import org.apache.spark.streaming.api.java.JavaPairDStream;
0045 import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
0046
0047
0048
0049
0050
0051 @SuppressWarnings("unchecked")
0052 public class Java8APISuite extends LocalJavaStreamingContext implements Serializable {
0053
0054 @Test
0055 public void testMap() {
0056 List<List<String>> inputData = Arrays.asList(
0057 Arrays.asList("hello", "world"),
0058 Arrays.asList("goodnight", "moon"));
0059
0060 List<List<Integer>> expected = Arrays.asList(
0061 Arrays.asList(5, 5),
0062 Arrays.asList(9, 4));
0063
0064 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0065 JavaDStream<Integer> letterCount = stream.map(String::length);
0066 JavaTestUtils.attachTestOutputStream(letterCount);
0067 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0068
0069 assertOrderInvariantEquals(expected, result);
0070 }
0071
0072 @Test
0073 public void testFilter() {
0074 List<List<String>> inputData = Arrays.asList(
0075 Arrays.asList("giants", "dodgers"),
0076 Arrays.asList("yankees", "red sox"));
0077
0078 List<List<String>> expected = Arrays.asList(
0079 Arrays.asList("giants"),
0080 Arrays.asList("yankees"));
0081
0082 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0083 JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
0084 JavaTestUtils.attachTestOutputStream(filtered);
0085 List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0086
0087 assertOrderInvariantEquals(expected, result);
0088 }
0089
0090 @Test
0091 public void testMapPartitions() {
0092 List<List<String>> inputData = Arrays.asList(
0093 Arrays.asList("giants", "dodgers"),
0094 Arrays.asList("yankees", "red sox"));
0095
0096 List<List<String>> expected = Arrays.asList(
0097 Arrays.asList("GIANTSDODGERS"),
0098 Arrays.asList("YANKEESRED SOX"));
0099
0100 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0101 JavaDStream<String> mapped = stream.mapPartitions(in -> {
0102 String out = "";
0103 while (in.hasNext()) {
0104 out = out + in.next().toUpperCase(Locale.ROOT);
0105 }
0106 return Arrays.asList(out).iterator();
0107 });
0108 JavaTestUtils.attachTestOutputStream(mapped);
0109 List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0110
0111 Assert.assertEquals(expected, result);
0112 }
0113
0114 @Test
0115 public void testReduce() {
0116 List<List<Integer>> inputData = Arrays.asList(
0117 Arrays.asList(1, 2, 3),
0118 Arrays.asList(4, 5, 6),
0119 Arrays.asList(7, 8, 9));
0120
0121 List<List<Integer>> expected = Arrays.asList(
0122 Arrays.asList(6),
0123 Arrays.asList(15),
0124 Arrays.asList(24));
0125
0126 JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0127 JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y);
0128 JavaTestUtils.attachTestOutputStream(reduced);
0129 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0130
0131 Assert.assertEquals(expected, result);
0132 }
0133
0134 @Test
0135 public void testReduceByWindow() {
0136 List<List<Integer>> inputData = Arrays.asList(
0137 Arrays.asList(1, 2, 3),
0138 Arrays.asList(4, 5, 6),
0139 Arrays.asList(7, 8, 9));
0140
0141 List<List<Integer>> expected = Arrays.asList(
0142 Arrays.asList(6),
0143 Arrays.asList(21),
0144 Arrays.asList(39),
0145 Arrays.asList(24));
0146
0147 JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0148 JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(
0149 (x, y) -> x + y, (x, y) -> x - y, new Duration(2000), new Duration(1000));
0150 JavaTestUtils.attachTestOutputStream(reducedWindowed);
0151 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
0152
0153 Assert.assertEquals(expected, result);
0154 }
0155
0156 @Test
0157 public void testTransform() {
0158 List<List<Integer>> inputData = Arrays.asList(
0159 Arrays.asList(1, 2, 3),
0160 Arrays.asList(4, 5, 6),
0161 Arrays.asList(7, 8, 9));
0162
0163 List<List<Integer>> expected = Arrays.asList(
0164 Arrays.asList(3, 4, 5),
0165 Arrays.asList(6, 7, 8),
0166 Arrays.asList(9, 10, 11));
0167
0168 JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0169 JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2));
0170
0171 JavaTestUtils.attachTestOutputStream(transformed);
0172 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0173
0174 assertOrderInvariantEquals(expected, result);
0175 }
0176
0177 @Test
0178 public void testVariousTransform() {
0179
0180
0181 List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
0182 JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0183
0184 List<List<Tuple2<String, Integer>>> pairInputData =
0185 Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
0186 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
0187 JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
0188
0189 JavaDStream<Integer> transformed1 = stream.transform(in -> null);
0190 JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null);
0191 JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null);
0192 JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null);
0193 JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null);
0194 JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null);
0195 JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null);
0196 JavaPairDStream<String, String> pairTransformed4 =
0197 pairStream.transformToPair((x, time) -> null);
0198
0199 }
0200
0201 @Test
0202 public void testTransformWith() {
0203 List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
0204 Arrays.asList(
0205 new Tuple2<>("california", "dodgers"),
0206 new Tuple2<>("new york", "yankees")),
0207 Arrays.asList(
0208 new Tuple2<>("california", "sharks"),
0209 new Tuple2<>("new york", "rangers")));
0210
0211 List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
0212 Arrays.asList(
0213 new Tuple2<>("california", "giants"),
0214 new Tuple2<>("new york", "mets")),
0215 Arrays.asList(
0216 new Tuple2<>("california", "ducks"),
0217 new Tuple2<>("new york", "islanders")));
0218
0219
0220 List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
0221 Sets.newHashSet(
0222 new Tuple2<>("california",
0223 new Tuple2<>("dodgers", "giants")),
0224 new Tuple2<>("new york",
0225 new Tuple2<>("yankees", "mets"))),
0226 Sets.newHashSet(
0227 new Tuple2<>("california",
0228 new Tuple2<>("sharks", "ducks")),
0229 new Tuple2<>("new york",
0230 new Tuple2<>("rangers", "islanders"))));
0231
0232 JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
0233 ssc, stringStringKVStream1, 1);
0234 JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
0235
0236 JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
0237 ssc, stringStringKVStream2, 1);
0238 JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
0239
0240 JavaPairDStream<String, Tuple2<String, String>> joined =
0241 pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y));
0242
0243 JavaTestUtils.attachTestOutputStream(joined);
0244 List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0245 List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new ArrayList<>();
0246 for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
0247 unorderedResult.add(Sets.newHashSet(res));
0248 }
0249
0250 Assert.assertEquals(expected, unorderedResult);
0251 }
0252
0253
0254 @Test
0255 public void testVariousTransformWith() {
0256
0257
0258 List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
0259 List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
0260 JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
0261 JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
0262
0263 List<List<Tuple2<String, Integer>>> pairInputData1 =
0264 Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
0265 List<List<Tuple2<Double, Character>>> pairInputData2 =
0266 Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
0267 JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
0268 JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
0269 JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
0270 JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
0271
0272 JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null);
0273 JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null);
0274
0275 JavaPairDStream<Double, Double> transformed3 =
0276 stream1.transformWithToPair(stream2,(x, y, z) -> null);
0277
0278 JavaPairDStream<Double, Double> transformed4 =
0279 stream1.transformWithToPair(pairStream1,(x, y, z) -> null);
0280
0281 JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null);
0282
0283 JavaDStream<Double> pairTransformed2_ =
0284 pairStream1.transformWith(pairStream1,(x, y, z) -> null);
0285
0286 JavaPairDStream<Double, Double> pairTransformed3 =
0287 pairStream1.transformWithToPair(stream2,(x, y, z) -> null);
0288
0289 JavaPairDStream<Double, Double> pairTransformed4 =
0290 pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null);
0291 }
0292
0293 @Test
0294 public void testStreamingContextTransform() {
0295 List<List<Integer>> stream1input = Arrays.asList(
0296 Arrays.asList(1),
0297 Arrays.asList(2)
0298 );
0299
0300 List<List<Integer>> stream2input = Arrays.asList(
0301 Arrays.asList(3),
0302 Arrays.asList(4)
0303 );
0304
0305 List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
0306 Arrays.asList(new Tuple2<>(1, "x")),
0307 Arrays.asList(new Tuple2<>(2, "y"))
0308 );
0309
0310 List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
0311 Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
0312 Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
0313 );
0314
0315 JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
0316 JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
0317 JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
0318 JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
0319
0320 List<JavaDStream<?>> listOfDStreams1 = Arrays.asList(stream1, stream2);
0321
0322
0323 JavaDStream<Long> transformed1 = ssc.transform(
0324 listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
0325 Assert.assertEquals(2, listOfRDDs.size());
0326 return null;
0327 });
0328
0329 List<JavaDStream<?>> listOfDStreams2 =
0330 Arrays.asList(stream1, stream2, pairStream1.toJavaDStream());
0331
0332 JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
0333 listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
0334 Assert.assertEquals(3, listOfRDDs.size());
0335 JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0);
0336 JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1);
0337 JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2);
0338 JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
0339 PairFunction<Integer, Integer, Integer> mapToTuple =
0340 (Integer i) -> new Tuple2<>(i, i);
0341 return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
0342 });
0343 JavaTestUtils.attachTestOutputStream(transformed2);
0344 List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
0345 JavaTestUtils.runStreams(ssc, 2, 2);
0346 Assert.assertEquals(expected, result);
0347 }
0348
0349 @Test
0350 public void testFlatMap() {
0351 List<List<String>> inputData = Arrays.asList(
0352 Arrays.asList("go", "giants"),
0353 Arrays.asList("boo", "dodgers"),
0354 Arrays.asList("athletics"));
0355
0356 List<List<String>> expected = Arrays.asList(
0357 Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"),
0358 Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"),
0359 Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s"));
0360
0361 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0362 JavaDStream<String> flatMapped = stream.flatMap(
0363 s -> Arrays.asList(s.split("(?!^)")).iterator());
0364 JavaTestUtils.attachTestOutputStream(flatMapped);
0365 List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0366
0367 assertOrderInvariantEquals(expected, result);
0368 }
0369
0370 @Test
0371 public void testPairFlatMap() {
0372 List<List<String>> inputData = Arrays.asList(
0373 Arrays.asList("giants"),
0374 Arrays.asList("dodgers"),
0375 Arrays.asList("athletics"));
0376
0377 List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0378 Arrays.asList(
0379 new Tuple2<>(6, "g"),
0380 new Tuple2<>(6, "i"),
0381 new Tuple2<>(6, "a"),
0382 new Tuple2<>(6, "n"),
0383 new Tuple2<>(6, "t"),
0384 new Tuple2<>(6, "s")),
0385 Arrays.asList(
0386 new Tuple2<>(7, "d"),
0387 new Tuple2<>(7, "o"),
0388 new Tuple2<>(7, "d"),
0389 new Tuple2<>(7, "g"),
0390 new Tuple2<>(7, "e"),
0391 new Tuple2<>(7, "r"),
0392 new Tuple2<>(7, "s")),
0393 Arrays.asList(
0394 new Tuple2<>(9, "a"),
0395 new Tuple2<>(9, "t"),
0396 new Tuple2<>(9, "h"),
0397 new Tuple2<>(9, "l"),
0398 new Tuple2<>(9, "e"),
0399 new Tuple2<>(9, "t"),
0400 new Tuple2<>(9, "i"),
0401 new Tuple2<>(9, "c"),
0402 new Tuple2<>(9, "s")));
0403
0404 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0405 JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
0406 List<Tuple2<Integer, String>> out = new ArrayList<>();
0407 for (String letter : s.split("(?!^)")) {
0408 out.add(new Tuple2<>(s.length(), letter));
0409 }
0410 return out.iterator();
0411 });
0412
0413 JavaTestUtils.attachTestOutputStream(flatMapped);
0414 List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0415
0416 Assert.assertEquals(expected, result);
0417 }
0418
0419
0420
0421
0422
0423 public static <T extends Comparable<T>> void assertOrderInvariantEquals(
0424 List<List<T>> expected, List<List<T>> actual) {
0425 expected.forEach(Collections::sort);
0426 List<List<T>> sortedActual = new ArrayList<>();
0427 actual.forEach(list -> {
0428 List<T> sortedList = new ArrayList<>(list);
0429 Collections.sort(sortedList);
0430 sortedActual.add(sortedList);
0431 });
0432 Assert.assertEquals(expected, sortedActual);
0433 }
0434
0435 @Test
0436 public void testPairFilter() {
0437 List<List<String>> inputData = Arrays.asList(
0438 Arrays.asList("giants", "dodgers"),
0439 Arrays.asList("yankees", "red sox"));
0440
0441 List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0442 Arrays.asList(new Tuple2<>("giants", 6)),
0443 Arrays.asList(new Tuple2<>("yankees", 7)));
0444
0445 JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0446 JavaPairDStream<String, Integer> pairStream =
0447 stream.mapToPair(x -> new Tuple2<>(x, x.length()));
0448 JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a"));
0449 JavaTestUtils.attachTestOutputStream(filtered);
0450 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0451
0452 Assert.assertEquals(expected, result);
0453 }
0454
0455 List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
0456 Arrays.asList(new Tuple2<>("california", "dodgers"),
0457 new Tuple2<>("california", "giants"),
0458 new Tuple2<>("new york", "yankees"),
0459 new Tuple2<>("new york", "mets")),
0460 Arrays.asList(new Tuple2<>("california", "sharks"),
0461 new Tuple2<>("california", "ducks"),
0462 new Tuple2<>("new york", "rangers"),
0463 new Tuple2<>("new york", "islanders")));
0464
0465 List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
0466 Arrays.asList(
0467 new Tuple2<>("california", 1),
0468 new Tuple2<>("california", 3),
0469 new Tuple2<>("new york", 4),
0470 new Tuple2<>("new york", 1)),
0471 Arrays.asList(
0472 new Tuple2<>("california", 5),
0473 new Tuple2<>("california", 5),
0474 new Tuple2<>("new york", 3),
0475 new Tuple2<>("new york", 1)));
0476
0477 @Test
0478 public void testPairMap() {
0479 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0480
0481 List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0482 Arrays.asList(
0483 new Tuple2<>(1, "california"),
0484 new Tuple2<>(3, "california"),
0485 new Tuple2<>(4, "new york"),
0486 new Tuple2<>(1, "new york")),
0487 Arrays.asList(
0488 new Tuple2<>(5, "california"),
0489 new Tuple2<>(5, "california"),
0490 new Tuple2<>(3, "new york"),
0491 new Tuple2<>(1, "new york")));
0492
0493 JavaDStream<Tuple2<String, Integer>> stream =
0494 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0495 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0496 JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(Tuple2::swap);
0497 JavaTestUtils.attachTestOutputStream(reversed);
0498 List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0499
0500 Assert.assertEquals(expected, result);
0501 }
0502
0503 @Test
0504 public void testPairMapPartitions() {
0505 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0506
0507 List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0508 Arrays.asList(
0509 new Tuple2<>(1, "california"),
0510 new Tuple2<>(3, "california"),
0511 new Tuple2<>(4, "new york"),
0512 new Tuple2<>(1, "new york")),
0513 Arrays.asList(
0514 new Tuple2<>(5, "california"),
0515 new Tuple2<>(5, "california"),
0516 new Tuple2<>(3, "new york"),
0517 new Tuple2<>(1, "new york")));
0518
0519 JavaDStream<Tuple2<String, Integer>> stream =
0520 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0521 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0522 JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
0523 LinkedList<Tuple2<Integer, String>> out = new LinkedList<>();
0524 while (in.hasNext()) {
0525 Tuple2<String, Integer> next = in.next();
0526 out.add(next.swap());
0527 }
0528 return out.iterator();
0529 });
0530
0531 JavaTestUtils.attachTestOutputStream(reversed);
0532 List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0533
0534 Assert.assertEquals(expected, result);
0535 }
0536
0537 @Test
0538 public void testPairMap2() {
0539 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0540
0541 List<List<Integer>> expected = Arrays.asList(
0542 Arrays.asList(1, 3, 4, 1),
0543 Arrays.asList(5, 5, 3, 1));
0544
0545 JavaDStream<Tuple2<String, Integer>> stream =
0546 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0547 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0548 JavaDStream<Integer> reversed = pairStream.map(Tuple2::_2);
0549 JavaTestUtils.attachTestOutputStream(reversed);
0550 List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0551
0552 Assert.assertEquals(expected, result);
0553 }
0554
0555 @Test
0556 public void testPairToPairFlatMapWithChangingTypes() {
0557 List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
0558 Arrays.asList(
0559 new Tuple2<>("hi", 1),
0560 new Tuple2<>("ho", 2)),
0561 Arrays.asList(
0562 new Tuple2<>("hi", 1),
0563 new Tuple2<>("ho", 2)));
0564
0565 List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
0566 Arrays.asList(
0567 new Tuple2<>(1, "h"),
0568 new Tuple2<>(1, "i"),
0569 new Tuple2<>(2, "h"),
0570 new Tuple2<>(2, "o")),
0571 Arrays.asList(
0572 new Tuple2<>(1, "h"),
0573 new Tuple2<>(1, "i"),
0574 new Tuple2<>(2, "h"),
0575 new Tuple2<>(2, "o")));
0576
0577 JavaDStream<Tuple2<String, Integer>> stream =
0578 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0579 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0580 JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
0581 List<Tuple2<Integer, String>> out = new LinkedList<>();
0582 for (Character s : in._1().toCharArray()) {
0583 out.add(new Tuple2<>(in._2(), s.toString()));
0584 }
0585 return out.iterator();
0586 });
0587
0588 JavaTestUtils.attachTestOutputStream(flatMapped);
0589 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0590
0591 Assert.assertEquals(expected, result);
0592 }
0593
0594 @Test
0595 public void testPairReduceByKey() {
0596 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0597
0598 List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0599 Arrays.asList(
0600 new Tuple2<>("california", 4),
0601 new Tuple2<>("new york", 5)),
0602 Arrays.asList(
0603 new Tuple2<>("california", 10),
0604 new Tuple2<>("new york", 4)));
0605
0606 JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
0607 ssc, inputData, 1);
0608 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0609
0610 JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y);
0611
0612 JavaTestUtils.attachTestOutputStream(reduced);
0613 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0614
0615 Assert.assertEquals(expected, result);
0616 }
0617
0618 @Test
0619 public void testCombineByKey() {
0620 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0621
0622 List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0623 Arrays.asList(
0624 new Tuple2<>("california", 4),
0625 new Tuple2<>("new york", 5)),
0626 Arrays.asList(
0627 new Tuple2<>("california", 10),
0628 new Tuple2<>("new york", 4)));
0629
0630 JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
0631 ssc, inputData, 1);
0632 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0633
0634 JavaPairDStream<String, Integer> combined = pairStream.combineByKey(i -> i,
0635 (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2));
0636
0637 JavaTestUtils.attachTestOutputStream(combined);
0638 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0639
0640 Assert.assertEquals(expected, result);
0641 }
0642
0643 @Test
0644 public void testReduceByKeyAndWindow() {
0645 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0646
0647 List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0648 Arrays.asList(new Tuple2<>("california", 4),
0649 new Tuple2<>("new york", 5)),
0650 Arrays.asList(new Tuple2<>("california", 14),
0651 new Tuple2<>("new york", 9)),
0652 Arrays.asList(new Tuple2<>("california", 10),
0653 new Tuple2<>("new york", 4)));
0654
0655 JavaDStream<Tuple2<String, Integer>> stream =
0656 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0657 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0658
0659 JavaPairDStream<String, Integer> reduceWindowed =
0660 pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000));
0661 JavaTestUtils.attachTestOutputStream(reduceWindowed);
0662 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0663
0664 Assert.assertEquals(expected, result);
0665 }
0666
0667 @Test
0668 public void testUpdateStateByKey() {
0669 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0670
0671 List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0672 Arrays.asList(new Tuple2<>("california", 4),
0673 new Tuple2<>("new york", 5)),
0674 Arrays.asList(new Tuple2<>("california", 14),
0675 new Tuple2<>("new york", 9)),
0676 Arrays.asList(new Tuple2<>("california", 14),
0677 new Tuple2<>("new york", 9)));
0678
0679 JavaDStream<Tuple2<String, Integer>> stream =
0680 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0681 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0682
0683 JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
0684 int out = 0;
0685 if (state.isPresent()) {
0686 out = out + state.get();
0687 }
0688 for (Integer v : values) {
0689 out = out + v;
0690 }
0691 return Optional.of(out);
0692 });
0693
0694 JavaTestUtils.attachTestOutputStream(updated);
0695 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0696
0697 Assert.assertEquals(expected, result);
0698 }
0699
0700 @Test
0701 public void testReduceByKeyAndWindowWithInverse() {
0702 List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
0703
0704 List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
0705 Arrays.asList(new Tuple2<>("california", 4),
0706 new Tuple2<>("new york", 5)),
0707 Arrays.asList(new Tuple2<>("california", 14),
0708 new Tuple2<>("new york", 9)),
0709 Arrays.asList(new Tuple2<>("california", 10),
0710 new Tuple2<>("new york", 4)));
0711
0712 JavaDStream<Tuple2<String, Integer>> stream =
0713 JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
0714 JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0715
0716 JavaPairDStream<String, Integer> reduceWindowed =
0717 pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000),
0718 new Duration(1000));
0719 JavaTestUtils.attachTestOutputStream(reduceWindowed);
0720 List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
0721
0722 Assert.assertEquals(expected, result);
0723 }
0724
0725 @Test
0726 public void testPairTransform() {
0727 List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
0728 Arrays.asList(
0729 new Tuple2<>(3, 5),
0730 new Tuple2<>(1, 5),
0731 new Tuple2<>(4, 5),
0732 new Tuple2<>(2, 5)),
0733 Arrays.asList(
0734 new Tuple2<>(2, 5),
0735 new Tuple2<>(3, 5),
0736 new Tuple2<>(4, 5),
0737 new Tuple2<>(1, 5)));
0738
0739 List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
0740 Arrays.asList(
0741 new Tuple2<>(1, 5),
0742 new Tuple2<>(2, 5),
0743 new Tuple2<>(3, 5),
0744 new Tuple2<>(4, 5)),
0745 Arrays.asList(
0746 new Tuple2<>(1, 5),
0747 new Tuple2<>(2, 5),
0748 new Tuple2<>(3, 5),
0749 new Tuple2<>(4, 5)));
0750
0751 JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
0752 ssc, inputData, 1);
0753 JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0754
0755 JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey());
0756
0757 JavaTestUtils.attachTestOutputStream(sorted);
0758 List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0759
0760 Assert.assertEquals(expected, result);
0761 }
0762
0763 @Test
0764 public void testPairToNormalRDDTransform() {
0765 List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
0766 Arrays.asList(
0767 new Tuple2<>(3, 5),
0768 new Tuple2<>(1, 5),
0769 new Tuple2<>(4, 5),
0770 new Tuple2<>(2, 5)),
0771 Arrays.asList(
0772 new Tuple2<>(2, 5),
0773 new Tuple2<>(3, 5),
0774 new Tuple2<>(4, 5),
0775 new Tuple2<>(1, 5)));
0776
0777 List<List<Integer>> expected = Arrays.asList(
0778 Arrays.asList(3, 1, 4, 2),
0779 Arrays.asList(2, 3, 4, 1));
0780
0781 JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
0782 ssc, inputData, 1);
0783 JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
0784 JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1()));
0785 JavaTestUtils.attachTestOutputStream(firstParts);
0786 List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0787
0788 Assert.assertEquals(expected, result);
0789 }
0790
0791 @Test
0792 public void testMapValues() {
0793 List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
0794
0795 List<List<Tuple2<String, String>>> expected = Arrays.asList(
0796 Arrays.asList(new Tuple2<>("california", "DODGERS"),
0797 new Tuple2<>("california", "GIANTS"),
0798 new Tuple2<>("new york", "YANKEES"),
0799 new Tuple2<>("new york", "METS")),
0800 Arrays.asList(new Tuple2<>("california", "SHARKS"),
0801 new Tuple2<>("california", "DUCKS"),
0802 new Tuple2<>("new york", "RANGERS"),
0803 new Tuple2<>("new york", "ISLANDERS")));
0804
0805 JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
0806 ssc, inputData, 1);
0807 JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
0808
0809 JavaPairDStream<String, String> mapped =
0810 pairStream.mapValues(s -> s.toUpperCase(Locale.ROOT));
0811 JavaTestUtils.attachTestOutputStream(mapped);
0812 List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0813
0814 Assert.assertEquals(expected, result);
0815 }
0816
0817 @Test
0818 public void testFlatMapValues() {
0819 List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
0820
0821 List<List<Tuple2<String, String>>> expected = Arrays.asList(
0822 Arrays.asList(new Tuple2<>("california", "dodgers1"),
0823 new Tuple2<>("california", "dodgers2"),
0824 new Tuple2<>("california", "giants1"),
0825 new Tuple2<>("california", "giants2"),
0826 new Tuple2<>("new york", "yankees1"),
0827 new Tuple2<>("new york", "yankees2"),
0828 new Tuple2<>("new york", "mets1"),
0829 new Tuple2<>("new york", "mets2")),
0830 Arrays.asList(new Tuple2<>("california", "sharks1"),
0831 new Tuple2<>("california", "sharks2"),
0832 new Tuple2<>("california", "ducks1"),
0833 new Tuple2<>("california", "ducks2"),
0834 new Tuple2<>("new york", "rangers1"),
0835 new Tuple2<>("new york", "rangers2"),
0836 new Tuple2<>("new york", "islanders1"),
0837 new Tuple2<>("new york", "islanders2")));
0838
0839 JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
0840 ssc, inputData, 1);
0841 JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
0842
0843 JavaPairDStream<String, String> flatMapped =
0844 pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2").iterator());
0845 JavaTestUtils.attachTestOutputStream(flatMapped);
0846 List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
0847 Assert.assertEquals(expected, result);
0848 }
0849
0850
0851
0852
0853 public void testMapWithStateAPI() {
0854 JavaPairRDD<String, Boolean> initialRDD = null;
0855 JavaPairDStream<String, Integer> wordsDstream = null;
0856
0857 Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mapFn =
0858 (time, key, value, state) -> {
0859
0860 state.exists();
0861 state.get();
0862 state.isTimingOut();
0863 state.remove();
0864 state.update(true);
0865 return Optional.of(2.0);
0866 };
0867
0868 JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
0869 wordsDstream.mapWithState(
0870 StateSpec.function(mapFn)
0871 .initialState(initialRDD)
0872 .numPartitions(10)
0873 .partitioner(new HashPartitioner(10))
0874 .timeout(Durations.seconds(10)));
0875
0876 JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots();
0877
0878 Function3<String, Optional<Integer>, State<Boolean>, Double> mapFn2 =
0879 (key, value, state) -> {
0880 state.exists();
0881 state.get();
0882 state.isTimingOut();
0883 state.remove();
0884 state.update(true);
0885 return 2.0;
0886 };
0887
0888 JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
0889 wordsDstream.mapWithState(
0890 StateSpec.function(mapFn2)
0891 .initialState(initialRDD)
0892 .numPartitions(10)
0893 .partitioner(new HashPartitioner(10))
0894 .timeout(Durations.seconds(10)));
0895
0896 JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots();
0897 }
0898 }