0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.examples;
0019
0020 import java.util.ArrayList;
0021 import java.util.List;
0022 import java.util.regex.Pattern;
0023
0024 import scala.Tuple2;
0025
0026 import com.google.common.collect.Iterables;
0027
0028 import org.apache.spark.api.java.JavaPairRDD;
0029 import org.apache.spark.api.java.JavaRDD;
0030 import org.apache.spark.api.java.function.Function2;
0031 import org.apache.spark.sql.SparkSession;
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050 public final class JavaPageRank {
0051 private static final Pattern SPACES = Pattern.compile("\\s+");
0052
0053 static void showWarning() {
0054 String warning = "WARN: This is a naive implementation of PageRank " +
0055 "and is given as an example! \n" +
0056 "Please use the PageRank implementation found in " +
0057 "org.apache.spark.graphx.lib.PageRank for more conventional use.";
0058 System.err.println(warning);
0059 }
0060
0061 private static class Sum implements Function2<Double, Double, Double> {
0062 @Override
0063 public Double call(Double a, Double b) {
0064 return a + b;
0065 }
0066 }
0067
0068 public static void main(String[] args) throws Exception {
0069 if (args.length < 2) {
0070 System.err.println("Usage: JavaPageRank <file> <number_of_iterations>");
0071 System.exit(1);
0072 }
0073
0074 showWarning();
0075
0076 SparkSession spark = SparkSession
0077 .builder()
0078 .appName("JavaPageRank")
0079 .getOrCreate();
0080
0081
0082
0083
0084
0085
0086 JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
0087
0088
0089 JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(s -> {
0090 String[] parts = SPACES.split(s);
0091 return new Tuple2<>(parts[0], parts[1]);
0092 }).distinct().groupByKey().cache();
0093
0094
0095 JavaPairRDD<String, Double> ranks = links.mapValues(rs -> 1.0);
0096
0097
0098 for (int current = 0; current < Integer.parseInt(args[1]); current++) {
0099
0100 JavaPairRDD<String, Double> contribs = links.join(ranks).values()
0101 .flatMapToPair(s -> {
0102 int urlCount = Iterables.size(s._1());
0103 List<Tuple2<String, Double>> results = new ArrayList<>();
0104 for (String n : s._1) {
0105 results.add(new Tuple2<>(n, s._2() / urlCount));
0106 }
0107 return results.iterator();
0108 });
0109
0110
0111 ranks = contribs.reduceByKey(new Sum()).mapValues(sum -> 0.15 + sum * 0.85);
0112 }
0113
0114
0115 List<Tuple2<String, Double>> output = ranks.collect();
0116 for (Tuple2<?,?> tuple : output) {
0117 System.out.println(tuple._1() + " has rank: " + tuple._2() + ".");
0118 }
0119
0120 spark.stop();
0121 }
0122 }