Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 from __future__ import print_function
0019 
0020 import sys
0021 from random import Random
0022 
0023 from pyspark.sql import SparkSession
0024 
0025 numEdges = 200
0026 numVertices = 100
0027 rand = Random(42)
0028 
0029 
0030 def generateGraph():
0031     edges = set()
0032     while len(edges) < numEdges:
0033         src = rand.randrange(0, numVertices)
0034         dst = rand.randrange(0, numVertices)
0035         if src != dst:
0036             edges.add((src, dst))
0037     return edges
0038 
0039 
0040 if __name__ == "__main__":
0041     """
0042     Usage: transitive_closure [partitions]
0043     """
0044     spark = SparkSession\
0045         .builder\
0046         .appName("PythonTransitiveClosure")\
0047         .getOrCreate()
0048 
0049     partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
0050     tc = spark.sparkContext.parallelize(generateGraph(), partitions).cache()
0051 
0052     # Linear transitive closure: each round grows paths by one edge,
0053     # by joining the graph's edges with the already-discovered paths.
0054     # e.g. join the path (y, z) from the TC with the edge (x, y) from
0055     # the graph to obtain the path (x, z).
0056 
0057     # Because join() joins on keys, the edges are stored in reversed order.
0058     edges = tc.map(lambda x_y: (x_y[1], x_y[0]))
0059 
0060     oldCount = 0
0061     nextCount = tc.count()
0062     while True:
0063         oldCount = nextCount
0064         # Perform the join, obtaining an RDD of (y, (z, x)) pairs,
0065         # then project the result to obtain the new (x, z) paths.
0066         new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0]))
0067         tc = tc.union(new_edges).distinct().cache()
0068         nextCount = tc.count()
0069         if nextCount == oldCount:
0070             break
0071 
0072     print("TC has %i edges" % tc.count())
0073 
0074     spark.stop()