0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.examples;
0019
0020 import java.util.ArrayList;
0021 import java.util.HashSet;
0022 import java.util.List;
0023 import java.util.Random;
0024 import java.util.Set;
0025
0026 import scala.Tuple2;
0027
0028 import org.apache.spark.api.java.JavaPairRDD;
0029 import org.apache.spark.api.java.JavaSparkContext;
0030 import org.apache.spark.api.java.function.PairFunction;
0031 import org.apache.spark.sql.SparkSession;
0032
0033
0034
0035
0036
0037 public final class JavaTC {
0038
0039 private static final int numEdges = 200;
0040 private static final int numVertices = 100;
0041 private static final Random rand = new Random(42);
0042
0043 static List<Tuple2<Integer, Integer>> generateGraph() {
0044 Set<Tuple2<Integer, Integer>> edges = new HashSet<>(numEdges);
0045 while (edges.size() < numEdges) {
0046 int from = rand.nextInt(numVertices);
0047 int to = rand.nextInt(numVertices);
0048 Tuple2<Integer, Integer> e = new Tuple2<>(from, to);
0049 if (from != to) {
0050 edges.add(e);
0051 }
0052 }
0053 return new ArrayList<>(edges);
0054 }
0055
0056 static class ProjectFn implements PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
0057 Integer, Integer> {
0058 static final ProjectFn INSTANCE = new ProjectFn();
0059
0060 @Override
0061 public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> triple) {
0062 return new Tuple2<>(triple._2()._2(), triple._2()._1());
0063 }
0064 }
0065
0066 public static void main(String[] args) {
0067 SparkSession spark = SparkSession
0068 .builder()
0069 .appName("JavaTC")
0070 .getOrCreate();
0071
0072 JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
0073
0074 int slices = (args.length > 0) ? Integer.parseInt(args[0]): 2;
0075 JavaPairRDD<Integer, Integer> tc = jsc.parallelizePairs(generateGraph(), slices).cache();
0076
0077
0078
0079
0080
0081
0082
0083 JavaPairRDD<Integer, Integer> edges = tc.mapToPair(e -> new Tuple2<>(e._2(), e._1()));
0084
0085 long oldCount;
0086 long nextCount = tc.count();
0087 do {
0088 oldCount = nextCount;
0089
0090
0091 tc = tc.union(tc.join(edges).mapToPair(ProjectFn.INSTANCE)).distinct().cache();
0092 nextCount = tc.count();
0093 } while (nextCount != oldCount);
0094
0095 System.out.println("TC has " + tc.count() + " edges.");
0096 spark.stop();
0097 }
0098 }