Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package test.org.apache.spark;
0019 
0020 import java.io.*;
0021 import java.nio.channels.FileChannel;
0022 import java.nio.ByteBuffer;
0023 import java.nio.charset.StandardCharsets;
0024 import java.util.ArrayList;
0025 import java.util.Arrays;
0026 import java.util.Collections;
0027 import java.util.Comparator;
0028 import java.util.HashMap;
0029 import java.util.HashSet;
0030 import java.util.Iterator;
0031 import java.util.LinkedList;
0032 import java.util.List;
0033 import java.util.Map;
0034 import java.util.concurrent.*;
0035 import java.util.stream.Collectors;
0036 import java.util.stream.IntStream;
0037 
0038 import org.apache.spark.Partitioner;
0039 import org.apache.spark.SparkConf;
0040 import org.apache.spark.TaskContext;
0041 import org.apache.spark.TaskContext$;
0042 import scala.Tuple2;
0043 import scala.Tuple3;
0044 import scala.Tuple4;
0045 import scala.collection.JavaConverters;
0046 
0047 import com.google.common.collect.ImmutableMap;
0048 import com.google.common.collect.Iterables;
0049 import com.google.common.collect.Iterators;
0050 import com.google.common.collect.Lists;
0051 import com.google.common.base.Throwables;
0052 import com.google.common.io.Files;
0053 import org.apache.hadoop.fs.Path;
0054 import org.apache.hadoop.io.IntWritable;
0055 import org.apache.hadoop.io.Text;
0056 import org.apache.hadoop.io.compress.DefaultCodec;
0057 import org.apache.hadoop.mapred.SequenceFileInputFormat;
0058 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
0059 import org.apache.hadoop.mapreduce.Job;
0060 import org.junit.After;
0061 import static org.junit.Assert.*;
0062 import org.junit.Before;
0063 import org.junit.Test;
0064 
0065 import org.apache.spark.api.java.JavaDoubleRDD;
0066 import org.apache.spark.api.java.JavaFutureAction;
0067 import org.apache.spark.api.java.JavaPairRDD;
0068 import org.apache.spark.api.java.JavaRDD;
0069 import org.apache.spark.api.java.JavaSparkContext;
0070 import org.apache.spark.api.java.Optional;
0071 import org.apache.spark.api.java.function.*;
0072 import org.apache.spark.input.PortableDataStream;
0073 import org.apache.spark.partial.BoundedDouble;
0074 import org.apache.spark.partial.PartialResult;
0075 import org.apache.spark.rdd.RDD;
0076 import org.apache.spark.serializer.KryoSerializer;
0077 import org.apache.spark.storage.StorageLevel;
0078 import org.apache.spark.util.LongAccumulator;
0079 import org.apache.spark.util.StatCounter;
0080 
0081 // The test suite itself is Serializable so that anonymous Function implementations can be
0082 // serialized, as an alternative to converting these anonymous classes to static inner classes;
0083 // see http://stackoverflow.com/questions/758570/.
0084 public class JavaAPISuite implements Serializable {
0085   private transient JavaSparkContext sc;
0086   private transient File tempDir;
0087 
0088   @Before
0089   public void setUp() {
0090     sc = new JavaSparkContext("local", "JavaAPISuite");
0091     tempDir = Files.createTempDir();
0092     tempDir.deleteOnExit();
0093   }
0094 
0095   @After
0096   public void tearDown() {
0097     sc.stop();
0098     sc = null;
0099   }
0100 
0101   @SuppressWarnings("unchecked")
0102   @Test
0103   public void sparkContextUnion() {
0104     // Union of non-specialized JavaRDDs
0105     List<String> strings = Arrays.asList("Hello", "World");
0106     JavaRDD<String> s1 = sc.parallelize(strings);
0107     JavaRDD<String> s2 = sc.parallelize(strings);
0108     // Varargs
0109     JavaRDD<String> sUnion = sc.union(s1, s2);
0110     assertEquals(4, sUnion.count());
0111 
0112     // Union of JavaDoubleRDDs
0113     List<Double> doubles = Arrays.asList(1.0, 2.0);
0114     JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
0115     JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
0116     JavaDoubleRDD dUnion = sc.union(d1, d2);
0117     assertEquals(4, dUnion.count());
0118 
0119     // Union of JavaPairRDDs
0120     List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
0121     pairs.add(new Tuple2<>(1, 2));
0122     pairs.add(new Tuple2<>(3, 4));
0123     JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
0124     JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
0125     JavaPairRDD<Integer, Integer> pUnion = sc.union(p1, p2);
0126     assertEquals(4, pUnion.count());
0127   }
0128 
0129   @SuppressWarnings("unchecked")
0130   @Test
0131   public void intersection() {
0132     List<Integer> ints1 = Arrays.asList(1, 10, 2, 3, 4, 5);
0133     List<Integer> ints2 = Arrays.asList(1, 6, 2, 3, 7, 8);
0134     JavaRDD<Integer> s1 = sc.parallelize(ints1);
0135     JavaRDD<Integer> s2 = sc.parallelize(ints2);
0136 
0137     JavaRDD<Integer> intersections = s1.intersection(s2);
0138     assertEquals(3, intersections.count());
0139 
0140     JavaRDD<Integer> empty = sc.emptyRDD();
0141     JavaRDD<Integer> emptyIntersection = empty.intersection(s2);
0142     assertEquals(0, emptyIntersection.count());
0143 
0144     List<Double> doubles = Arrays.asList(1.0, 2.0);
0145     JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
0146     JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
0147     JavaDoubleRDD dIntersection = d1.intersection(d2);
0148     assertEquals(2, dIntersection.count());
0149 
0150     List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
0151     pairs.add(new Tuple2<>(1, 2));
0152     pairs.add(new Tuple2<>(3, 4));
0153     JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
0154     JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
0155     JavaPairRDD<Integer, Integer> pIntersection = p1.intersection(p2);
0156     assertEquals(2, pIntersection.count());
0157   }
0158 
0159   @Test
0160   public void sample() {
0161     List<Integer> ints = IntStream.iterate(1, x -> x + 1)
0162       .limit(20)
0163       .boxed()
0164       .collect(Collectors.toList());
0165     JavaRDD<Integer> rdd = sc.parallelize(ints);
0166     // the seeds here are "magic" to make this work out nicely
0167     JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 8);
0168     assertEquals(2, sample20.count());
0169     JavaRDD<Integer> sample20WithoutReplacement = rdd.sample(false, 0.2, 2);
0170     assertEquals(4, sample20WithoutReplacement.count());
0171   }
0172 
0173   @Test
0174   public void randomSplit() {
0175     List<Integer> ints = new ArrayList<>(1000);
0176     for (int i = 0; i < 1000; i++) {
0177       ints.add(i);
0178     }
0179     JavaRDD<Integer> rdd = sc.parallelize(ints);
0180     JavaRDD<Integer>[] splits = rdd.randomSplit(new double[] { 0.4, 0.6, 1.0 }, 31);
0181     // the splits aren't perfect -- not enough data for them to be -- just check they're about right
0182     assertEquals(3, splits.length);
0183     long s0 = splits[0].count();
0184     long s1 = splits[1].count();
0185     long s2 = splits[2].count();
0186     assertTrue(s0 + " not within expected range", s0 > 150 && s0 < 250);
0187     assertTrue(s1 + " not within expected range", s1 > 250 && s1 < 350);
0188     assertTrue(s2 + " not within expected range", s2 > 430 && s2 < 570);
0189   }
0190 
0191   @Test
0192   public void sortByKey() {
0193     List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
0194     pairs.add(new Tuple2<>(0, 4));
0195     pairs.add(new Tuple2<>(3, 2));
0196     pairs.add(new Tuple2<>(-1, 1));
0197 
0198     JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
0199 
0200     // Default comparator
0201     JavaPairRDD<Integer, Integer> sortedRDD = rdd.sortByKey();
0202     assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
0203     List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
0204     assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
0205     assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
0206 
0207     // Custom comparator
0208     sortedRDD = rdd.sortByKey(Collections.reverseOrder(), false);
0209     assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
0210     sortedPairs = sortedRDD.collect();
0211     assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
0212     assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
0213   }
0214 
0215   @SuppressWarnings("unchecked")
0216   @Test
0217   public void repartitionAndSortWithinPartitions() {
0218     List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
0219     pairs.add(new Tuple2<>(0, 5));
0220     pairs.add(new Tuple2<>(3, 8));
0221     pairs.add(new Tuple2<>(2, 6));
0222     pairs.add(new Tuple2<>(0, 8));
0223     pairs.add(new Tuple2<>(3, 8));
0224     pairs.add(new Tuple2<>(1, 3));
0225 
0226     JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
0227 
0228     Partitioner partitioner = new Partitioner() {
0229       @Override
0230       public int numPartitions() {
0231         return 2;
0232       }
0233       @Override
0234       public int getPartition(Object key) {
0235         return (Integer) key % 2;
0236       }
0237     };
0238 
0239     JavaPairRDD<Integer, Integer> repartitioned =
0240         rdd.repartitionAndSortWithinPartitions(partitioner);
0241     assertTrue(repartitioned.partitioner().isPresent());
0242     assertEquals(repartitioned.partitioner().get(), partitioner);
0243     List<List<Tuple2<Integer, Integer>>> partitions = repartitioned.glom().collect();
0244     assertEquals(partitions.get(0),
0245         Arrays.asList(new Tuple2<>(0, 5), new Tuple2<>(0, 8), new Tuple2<>(2, 6)));
0246     assertEquals(partitions.get(1),
0247         Arrays.asList(new Tuple2<>(1, 3), new Tuple2<>(3, 8), new Tuple2<>(3, 8)));
0248   }
0249 
0250   @Test
0251   public void emptyRDD() {
0252     JavaRDD<String> rdd = sc.emptyRDD();
0253     assertEquals("Empty RDD shouldn't have any values", 0, rdd.count());
0254   }
0255 
0256   @Test
0257   public void sortBy() {
0258     List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
0259     pairs.add(new Tuple2<>(0, 4));
0260     pairs.add(new Tuple2<>(3, 2));
0261     pairs.add(new Tuple2<>(-1, 1));
0262 
0263     JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(pairs);
0264 
0265     // compare on first value
0266     JavaRDD<Tuple2<Integer, Integer>> sortedRDD = rdd.sortBy(Tuple2::_1, true, 2);
0267 
0268     assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
0269     List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
0270     assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
0271     assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
0272 
0273     // compare on second value
0274     sortedRDD = rdd.sortBy(Tuple2::_2, true, 2);
0275     assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
0276     sortedPairs = sortedRDD.collect();
0277     assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1));
0278     assertEquals(new Tuple2<>(0, 4), sortedPairs.get(2));
0279   }
0280 
0281   @Test
0282   public void foreach() {
0283     LongAccumulator accum = sc.sc().longAccumulator();
0284     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
0285     rdd.foreach(s -> accum.add(1));
0286     assertEquals(2, accum.value().intValue());
0287   }
0288 
0289   @Test
0290   public void foreachPartition() {
0291     LongAccumulator accum = sc.sc().longAccumulator();
0292     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
0293     rdd.foreachPartition(iter -> {
0294       while (iter.hasNext()) {
0295         iter.next();
0296         accum.add(1);
0297       }
0298     });
0299     assertEquals(2, accum.value().intValue());
0300   }
0301 
0302   @Test
0303   public void toLocalIterator() {
0304     List<Integer> correct = Arrays.asList(1, 2, 3, 4);
0305     JavaRDD<Integer> rdd = sc.parallelize(correct);
0306     List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
0307     assertEquals(correct, result);
0308   }
0309 
0310   @Test
0311   public void zipWithUniqueId() {
0312     List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
0313     JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithUniqueId();
0314     JavaRDD<Long> indexes = zip.values();
0315     assertEquals(4, new HashSet<>(indexes.collect()).size());
0316   }
0317 
0318   @Test
0319   public void zipWithIndex() {
0320     List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
0321     JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithIndex();
0322     JavaRDD<Long> indexes = zip.values();
0323     List<Long> correctIndexes = Arrays.asList(0L, 1L, 2L, 3L);
0324     assertEquals(correctIndexes, indexes.collect());
0325   }
0326 
0327   @SuppressWarnings("unchecked")
0328   @Test
0329   public void lookup() {
0330     JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
0331       new Tuple2<>("Apples", "Fruit"),
0332       new Tuple2<>("Oranges", "Fruit"),
0333       new Tuple2<>("Oranges", "Citrus")
0334     ));
0335     assertEquals(2, categories.lookup("Oranges").size());
0336     assertEquals(2, Iterables.size(categories.groupByKey().lookup("Oranges").get(0)));
0337   }
0338 
0339   @Test
0340   public void groupBy() {
0341     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0342     Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
0343     JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
0344     assertEquals(2, oddsAndEvens.count());
0345     assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
0346     assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
0347 
0348     oddsAndEvens = rdd.groupBy(isOdd, 1);
0349     assertEquals(2, oddsAndEvens.count());
0350     assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
0351     assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
0352   }
0353 
0354   @Test
0355   public void groupByOnPairRDD() {
0356     // Regression test for SPARK-4459
0357     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0358     Function<Tuple2<Integer, Integer>, Boolean> areOdd =
0359       x -> (x._1() % 2 == 0) && (x._2() % 2 == 0);
0360     JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
0361     JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd);
0362     assertEquals(2, oddsAndEvens.count());
0363     assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
0364     assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
0365 
0366     oddsAndEvens = pairRDD.groupBy(areOdd, 1);
0367     assertEquals(2, oddsAndEvens.count());
0368     assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
0369     assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
0370   }
0371 
0372   @SuppressWarnings("unchecked")
0373   @Test
0374   public void keyByOnPairRDD() {
0375     // Regression test for SPARK-4459
0376     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0377     Function<Tuple2<Integer, Integer>, String> sumToString = x -> String.valueOf(x._1() + x._2());
0378     JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
0379     JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = pairRDD.keyBy(sumToString);
0380     assertEquals(7, keyed.count());
0381     assertEquals(1, (long) keyed.lookup("2").get(0)._1());
0382   }
0383 
0384   @SuppressWarnings("unchecked")
0385   @Test
0386   public void cogroup() {
0387     JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
0388       new Tuple2<>("Apples", "Fruit"),
0389       new Tuple2<>("Oranges", "Fruit"),
0390       new Tuple2<>("Oranges", "Citrus")
0391       ));
0392     JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
0393       new Tuple2<>("Oranges", 2),
0394       new Tuple2<>("Apples", 3)
0395     ));
0396     JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped =
0397         categories.cogroup(prices);
0398     assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
0399     assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
0400 
0401     cogrouped.collect();
0402   }
0403 
0404   @SuppressWarnings("unchecked")
0405   @Test
0406   public void cogroup3() {
0407     JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
0408       new Tuple2<>("Apples", "Fruit"),
0409       new Tuple2<>("Oranges", "Fruit"),
0410       new Tuple2<>("Oranges", "Citrus")
0411       ));
0412     JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
0413       new Tuple2<>("Oranges", 2),
0414       new Tuple2<>("Apples", 3)
0415     ));
0416     JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
0417       new Tuple2<>("Oranges", 21),
0418       new Tuple2<>("Apples", 42)
0419     ));
0420 
0421     JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Integer>>> cogrouped =
0422         categories.cogroup(prices, quantities);
0423     assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
0424     assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
0425     assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
0426 
0427 
0428     cogrouped.collect();
0429   }
0430 
0431   @SuppressWarnings("unchecked")
0432   @Test
0433   public void cogroup4() {
0434     JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
0435       new Tuple2<>("Apples", "Fruit"),
0436       new Tuple2<>("Oranges", "Fruit"),
0437       new Tuple2<>("Oranges", "Citrus")
0438       ));
0439     JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
0440       new Tuple2<>("Oranges", 2),
0441       new Tuple2<>("Apples", 3)
0442     ));
0443     JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
0444       new Tuple2<>("Oranges", 21),
0445       new Tuple2<>("Apples", 42)
0446     ));
0447     JavaPairRDD<String, String> countries = sc.parallelizePairs(Arrays.asList(
0448       new Tuple2<>("Oranges", "BR"),
0449       new Tuple2<>("Apples", "US")
0450     ));
0451 
0452     JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Integer>,
0453         Iterable<String>>> cogrouped = categories.cogroup(prices, quantities, countries);
0454     assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
0455     assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
0456     assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
0457     assertEquals("[BR]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._4()));
0458 
0459     cogrouped.collect();
0460   }
0461 
0462   @SuppressWarnings("unchecked")
0463   @Test
0464   public void leftOuterJoin() {
0465     JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
0466       new Tuple2<>(1, 1),
0467       new Tuple2<>(1, 2),
0468       new Tuple2<>(2, 1),
0469       new Tuple2<>(3, 1)
0470       ));
0471     JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
0472       new Tuple2<>(1, 'x'),
0473       new Tuple2<>(2, 'y'),
0474       new Tuple2<>(2, 'z'),
0475       new Tuple2<>(4, 'w')
0476     ));
0477     List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
0478       rdd1.leftOuterJoin(rdd2).collect();
0479     assertEquals(5, joined.size());
0480     Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
0481       rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
0482     assertEquals(3, firstUnmatched._1().intValue());
0483   }
0484 
0485   @Test
0486   public void foldReduce() {
0487     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0488     Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
0489 
0490     int sum = rdd.fold(0, add);
0491     assertEquals(33, sum);
0492 
0493     sum = rdd.reduce(add);
0494     assertEquals(33, sum);
0495   }
0496 
0497   @Test
0498   public void treeReduce() {
0499     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
0500     Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
0501     for (int depth = 1; depth <= 10; depth++) {
0502       int sum = rdd.treeReduce(add, depth);
0503       assertEquals(-5, sum);
0504     }
0505   }
0506 
0507   @Test
0508   public void treeAggregate() {
0509     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
0510     Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
0511     for (int depth = 1; depth <= 10; depth++) {
0512       int sum = rdd.treeAggregate(0, add, add, depth);
0513       assertEquals(-5, sum);
0514     }
0515   }
0516 
0517   @SuppressWarnings("unchecked")
0518   @Test
0519   public void aggregateByKey() {
0520     JavaPairRDD<Integer, Integer> pairs = sc.parallelizePairs(
0521       Arrays.asList(
0522         new Tuple2<>(1, 1),
0523         new Tuple2<>(1, 1),
0524         new Tuple2<>(3, 2),
0525         new Tuple2<>(5, 1),
0526         new Tuple2<>(5, 3)), 2);
0527 
0528     Map<Integer, HashSet<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(),
0529        (a, b) -> {
0530          a.add(b);
0531          return a;
0532        },
0533        (a, b) -> {
0534          a.addAll(b);
0535          return a;
0536        }).collectAsMap();
0537     assertEquals(3, sets.size());
0538     assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1));
0539     assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3));
0540     assertEquals(new HashSet<>(Arrays.asList(1, 3)), sets.get(5));
0541   }
0542 
0543   @SuppressWarnings("unchecked")
0544   @Test
0545   public void foldByKey() {
0546     List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
0547       new Tuple2<>(2, 1),
0548       new Tuple2<>(2, 1),
0549       new Tuple2<>(1, 1),
0550       new Tuple2<>(3, 2),
0551       new Tuple2<>(3, 1)
0552     );
0553     JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
0554     JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
0555     assertEquals(1, sums.lookup(1).get(0).intValue());
0556     assertEquals(2, sums.lookup(2).get(0).intValue());
0557     assertEquals(3, sums.lookup(3).get(0).intValue());
0558   }
0559 
0560   @SuppressWarnings("unchecked")
0561   @Test
0562   public void reduceByKey() {
0563     List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
0564       new Tuple2<>(2, 1),
0565       new Tuple2<>(2, 1),
0566       new Tuple2<>(1, 1),
0567       new Tuple2<>(3, 2),
0568       new Tuple2<>(3, 1)
0569     );
0570     JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
0571     JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
0572     assertEquals(1, counts.lookup(1).get(0).intValue());
0573     assertEquals(2, counts.lookup(2).get(0).intValue());
0574     assertEquals(3, counts.lookup(3).get(0).intValue());
0575 
0576     Map<Integer, Integer> localCounts = counts.collectAsMap();
0577     assertEquals(1, localCounts.get(1).intValue());
0578     assertEquals(2, localCounts.get(2).intValue());
0579     assertEquals(3, localCounts.get(3).intValue());
0580 
0581     localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
0582     assertEquals(1, localCounts.get(1).intValue());
0583     assertEquals(2, localCounts.get(2).intValue());
0584     assertEquals(3, localCounts.get(3).intValue());
0585   }
0586 
0587   @Test
0588   public void approximateResults() {
0589     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0590     Map<Integer, Long> countsByValue = rdd.countByValue();
0591     assertEquals(2, countsByValue.get(1).longValue());
0592     assertEquals(1, countsByValue.get(13).longValue());
0593 
0594     PartialResult<Map<Integer, BoundedDouble>> approx = rdd.countByValueApprox(1);
0595     Map<Integer, BoundedDouble> finalValue = approx.getFinalValue();
0596     assertEquals(2.0, finalValue.get(1).mean(), 0.01);
0597     assertEquals(1.0, finalValue.get(13).mean(), 0.01);
0598   }
0599 
0600   @Test
0601   public void take() {
0602     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0603     assertEquals(1, rdd.first().intValue());
0604     rdd.take(2);
0605     rdd.takeSample(false, 2, 42);
0606   }
0607 
0608   @Test
0609   public void isEmpty() {
0610     assertTrue(sc.emptyRDD().isEmpty());
0611     assertTrue(sc.parallelize(new ArrayList<Integer>()).isEmpty());
0612     assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty());
0613     assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(i -> i < 0).isEmpty());
0614     assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(i -> i > 1).isEmpty());
0615   }
0616 
0617   @Test
0618   public void cartesian() {
0619     JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
0620     JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
0621     JavaPairRDD<String, Double> cartesian = stringRDD.cartesian(doubleRDD);
0622     assertEquals(new Tuple2<>("Hello", 1.0), cartesian.first());
0623   }
0624 
0625   @Test
0626   public void javaDoubleRDD() {
0627     JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
0628     JavaDoubleRDD distinct = rdd.distinct();
0629     assertEquals(5, distinct.count());
0630     JavaDoubleRDD filter = rdd.filter(x -> x > 2.0);
0631     assertEquals(3, filter.count());
0632     JavaDoubleRDD union = rdd.union(rdd);
0633     assertEquals(12, union.count());
0634     union = union.cache();
0635     assertEquals(12, union.count());
0636 
0637     assertEquals(20, rdd.sum(), 0.01);
0638     StatCounter stats = rdd.stats();
0639     assertEquals(20, stats.sum(), 0.01);
0640     assertEquals(20/6.0, rdd.mean(), 0.01);
0641     assertEquals(20/6.0, rdd.mean(), 0.01);
0642     assertEquals(6.22222, rdd.variance(), 0.01);
0643     assertEquals(rdd.variance(), rdd.popVariance(), 1e-14);
0644     assertEquals(7.46667, rdd.sampleVariance(), 0.01);
0645     assertEquals(2.49444, rdd.stdev(), 0.01);
0646     assertEquals(rdd.stdev(), rdd.popStdev(), 1e-14);
0647     assertEquals(2.73252, rdd.sampleStdev(), 0.01);
0648 
0649     rdd.first();
0650     rdd.take(5);
0651   }
0652 
0653   @Test
0654   public void javaDoubleRDDHistoGram() {
0655     JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0656     // Test using generated buckets
0657     Tuple2<double[], long[]> results = rdd.histogram(2);
0658     double[] expected_buckets = {1.0, 2.5, 4.0};
0659     long[] expected_counts = {2, 2};
0660     assertArrayEquals(expected_buckets, results._1(), 0.1);
0661     assertArrayEquals(expected_counts, results._2());
0662     // Test with provided buckets
0663     long[] histogram = rdd.histogram(expected_buckets);
0664     assertArrayEquals(expected_counts, histogram);
0665     // SPARK-5744
0666     assertArrayEquals(
0667       new long[] {0},
0668       sc.parallelizeDoubles(new ArrayList<>(0), 1).histogram(new double[]{0.0, 1.0}));
0669   }
0670 
0671   private static class DoubleComparator implements Comparator<Double>, Serializable {
0672     @Override
0673     public int compare(Double o1, Double o2) {
0674       return o1.compareTo(o2);
0675     }
0676   }
0677 
0678   @Test
0679   public void max() {
0680     JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0681     double max = rdd.max(new DoubleComparator());
0682     assertEquals(4.0, max, 0.001);
0683   }
0684 
0685   @Test
0686   public void min() {
0687     JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0688     double max = rdd.min(new DoubleComparator());
0689     assertEquals(1.0, max, 0.001);
0690   }
0691 
0692   @Test
0693   public void naturalMax() {
0694     JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0695     double max = rdd.max();
0696     assertEquals(4.0, max, 0.0);
0697   }
0698 
0699   @Test
0700   public void naturalMin() {
0701     JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0702     double max = rdd.min();
0703     assertEquals(1.0, max, 0.0);
0704   }
0705 
0706   @Test
0707   public void takeOrdered() {
0708     JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0709     assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2, new DoubleComparator()));
0710     assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2));
0711   }
0712 
0713   @Test
0714   public void top() {
0715     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0716     List<Integer> top2 = rdd.top(2);
0717     assertEquals(Arrays.asList(4, 3), top2);
0718   }
0719 
0720   private static class AddInts implements Function2<Integer, Integer, Integer> {
0721     @Override
0722     public Integer call(Integer a, Integer b) {
0723       return a + b;
0724     }
0725   }
0726 
0727   @Test
0728   public void reduce() {
0729     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0730     int sum = rdd.reduce(new AddInts());
0731     assertEquals(10, sum);
0732   }
0733 
0734   @Test
0735   public void reduceOnJavaDoubleRDD() {
0736     JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
0737     double sum = rdd.reduce((v1, v2) -> v1 + v2);
0738     assertEquals(10.0, sum, 0.001);
0739   }
0740 
0741   @Test
0742   public void fold() {
0743     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0744     int sum = rdd.fold(0, new AddInts());
0745     assertEquals(10, sum);
0746   }
0747 
0748   @Test
0749   public void aggregate() {
0750     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0751     int sum = rdd.aggregate(0, new AddInts(), new AddInts());
0752     assertEquals(10, sum);
0753   }
0754 
0755   @Test
0756   public void map() {
0757     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
0758     JavaDoubleRDD doubles = rdd.mapToDouble(Integer::doubleValue).cache();
0759     doubles.collect();
0760     JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)).cache();
0761     pairs.collect();
0762     JavaRDD<String> strings = rdd.map(Object::toString).cache();
0763     strings.collect();
0764   }
0765 
0766   @Test
0767   public void flatMap() {
0768     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
0769       "The quick brown fox jumps over the lazy dog."));
0770     JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
0771     assertEquals("Hello", words.first());
0772     assertEquals(11, words.count());
0773 
0774     JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(s -> {
0775         List<Tuple2<String, String>> pairs = new LinkedList<>();
0776         for (String word : s.split(" ")) {
0777           pairs.add(new Tuple2<>(word, word));
0778         }
0779         return pairs.iterator();
0780       }
0781     );
0782     assertEquals(new Tuple2<>("Hello", "Hello"), pairsRDD.first());
0783     assertEquals(11, pairsRDD.count());
0784 
0785     JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
0786       List<Double> lengths = new LinkedList<>();
0787       for (String word : s.split(" ")) {
0788         lengths.add((double) word.length());
0789       }
0790       return lengths.iterator();
0791     });
0792     assertEquals(5.0, doubles.first(), 0.01);
0793     assertEquals(11, pairsRDD.count());
0794   }
0795 
0796   @SuppressWarnings("unchecked")
0797   @Test
0798   public void mapsFromPairsToPairs() {
0799     List<Tuple2<Integer, String>> pairs = Arrays.asList(
0800       new Tuple2<>(1, "a"),
0801       new Tuple2<>(2, "aa"),
0802       new Tuple2<>(3, "aaa")
0803     );
0804     JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
0805 
0806     // Regression test for SPARK-668:
0807     JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
0808       item -> Collections.singletonList(item.swap()).iterator());
0809     swapped.collect();
0810 
0811     // There was never a bug here, but it's worth testing:
0812     pairRDD.mapToPair(Tuple2::swap).collect();
0813   }
0814 
0815   @Test
0816   public void mapPartitions() {
0817     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
0818     JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
0819         int sum = 0;
0820         while (iter.hasNext()) {
0821           sum += iter.next();
0822         }
0823         return Collections.singletonList(sum).iterator();
0824       });
0825     assertEquals("[3, 7]", partitionSums.collect().toString());
0826   }
0827 
0828 
0829   @Test
0830   public void mapPartitionsWithIndex() {
0831     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
0832     JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex((index, iter) -> {
0833         int sum = 0;
0834         while (iter.hasNext()) {
0835           sum += iter.next();
0836         }
0837         return Collections.singletonList(sum).iterator();
0838       }, false);
0839     assertEquals("[3, 7]", partitionSums.collect().toString());
0840   }
0841 
0842   @Test
0843   public void getNumPartitions(){
0844     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
0845     JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2);
0846     JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(
0847       Arrays.asList(
0848         new Tuple2<>("a", 1),
0849         new Tuple2<>("aa", 2),
0850         new Tuple2<>("aaa", 3)
0851       ),
0852       2);
0853     assertEquals(3, rdd1.getNumPartitions());
0854     assertEquals(2, rdd2.getNumPartitions());
0855     assertEquals(2, rdd3.getNumPartitions());
0856   }
0857 
0858   @Test
0859   public void repartition() {
0860     // Shrinking number of partitions
0861     JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 2);
0862     JavaRDD<Integer> repartitioned1 = in1.repartition(4);
0863     List<List<Integer>> result1 = repartitioned1.glom().collect();
0864     assertEquals(4, result1.size());
0865     for (List<Integer> l : result1) {
0866       assertFalse(l.isEmpty());
0867     }
0868 
0869     // Growing number of partitions
0870     JavaRDD<Integer> in2 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 4);
0871     JavaRDD<Integer> repartitioned2 = in2.repartition(2);
0872     List<List<Integer>> result2 = repartitioned2.glom().collect();
0873     assertEquals(2, result2.size());
0874     for (List<Integer> l: result2) {
0875       assertFalse(l.isEmpty());
0876     }
0877   }
0878 
0879   @SuppressWarnings("unchecked")
0880   @Test
0881   public void persist() {
0882     JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
0883     doubleRDD = doubleRDD.persist(StorageLevel.DISK_ONLY());
0884     assertEquals(20, doubleRDD.sum(), 0.1);
0885 
0886     List<Tuple2<Integer, String>> pairs = Arrays.asList(
0887       new Tuple2<>(1, "a"),
0888       new Tuple2<>(2, "aa"),
0889       new Tuple2<>(3, "aaa")
0890     );
0891     JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
0892     pairRDD = pairRDD.persist(StorageLevel.DISK_ONLY());
0893     assertEquals("a", pairRDD.first()._2());
0894 
0895     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
0896     rdd = rdd.persist(StorageLevel.DISK_ONLY());
0897     assertEquals(1, rdd.first().intValue());
0898   }
0899 
0900   @Test
0901   public void iterator() {
0902     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
0903     TaskContext context = TaskContext$.MODULE$.empty();
0904     assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue());
0905   }
0906 
0907   @Test
0908   public void glom() {
0909     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
0910     assertEquals("[1, 2]", rdd.glom().first().toString());
0911   }
0912 
0913   // File input / output tests are largely adapted from FileSuite:
0914 
0915   @Test
0916   public void textFiles() throws IOException {
0917     String outputDir = new File(tempDir, "output").getAbsolutePath();
0918     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0919     rdd.saveAsTextFile(outputDir);
0920     // Read the plain text file and check it's OK
0921     File outputFile = new File(outputDir, "part-00000");
0922     String content = Files.toString(outputFile, StandardCharsets.UTF_8);
0923     assertEquals("1\n2\n3\n4\n", content);
0924     // Also try reading it in as a text file RDD
0925     List<String> expected = Arrays.asList("1", "2", "3", "4");
0926     JavaRDD<String> readRDD = sc.textFile(outputDir);
0927     assertEquals(expected, readRDD.collect());
0928   }
0929 
0930   @Test
0931   public void wholeTextFiles() throws Exception {
0932     byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
0933     byte[] content2 = "spark is also easy to use.\n".getBytes(StandardCharsets.UTF_8);
0934 
0935     String tempDirName = tempDir.getAbsolutePath();
0936     String path1 = new Path(tempDirName, "part-00000").toUri().getPath();
0937     String path2 = new Path(tempDirName, "part-00001").toUri().getPath();
0938 
0939     Files.write(content1, new File(path1));
0940     Files.write(content2, new File(path2));
0941 
0942     Map<String, String> container = new HashMap<>();
0943     container.put(path1, new Text(content1).toString());
0944     container.put(path2, new Text(content2).toString());
0945 
0946     JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName, 3);
0947     List<Tuple2<String, String>> result = readRDD.collect();
0948 
0949     for (Tuple2<String, String> res : result) {
0950       // Note that the paths from `wholeTextFiles` are in URI format on Windows,
0951       // for example, file:/C:/a/b/c.
0952       assertEquals(res._2(), container.get(new Path(res._1()).toUri().getPath()));
0953     }
0954   }
0955 
0956   @Test
0957   public void textFilesCompressed() {
0958     String outputDir = new File(tempDir, "output").getAbsolutePath();
0959     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0960     rdd.saveAsTextFile(outputDir, DefaultCodec.class);
0961 
0962     // Try reading it in as a text file RDD
0963     List<String> expected = Arrays.asList("1", "2", "3", "4");
0964     JavaRDD<String> readRDD = sc.textFile(outputDir);
0965     assertEquals(expected, readRDD.collect());
0966   }
0967 
0968   @SuppressWarnings("unchecked")
0969   @Test
0970   public void sequenceFile() {
0971     String outputDir = new File(tempDir, "output").getAbsolutePath();
0972     List<Tuple2<Integer, String>> pairs = Arrays.asList(
0973       new Tuple2<>(1, "a"),
0974       new Tuple2<>(2, "aa"),
0975       new Tuple2<>(3, "aaa")
0976     );
0977     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
0978 
0979     rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
0980       .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
0981 
0982     // Try reading the output back as an object file
0983     JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
0984       Text.class).mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
0985     assertEquals(pairs, readRDD.collect());
0986   }
0987 
0988   @Test
0989   public void binaryFiles() throws Exception {
0990     // Reusing the wholeText files example
0991     byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
0992 
0993     String tempDirName = tempDir.getAbsolutePath();
0994     File file1 = new File(tempDirName + "/part-00000");
0995 
0996     FileOutputStream fos1 = new FileOutputStream(file1);
0997 
0998     try (FileChannel channel1 = fos1.getChannel()) {
0999       ByteBuffer bbuf = ByteBuffer.wrap(content1);
1000       channel1.write(bbuf);
1001     }
1002     JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName, 3);
1003     List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
1004     for (Tuple2<String, PortableDataStream> res : result) {
1005       assertArrayEquals(content1, res._2().toArray());
1006     }
1007   }
1008 
1009   @Test
1010   public void binaryFilesCaching() throws Exception {
1011     // Reusing the wholeText files example
1012     byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
1013 
1014     String tempDirName = tempDir.getAbsolutePath();
1015     File file1 = new File(tempDirName + "/part-00000");
1016 
1017     FileOutputStream fos1 = new FileOutputStream(file1);
1018 
1019     try (FileChannel channel1 = fos1.getChannel()) {
1020       ByteBuffer bbuf = ByteBuffer.wrap(content1);
1021       channel1.write(bbuf);
1022     }
1023 
1024     JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
1025     readRDD.foreach(pair -> pair._2().toArray()); // force the file to read
1026 
1027     List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
1028     for (Tuple2<String, PortableDataStream> res : result) {
1029       assertArrayEquals(content1, res._2().toArray());
1030     }
1031   }
1032 
1033   @Test
1034   public void binaryRecords() throws Exception {
1035     // Reusing the wholeText files example
1036     byte[] content1 = "spark isn't always easy to use.\n".getBytes(StandardCharsets.UTF_8);
1037     int numOfCopies = 10;
1038     String tempDirName = tempDir.getAbsolutePath();
1039     File file1 = new File(tempDirName + "/part-00000");
1040 
1041     FileOutputStream fos1 = new FileOutputStream(file1);
1042 
1043     try (FileChannel channel1 = fos1.getChannel()) {
1044       for (int i = 0; i < numOfCopies; i++) {
1045         ByteBuffer bbuf = ByteBuffer.wrap(content1);
1046         channel1.write(bbuf);
1047       }
1048     }
1049 
1050     JavaRDD<byte[]> readRDD = sc.binaryRecords(tempDirName, content1.length);
1051     assertEquals(numOfCopies,readRDD.count());
1052     List<byte[]> result = readRDD.collect();
1053     for (byte[] res : result) {
1054       assertArrayEquals(content1, res);
1055     }
1056   }
1057 
1058   @SuppressWarnings("unchecked")
1059   @Test
1060   public void writeWithNewAPIHadoopFile() {
1061     String outputDir = new File(tempDir, "output").getAbsolutePath();
1062     List<Tuple2<Integer, String>> pairs = Arrays.asList(
1063       new Tuple2<>(1, "a"),
1064       new Tuple2<>(2, "aa"),
1065       new Tuple2<>(3, "aaa")
1066     );
1067     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
1068 
1069     rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
1070       .saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
1071         org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
1072 
1073     JavaPairRDD<IntWritable, Text> output =
1074       sc.sequenceFile(outputDir, IntWritable.class, Text.class);
1075     assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
1076   }
1077 
1078   @SuppressWarnings("unchecked")
1079   @Test
1080   public void readWithNewAPIHadoopFile() throws IOException {
1081     String outputDir = new File(tempDir, "output").getAbsolutePath();
1082     List<Tuple2<Integer, String>> pairs = Arrays.asList(
1083       new Tuple2<>(1, "a"),
1084       new Tuple2<>(2, "aa"),
1085       new Tuple2<>(3, "aaa")
1086     );
1087     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
1088 
1089     rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
1090       .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
1091 
1092     JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
1093       org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
1094       IntWritable.class, Text.class, Job.getInstance().getConfiguration());
1095     assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
1096   }
1097 
1098   @Test
1099   public void objectFilesOfInts() {
1100     String outputDir = new File(tempDir, "output").getAbsolutePath();
1101     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
1102     rdd.saveAsObjectFile(outputDir);
1103     // Try reading the output back as an object file
1104     List<Integer> expected = Arrays.asList(1, 2, 3, 4);
1105     JavaRDD<Integer> readRDD = sc.objectFile(outputDir);
1106     assertEquals(expected, readRDD.collect());
1107   }
1108 
1109   @SuppressWarnings("unchecked")
1110   @Test
1111   public void objectFilesOfComplexTypes() {
1112     String outputDir = new File(tempDir, "output").getAbsolutePath();
1113     List<Tuple2<Integer, String>> pairs = Arrays.asList(
1114       new Tuple2<>(1, "a"),
1115       new Tuple2<>(2, "aa"),
1116       new Tuple2<>(3, "aaa")
1117     );
1118     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
1119     rdd.saveAsObjectFile(outputDir);
1120     // Try reading the output back as an object file
1121     JavaRDD<Tuple2<Integer, String>> readRDD = sc.objectFile(outputDir);
1122     assertEquals(pairs, readRDD.collect());
1123   }
1124 
1125   @SuppressWarnings("unchecked")
1126   @Test
1127   public void hadoopFile() {
1128     String outputDir = new File(tempDir, "output").getAbsolutePath();
1129     List<Tuple2<Integer, String>> pairs = Arrays.asList(
1130       new Tuple2<>(1, "a"),
1131       new Tuple2<>(2, "aa"),
1132       new Tuple2<>(3, "aaa")
1133     );
1134     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
1135 
1136     rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
1137       .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
1138 
1139     JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
1140       SequenceFileInputFormat.class, IntWritable.class, Text.class);
1141     assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
1142   }
1143 
1144   @SuppressWarnings("unchecked")
1145   @Test
1146   public void hadoopFileCompressed() {
1147     String outputDir = new File(tempDir, "output_compressed").getAbsolutePath();
1148     List<Tuple2<Integer, String>> pairs = Arrays.asList(
1149       new Tuple2<>(1, "a"),
1150       new Tuple2<>(2, "aa"),
1151       new Tuple2<>(3, "aaa")
1152     );
1153     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
1154 
1155     rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
1156       .saveAsHadoopFile(outputDir, IntWritable.class, Text.class,
1157         SequenceFileOutputFormat.class, DefaultCodec.class);
1158 
1159     JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
1160       SequenceFileInputFormat.class, IntWritable.class, Text.class);
1161 
1162     assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
1163   }
1164 
1165   @Test
1166   public void zip() {
1167     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
1168     JavaDoubleRDD doubles = rdd.mapToDouble(Integer::doubleValue);
1169     JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
1170     zipped.count();
1171   }
1172 
1173   @Test
1174   public void zipPartitions() {
1175     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
1176     JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
1177     FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
1178         (i, s) -> Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
1179 
1180     JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
1181     assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
1182   }
1183 
1184   @Test
1185   public void keyBy() {
1186     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
1187     List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();
1188     assertEquals(new Tuple2<>("1", 1), s.get(0));
1189     assertEquals(new Tuple2<>("2", 2), s.get(1));
1190   }
1191 
1192   @Test
1193   public void checkpointAndComputation() {
1194     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
1195     sc.setCheckpointDir(tempDir.getAbsolutePath());
1196     assertFalse(rdd.isCheckpointed());
1197     rdd.checkpoint();
1198     rdd.count(); // Forces the DAG to cause a checkpoint
1199     assertTrue(rdd.isCheckpointed());
1200     assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect());
1201   }
1202 
1203   @Test
1204   public void checkpointAndRestore() {
1205     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
1206     sc.setCheckpointDir(tempDir.getAbsolutePath());
1207     assertFalse(rdd.isCheckpointed());
1208     rdd.checkpoint();
1209     rdd.count(); // Forces the DAG to cause a checkpoint
1210     assertTrue(rdd.isCheckpointed());
1211 
1212     assertTrue(rdd.getCheckpointFile().isPresent());
1213     JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
1214     assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
1215   }
1216 
1217   @Test
1218   public void combineByKey() {
1219     JavaRDD<Integer> originalRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
1220     Function<Integer, Integer> keyFunction = v1 -> v1 % 3;
1221     Function<Integer, Integer> createCombinerFunction = v1 -> v1;
1222 
1223     Function2<Integer, Integer, Integer> mergeValueFunction = (v1, v2) -> v1 + v2;
1224 
1225     JavaPairRDD<Integer, Integer> combinedRDD = originalRDD.keyBy(keyFunction)
1226       .combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction);
1227     Map<Integer, Integer> results = combinedRDD.collectAsMap();
1228     ImmutableMap<Integer, Integer> expected = ImmutableMap.of(0, 9, 1, 5, 2, 7);
1229     assertEquals(expected, results);
1230 
1231     Partitioner defaultPartitioner = Partitioner.defaultPartitioner(
1232       combinedRDD.rdd(),
1233       JavaConverters.collectionAsScalaIterableConverter(
1234         Collections.<RDD<?>>emptyList()).asScala().toSeq());
1235     combinedRDD = originalRDD.keyBy(keyFunction)
1236       .combineByKey(
1237         createCombinerFunction,
1238         mergeValueFunction,
1239         mergeValueFunction,
1240         defaultPartitioner,
1241         false,
1242         new KryoSerializer(new SparkConf()));
1243     results = combinedRDD.collectAsMap();
1244     assertEquals(expected, results);
1245   }
1246 
1247   @SuppressWarnings("unchecked")
1248   @Test
1249   public void mapOnPairRDD() {
1250     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
1251     JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
1252     JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
1253     assertEquals(Arrays.asList(
1254       new Tuple2<>(1, 1),
1255       new Tuple2<>(0, 2),
1256       new Tuple2<>(1, 3),
1257       new Tuple2<>(0, 4)), rdd3.collect());
1258   }
1259 
1260   @SuppressWarnings("unchecked")
1261   @Test
1262   public void collectPartitions() {
1263     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
1264 
1265     JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
1266 
1267     List<Integer>[] parts = rdd1.collectPartitions(new int[] {0});
1268     assertEquals(Arrays.asList(1, 2), parts[0]);
1269 
1270     parts = rdd1.collectPartitions(new int[] {1, 2});
1271     assertEquals(Arrays.asList(3, 4), parts[0]);
1272     assertEquals(Arrays.asList(5, 6, 7), parts[1]);
1273 
1274     assertEquals(
1275       Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
1276       rdd2.collectPartitions(new int[] {0})[0]);
1277 
1278     List<Tuple2<Integer,Integer>>[] parts2 = rdd2.collectPartitions(new int[] {1, 2});
1279     assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
1280     assertEquals(
1281       Arrays.asList(
1282         new Tuple2<>(5, 1),
1283         new Tuple2<>(6, 0),
1284         new Tuple2<>(7, 1)),
1285       parts2[1]);
1286   }
1287 
1288   @Test
1289   public void countApproxDistinct() {
1290     List<Integer> arrayData = new ArrayList<>();
1291     int size = 100;
1292     for (int i = 0; i < 100000; i++) {
1293       arrayData.add(i % size);
1294     }
1295     JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
1296     assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.1);
1297   }
1298 
1299   @Test
1300   public void countApproxDistinctByKey() {
1301     List<Tuple2<Integer, Integer>> arrayData = new ArrayList<>();
1302     for (int i = 10; i < 100; i++) {
1303       for (int j = 0; j < i; j++) {
1304         arrayData.add(new Tuple2<>(i, j));
1305       }
1306     }
1307     double relativeSD = 0.001;
1308     JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
1309     List<Tuple2<Integer, Long>> res =  pairRdd.countApproxDistinctByKey(relativeSD, 8).collect();
1310     for (Tuple2<Integer, Long> resItem : res) {
1311       double count = resItem._1();
1312       long resCount = resItem._2();
1313       double error = Math.abs((resCount - count) / count);
1314       assertTrue(error < 0.1);
1315     }
1316   }
1317 
1318   @Test
1319   public void collectAsMapWithIntArrayValues() {
1320     // Regression test for SPARK-1040
1321     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
1322     JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
1323     pairRDD.collect();  // Works fine
1324     pairRDD.collectAsMap();  // Used to crash with ClassCastException
1325   }
1326 
1327   @SuppressWarnings("unchecked")
1328   @Test
1329   public void collectAsMapAndSerialize() throws Exception {
1330     JavaPairRDD<String,Integer> rdd =
1331         sc.parallelizePairs(Arrays.asList(new Tuple2<>("foo", 1)));
1332     Map<String,Integer> map = rdd.collectAsMap();
1333     ByteArrayOutputStream bytes = new ByteArrayOutputStream();
1334     new ObjectOutputStream(bytes).writeObject(map);
1335     Map<String,Integer> deserializedMap = (Map<String,Integer>)
1336         new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject();
1337     assertEquals(1, deserializedMap.get("foo").intValue());
1338   }
1339 
1340   @Test
1341   @SuppressWarnings("unchecked")
1342   public void sampleByKey() {
1343     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
1344     JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i % 2, 1));
1345     Map<Integer, Double> fractions = new HashMap<>();
1346     fractions.put(0, 0.5);
1347     fractions.put(1, 1.0);
1348     JavaPairRDD<Integer, Integer> wr = rdd2.sampleByKey(true, fractions, 1L);
1349     Map<Integer, Long> wrCounts = wr.countByKey();
1350     assertEquals(2, wrCounts.size());
1351     assertTrue(wrCounts.get(0) > 0);
1352     assertTrue(wrCounts.get(1) > 0);
1353     JavaPairRDD<Integer, Integer> wor = rdd2.sampleByKey(false, fractions, 1L);
1354     Map<Integer, Long> worCounts = wor.countByKey();
1355     assertEquals(2, worCounts.size());
1356     assertTrue(worCounts.get(0) > 0);
1357     assertTrue(worCounts.get(1) > 0);
1358   }
1359 
1360   @Test
1361   @SuppressWarnings("unchecked")
1362   public void sampleByKeyExact() {
1363     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
1364     JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i % 2, 1));
1365     Map<Integer, Double> fractions = new HashMap<>();
1366     fractions.put(0, 0.5);
1367     fractions.put(1, 1.0);
1368     JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKeyExact(true, fractions, 1L);
1369     Map<Integer, Long> wrExactCounts = wrExact.countByKey();
1370     assertEquals(2, wrExactCounts.size());
1371     assertEquals(2, (long) wrExactCounts.get(0));
1372     assertEquals(4, (long) wrExactCounts.get(1));
1373     JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKeyExact(false, fractions, 1L);
1374     Map<Integer, Long> worExactCounts = worExact.countByKey();
1375     assertEquals(2, worExactCounts.size());
1376     assertEquals(2, (long) worExactCounts.get(0));
1377     assertEquals(4, (long) worExactCounts.get(1));
1378   }
1379 
1380   private static class SomeCustomClass implements Serializable {
1381     SomeCustomClass() {
1382       // Intentionally left blank
1383     }
1384   }
1385 
1386   @Test
1387   public void collectUnderlyingScalaRDD() {
1388     List<SomeCustomClass> data = new ArrayList<>();
1389     for (int i = 0; i < 100; i++) {
1390       data.add(new SomeCustomClass());
1391     }
1392     JavaRDD<SomeCustomClass> rdd = sc.parallelize(data);
1393     SomeCustomClass[] collected =
1394       (SomeCustomClass[]) rdd.rdd().retag(SomeCustomClass.class).collect();
1395     assertEquals(data.size(), collected.length);
1396   }
1397 
1398   private static final class BuggyMapFunction<T> implements Function<T, T> {
1399 
1400     @Override
1401     public T call(T x) {
1402       throw new IllegalStateException("Custom exception!");
1403     }
1404   }
1405 
1406   @Test
1407   public void collectAsync() throws Exception {
1408     List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
1409     JavaRDD<Integer> rdd = sc.parallelize(data, 1);
1410     JavaFutureAction<List<Integer>> future = rdd.collectAsync();
1411     List<Integer> result = future.get();
1412     assertEquals(data, result);
1413     assertFalse(future.isCancelled());
1414     assertTrue(future.isDone());
1415     assertEquals(1, future.jobIds().size());
1416   }
1417 
1418   @Test
1419   public void takeAsync() throws Exception {
1420     List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
1421     JavaRDD<Integer> rdd = sc.parallelize(data, 1);
1422     JavaFutureAction<List<Integer>> future = rdd.takeAsync(1);
1423     List<Integer> result = future.get();
1424     assertEquals(1, result.size());
1425     assertEquals((Integer) 1, result.get(0));
1426     assertFalse(future.isCancelled());
1427     assertTrue(future.isDone());
1428     assertEquals(1, future.jobIds().size());
1429   }
1430 
1431   @Test
1432   public void foreachAsync() throws Exception {
1433     List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
1434     JavaRDD<Integer> rdd = sc.parallelize(data, 1);
1435     JavaFutureAction<Void> future = rdd.foreachAsync(integer -> {});
1436     future.get();
1437     assertFalse(future.isCancelled());
1438     assertTrue(future.isDone());
1439     assertEquals(1, future.jobIds().size());
1440   }
1441 
1442   @Test
1443   public void countAsync() throws Exception {
1444     List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
1445     JavaRDD<Integer> rdd = sc.parallelize(data, 1);
1446     JavaFutureAction<Long> future = rdd.countAsync();
1447     long count = future.get();
1448     assertEquals(data.size(), count);
1449     assertFalse(future.isCancelled());
1450     assertTrue(future.isDone());
1451     assertEquals(1, future.jobIds().size());
1452   }
1453 
1454   @Test
1455   public void testAsyncActionCancellation() throws Exception {
1456     List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
1457     JavaRDD<Integer> rdd = sc.parallelize(data, 1);
1458     JavaFutureAction<Void> future = rdd.foreachAsync(integer -> {
1459       Thread.sleep(10000);  // To ensure that the job won't finish before it's cancelled.
1460     });
1461     future.cancel(true);
1462     assertTrue(future.isCancelled());
1463     assertTrue(future.isDone());
1464     try {
1465       future.get(2000, TimeUnit.MILLISECONDS);
1466       fail("Expected future.get() for cancelled job to throw CancellationException");
1467     } catch (CancellationException ignored) {
1468       // pass
1469     }
1470   }
1471 
1472   @Test
1473   public void testAsyncActionErrorWrapping() throws Exception {
1474     List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
1475     JavaRDD<Integer> rdd = sc.parallelize(data, 1);
1476     JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<>()).countAsync();
1477     try {
1478       future.get(2, TimeUnit.SECONDS);
1479       fail("Expected future.get() for failed job to throw ExcecutionException");
1480     } catch (ExecutionException ee) {
1481       assertTrue(Throwables.getStackTraceAsString(ee).contains("Custom exception!"));
1482     }
1483     assertTrue(future.isDone());
1484   }
1485 
1486   static class Class1 {}
1487   static class Class2 {}
1488 
1489   @Test
1490   public void testRegisterKryoClasses() {
1491     SparkConf conf = new SparkConf();
1492     conf.registerKryoClasses(new Class<?>[]{ Class1.class, Class2.class });
1493     assertEquals(
1494       Class1.class.getName() + "," + Class2.class.getName(),
1495       conf.get("spark.kryo.classesToRegister"));
1496   }
1497 
1498   @Test
1499   public void testGetPersistentRDDs() {
1500     java.util.Map<Integer, JavaRDD<?>> cachedRddsMap = sc.getPersistentRDDs();
1501     assertTrue(cachedRddsMap.isEmpty());
1502     JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b")).setName("RDD1").cache();
1503     JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("c", "d")).setName("RDD2").cache();
1504     cachedRddsMap = sc.getPersistentRDDs();
1505     assertEquals(2, cachedRddsMap.size());
1506     assertEquals("RDD1", cachedRddsMap.get(0).name());
1507     assertEquals("RDD2", cachedRddsMap.get(1).name());
1508   }
1509 
1510 }