|
||||
0001 # 0002 # Licensed to the Apache Software Foundation (ASF) under one or more 0003 # contributor license agreements. See the NOTICE file distributed with 0004 # this work for additional information regarding copyright ownership. 0005 # The ASF licenses this file to You under the Apache License, Version 2.0 0006 # (the "License"); you may not use this file except in compliance with 0007 # the License. You may obtain a copy of the License at 0008 # 0009 # http://www.apache.org/licenses/LICENSE-2.0 0010 # 0011 # Unless required by applicable law or agreed to in writing, software 0012 # distributed under the License is distributed on an "AS IS" BASIS, 0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 0014 # See the License for the specific language governing permissions and 0015 # limitations under the License. 0016 # 0017 0018 from __future__ import print_function 0019 0020 import sys 0021 from random import Random 0022 0023 from pyspark.sql import SparkSession 0024 0025 numEdges = 200 0026 numVertices = 100 0027 rand = Random(42) 0028 0029 0030 def generateGraph(): 0031 edges = set() 0032 while len(edges) < numEdges: 0033 src = rand.randrange(0, numVertices) 0034 dst = rand.randrange(0, numVertices) 0035 if src != dst: 0036 edges.add((src, dst)) 0037 return edges 0038 0039 0040 if __name__ == "__main__": 0041 """ 0042 Usage: transitive_closure [partitions] 0043 """ 0044 spark = SparkSession\ 0045 .builder\ 0046 .appName("PythonTransitiveClosure")\ 0047 .getOrCreate() 0048 0049 partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 0050 tc = spark.sparkContext.parallelize(generateGraph(), partitions).cache() 0051 0052 # Linear transitive closure: each round grows paths by one edge, 0053 # by joining the graph's edges with the already-discovered paths. 0054 # e.g. join the path (y, z) from the TC with the edge (x, y) from 0055 # the graph to obtain the path (x, z). 0056 0057 # Because join() joins on keys, the edges are stored in reversed order. 0058 edges = tc.map(lambda x_y: (x_y[1], x_y[0])) 0059 0060 oldCount = 0 0061 nextCount = tc.count() 0062 while True: 0063 oldCount = nextCount 0064 # Perform the join, obtaining an RDD of (y, (z, x)) pairs, 0065 # then project the result to obtain the new (x, z) paths. 0066 new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0])) 0067 tc = tc.union(new_edges).distinct().cache() 0068 nextCount = tc.count() 0069 if nextCount == oldCount: 0070 break 0071 0072 print("TC has %i edges" % tc.count()) 0073 0074 spark.stop()
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.1.0 LXR engine. The LXR team |