Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019 This is an example implementation of PageRank. For more conventional use,
0020 Please refer to PageRank implementation provided by graphx
0021 
0022 Example Usage:
0023 bin/spark-submit examples/src/main/python/pagerank.py data/mllib/pagerank_data.txt 10
0024 """
0025 from __future__ import print_function
0026 
0027 import re
0028 import sys
0029 from operator import add
0030 
0031 from pyspark.sql import SparkSession
0032 
0033 
0034 def computeContribs(urls, rank):
0035     """Calculates URL contributions to the rank of other URLs."""
0036     num_urls = len(urls)
0037     for url in urls:
0038         yield (url, rank / num_urls)
0039 
0040 
0041 def parseNeighbors(urls):
0042     """Parses a urls pair string into urls pair."""
0043     parts = re.split(r'\s+', urls)
0044     return parts[0], parts[1]
0045 
0046 
0047 if __name__ == "__main__":
0048     if len(sys.argv) != 3:
0049         print("Usage: pagerank <file> <iterations>", file=sys.stderr)
0050         sys.exit(-1)
0051 
0052     print("WARN: This is a naive implementation of PageRank and is given as an example!\n" +
0053           "Please refer to PageRank implementation provided by graphx",
0054           file=sys.stderr)
0055 
0056     # Initialize the spark context.
0057     spark = SparkSession\
0058         .builder\
0059         .appName("PythonPageRank")\
0060         .getOrCreate()
0061 
0062     # Loads in input file. It should be in format of:
0063     #     URL         neighbor URL
0064     #     URL         neighbor URL
0065     #     URL         neighbor URL
0066     #     ...
0067     lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
0068 
0069     # Loads all URLs from input file and initialize their neighbors.
0070     links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
0071 
0072     # Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
0073     ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))
0074 
0075     # Calculates and updates URL ranks continuously using PageRank algorithm.
0076     for iteration in range(int(sys.argv[2])):
0077         # Calculates URL contributions to the rank of other URLs.
0078         contribs = links.join(ranks).flatMap(
0079             lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
0080 
0081         # Re-calculates URL ranks based on neighbor contributions.
0082         ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
0083 
0084     # Collects all URL ranks and dump them to console.
0085     for (link, rank) in ranks.collect():
0086         print("%s has rank: %s." % (link, rank))
0087 
0088     spark.stop()