0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package test.org.apache.spark;
0019
0020 import java.io.File;
0021 import java.io.Serializable;
0022 import java.util.*;
0023
0024 import scala.Tuple2;
0025
0026 import com.google.common.collect.Iterables;
0027 import com.google.common.io.Files;
0028 import org.apache.hadoop.io.IntWritable;
0029 import org.apache.hadoop.io.Text;
0030 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
0031 import org.junit.After;
0032 import org.junit.Assert;
0033 import org.junit.Before;
0034 import org.junit.Test;
0035
0036 import org.apache.spark.api.java.JavaDoubleRDD;
0037 import org.apache.spark.api.java.JavaPairRDD;
0038 import org.apache.spark.api.java.JavaRDD;
0039 import org.apache.spark.api.java.JavaSparkContext;
0040 import org.apache.spark.api.java.Optional;
0041 import org.apache.spark.api.java.function.*;
0042 import org.apache.spark.util.Utils;
0043
0044
0045
0046
0047
0048 public class Java8RDDAPISuite implements Serializable {
0049 private static int foreachCalls = 0;
0050 private transient JavaSparkContext sc;
0051
0052 @Before
0053 public void setUp() {
0054 sc = new JavaSparkContext("local", "JavaAPISuite");
0055 }
0056
0057 @After
0058 public void tearDown() {
0059 sc.stop();
0060 sc = null;
0061 }
0062
0063 @Test
0064 public void foreachWithAnonymousClass() {
0065 foreachCalls = 0;
0066 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
0067 rdd.foreach(s -> foreachCalls++);
0068 Assert.assertEquals(2, foreachCalls);
0069 }
0070
0071 @Test
0072 public void foreach() {
0073 foreachCalls = 0;
0074 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
0075 rdd.foreach(x -> foreachCalls++);
0076 Assert.assertEquals(2, foreachCalls);
0077 }
0078
0079 @Test
0080 public void groupBy() {
0081 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0082 Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
0083 JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
0084 Assert.assertEquals(2, oddsAndEvens.count());
0085 Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));
0086 Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0)));
0087
0088 oddsAndEvens = rdd.groupBy(isOdd, 1);
0089 Assert.assertEquals(2, oddsAndEvens.count());
0090 Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));
0091 Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0)));
0092 }
0093
0094 @Test
0095 public void leftOuterJoin() {
0096 JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
0097 new Tuple2<>(1, 1),
0098 new Tuple2<>(1, 2),
0099 new Tuple2<>(2, 1),
0100 new Tuple2<>(3, 1)
0101 ));
0102 JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
0103 new Tuple2<>(1, 'x'),
0104 new Tuple2<>(2, 'y'),
0105 new Tuple2<>(2, 'z'),
0106 new Tuple2<>(4, 'w')
0107 ));
0108 List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
0109 rdd1.leftOuterJoin(rdd2).collect();
0110 Assert.assertEquals(5, joined.size());
0111 Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
0112 rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
0113 Assert.assertEquals(3, firstUnmatched._1().intValue());
0114 }
0115
0116 @Test
0117 public void foldReduce() {
0118 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
0119 Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
0120
0121 int sum = rdd.fold(0, add);
0122 Assert.assertEquals(33, sum);
0123
0124 sum = rdd.reduce(add);
0125 Assert.assertEquals(33, sum);
0126 }
0127
0128 @Test
0129 public void foldByKey() {
0130 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
0131 new Tuple2<>(2, 1),
0132 new Tuple2<>(2, 1),
0133 new Tuple2<>(1, 1),
0134 new Tuple2<>(3, 2),
0135 new Tuple2<>(3, 1)
0136 );
0137 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
0138 JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
0139 Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
0140 Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
0141 Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
0142 }
0143
0144 @Test
0145 public void reduceByKey() {
0146 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
0147 new Tuple2<>(2, 1),
0148 new Tuple2<>(2, 1),
0149 new Tuple2<>(1, 1),
0150 new Tuple2<>(3, 2),
0151 new Tuple2<>(3, 1)
0152 );
0153 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
0154 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
0155 Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
0156 Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
0157 Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
0158
0159 Map<Integer, Integer> localCounts = counts.collectAsMap();
0160 Assert.assertEquals(1, localCounts.get(1).intValue());
0161 Assert.assertEquals(2, localCounts.get(2).intValue());
0162 Assert.assertEquals(3, localCounts.get(3).intValue());
0163
0164 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
0165 Assert.assertEquals(1, localCounts.get(1).intValue());
0166 Assert.assertEquals(2, localCounts.get(2).intValue());
0167 Assert.assertEquals(3, localCounts.get(3).intValue());
0168 }
0169
0170 @Test
0171 public void map() {
0172 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
0173 JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
0174 doubles.collect();
0175 JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x))
0176 .cache();
0177 pairs.collect();
0178 JavaRDD<String> strings = rdd.map(Object::toString).cache();
0179 strings.collect();
0180 }
0181
0182 @Test
0183 public void flatMap() {
0184 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
0185 "The quick brown fox jumps over the lazy dog."));
0186 JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
0187
0188 Assert.assertEquals("Hello", words.first());
0189 Assert.assertEquals(11, words.count());
0190
0191 JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
0192 List<Tuple2<String, String>> pairs2 = new LinkedList<>();
0193 for (String word : s.split(" ")) {
0194 pairs2.add(new Tuple2<>(word, word));
0195 }
0196 return pairs2.iterator();
0197 });
0198
0199 Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first());
0200 Assert.assertEquals(11, pairs.count());
0201
0202 JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
0203 List<Double> lengths = new LinkedList<>();
0204 for (String word : s.split(" ")) {
0205 lengths.add((double) word.length());
0206 }
0207 return lengths.iterator();
0208 });
0209
0210 Assert.assertEquals(5.0, doubles.first(), 0.01);
0211 Assert.assertEquals(11, pairs.count());
0212 }
0213
0214 @Test
0215 public void mapsFromPairsToPairs() {
0216 List<Tuple2<Integer, String>> pairs = Arrays.asList(
0217 new Tuple2<>(1, "a"),
0218 new Tuple2<>(2, "aa"),
0219 new Tuple2<>(3, "aaa")
0220 );
0221 JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
0222
0223
0224 JavaPairRDD<String, Integer> swapped =
0225 pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()).iterator());
0226 swapped.collect();
0227
0228
0229 pairRDD.map(Tuple2::swap).collect();
0230 }
0231
0232 @Test
0233 public void mapPartitions() {
0234 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
0235 JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
0236 int sum = 0;
0237 while (iter.hasNext()) {
0238 sum += iter.next();
0239 }
0240 return Collections.singletonList(sum).iterator();
0241 });
0242
0243 Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
0244 }
0245
0246 @Test
0247 public void sequenceFile() {
0248 File tempDir = Files.createTempDir();
0249 tempDir.deleteOnExit();
0250 String outputDir = new File(tempDir, "output").getAbsolutePath();
0251 List<Tuple2<Integer, String>> pairs = Arrays.asList(
0252 new Tuple2<>(1, "a"),
0253 new Tuple2<>(2, "aa"),
0254 new Tuple2<>(3, "aaa")
0255 );
0256 JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
0257
0258 rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
0259 .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
0260
0261
0262 JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
0263 .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
0264 Assert.assertEquals(pairs, readRDD.collect());
0265 Utils.deleteRecursively(tempDir);
0266 }
0267
0268 @Test
0269 public void zip() {
0270 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
0271 JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x);
0272 JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
0273 zipped.count();
0274 }
0275
0276 @Test
0277 public void zipPartitions() {
0278 JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
0279 JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
0280 FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
0281 (Iterator<Integer> i, Iterator<String> s) -> {
0282 int sizeI = 0;
0283 while (i.hasNext()) {
0284 sizeI += 1;
0285 i.next();
0286 }
0287 int sizeS = 0;
0288 while (s.hasNext()) {
0289 sizeS += 1;
0290 s.next();
0291 }
0292 return Arrays.asList(sizeI, sizeS).iterator();
0293 };
0294 JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
0295 Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
0296 }
0297
0298 @Test
0299 public void keyBy() {
0300 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
0301 List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();
0302 Assert.assertEquals(new Tuple2<>("1", 1), s.get(0));
0303 Assert.assertEquals(new Tuple2<>("2", 2), s.get(1));
0304 }
0305
0306 @Test
0307 public void mapOnPairRDD() {
0308 JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
0309 JavaPairRDD<Integer, Integer> rdd2 =
0310 rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
0311 JavaPairRDD<Integer, Integer> rdd3 =
0312 rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
0313 Assert.assertEquals(Arrays.asList(
0314 new Tuple2<>(1, 1),
0315 new Tuple2<>(0, 2),
0316 new Tuple2<>(1, 3),
0317 new Tuple2<>(0, 4)), rdd3.collect());
0318 }
0319
0320 @Test
0321 public void collectPartitions() {
0322 JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
0323
0324 JavaPairRDD<Integer, Integer> rdd2 =
0325 rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
0326 List<Integer>[] parts = rdd1.collectPartitions(new int[]{0});
0327 Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
0328
0329 parts = rdd1.collectPartitions(new int[]{1, 2});
0330 Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
0331 Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
0332
0333 Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
0334 rdd2.collectPartitions(new int[]{0})[0]);
0335
0336 List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2});
0337 Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
0338 Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)),
0339 parts2[1]);
0340 }
0341
0342 @Test
0343 public void collectAsMapWithIntArrayValues() {
0344
0345 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
0346 JavaPairRDD<Integer, int[]> pairRDD =
0347 rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
0348 pairRDD.collect();
0349 pairRDD.collectAsMap();
0350 }
0351 }