Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package test.org.apache.spark;
0019 
0020 import java.io.File;
0021 import java.io.Serializable;
0022 import java.util.*;
0023 
0024 import scala.Tuple2;
0025 
0026 import com.google.common.collect.Iterables;
0027 import com.google.common.io.Files;
0028 import org.apache.hadoop.io.IntWritable;
0029 import org.apache.hadoop.io.Text;
0030 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
0031 import org.junit.After;
0032 import org.junit.Assert;
0033 import org.junit.Before;
0034 import org.junit.Test;
0035 
0036 import org.apache.spark.api.java.JavaDoubleRDD;
0037 import org.apache.spark.api.java.JavaPairRDD;
0038 import org.apache.spark.api.java.JavaRDD;
0039 import org.apache.spark.api.java.JavaSparkContext;
0040 import org.apache.spark.api.java.Optional;
0041 import org.apache.spark.api.java.function.*;
0042 import org.apache.spark.util.Utils;
0043 
0044 /**
0045  * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
0046  * lambda syntax.
0047  */
0048 public class Java8RDDAPISuite implements Serializable {
0049   private static int foreachCalls = 0;
0050   private transient JavaSparkContext sc;
0051 
0052   @Before
0053   public void setUp() {
0054     sc = new JavaSparkContext("local", "JavaAPISuite");
0055   }
0056 
0057   @After
0058   public void tearDown() {
0059     sc.stop();
0060     sc = null;
0061   }
0062 
0063   @Test
0064   public void foreachWithAnonymousClass() {
0065     foreachCalls = 0;
0066     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
0067     rdd.foreach(s -> foreachCalls++);
0068     Assert.assertEquals(2, foreachCalls);
0069   }
0070 
0071   @Test
0072   public void foreach() {
0073     foreachCalls = 0;
0074     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
0075     rdd.foreach(x -> foreachCalls++);
0076     Assert.assertEquals(2, foreachCalls);
0077   }
0078 
0079   @Test
0080   public void groupBy() {
0081     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0082     Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
0083     JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
0084     Assert.assertEquals(2, oddsAndEvens.count());
0085     Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
0086     Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
0087 
0088     oddsAndEvens = rdd.groupBy(isOdd, 1);
0089     Assert.assertEquals(2, oddsAndEvens.count());
0090     Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
0091     Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
0092   }
0093 
0094   @Test
0095   public void leftOuterJoin() {
0096     JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
0097       new Tuple2<>(1, 1),
0098       new Tuple2<>(1, 2),
0099       new Tuple2<>(2, 1),
0100       new Tuple2<>(3, 1)
0101     ));
0102     JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
0103       new Tuple2<>(1, 'x'),
0104       new Tuple2<>(2, 'y'),
0105       new Tuple2<>(2, 'z'),
0106       new Tuple2<>(4, 'w')
0107     ));
0108     List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
0109       rdd1.leftOuterJoin(rdd2).collect();
0110     Assert.assertEquals(5, joined.size());
0111     Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
0112       rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
0113     Assert.assertEquals(3, firstUnmatched._1().intValue());
0114   }
0115 
0116   @Test
0117   public void foldReduce() {
0118     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0119     Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
0120 
0121     int sum = rdd.fold(0, add);
0122     Assert.assertEquals(33, sum);
0123 
0124     sum = rdd.reduce(add);
0125     Assert.assertEquals(33, sum);
0126   }
0127 
0128   @Test
0129   public void foldByKey() {
0130     List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
0131       new Tuple2<>(2, 1),
0132       new Tuple2<>(2, 1),
0133       new Tuple2<>(1, 1),
0134       new Tuple2<>(3, 2),
0135       new Tuple2<>(3, 1)
0136     );
0137     JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
0138     JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
0139     Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
0140     Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
0141     Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
0142   }
0143 
0144   @Test
0145   public void reduceByKey() {
0146     List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
0147       new Tuple2<>(2, 1),
0148       new Tuple2<>(2, 1),
0149       new Tuple2<>(1, 1),
0150       new Tuple2<>(3, 2),
0151       new Tuple2<>(3, 1)
0152     );
0153     JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
0154     JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
0155     Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
0156     Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
0157     Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
0158 
0159     Map<Integer, Integer> localCounts = counts.collectAsMap();
0160     Assert.assertEquals(1, localCounts.get(1).intValue());
0161     Assert.assertEquals(2, localCounts.get(2).intValue());
0162     Assert.assertEquals(3, localCounts.get(3).intValue());
0163 
0164     localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
0165     Assert.assertEquals(1, localCounts.get(1).intValue());
0166     Assert.assertEquals(2, localCounts.get(2).intValue());
0167     Assert.assertEquals(3, localCounts.get(3).intValue());
0168   }
0169 
0170   @Test
0171   public void map() {
0172     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
0173     JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
0174     doubles.collect();
0175     JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x))
0176       .cache();
0177     pairs.collect();
0178     JavaRDD<String> strings = rdd.map(Object::toString).cache();
0179     strings.collect();
0180   }
0181 
0182   @Test
0183   public void flatMap() {
0184     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
0185       "The quick brown fox jumps over the lazy dog."));
0186     JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
0187 
0188     Assert.assertEquals("Hello", words.first());
0189     Assert.assertEquals(11, words.count());
0190 
0191     JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
0192       List<Tuple2<String, String>> pairs2 = new LinkedList<>();
0193       for (String word : s.split(" ")) {
0194         pairs2.add(new Tuple2<>(word, word));
0195       }
0196       return pairs2.iterator();
0197     });
0198 
0199     Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first());
0200     Assert.assertEquals(11, pairs.count());
0201 
0202     JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
0203       List<Double> lengths = new LinkedList<>();
0204       for (String word : s.split(" ")) {
0205         lengths.add((double) word.length());
0206       }
0207       return lengths.iterator();
0208     });
0209 
0210     Assert.assertEquals(5.0, doubles.first(), 0.01);
0211     Assert.assertEquals(11, pairs.count());
0212   }
0213 
0214   @Test
0215   public void mapsFromPairsToPairs() {
0216     List<Tuple2<Integer, String>> pairs = Arrays.asList(
0217       new Tuple2<>(1, "a"),
0218       new Tuple2<>(2, "aa"),
0219       new Tuple2<>(3, "aaa")
0220     );
0221     JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
0222 
0223     // Regression test for SPARK-668:
0224     JavaPairRDD<String, Integer> swapped =
0225       pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()).iterator());
0226     swapped.collect();
0227 
0228     // There was never a bug here, but it's worth testing:
0229     pairRDD.map(Tuple2::swap).collect();
0230   }
0231 
0232   @Test
0233   public void mapPartitions() {
0234     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
0235     JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
0236       int sum = 0;
0237       while (iter.hasNext()) {
0238         sum += iter.next();
0239       }
0240       return Collections.singletonList(sum).iterator();
0241     });
0242 
0243     Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
0244   }
0245 
0246   @Test
0247   public void sequenceFile() {
0248     File tempDir = Files.createTempDir();
0249     tempDir.deleteOnExit();
0250     String outputDir = new File(tempDir, "output").getAbsolutePath();
0251     List<Tuple2<Integer, String>> pairs = Arrays.asList(
0252       new Tuple2<>(1, "a"),
0253       new Tuple2<>(2, "aa"),
0254       new Tuple2<>(3, "aaa")
0255     );
0256     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
0257 
0258     rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
0259       .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
0260 
0261     // Try reading the output back as an object file
0262     JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
0263       .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
0264     Assert.assertEquals(pairs, readRDD.collect());
0265     Utils.deleteRecursively(tempDir);
0266   }
0267 
0268   @Test
0269   public void zip() {
0270     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
0271     JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x);
0272     JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
0273     zipped.count();
0274   }
0275 
0276   @Test
0277   public void zipPartitions() {
0278     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
0279     JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
0280     FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
0281       (Iterator<Integer> i, Iterator<String> s) -> {
0282         int sizeI = 0;
0283         while (i.hasNext()) {
0284           sizeI += 1;
0285           i.next();
0286         }
0287         int sizeS = 0;
0288         while (s.hasNext()) {
0289           sizeS += 1;
0290           s.next();
0291         }
0292         return Arrays.asList(sizeI, sizeS).iterator();
0293       };
0294     JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
0295     Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
0296   }
0297 
0298   @Test
0299   public void keyBy() {
0300     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
0301     List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();
0302     Assert.assertEquals(new Tuple2<>("1", 1), s.get(0));
0303     Assert.assertEquals(new Tuple2<>("2", 2), s.get(1));
0304   }
0305 
0306   @Test
0307   public void mapOnPairRDD() {
0308     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0309     JavaPairRDD<Integer, Integer> rdd2 =
0310       rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
0311     JavaPairRDD<Integer, Integer> rdd3 =
0312       rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
0313     Assert.assertEquals(Arrays.asList(
0314       new Tuple2<>(1, 1),
0315       new Tuple2<>(0, 2),
0316       new Tuple2<>(1, 3),
0317       new Tuple2<>(0, 4)), rdd3.collect());
0318   }
0319 
0320   @Test
0321   public void collectPartitions() {
0322     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
0323 
0324     JavaPairRDD<Integer, Integer> rdd2 =
0325       rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
0326     List<Integer>[] parts = rdd1.collectPartitions(new int[]{0});
0327     Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
0328 
0329     parts = rdd1.collectPartitions(new int[]{1, 2});
0330     Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
0331     Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
0332 
0333     Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
0334       rdd2.collectPartitions(new int[]{0})[0]);
0335 
0336     List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2});
0337     Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
0338     Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)),
0339       parts2[1]);
0340   }
0341 
0342   @Test
0343   public void collectAsMapWithIntArrayValues() {
0344     // Regression test for SPARK-1040
0345     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
0346     JavaPairRDD<Integer, int[]> pairRDD =
0347       rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
0348     pairRDD.collect();  // Works fine
0349     pairRDD.collectAsMap();  // Used to crash with ClassCastException
0350   }
0351 }