0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 This is an example implementation of PageRank. For more conventional use,
0020 Please refer to PageRank implementation provided by graphx
0021
0022 Example Usage:
0023 bin/spark-submit examples/src/main/python/pagerank.py data/mllib/pagerank_data.txt 10
0024 """
0025 from __future__ import print_function
0026
0027 import re
0028 import sys
0029 from operator import add
0030
0031 from pyspark.sql import SparkSession
0032
0033
0034 def computeContribs(urls, rank):
0035 """Calculates URL contributions to the rank of other URLs."""
0036 num_urls = len(urls)
0037 for url in urls:
0038 yield (url, rank / num_urls)
0039
0040
0041 def parseNeighbors(urls):
0042 """Parses a urls pair string into urls pair."""
0043 parts = re.split(r'\s+', urls)
0044 return parts[0], parts[1]
0045
0046
0047 if __name__ == "__main__":
0048 if len(sys.argv) != 3:
0049 print("Usage: pagerank <file> <iterations>", file=sys.stderr)
0050 sys.exit(-1)
0051
0052 print("WARN: This is a naive implementation of PageRank and is given as an example!\n" +
0053 "Please refer to PageRank implementation provided by graphx",
0054 file=sys.stderr)
0055
0056
0057 spark = SparkSession\
0058 .builder\
0059 .appName("PythonPageRank")\
0060 .getOrCreate()
0061
0062
0063
0064
0065
0066
0067 lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
0068
0069
0070 links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
0071
0072
0073 ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))
0074
0075
0076 for iteration in range(int(sys.argv[2])):
0077
0078 contribs = links.join(ranks).flatMap(
0079 lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
0080
0081
0082 ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
0083
0084
0085 for (link, rank) in ranks.collect():
0086 print("%s has rank: %s." % (link, rank))
0087
0088 spark.stop()