Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.examples;
0019 
0020 import java.util.ArrayList;
0021 import java.util.HashSet;
0022 import java.util.List;
0023 import java.util.Random;
0024 import java.util.Set;
0025 
0026 import scala.Tuple2;
0027 
0028 import org.apache.spark.api.java.JavaPairRDD;
0029 import org.apache.spark.api.java.JavaSparkContext;
0030 import org.apache.spark.api.java.function.PairFunction;
0031 import org.apache.spark.sql.SparkSession;
0032 
0033 /**
0034  * Transitive closure on a graph, implemented in Java.
0035  * Usage: JavaTC [partitions]
0036  */
0037 public final class JavaTC {
0038 
0039   private static final int numEdges = 200;
0040   private static final int numVertices = 100;
0041   private static final Random rand = new Random(42);
0042 
0043   static List<Tuple2<Integer, Integer>> generateGraph() {
0044     Set<Tuple2<Integer, Integer>> edges = new HashSet<>(numEdges);
0045     while (edges.size() < numEdges) {
0046       int from = rand.nextInt(numVertices);
0047       int to = rand.nextInt(numVertices);
0048       Tuple2<Integer, Integer> e = new Tuple2<>(from, to);
0049       if (from != to) {
0050         edges.add(e);
0051       }
0052     }
0053     return new ArrayList<>(edges);
0054   }
0055 
0056   static class ProjectFn implements PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
0057       Integer, Integer> {
0058     static final ProjectFn INSTANCE = new ProjectFn();
0059 
0060     @Override
0061     public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> triple) {
0062       return new Tuple2<>(triple._2()._2(), triple._2()._1());
0063     }
0064   }
0065 
0066   public static void main(String[] args) {
0067     SparkSession spark = SparkSession
0068       .builder()
0069       .appName("JavaTC")
0070       .getOrCreate();
0071 
0072     JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
0073 
0074     int slices = (args.length > 0) ? Integer.parseInt(args[0]): 2;
0075     JavaPairRDD<Integer, Integer> tc = jsc.parallelizePairs(generateGraph(), slices).cache();
0076 
0077     // Linear transitive closure: each round grows paths by one edge,
0078     // by joining the graph's edges with the already-discovered paths.
0079     // e.g. join the path (y, z) from the TC with the edge (x, y) from
0080     // the graph to obtain the path (x, z).
0081 
0082     // Because join() joins on keys, the edges are stored in reversed order.
0083     JavaPairRDD<Integer, Integer> edges = tc.mapToPair(e -> new Tuple2<>(e._2(), e._1()));
0084 
0085     long oldCount;
0086     long nextCount = tc.count();
0087     do {
0088       oldCount = nextCount;
0089       // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
0090       // then project the result to obtain the new (x, z) paths.
0091       tc = tc.union(tc.join(edges).mapToPair(ProjectFn.INSTANCE)).distinct().cache();
0092       nextCount = tc.count();
0093     } while (nextCount != oldCount);
0094 
0095     System.out.println("TC has " + tc.count() + " edges.");
0096     spark.stop();
0097   }
0098 }