0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package test.org.apache.spark;
0019
0020 import java.io.*;
0021 import java.nio.channels.FileChannel;
0022 import java.nio.ByteBuffer;
0023 import java.nio.charset.StandardCharsets;
0024 import java.util.ArrayList;
0025 import java.util.Arrays;
0026 import java.util.Collections;
0027 import java.util.Comparator;
0028 import java.util.HashMap;
0029 import java.util.HashSet;
0030 import java.util.Iterator;
0031 import java.util.LinkedList;
0032 import java.util.List;
0033 import java.util.Map;
0034 import java.util.concurrent.*;
0035 import java.util.stream.Collectors;
0036 import java.util.stream.IntStream;
0037
0038 import org.apache.spark.Partitioner;
0039 import org.apache.spark.SparkConf;
0040 import org.apache.spark.TaskContext;
0041 import org.apache.spark.TaskContext$;
0042 import scala.Tuple2;
0043 import scala.Tuple3;
0044 import scala.Tuple4;
0045 import scala.collection.JavaConverters;
0046
0047 import com.google.common.collect.ImmutableMap;
0048 import com.google.common.collect.Iterables;
0049 import com.google.common.collect.Iterators;
0050 import com.google.common.collect.Lists;
0051 import com.google.common.base.Throwables;
0052 import com.google.common.io.Files;
0053 import org.apache.hadoop.fs.Path;
0054 import org.apache.hadoop.io.IntWritable;
0055 import org.apache.hadoop.io.Text;
0056 import org.apache.hadoop.io.compress.DefaultCodec;
0057 import org.apache.hadoop.mapred.SequenceFileInputFormat;
0058 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
0059 import org.apache.hadoop.mapreduce.Job;
0060 import org.junit.After;
0061 import static org.junit.Assert.*;
0062 import org.junit.Before;
0063 import org.junit.Test;
0064
0065 import org.apache.spark.api.java.JavaDoubleRDD;
0066 import org.apache.spark.api.java.JavaFutureAction;
0067 import org.apache.spark.api.java.JavaPairRDD;
0068 import org.apache.spark.api.java.JavaRDD;
0069 import org.apache.spark.api.java.JavaSparkContext;
0070 import org.apache.spark.api.java.Optional;
0071 import org.apache.spark.api.java.function.*;
0072 import org.apache.spark.input.PortableDataStream;
0073 import org.apache.spark.partial.BoundedDouble;
0074 import org.apache.spark.partial.PartialResult;
0075 import org.apache.spark.rdd.RDD;
0076 import org.apache.spark.serializer.KryoSerializer;
0077 import org.apache.spark.storage.StorageLevel;
0078 import org.apache.spark.util.LongAccumulator;
0079 import org.apache.spark.util.StatCounter;
0080
0081
0082
0083
0084 public class JavaAPISuite implements Serializable {
0085 private transient JavaSparkContext sc;
0086 private transient File tempDir;
0087
0088 @Before
0089 public void setUp() {
0090 sc = new JavaSparkContext("local", "JavaAPISuite");
0091 tempDir = Files.createTempDir();
0092 tempDir.deleteOnExit();
0093 }
0094
0095 @After
0096 public void tearDown() {
0097 sc.stop();
0098 sc = null;
0099 }
0100
0101 @SuppressWarnings("unchecked")
0102 @Test
0103 public void sparkContextUnion() {
0104
0105 List<String> strings = Arrays.asList("Hello", "World");
0106 JavaRDD<String> s1 = sc.parallelize(strings);
0107 JavaRDD<String> s2 = sc.parallelize(strings);
0108
0109 JavaRDD<String> sUnion = sc.union(s1, s2);
0110 assertEquals(4, sUnion.count());
0111
0112
0113 List<Double> doubles = Arrays.asList(1.0, 2.0);
0114 JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
0115 JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
0116 JavaDoubleRDD dUnion = sc.union(d1, d2);
0117 assertEquals(4, dUnion.count());
0118
0119
0120 List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
0121 pairs.add(new Tuple2<>(1, 2));
0122 pairs.add(new Tuple2<>(3, 4));
0123 JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
0124 JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
0125 JavaPairRDD<Integer, Integer> pUnion = sc.union(p1, p2);
0126 assertEquals(4, pUnion.count());
0127 }
0128
0129 @SuppressWarnings("unchecked")
0130 @Test
0131 public void intersection() {
0132 List<Integer> ints1 = Arrays.asList(1, 10, 2, 3, 4, 5);
0133 List<Integer> ints2 = Arrays.asList(1, 6, 2, 3, 7, 8);
0134 JavaRDD<Integer> s1 = sc.parallelize(ints1);
0135 JavaRDD<Integer> s2 = sc.parallelize(ints2);
0136
0137 JavaRDD<Integer> intersections = s1.intersection(s2);
0138 assertEquals(3, intersections.count());
0139
0140 JavaRDD<Integer> empty = sc.emptyRDD();
0141 JavaRDD<Integer> emptyIntersection = empty.intersection(s2);
0142 assertEquals(0, emptyIntersection.count());
0143
0144 List<Double> doubles = Arrays.asList(1.0, 2.0);
0145 JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
0146 JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
0147 JavaDoubleRDD dIntersection = d1.intersection(d2);
0148 assertEquals(2, dIntersection.count());
0149
0150 List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
0151 pairs.add(new Tuple2<>(1, 2));
0152 pairs.add(new Tuple2<>(3, 4));
0153 JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
0154 JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
0155 JavaPairRDD<Integer, Integer> pIntersection = p1.intersection(p2);
0156 assertEquals(2, pIntersection.count());
0157 }
0158
0159 @Test
0160 public void sample() {
0161 List<Integer> ints = IntStream.iterate(1, x -> x + 1)
0162 .limit(20)
0163 .boxed()
0164 .collect(Collectors.toList());
0165 JavaRDD<Integer> rdd = sc.parallelize(ints);
0166
0167 JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 8);
0168 assertEquals(2, sample20.count());
0169 JavaRDD<Integer> sample20WithoutReplacement = rdd.sample(false, 0.2, 2);
0170 assertEquals(4, sample20WithoutReplacement.count());
0171 }
0172
0173 @Test
0174 public void randomSplit() {
0175 List<Integer> ints = new ArrayList<>(1000);
0176 for (int i = 0; i < 1000; i++) {
0177 ints.add(i);
0178 }
0179 JavaRDD<Integer> rdd = sc.parallelize(ints);
0180 JavaRDD<Integer>[] splits = rdd.randomSplit(new double[] { 0.4, 0.6, 1.0 }, 31);
0181
0182 assertEquals(3, splits.length);
0183 long s0 = splits[0].count();
0184 long s1 = splits[1].count();
0185 long s2 = splits[2].count();
0186 assertTrue(s0 + " not within expected range", s0 > 150 && s0 < 250);
0187 assertTrue(s1 + " not within expected range", s1 > 250 && s1 < 350);
0188 assertTrue(s2 + " not within expected range", s2 > 430 && s2 < 570);
0189 }
0190
0191 @Test
0192 public void sortByKey() {
0193 List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
0194 pairs.add(new Tuple2<>(0, 4));
0195 pairs.add(new Tuple2<>(3, 2));
0196 pairs.add(new Tuple2<>(-1, 1));
0197
0198 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
0199
0200
0201 JavaPairRDD<Integer, Integer> sortedRDD = rdd.sortByKey();
0202 assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
0203 List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
0204 assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
0205 assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
0206
0207
0208 sortedRDD = rdd.sortByKey(Collections.reverseOrder(), false);
0209 assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
0210 sortedPairs = sortedRDD.collect();
0211 assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
0212 assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
0213 }
0214
0215 @SuppressWarnings("unchecked")
0216 @Test
0217 public void repartitionAndSortWithinPartitions() {
0218 List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
0219 pairs.add(new Tuple2<>(0, 5));
0220 pairs.add(new Tuple2<>(3, 8));
0221 pairs.add(new Tuple2<>(2, 6));
0222 pairs.add(new Tuple2<>(0, 8));
0223 pairs.add(new Tuple2<>(3, 8));
0224 pairs.add(new Tuple2<>(1, 3));
0225
0226 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
0227
0228 Partitioner partitioner = new Partitioner() {
0229 @Override
0230 public int numPartitions() {
0231 return 2;
0232 }
0233 @Override
0234 public int getPartition(Object key) {
0235 return (Integer) key % 2;
0236 }
0237 };
0238
0239 JavaPairRDD<Integer, Integer> repartitioned =
0240 rdd.repartitionAndSortWithinPartitions(partitioner);
0241 assertTrue(repartitioned.partitioner().isPresent());
0242 assertEquals(repartitioned.partitioner().get(), partitioner);
0243 List<List<Tuple2<Integer, Integer>>> partitions = repartitioned.glom().collect();
0244 assertEquals(partitions.get(0),
0245 Arrays.asList(new Tuple2<>(0, 5), new Tuple2<>(0, 8), new Tuple2<>(2, 6)));
0246 assertEquals(partitions.get(1),
0247 Arrays.asList(new Tuple2<>(1, 3), new Tuple2<>(3, 8), new Tuple2<>(3, 8)));
0248 }
0249
0250 @Test
0251 public void emptyRDD() {
0252 JavaRDD<String> rdd = sc.emptyRDD();
0253 assertEquals("Empty RDD shouldn't have any values", 0, rdd.count());
0254 }
0255
0256 @Test
0257 public void sortBy() {
0258 List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
0259 pairs.add(new Tuple2<>(0, 4));
0260 pairs.add(new Tuple2<>(3, 2));
0261 pairs.add(new Tuple2<>(-1, 1));
0262
0263 JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(pairs);
0264
0265
0266 JavaRDD<Tuple2<Integer, Integer>> sortedRDD = rdd.sortBy(Tuple2::_1, true, 2);
0267
0268 assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
0269 List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
0270 assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
0271 assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
0272
0273
0274 sortedRDD = rdd.sortBy(Tuple2::_2, true, 2);
0275 assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
0276 sortedPairs = sortedRDD.collect();
0277 assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1));
0278 assertEquals(new Tuple2<>(0, 4), sortedPairs.get(2));
0279 }
0280
0281 @Test
0282 public void foreach() {
0283 LongAccumulator accum = sc.sc().longAccumulator();
0284 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
0285 rdd.foreach(s -> accum.add(1));
0286 assertEquals(2, accum.value().intValue());
0287 }
0288
0289 @Test
0290 public void foreachPartition() {
0291 LongAccumulator accum = sc.sc().longAccumulator();
0292 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
0293 rdd.foreachPartition(iter -> {
0294 while (iter.hasNext()) {
0295 iter.next();
0296 accum.add(1);
0297 }
0298 });
0299 assertEquals(2, accum.value().intValue());
0300 }
0301
0302 @Test
0303 public void toLocalIterator() {
0304 List<Integer> correct = Arrays.asList(1, 2, 3, 4);
0305 JavaRDD<Integer> rdd = sc.parallelize(correct);
0306 List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
0307 assertEquals(correct, result);
0308 }
0309
0310 @Test
0311 public void zipWithUniqueId() {
0312 List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
0313 JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithUniqueId();
0314 JavaRDD<Long> indexes = zip.values();
0315 assertEquals(4, new HashSet<>(indexes.collect()).size());
0316 }
0317
0318 @Test
0319 public void zipWithIndex() {
0320 List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
0321 JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithIndex();
0322 JavaRDD<Long> indexes = zip.values();
0323 List<Long> correctIndexes = Arrays.asList(0L, 1L, 2L, 3L);
0324 assertEquals(correctIndexes, indexes.collect());
0325 }
0326
0327 @SuppressWarnings("unchecked")
0328 @Test
0329 public void lookup() {
0330 JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
0331 new Tuple2<>("Apples", "Fruit"),
0332 new Tuple2<>("Oranges", "Fruit"),
0333 new Tuple2<>("Oranges", "Citrus")
0334 ));
0335 assertEquals(2, categories.lookup("Oranges").size());
0336 assertEquals(2, Iterables.size(categories.groupByKey().lookup("Oranges").get(0)));
0337 }
0338
0339 @Test
0340 public void groupBy() {
0341 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0342 Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
0343 JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
0344 assertEquals(2, oddsAndEvens.count());
0345 assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));
0346 assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0)));
0347
0348 oddsAndEvens = rdd.groupBy(isOdd, 1);
0349 assertEquals(2, oddsAndEvens.count());
0350 assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));
0351 assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0)));
0352 }
0353
0354 @Test
0355 public void groupByOnPairRDD() {
0356
0357 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0358 Function<Tuple2<Integer, Integer>, Boolean> areOdd =
0359 x -> (x._1() % 2 == 0) && (x._2() % 2 == 0);
0360 JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
0361 JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd);
0362 assertEquals(2, oddsAndEvens.count());
0363 assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));
0364 assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0)));
0365
0366 oddsAndEvens = pairRDD.groupBy(areOdd, 1);
0367 assertEquals(2, oddsAndEvens.count());
0368 assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));
0369 assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0)));
0370 }
0371
0372 @SuppressWarnings("unchecked")
0373 @Test
0374 public void keyByOnPairRDD() {
0375
0376 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0377 Function<Tuple2<Integer, Integer>, String> sumToString = x -> String.valueOf(x._1() + x._2());
0378 JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
0379 JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = pairRDD.keyBy(sumToString);
0380 assertEquals(7, keyed.count());
0381 assertEquals(1, (long) keyed.lookup("2").get(0)._1());
0382 }
0383
0384 @SuppressWarnings("unchecked")
0385 @Test
0386 public void cogroup() {
0387 JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
0388 new Tuple2<>("Apples", "Fruit"),
0389 new Tuple2<>("Oranges", "Fruit"),
0390 new Tuple2<>("Oranges", "Citrus")
0391 ));
0392 JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
0393 new Tuple2<>("Oranges", 2),
0394 new Tuple2<>("Apples", 3)
0395 ));
0396 JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped =
0397 categories.cogroup(prices);
0398 assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
0399 assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
0400
0401 cogrouped.collect();
0402 }
0403
0404 @SuppressWarnings("unchecked")
0405 @Test
0406 public void cogroup3() {
0407 JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
0408 new Tuple2<>("Apples", "Fruit"),
0409 new Tuple2<>("Oranges", "Fruit"),
0410 new Tuple2<>("Oranges", "Citrus")
0411 ));
0412 JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
0413 new Tuple2<>("Oranges", 2),
0414 new Tuple2<>("Apples", 3)
0415 ));
0416 JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
0417 new Tuple2<>("Oranges", 21),
0418 new Tuple2<>("Apples", 42)
0419 ));
0420
0421 JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Integer>>> cogrouped =
0422 categories.cogroup(prices, quantities);
0423 assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
0424 assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
0425 assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
0426
0427
0428 cogrouped.collect();
0429 }
0430
0431 @SuppressWarnings("unchecked")
0432 @Test
0433 public void cogroup4() {
0434 JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
0435 new Tuple2<>("Apples", "Fruit"),
0436 new Tuple2<>("Oranges", "Fruit"),
0437 new Tuple2<>("Oranges", "Citrus")
0438 ));
0439 JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
0440 new Tuple2<>("Oranges", 2),
0441 new Tuple2<>("Apples", 3)
0442 ));
0443 JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
0444 new Tuple2<>("Oranges", 21),
0445 new Tuple2<>("Apples", 42)
0446 ));
0447 JavaPairRDD<String, String> countries = sc.parallelizePairs(Arrays.asList(
0448 new Tuple2<>("Oranges", "BR"),
0449 new Tuple2<>("Apples", "US")
0450 ));
0451
0452 JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Integer>,
0453 Iterable<String>>> cogrouped = categories.cogroup(prices, quantities, countries);
0454 assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
0455 assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
0456 assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
0457 assertEquals("[BR]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._4()));
0458
0459 cogrouped.collect();
0460 }
0461
0462 @SuppressWarnings("unchecked")
0463 @Test
0464 public void leftOuterJoin() {
0465 JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
0466 new Tuple2<>(1, 1),
0467 new Tuple2<>(1, 2),
0468 new Tuple2<>(2, 1),
0469 new Tuple2<>(3, 1)
0470 ));
0471 JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
0472 new Tuple2<>(1, 'x'),
0473 new Tuple2<>(2, 'y'),
0474 new Tuple2<>(2, 'z'),
0475 new Tuple2<>(4, 'w')
0476 ));
0477 List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
0478 rdd1.leftOuterJoin(rdd2).collect();
0479 assertEquals(5, joined.size());
0480 Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
0481 rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
0482 assertEquals(3, firstUnmatched._1().intValue());
0483 }
0484
0485 @Test
0486 public void foldReduce() {
0487 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0488 Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
0489
0490 int sum = rdd.fold(0, add);
0491 assertEquals(33, sum);
0492
0493 sum = rdd.reduce(add);
0494 assertEquals(33, sum);
0495 }
0496
0497 @Test
0498 public void treeReduce() {
0499 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
0500 Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
0501 for (int depth = 1; depth <= 10; depth++) {
0502 int sum = rdd.treeReduce(add, depth);
0503 assertEquals(-5, sum);
0504 }
0505 }
0506
0507 @Test
0508 public void treeAggregate() {
0509 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
0510 Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
0511 for (int depth = 1; depth <= 10; depth++) {
0512 int sum = rdd.treeAggregate(0, add, add, depth);
0513 assertEquals(-5, sum);
0514 }
0515 }
0516
0517 @SuppressWarnings("unchecked")
0518 @Test
0519 public void aggregateByKey() {
0520 JavaPairRDD<Integer, Integer> pairs = sc.parallelizePairs(
0521 Arrays.asList(
0522 new Tuple2<>(1, 1),
0523 new Tuple2<>(1, 1),
0524 new Tuple2<>(3, 2),
0525 new Tuple2<>(5, 1),
0526 new Tuple2<>(5, 3)), 2);
0527
0528 Map<Integer, HashSet<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(),
0529 (a, b) -> {
0530 a.add(b);
0531 return a;
0532 },
0533 (a, b) -> {
0534 a.addAll(b);
0535 return a;
0536 }).collectAsMap();
0537 assertEquals(3, sets.size());
0538 assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1));
0539 assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3));
0540 assertEquals(new HashSet<>(Arrays.asList(1, 3)), sets.get(5));
0541 }
0542
0543 @SuppressWarnings("unchecked")
0544 @Test
0545 public void foldByKey() {
0546 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
0547 new Tuple2<>(2, 1),
0548 new Tuple2<>(2, 1),
0549 new Tuple2<>(1, 1),
0550 new Tuple2<>(3, 2),
0551 new Tuple2<>(3, 1)
0552 );
0553 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
0554 JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
0555 assertEquals(1, sums.lookup(1).get(0).intValue());
0556 assertEquals(2, sums.lookup(2).get(0).intValue());
0557 assertEquals(3, sums.lookup(3).get(0).intValue());
0558 }
0559
0560 @SuppressWarnings("unchecked")
0561 @Test
0562 public void reduceByKey() {
0563 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
0564 new Tuple2<>(2, 1),
0565 new Tuple2<>(2, 1),
0566 new Tuple2<>(1, 1),
0567 new Tuple2<>(3, 2),
0568 new Tuple2<>(3, 1)
0569 );
0570 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
0571 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
0572 assertEquals(1, counts.lookup(1).get(0).intValue());
0573 assertEquals(2, counts.lookup(2).get(0).intValue());
0574 assertEquals(3, counts.lookup(3).get(0).intValue());
0575
0576 Map<Integer, Integer> localCounts = counts.collectAsMap();
0577 assertEquals(1, localCounts.get(1).intValue());
0578 assertEquals(2, localCounts.get(2).intValue());
0579 assertEquals(3, localCounts.get(3).intValue());
0580
0581 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
0582 assertEquals(1, localCounts.get(1).intValue());
0583 assertEquals(2, localCounts.get(2).intValue());
0584 assertEquals(3, localCounts.get(3).intValue());
0585 }
0586
0587 @Test
0588 public void approximateResults() {
0589 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0590 Map<Integer, Long> countsByValue = rdd.countByValue();
0591 assertEquals(2, countsByValue.get(1).longValue());
0592 assertEquals(1, countsByValue.get(13).longValue());
0593
0594 PartialResult<Map<Integer, BoundedDouble>> approx = rdd.countByValueApprox(1);
0595 Map<Integer, BoundedDouble> finalValue = approx.getFinalValue();
0596 assertEquals(2.0, finalValue.get(1).mean(), 0.01);
0597 assertEquals(1.0, finalValue.get(13).mean(), 0.01);
0598 }
0599
0600 @Test
0601 public void take() {
0602 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0603 assertEquals(1, rdd.first().intValue());
0604 rdd.take(2);
0605 rdd.takeSample(false, 2, 42);
0606 }
0607
0608 @Test
0609 public void isEmpty() {
0610 assertTrue(sc.emptyRDD().isEmpty());
0611 assertTrue(sc.parallelize(new ArrayList<Integer>()).isEmpty());
0612 assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty());
0613 assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(i -> i < 0).isEmpty());
0614 assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(i -> i > 1).isEmpty());
0615 }
0616
0617 @Test
0618 public void cartesian() {
0619 JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
0620 JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
0621 JavaPairRDD<String, Double> cartesian = stringRDD.cartesian(doubleRDD);
0622 assertEquals(new Tuple2<>("Hello", 1.0), cartesian.first());
0623 }
0624
0625 @Test
0626 public void javaDoubleRDD() {
0627 JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
0628 JavaDoubleRDD distinct = rdd.distinct();
0629 assertEquals(5, distinct.count());
0630 JavaDoubleRDD filter = rdd.filter(x -> x > 2.0);
0631 assertEquals(3, filter.count());
0632 JavaDoubleRDD union = rdd.union(rdd);
0633 assertEquals(12, union.count());
0634 union = union.cache();
0635 assertEquals(12, union.count());
0636
0637 assertEquals(20, rdd.sum(), 0.01);
0638 StatCounter stats = rdd.stats();
0639 assertEquals(20, stats.sum(), 0.01);
0640 assertEquals(20/6.0, rdd.mean(), 0.01);
0641 assertEquals(20/6.0, rdd.mean(), 0.01);
0642 assertEquals(6.22222, rdd.variance(), 0.01);
0643 assertEquals(rdd.variance(), rdd.popVariance(), 1e-14);
0644 assertEquals(7.46667, rdd.sampleVariance(), 0.01);
0645 assertEquals(2.49444, rdd.stdev(), 0.01);
0646 assertEquals(rdd.stdev(), rdd.popStdev(), 1e-14);
0647 assertEquals(2.73252, rdd.sampleStdev(), 0.01);
0648
0649 rdd.first();
0650 rdd.take(5);
0651 }
0652
0653 @Test
0654 public void javaDoubleRDDHistoGram() {
0655 JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0656
0657 Tuple2<double[], long[]> results = rdd.histogram(2);
0658 double[] expected_buckets = {1.0, 2.5, 4.0};
0659 long[] expected_counts = {2, 2};
0660 assertArrayEquals(expected_buckets, results._1(), 0.1);
0661 assertArrayEquals(expected_counts, results._2());
0662
0663 long[] histogram = rdd.histogram(expected_buckets);
0664 assertArrayEquals(expected_counts, histogram);
0665
0666 assertArrayEquals(
0667 new long[] {0},
0668 sc.parallelizeDoubles(new ArrayList<>(0), 1).histogram(new double[]{0.0, 1.0}));
0669 }
0670
0671 private static class DoubleComparator implements Comparator<Double>, Serializable {
0672 @Override
0673 public int compare(Double o1, Double o2) {
0674 return o1.compareTo(o2);
0675 }
0676 }
0677
0678 @Test
0679 public void max() {
0680 JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0681 double max = rdd.max(new DoubleComparator());
0682 assertEquals(4.0, max, 0.001);
0683 }
0684
0685 @Test
0686 public void min() {
0687 JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0688 double max = rdd.min(new DoubleComparator());
0689 assertEquals(1.0, max, 0.001);
0690 }
0691
0692 @Test
0693 public void naturalMax() {
0694 JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0695 double max = rdd.max();
0696 assertEquals(4.0, max, 0.0);
0697 }
0698
0699 @Test
0700 public void naturalMin() {
0701 JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0702 double max = rdd.min();
0703 assertEquals(1.0, max, 0.0);
0704 }
0705
0706 @Test
0707 public void takeOrdered() {
0708 JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0709 assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2, new DoubleComparator()));
0710 assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2));
0711 }
0712
0713 @Test
0714 public void top() {
0715 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0716 List<Integer> top2 = rdd.top(2);
0717 assertEquals(Arrays.asList(4, 3), top2);
0718 }
0719
0720 private static class AddInts implements Function2<Integer, Integer, Integer> {
0721 @Override
0722 public Integer call(Integer a, Integer b) {
0723 return a + b;
0724 }
0725 }
0726
0727 @Test
0728 public void reduce() {
0729 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0730 int sum = rdd.reduce(new AddInts());
0731 assertEquals(10, sum);
0732 }
0733
0734 @Test
0735 public void reduceOnJavaDoubleRDD() {
0736 JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0737 double sum = rdd.reduce((v1, v2) -> v1 + v2);
0738 assertEquals(10.0, sum, 0.001);
0739 }
0740
0741 @Test
0742 public void fold() {
0743 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0744 int sum = rdd.fold(0, new AddInts());
0745 assertEquals(10, sum);
0746 }
0747
0748 @Test
0749 public void aggregate() {
0750 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0751 int sum = rdd.aggregate(0, new AddInts(), new AddInts());
0752 assertEquals(10, sum);
0753 }
0754
0755 @Test
0756 public void map() {
0757 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
0758 JavaDoubleRDD doubles = rdd.mapToDouble(Integer::doubleValue).cache();
0759 doubles.collect();
0760 JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)).cache();
0761 pairs.collect();
0762 JavaRDD<String> strings = rdd.map(Object::toString).cache();
0763 strings.collect();
0764 }
0765
0766 @Test
0767 public void flatMap() {
0768 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
0769 "The quick brown fox jumps over the lazy dog."));
0770 JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
0771 assertEquals("Hello", words.first());
0772 assertEquals(11, words.count());
0773
0774 JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(s -> {
0775 List<Tuple2<String, String>> pairs = new LinkedList<>();
0776 for (String word : s.split(" ")) {
0777 pairs.add(new Tuple2<>(word, word));
0778 }
0779 return pairs.iterator();
0780 }
0781 );
0782 assertEquals(new Tuple2<>("Hello", "Hello"), pairsRDD.first());
0783 assertEquals(11, pairsRDD.count());
0784
0785 JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
0786 List<Double> lengths = new LinkedList<>();
0787 for (String word : s.split(" ")) {
0788 lengths.add((double) word.length());
0789 }
0790 return lengths.iterator();
0791 });
0792 assertEquals(5.0, doubles.first(), 0.01);
0793 assertEquals(11, pairsRDD.count());
0794 }
0795
0796 @SuppressWarnings("unchecked")
0797 @Test
0798 public void mapsFromPairsToPairs() {
0799 List<Tuple2<Integer, String>> pairs = Arrays.asList(
0800 new Tuple2<>(1, "a"),
0801 new Tuple2<>(2, "aa"),
0802 new Tuple2<>(3, "aaa")
0803 );
0804 JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
0805
0806
0807 JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
0808 item -> Collections.singletonList(item.swap()).iterator());
0809 swapped.collect();
0810
0811
0812 pairRDD.mapToPair(Tuple2::swap).collect();
0813 }
0814
0815 @Test
0816 public void mapPartitions() {
0817 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
0818 JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
0819 int sum = 0;
0820 while (iter.hasNext()) {
0821 sum += iter.next();
0822 }
0823 return Collections.singletonList(sum).iterator();
0824 });
0825 assertEquals("[3, 7]", partitionSums.collect().toString());
0826 }
0827
0828
0829 @Test
0830 public void mapPartitionsWithIndex() {
0831 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
0832 JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex((index, iter) -> {
0833 int sum = 0;
0834 while (iter.hasNext()) {
0835 sum += iter.next();
0836 }
0837 return Collections.singletonList(sum).iterator();
0838 }, false);
0839 assertEquals("[3, 7]", partitionSums.collect().toString());
0840 }
0841
0842 @Test
0843 public void getNumPartitions(){
0844 JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
0845 JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2);
0846 JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(
0847 Arrays.asList(
0848 new Tuple2<>("a", 1),
0849 new Tuple2<>("aa", 2),
0850 new Tuple2<>("aaa", 3)
0851 ),
0852 2);
0853 assertEquals(3, rdd1.getNumPartitions());
0854 assertEquals(2, rdd2.getNumPartitions());
0855 assertEquals(2, rdd3.getNumPartitions());
0856 }
0857
0858 @Test
0859 public void repartition() {
0860
0861 JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 2);
0862 JavaRDD<Integer> repartitioned1 = in1.repartition(4);
0863 List<List<Integer>> result1 = repartitioned1.glom().collect();
0864 assertEquals(4, result1.size());
0865 for (List<Integer> l : result1) {
0866 assertFalse(l.isEmpty());
0867 }
0868
0869
0870 JavaRDD<Integer> in2 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 4);
0871 JavaRDD<Integer> repartitioned2 = in2.repartition(2);
0872 List<List<Integer>> result2 = repartitioned2.glom().collect();
0873 assertEquals(2, result2.size());
0874 for (List<Integer> l: result2) {
0875 assertFalse(l.isEmpty());
0876 }
0877 }
0878
0879 @SuppressWarnings("unchecked")
0880 @Test
0881 public void persist() {
0882 JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
0883 doubleRDD = doubleRDD.persist(StorageLevel.DISK_ONLY());
0884 assertEquals(20, doubleRDD.sum(), 0.1);
0885
0886 List<Tuple2<Integer, String>> pairs = Arrays.asList(
0887 new Tuple2<>(1, "a"),
0888 new Tuple2<>(2, "aa"),
0889 new Tuple2<>(3, "aaa")
0890 );
0891 JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
0892 pairRDD = pairRDD.persist(StorageLevel.DISK_ONLY());
0893 assertEquals("a", pairRDD.first()._2());
0894
0895 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
0896 rdd = rdd.persist(StorageLevel.DISK_ONLY());
0897 assertEquals(1, rdd.first().intValue());
0898 }
0899
0900 @Test
0901 public void iterator() {
0902 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
0903 TaskContext context = TaskContext$.MODULE$.empty();
0904 assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue());
0905 }
0906
0907 @Test
0908 public void glom() {
0909 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
0910 assertEquals("[1, 2]", rdd.glom().first().toString());
0911 }
0912
0913
0914
0915 @Test
0916 public void textFiles() throws IOException {
0917 String outputDir = new File(tempDir, "output").getAbsolutePath();
0918 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0919 rdd.saveAsTextFile(outputDir);
0920
0921 File outputFile = new File(outputDir, "part-00000");
0922 String content = Files.toString(outputFile, StandardCharsets.UTF_8);
0923 assertEquals("1\n2\n3\n4\n", content);
0924
0925 List<String> expected = Arrays.asList("1", "2", "3", "4");
0926 JavaRDD<String> readRDD = sc.textFile(outputDir);
0927 assertEquals(expected, readRDD.collect());
0928 }
0929
0930 @Test
0931 public void wholeTextFiles() throws Exception {
0932 byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
0933 byte[] content2 = "spark is also easy to use.\n".getBytes(StandardCharsets.UTF_8);
0934
0935 String tempDirName = tempDir.getAbsolutePath();
0936 String path1 = new Path(tempDirName, "part-00000").toUri().getPath();
0937 String path2 = new Path(tempDirName, "part-00001").toUri().getPath();
0938
0939 Files.write(content1, new File(path1));
0940 Files.write(content2, new File(path2));
0941
0942 Map<String, String> container = new HashMap<>();
0943 container.put(path1, new Text(content1).toString());
0944 container.put(path2, new Text(content2).toString());
0945
0946 JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName, 3);
0947 List<Tuple2<String, String>> result = readRDD.collect();
0948
0949 for (Tuple2<String, String> res : result) {
0950
0951
0952 assertEquals(res._2(), container.get(new Path(res._1()).toUri().getPath()));
0953 }
0954 }
0955
0956 @Test
0957 public void textFilesCompressed() {
0958 String outputDir = new File(tempDir, "output").getAbsolutePath();
0959 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0960 rdd.saveAsTextFile(outputDir, DefaultCodec.class);
0961
0962
0963 List<String> expected = Arrays.asList("1", "2", "3", "4");
0964 JavaRDD<String> readRDD = sc.textFile(outputDir);
0965 assertEquals(expected, readRDD.collect());
0966 }
0967
0968 @SuppressWarnings("unchecked")
0969 @Test
0970 public void sequenceFile() {
0971 String outputDir = new File(tempDir, "output").getAbsolutePath();
0972 List<Tuple2<Integer, String>> pairs = Arrays.asList(
0973 new Tuple2<>(1, "a"),
0974 new Tuple2<>(2, "aa"),
0975 new Tuple2<>(3, "aaa")
0976 );
0977 JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
0978
0979 rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
0980 .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
0981
0982
0983 JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
0984 Text.class).mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
0985 assertEquals(pairs, readRDD.collect());
0986 }
0987
0988 @Test
0989 public void binaryFiles() throws Exception {
0990
0991 byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
0992
0993 String tempDirName = tempDir.getAbsolutePath();
0994 File file1 = new File(tempDirName + "/part-00000");
0995
0996 FileOutputStream fos1 = new FileOutputStream(file1);
0997
0998 try (FileChannel channel1 = fos1.getChannel()) {
0999 ByteBuffer bbuf = ByteBuffer.wrap(content1);
1000 channel1.write(bbuf);
1001 }
1002 JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName, 3);
1003 List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
1004 for (Tuple2<String, PortableDataStream> res : result) {
1005 assertArrayEquals(content1, res._2().toArray());
1006 }
1007 }
1008
1009 @Test
1010 public void binaryFilesCaching() throws Exception {
1011
1012 byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
1013
1014 String tempDirName = tempDir.getAbsolutePath();
1015 File file1 = new File(tempDirName + "/part-00000");
1016
1017 FileOutputStream fos1 = new FileOutputStream(file1);
1018
1019 try (FileChannel channel1 = fos1.getChannel()) {
1020 ByteBuffer bbuf = ByteBuffer.wrap(content1);
1021 channel1.write(bbuf);
1022 }
1023
1024 JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
1025 readRDD.foreach(pair -> pair._2().toArray());
1026
1027 List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
1028 for (Tuple2<String, PortableDataStream> res : result) {
1029 assertArrayEquals(content1, res._2().toArray());
1030 }
1031 }
1032
1033 @Test
1034 public void binaryRecords() throws Exception {
1035
1036 byte[] content1 = "spark isn't always easy to use.\n".getBytes(StandardCharsets.UTF_8);
1037 int numOfCopies = 10;
1038 String tempDirName = tempDir.getAbsolutePath();
1039 File file1 = new File(tempDirName + "/part-00000");
1040
1041 FileOutputStream fos1 = new FileOutputStream(file1);
1042
1043 try (FileChannel channel1 = fos1.getChannel()) {
1044 for (int i = 0; i < numOfCopies; i++) {
1045 ByteBuffer bbuf = ByteBuffer.wrap(content1);
1046 channel1.write(bbuf);
1047 }
1048 }
1049
1050 JavaRDD<byte[]> readRDD = sc.binaryRecords(tempDirName, content1.length);
1051 assertEquals(numOfCopies,readRDD.count());
1052 List<byte[]> result = readRDD.collect();
1053 for (byte[] res : result) {
1054 assertArrayEquals(content1, res);
1055 }
1056 }
1057
1058 @SuppressWarnings("unchecked")
1059 @Test
1060 public void writeWithNewAPIHadoopFile() {
1061 String outputDir = new File(tempDir, "output").getAbsolutePath();
1062 List<Tuple2<Integer, String>> pairs = Arrays.asList(
1063 new Tuple2<>(1, "a"),
1064 new Tuple2<>(2, "aa"),
1065 new Tuple2<>(3, "aaa")
1066 );
1067 JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
1068
1069 rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
1070 .saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
1071 org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
1072
1073 JavaPairRDD<IntWritable, Text> output =
1074 sc.sequenceFile(outputDir, IntWritable.class, Text.class);
1075 assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
1076 }
1077
1078 @SuppressWarnings("unchecked")
1079 @Test
1080 public void readWithNewAPIHadoopFile() throws IOException {
1081 String outputDir = new File(tempDir, "output").getAbsolutePath();
1082 List<Tuple2<Integer, String>> pairs = Arrays.asList(
1083 new Tuple2<>(1, "a"),
1084 new Tuple2<>(2, "aa"),
1085 new Tuple2<>(3, "aaa")
1086 );
1087 JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
1088
1089 rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
1090 .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
1091
1092 JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
1093 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
1094 IntWritable.class, Text.class, Job.getInstance().getConfiguration());
1095 assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
1096 }
1097
1098 @Test
1099 public void objectFilesOfInts() {
1100 String outputDir = new File(tempDir, "output").getAbsolutePath();
1101 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
1102 rdd.saveAsObjectFile(outputDir);
1103
1104 List<Integer> expected = Arrays.asList(1, 2, 3, 4);
1105 JavaRDD<Integer> readRDD = sc.objectFile(outputDir);
1106 assertEquals(expected, readRDD.collect());
1107 }
1108
1109 @SuppressWarnings("unchecked")
1110 @Test
1111 public void objectFilesOfComplexTypes() {
1112 String outputDir = new File(tempDir, "output").getAbsolutePath();
1113 List<Tuple2<Integer, String>> pairs = Arrays.asList(
1114 new Tuple2<>(1, "a"),
1115 new Tuple2<>(2, "aa"),
1116 new Tuple2<>(3, "aaa")
1117 );
1118 JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
1119 rdd.saveAsObjectFile(outputDir);
1120
1121 JavaRDD<Tuple2<Integer, String>> readRDD = sc.objectFile(outputDir);
1122 assertEquals(pairs, readRDD.collect());
1123 }
1124
1125 @SuppressWarnings("unchecked")
1126 @Test
1127 public void hadoopFile() {
1128 String outputDir = new File(tempDir, "output").getAbsolutePath();
1129 List<Tuple2<Integer, String>> pairs = Arrays.asList(
1130 new Tuple2<>(1, "a"),
1131 new Tuple2<>(2, "aa"),
1132 new Tuple2<>(3, "aaa")
1133 );
1134 JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
1135
1136 rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
1137 .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
1138
1139 JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
1140 SequenceFileInputFormat.class, IntWritable.class, Text.class);
1141 assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
1142 }
1143
1144 @SuppressWarnings("unchecked")
1145 @Test
1146 public void hadoopFileCompressed() {
1147 String outputDir = new File(tempDir, "output_compressed").getAbsolutePath();
1148 List<Tuple2<Integer, String>> pairs = Arrays.asList(
1149 new Tuple2<>(1, "a"),
1150 new Tuple2<>(2, "aa"),
1151 new Tuple2<>(3, "aaa")
1152 );
1153 JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
1154
1155 rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
1156 .saveAsHadoopFile(outputDir, IntWritable.class, Text.class,
1157 SequenceFileOutputFormat.class, DefaultCodec.class);
1158
1159 JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
1160 SequenceFileInputFormat.class, IntWritable.class, Text.class);
1161
1162 assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
1163 }
1164
1165 @Test
1166 public void zip() {
1167 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
1168 JavaDoubleRDD doubles = rdd.mapToDouble(Integer::doubleValue);
1169 JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
1170 zipped.count();
1171 }
1172
1173 @Test
1174 public void zipPartitions() {
1175 JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
1176 JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
1177 FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
1178 (i, s) -> Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
1179
1180 JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
1181 assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
1182 }
1183
1184 @Test
1185 public void keyBy() {
1186 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
1187 List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();
1188 assertEquals(new Tuple2<>("1", 1), s.get(0));
1189 assertEquals(new Tuple2<>("2", 2), s.get(1));
1190 }
1191
1192 @Test
1193 public void checkpointAndComputation() {
1194 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
1195 sc.setCheckpointDir(tempDir.getAbsolutePath());
1196 assertFalse(rdd.isCheckpointed());
1197 rdd.checkpoint();
1198 rdd.count();
1199 assertTrue(rdd.isCheckpointed());
1200 assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect());
1201 }
1202
1203 @Test
1204 public void checkpointAndRestore() {
1205 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
1206 sc.setCheckpointDir(tempDir.getAbsolutePath());
1207 assertFalse(rdd.isCheckpointed());
1208 rdd.checkpoint();
1209 rdd.count();
1210 assertTrue(rdd.isCheckpointed());
1211
1212 assertTrue(rdd.getCheckpointFile().isPresent());
1213 JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
1214 assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
1215 }
1216
1217 @Test
1218 public void combineByKey() {
1219 JavaRDD<Integer> originalRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
1220 Function<Integer, Integer> keyFunction = v1 -> v1 % 3;
1221 Function<Integer, Integer> createCombinerFunction = v1 -> v1;
1222
1223 Function2<Integer, Integer, Integer> mergeValueFunction = (v1, v2) -> v1 + v2;
1224
1225 JavaPairRDD<Integer, Integer> combinedRDD = originalRDD.keyBy(keyFunction)
1226 .combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction);
1227 Map<Integer, Integer> results = combinedRDD.collectAsMap();
1228 ImmutableMap<Integer, Integer> expected = ImmutableMap.of(0, 9, 1, 5, 2, 7);
1229 assertEquals(expected, results);
1230
1231 Partitioner defaultPartitioner = Partitioner.defaultPartitioner(
1232 combinedRDD.rdd(),
1233 JavaConverters.collectionAsScalaIterableConverter(
1234 Collections.<RDD<?>>emptyList()).asScala().toSeq());
1235 combinedRDD = originalRDD.keyBy(keyFunction)
1236 .combineByKey(
1237 createCombinerFunction,
1238 mergeValueFunction,
1239 mergeValueFunction,
1240 defaultPartitioner,
1241 false,
1242 new KryoSerializer(new SparkConf()));
1243 results = combinedRDD.collectAsMap();
1244 assertEquals(expected, results);
1245 }
1246
1247 @SuppressWarnings("unchecked")
1248 @Test
1249 public void mapOnPairRDD() {
1250 JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
1251 JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
1252 JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
1253 assertEquals(Arrays.asList(
1254 new Tuple2<>(1, 1),
1255 new Tuple2<>(0, 2),
1256 new Tuple2<>(1, 3),
1257 new Tuple2<>(0, 4)), rdd3.collect());
1258 }
1259
1260 @SuppressWarnings("unchecked")
1261 @Test
1262 public void collectPartitions() {
1263 JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
1264
1265 JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
1266
1267 List<Integer>[] parts = rdd1.collectPartitions(new int[] {0});
1268 assertEquals(Arrays.asList(1, 2), parts[0]);
1269
1270 parts = rdd1.collectPartitions(new int[] {1, 2});
1271 assertEquals(Arrays.asList(3, 4), parts[0]);
1272 assertEquals(Arrays.asList(5, 6, 7), parts[1]);
1273
1274 assertEquals(
1275 Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
1276 rdd2.collectPartitions(new int[] {0})[0]);
1277
1278 List<Tuple2<Integer,Integer>>[] parts2 = rdd2.collectPartitions(new int[] {1, 2});
1279 assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
1280 assertEquals(
1281 Arrays.asList(
1282 new Tuple2<>(5, 1),
1283 new Tuple2<>(6, 0),
1284 new Tuple2<>(7, 1)),
1285 parts2[1]);
1286 }
1287
1288 @Test
1289 public void countApproxDistinct() {
1290 List<Integer> arrayData = new ArrayList<>();
1291 int size = 100;
1292 for (int i = 0; i < 100000; i++) {
1293 arrayData.add(i % size);
1294 }
1295 JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
1296 assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.1);
1297 }
1298
1299 @Test
1300 public void countApproxDistinctByKey() {
1301 List<Tuple2<Integer, Integer>> arrayData = new ArrayList<>();
1302 for (int i = 10; i < 100; i++) {
1303 for (int j = 0; j < i; j++) {
1304 arrayData.add(new Tuple2<>(i, j));
1305 }
1306 }
1307 double relativeSD = 0.001;
1308 JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
1309 List<Tuple2<Integer, Long>> res = pairRdd.countApproxDistinctByKey(relativeSD, 8).collect();
1310 for (Tuple2<Integer, Long> resItem : res) {
1311 double count = resItem._1();
1312 long resCount = resItem._2();
1313 double error = Math.abs((resCount - count) / count);
1314 assertTrue(error < 0.1);
1315 }
1316 }
1317
1318 @Test
1319 public void collectAsMapWithIntArrayValues() {
1320
1321 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
1322 JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
1323 pairRDD.collect();
1324 pairRDD.collectAsMap();
1325 }
1326
1327 @SuppressWarnings("unchecked")
1328 @Test
1329 public void collectAsMapAndSerialize() throws Exception {
1330 JavaPairRDD<String,Integer> rdd =
1331 sc.parallelizePairs(Arrays.asList(new Tuple2<>("foo", 1)));
1332 Map<String,Integer> map = rdd.collectAsMap();
1333 ByteArrayOutputStream bytes = new ByteArrayOutputStream();
1334 new ObjectOutputStream(bytes).writeObject(map);
1335 Map<String,Integer> deserializedMap = (Map<String,Integer>)
1336 new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject();
1337 assertEquals(1, deserializedMap.get("foo").intValue());
1338 }
1339
1340 @Test
1341 @SuppressWarnings("unchecked")
1342 public void sampleByKey() {
1343 JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
1344 JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i % 2, 1));
1345 Map<Integer, Double> fractions = new HashMap<>();
1346 fractions.put(0, 0.5);
1347 fractions.put(1, 1.0);
1348 JavaPairRDD<Integer, Integer> wr = rdd2.sampleByKey(true, fractions, 1L);
1349 Map<Integer, Long> wrCounts = wr.countByKey();
1350 assertEquals(2, wrCounts.size());
1351 assertTrue(wrCounts.get(0) > 0);
1352 assertTrue(wrCounts.get(1) > 0);
1353 JavaPairRDD<Integer, Integer> wor = rdd2.sampleByKey(false, fractions, 1L);
1354 Map<Integer, Long> worCounts = wor.countByKey();
1355 assertEquals(2, worCounts.size());
1356 assertTrue(worCounts.get(0) > 0);
1357 assertTrue(worCounts.get(1) > 0);
1358 }
1359
1360 @Test
1361 @SuppressWarnings("unchecked")
1362 public void sampleByKeyExact() {
1363 JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
1364 JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i % 2, 1));
1365 Map<Integer, Double> fractions = new HashMap<>();
1366 fractions.put(0, 0.5);
1367 fractions.put(1, 1.0);
1368 JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKeyExact(true, fractions, 1L);
1369 Map<Integer, Long> wrExactCounts = wrExact.countByKey();
1370 assertEquals(2, wrExactCounts.size());
1371 assertEquals(2, (long) wrExactCounts.get(0));
1372 assertEquals(4, (long) wrExactCounts.get(1));
1373 JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKeyExact(false, fractions, 1L);
1374 Map<Integer, Long> worExactCounts = worExact.countByKey();
1375 assertEquals(2, worExactCounts.size());
1376 assertEquals(2, (long) worExactCounts.get(0));
1377 assertEquals(4, (long) worExactCounts.get(1));
1378 }
1379
1380 private static class SomeCustomClass implements Serializable {
1381 SomeCustomClass() {
1382
1383 }
1384 }
1385
1386 @Test
1387 public void collectUnderlyingScalaRDD() {
1388 List<SomeCustomClass> data = new ArrayList<>();
1389 for (int i = 0; i < 100; i++) {
1390 data.add(new SomeCustomClass());
1391 }
1392 JavaRDD<SomeCustomClass> rdd = sc.parallelize(data);
1393 SomeCustomClass[] collected =
1394 (SomeCustomClass[]) rdd.rdd().retag(SomeCustomClass.class).collect();
1395 assertEquals(data.size(), collected.length);
1396 }
1397
1398 private static final class BuggyMapFunction<T> implements Function<T, T> {
1399
1400 @Override
1401 public T call(T x) {
1402 throw new IllegalStateException("Custom exception!");
1403 }
1404 }
1405
1406 @Test
1407 public void collectAsync() throws Exception {
1408 List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
1409 JavaRDD<Integer> rdd = sc.parallelize(data, 1);
1410 JavaFutureAction<List<Integer>> future = rdd.collectAsync();
1411 List<Integer> result = future.get();
1412 assertEquals(data, result);
1413 assertFalse(future.isCancelled());
1414 assertTrue(future.isDone());
1415 assertEquals(1, future.jobIds().size());
1416 }
1417
1418 @Test
1419 public void takeAsync() throws Exception {
1420 List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
1421 JavaRDD<Integer> rdd = sc.parallelize(data, 1);
1422 JavaFutureAction<List<Integer>> future = rdd.takeAsync(1);
1423 List<Integer> result = future.get();
1424 assertEquals(1, result.size());
1425 assertEquals((Integer) 1, result.get(0));
1426 assertFalse(future.isCancelled());
1427 assertTrue(future.isDone());
1428 assertEquals(1, future.jobIds().size());
1429 }
1430
1431 @Test
1432 public void foreachAsync() throws Exception {
1433 List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
1434 JavaRDD<Integer> rdd = sc.parallelize(data, 1);
1435 JavaFutureAction<Void> future = rdd.foreachAsync(integer -> {});
1436 future.get();
1437 assertFalse(future.isCancelled());
1438 assertTrue(future.isDone());
1439 assertEquals(1, future.jobIds().size());
1440 }
1441
1442 @Test
1443 public void countAsync() throws Exception {
1444 List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
1445 JavaRDD<Integer> rdd = sc.parallelize(data, 1);
1446 JavaFutureAction<Long> future = rdd.countAsync();
1447 long count = future.get();
1448 assertEquals(data.size(), count);
1449 assertFalse(future.isCancelled());
1450 assertTrue(future.isDone());
1451 assertEquals(1, future.jobIds().size());
1452 }
1453
1454 @Test
1455 public void testAsyncActionCancellation() throws Exception {
1456 List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
1457 JavaRDD<Integer> rdd = sc.parallelize(data, 1);
1458 JavaFutureAction<Void> future = rdd.foreachAsync(integer -> {
1459 Thread.sleep(10000);
1460 });
1461 future.cancel(true);
1462 assertTrue(future.isCancelled());
1463 assertTrue(future.isDone());
1464 try {
1465 future.get(2000, TimeUnit.MILLISECONDS);
1466 fail("Expected future.get() for cancelled job to throw CancellationException");
1467 } catch (CancellationException ignored) {
1468
1469 }
1470 }
1471
1472 @Test
1473 public void testAsyncActionErrorWrapping() throws Exception {
1474 List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
1475 JavaRDD<Integer> rdd = sc.parallelize(data, 1);
1476 JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<>()).countAsync();
1477 try {
1478 future.get(2, TimeUnit.SECONDS);
1479 fail("Expected future.get() for failed job to throw ExcecutionException");
1480 } catch (ExecutionException ee) {
1481 assertTrue(Throwables.getStackTraceAsString(ee).contains("Custom exception!"));
1482 }
1483 assertTrue(future.isDone());
1484 }
1485
1486 static class Class1 {}
1487 static class Class2 {}
1488
1489 @Test
1490 public void testRegisterKryoClasses() {
1491 SparkConf conf = new SparkConf();
1492 conf.registerKryoClasses(new Class<?>[]{ Class1.class, Class2.class });
1493 assertEquals(
1494 Class1.class.getName() + "," + Class2.class.getName(),
1495 conf.get("spark.kryo.classesToRegister"));
1496 }
1497
1498 @Test
1499 public void testGetPersistentRDDs() {
1500 java.util.Map<Integer, JavaRDD<?>> cachedRddsMap = sc.getPersistentRDDs();
1501 assertTrue(cachedRddsMap.isEmpty());
1502 JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b")).setName("RDD1").cache();
1503 JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("c", "d")).setName("RDD2").cache();
1504 cachedRddsMap = sc.getPersistentRDDs();
1505 assertEquals(2, cachedRddsMap.size());
1506 assertEquals("RDD1", cachedRddsMap.get(0).name());
1507 assertEquals("RDD2", cachedRddsMap.get(1).name());
1508 }
1509
1510 }