Back to home page

OSCL-LXR

 
 

    


0001 ---
0002 layout: global
0003 displayTitle: GraphX Programming Guide
0004 title: GraphX
0005 description: GraphX graph processing library guide for Spark SPARK_VERSION_SHORT
0006 license: |
0007   Licensed to the Apache Software Foundation (ASF) under one or more
0008   contributor license agreements.  See the NOTICE file distributed with
0009   this work for additional information regarding copyright ownership.
0010   The ASF licenses this file to You under the Apache License, Version 2.0
0011   (the "License"); you may not use this file except in compliance with
0012   the License.  You may obtain a copy of the License at
0013  
0014      http://www.apache.org/licenses/LICENSE-2.0
0015  
0016   Unless required by applicable law or agreed to in writing, software
0017   distributed under the License is distributed on an "AS IS" BASIS,
0018   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0019   See the License for the specific language governing permissions and
0020   limitations under the License.
0021 ---
0022 
0023 * This will become a table of contents (this text will be scraped).
0024 {:toc}
0025 
0026 <!-- All the documentation links  -->
0027 
0028 [EdgeRDD]: api/scala/org/apache/spark/graphx/EdgeRDD.html
0029 [VertexRDD]: api/scala/org/apache/spark/graphx/VertexRDD.html
0030 [Edge]: api/scala/org/apache/spark/graphx/Edge.html
0031 [EdgeTriplet]: api/scala/org/apache/spark/graphx/EdgeTriplet.html
0032 [Graph]: api/scala/org/apache/spark/graphx/Graph$.html
0033 [GraphOps]: api/scala/org/apache/spark/graphx/GraphOps.html
0034 [Graph.mapVertices]: api/scala/org/apache/spark/graphx/Graph.html#mapVertices[VD2]((VertexId,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]
0035 [Graph.reverse]: api/scala/org/apache/spark/graphx/Graph.html#reverse:Graph[VD,ED]
0036 [Graph.subgraph]: api/scala/org/apache/spark/graphx/Graph.html#subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED]
0037 [Graph.mask]: api/scala/org/apache/spark/graphx/Graph.html#mask[VD2,ED2](Graph[VD2,ED2])(ClassTag[VD2],ClassTag[ED2]):Graph[VD,ED]
0038 [Graph.groupEdges]: api/scala/org/apache/spark/graphx/Graph.html#groupEdges((ED,ED)⇒ED):Graph[VD,ED]
0039 [GraphOps.joinVertices]: api/scala/org/apache/spark/graphx/GraphOps.html#joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED]
0040 [Graph.outerJoinVertices]: api/scala/org/apache/spark/graphx/Graph.html#outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]
0041 [Graph.aggregateMessages]: api/scala/org/apache/spark/graphx/Graph.html#aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]
0042 [EdgeContext]: api/scala/org/apache/spark/graphx/EdgeContext.html
0043 [GraphOps.collectNeighborIds]: api/scala/org/apache/spark/graphx/GraphOps.html#collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexId]]
0044 [GraphOps.collectNeighbors]: api/scala/org/apache/spark/graphx/GraphOps.html#collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]]
0045 [RDD Persistence]: rdd-programming-guide.html#rdd-persistence
0046 [Graph.cache]: api/scala/org/apache/spark/graphx/Graph.html#cache():Graph[VD,ED]
0047 [GraphOps.pregel]: api/scala/org/apache/spark/graphx/GraphOps.html#pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
0048 [PartitionStrategy]: api/scala/org/apache/spark/graphx/PartitionStrategy$.html
0049 [GraphLoader.edgeListFile]: api/scala/org/apache/spark/graphx/GraphLoader$.html#edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]
0050 [Graph.apply]: api/scala/org/apache/spark/graphx/Graph$.html#apply[VD,ED](RDD[(VertexId,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
0051 [Graph.fromEdgeTuples]: api/scala/org/apache/spark/graphx/Graph$.html#fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]
0052 [Graph.fromEdges]: api/scala/org/apache/spark/graphx/Graph$.html#fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
0053 [PartitionStrategy]: api/scala/org/apache/spark/graphx/PartitionStrategy$.html
0054 [PageRank]: api/scala/org/apache/spark/graphx/lib/PageRank$.html
0055 [ConnectedComponents]: api/scala/org/apache/spark/graphx/lib/ConnectedComponents$.html
0056 [TriangleCount]: api/scala/org/apache/spark/graphx/lib/TriangleCount$.html
0057 [Graph.partitionBy]: api/scala/org/apache/spark/graphx/Graph.html#partitionBy(PartitionStrategy):Graph[VD,ED]
0058 [EdgeContext.sendToSrc]: api/scala/org/apache/spark/graphx/EdgeContext.html#sendToSrc(msg:A):Unit
0059 [EdgeContext.sendToDst]: api/scala/org/apache/spark/graphx/EdgeContext.html#sendToDst(msg:A):Unit
0060 [TripletFields]: api/java/org/apache/spark/graphx/TripletFields.html
0061 [TripletFields.All]: api/java/org/apache/spark/graphx/TripletFields.html#All
0062 [TripletFields.None]: api/java/org/apache/spark/graphx/TripletFields.html#None
0063 [TripletFields.Src]: api/java/org/apache/spark/graphx/TripletFields.html#Src
0064 [TripletFields.Dst]: api/java/org/apache/spark/graphx/TripletFields.html#Dst
0065 
0066 <p style="text-align: center;">
0067   <img src="img/graphx_logo.png"
0068        title="GraphX Logo"
0069        alt="GraphX"
0070        width="60%" />
0071   <!-- Images are downsized intentionally to improve quality on retina displays -->
0072 </p>
0073 
0074 # Overview
0075 
0076 GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level,
0077 GraphX extends the Spark [RDD](api/scala/org/apache/spark/rdd/RDD.html) by introducing a
0078 new [Graph](#property_graph) abstraction: a directed multigraph with properties
0079 attached to each vertex and edge.  To support graph computation, GraphX exposes a set of fundamental
0080 operators (e.g., [subgraph](#structural_operators), [joinVertices](#join_operators), and
0081 [aggregateMessages](#aggregateMessages)) as well as an optimized variant of the [Pregel](#pregel) API. In addition, GraphX includes a growing collection of graph [algorithms](#graph_algorithms) and
0082 [builders](#graph_builders) to simplify graph analytics tasks.
0083 
0084 # Getting Started
0085 
0086 To get started you first need to import Spark and GraphX into your project, as follows:
0087 
0088 {% highlight scala %}
0089 import org.apache.spark._
0090 import org.apache.spark.graphx._
0091 // To make some of the examples work we will also need RDD
0092 import org.apache.spark.rdd.RDD
0093 {% endhighlight %}
0094 
0095 If you are not using the Spark shell you will also need a `SparkContext`.  To learn more about
0096 getting started with Spark refer to the [Spark Quick Start Guide](quick-start.html).
0097 
0098 <a name="property_graph"></a>
0099 
0100 # The Property Graph
0101 
0102 The [property graph](api/scala/org/apache/spark/graphx/Graph.html) is a directed multigraph
0103 with user defined objects attached to each vertex and edge.  A directed multigraph is a directed
0104 graph with potentially multiple parallel edges sharing the same source and destination vertex.  The
0105 ability to support parallel edges simplifies modeling scenarios where there can be multiple
0106 relationships (e.g., co-worker and friend) between the same vertices.  Each vertex is keyed by a
0107 *unique* 64-bit long identifier (`VertexId`).  GraphX does not impose any ordering constraints on
0108 the vertex identifiers.  Similarly, edges have corresponding source and destination vertex
0109 identifiers.
0110 
0111 The property graph is parameterized over the vertex (`VD`) and edge (`ED`) types.  These
0112 are the types of the objects associated with each vertex and edge respectively.
0113 
0114 > GraphX optimizes the representation of vertex and edge types when they are primitive data types
0115 > (e.g., int, double, etc...) reducing the in memory footprint by storing them in specialized
0116 > arrays.
0117 
0118 In some cases it may be desirable to have vertices with different property types in the same graph.
0119 This can be accomplished through inheritance.  For example to model users and products as a
0120 bipartite graph we might do the following:
0121 
0122 {% highlight scala %}
0123 class VertexProperty()
0124 case class UserProperty(val name: String) extends VertexProperty
0125 case class ProductProperty(val name: String, val price: Double) extends VertexProperty
0126 // The graph might then have the type:
0127 var graph: Graph[VertexProperty, String] = null
0128 {% endhighlight %}
0129 
0130 Like RDDs, property graphs are immutable, distributed, and fault-tolerant.  Changes to the values or
0131 structure of the graph are accomplished by producing a new graph with the desired changes.  Note
0132 that substantial parts of the original graph (i.e., unaffected structure, attributes, and indices)
0133 are reused in the new graph reducing the cost of this inherently functional data structure.  The
0134 graph is partitioned across the executors using a range of vertex partitioning heuristics.  As with
0135 RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.
0136 
0137 Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the
0138 properties for each vertex and edge.  As a consequence, the graph class contains members to access
0139 the vertices and edges of the graph:
0140 
0141 {% highlight scala %}
0142 class Graph[VD, ED] {
0143   val vertices: VertexRDD[VD]
0144   val edges: EdgeRDD[ED]
0145 }
0146 {% endhighlight %}
0147 
0148 The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexId,
0149 VD)]` and `RDD[Edge[ED]]` respectively.  Both `VertexRDD[VD]` and `EdgeRDD[ED]` provide  additional
0150 functionality built around graph computation and leverage internal optimizations.  We discuss the
0151 `VertexRDD`[VertexRDD] and `EdgeRDD`[EdgeRDD] API in greater detail in the section on [vertex and edge
0152 RDDs](#vertex_and_edge_rdds) but for now they can be thought of as simply RDDs of the form:
0153 `RDD[(VertexId, VD)]` and `RDD[Edge[ED]]`.
0154 
0155 ### Example Property Graph
0156 
0157 Suppose we want to construct a property graph consisting of the various collaborators on the GraphX
0158 project. The vertex property might contain the username and occupation.  We could annotate edges
0159 with a string describing the relationships between collaborators:
0160 
0161 <p style="text-align: center;">
0162   <img src="img/property_graph.png"
0163        title="The Property Graph"
0164        alt="The Property Graph"
0165        width="50%" />
0166   <!-- Images are downsized intentionally to improve quality on retina displays -->
0167 </p>
0168 
0169 The resulting graph would have the type signature:
0170 
0171 {% highlight scala %}
0172 val userGraph: Graph[(String, String), String]
0173 {% endhighlight %}
0174 
0175 There are numerous ways to construct a property graph from raw files, RDDs, and even synthetic
0176 generators and these are discussed in more detail in the section on
0177 [graph builders](#graph_builders).  Probably the most general method is to use the
0178 [Graph object](api/scala/org/apache/spark/graphx/Graph$.html).  For example the following
0179 code constructs a graph from a collection of RDDs:
0180 
0181 {% highlight scala %}
0182 // Assume the SparkContext has already been constructed
0183 val sc: SparkContext
0184 // Create an RDD for the vertices
0185 val users: RDD[(VertexId, (String, String))] =
0186   sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
0187                        (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
0188 // Create an RDD for edges
0189 val relationships: RDD[Edge[String]] =
0190   sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
0191                        Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
0192 // Define a default user in case there are relationship with missing user
0193 val defaultUser = ("John Doe", "Missing")
0194 // Build the initial Graph
0195 val graph = Graph(users, relationships, defaultUser)
0196 {% endhighlight %}
0197 
0198 In the above example we make use of the [`Edge`][Edge] case class. Edges have a `srcId` and a
0199 `dstId` corresponding to the source and destination vertex identifiers. In addition, the `Edge`
0200 class has an `attr` member which stores the edge property.
0201 
0202 
0203 We can deconstruct a graph into the respective vertex and edge views by using the `graph.vertices`
0204 and `graph.edges` members respectively.
0205 
0206 {% highlight scala %}
0207 val graph: Graph[(String, String), String] // Constructed from above
0208 // Count all users which are postdocs
0209 graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
0210 // Count all the edges where src > dst
0211 graph.edges.filter(e => e.srcId > e.dstId).count
0212 {% endhighlight %}
0213 
0214 > Note that `graph.vertices` returns an `VertexRDD[(String, String)]` which extends
0215 > `RDD[(VertexId, (String, String))]` and so we use the scala `case` expression to deconstruct the
0216 > tuple.  On the other hand, `graph.edges` returns an `EdgeRDD` containing `Edge[String]` objects.
0217 > We could have also used the case class type constructor as in the following:
0218 > {% highlight scala %}
0219 graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
0220 {% endhighlight %}
0221 
0222 In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view.
0223 The triplet view logically joins the vertex and edge properties yielding an
0224 `RDD[EdgeTriplet[VD, ED]]` containing instances of the [`EdgeTriplet`][EdgeTriplet] class. This
0225 *join* can be expressed in the following SQL expression:
0226 
0227 
0228 {% highlight sql %}
0229 SELECT src.id, dst.id, src.attr, e.attr, dst.attr
0230 FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
0231 ON e.srcId = src.Id AND e.dstId = dst.Id
0232 {% endhighlight %}
0233 
0234 or graphically as:
0235 
0236 <p style="text-align: center;">
0237   <img src="img/triplet.png"
0238        title="Edge Triplet"
0239        alt="Edge Triplet"
0240        width="50%" />
0241   <!-- Images are downsized intentionally to improve quality on retina displays -->
0242 </p>
0243 
0244 The [`EdgeTriplet`][EdgeTriplet] class extends the [`Edge`][Edge] class by adding the `srcAttr` and
0245 `dstAttr` members which contain the source and destination properties respectively. We can use the
0246 triplet view of a graph to render a collection of strings describing relationships between users.
0247 
0248 {% highlight scala %}
0249 val graph: Graph[(String, String), String] // Constructed from above
0250 // Use the triplets view to create an RDD of facts.
0251 val facts: RDD[String] =
0252   graph.triplets.map(triplet =>
0253     triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
0254 facts.collect.foreach(println(_))
0255 {% endhighlight %}
0256 
0257 # Graph Operators
0258 
0259 Just as RDDs have basic operations like `map`, `filter`, and `reduceByKey`, property graphs also
0260 have a collection of basic operators that take user defined functions and produce new graphs with
0261 transformed properties and structure.  The core operators that have optimized implementations are
0262 defined in [`Graph`][Graph] and convenient operators that are expressed as a compositions of the
0263 core operators are defined in [`GraphOps`][GraphOps].  However, thanks to Scala implicits the
0264 operators in `GraphOps` are automatically available as members of `Graph`.  For example, we can
0265 compute the in-degree of each vertex (defined in `GraphOps`) by the following:
0266 
0267 {% highlight scala %}
0268 val graph: Graph[(String, String), String]
0269 // Use the implicit GraphOps.inDegrees operator
0270 val inDegrees: VertexRDD[Int] = graph.inDegrees
0271 {% endhighlight %}
0272 
0273 The reason for differentiating between core graph operations and [`GraphOps`][GraphOps] is to be
0274 able to support different graph representations in the future.  Each graph representation must
0275 provide implementations of the core operations and reuse many of the useful operations defined in
0276 [`GraphOps`][GraphOps].
0277 
0278 ### Summary List of Operators
0279 The following is a quick summary of the functionality defined in both [`Graph`][Graph] and
0280 [`GraphOps`][GraphOps] but presented as members of Graph for simplicity. Note that some function
0281 signatures have been simplified (e.g., default arguments and type constraints removed) and some more
0282 advanced functionality has been removed so please consult the API docs for the official list of
0283 operations.
0284 
0285 {% highlight scala %}
0286 /** Summary of the functionality in the property graph */
0287 class Graph[VD, ED] {
0288   // Information about the Graph ===================================================================
0289   val numEdges: Long
0290   val numVertices: Long
0291   val inDegrees: VertexRDD[Int]
0292   val outDegrees: VertexRDD[Int]
0293   val degrees: VertexRDD[Int]
0294   // Views of the graph as collections =============================================================
0295   val vertices: VertexRDD[VD]
0296   val edges: EdgeRDD[ED]
0297   val triplets: RDD[EdgeTriplet[VD, ED]]
0298   // Functions for caching graphs ==================================================================
0299   def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
0300   def cache(): Graph[VD, ED]
0301   def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
0302   // Change the partitioning heuristic  ============================================================
0303   def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
0304   // Transform vertex and edge attributes ==========================================================
0305   def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
0306   def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
0307   def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
0308   def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
0309   def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
0310     : Graph[VD, ED2]
0311   // Modify the graph structure ====================================================================
0312   def reverse: Graph[VD, ED]
0313   def subgraph(
0314       epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
0315       vpred: (VertexId, VD) => Boolean = ((v, d) => true))
0316     : Graph[VD, ED]
0317   def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
0318   def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
0319   // Join RDDs with the graph ======================================================================
0320   def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
0321   def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
0322       (mapFunc: (VertexId, VD, Option[U]) => VD2)
0323     : Graph[VD2, ED]
0324   // Aggregate information about adjacent triplets =================================================
0325   def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
0326   def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
0327   def aggregateMessages[Msg: ClassTag](
0328       sendMsg: EdgeContext[VD, ED, Msg] => Unit,
0329       mergeMsg: (Msg, Msg) => Msg,
0330       tripletFields: TripletFields = TripletFields.All)
0331     : VertexRDD[A]
0332   // Iterative graph-parallel computation ==========================================================
0333   def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
0334       vprog: (VertexId, VD, A) => VD,
0335       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
0336       mergeMsg: (A, A) => A)
0337     : Graph[VD, ED]
0338   // Basic graph algorithms ========================================================================
0339   def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
0340   def connectedComponents(): Graph[VertexId, ED]
0341   def triangleCount(): Graph[Int, ED]
0342   def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
0343 }
0344 {% endhighlight %}
0345 
0346 
0347 ## Property Operators
0348 
0349 Like the RDD `map` operator, the property graph contains the following:
0350 
0351 {% highlight scala %}
0352 class Graph[VD, ED] {
0353   def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
0354   def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
0355   def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
0356 }
0357 {% endhighlight %}
0358 
0359 Each of these operators yields a new graph with the vertex or edge properties modified by the user
0360 defined `map` function.
0361 
0362 > Note that in each case the graph structure is unaffected. This is a key feature of these operators
0363 > which allows the resulting graph to reuse the structural indices of the original graph. The
0364 > following snippets are logically equivalent, but the first one does not preserve the structural
0365 > indices and would not benefit from the GraphX system optimizations:
0366 > {% highlight scala %}
0367 val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
0368 val newGraph = Graph(newVertices, graph.edges)
0369 {% endhighlight %}
0370 > Instead, use [`mapVertices`][Graph.mapVertices] to preserve the indices:
0371 > {% highlight scala %}
0372 val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
0373 {% endhighlight %}
0374 
0375 
0376 These operators are often used to initialize the graph for a particular computation or project away
0377 unnecessary properties.  For example, given a graph with the out degrees as the vertex properties
0378 (we describe how to construct such a graph later), we initialize it for PageRank:
0379 
0380 {% highlight scala %}
0381 // Given a graph where the vertex property is the out degree
0382 val inputGraph: Graph[Int, String] =
0383   graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
0384 // Construct a graph where each edge contains the weight
0385 // and each vertex is the initial PageRank
0386 val outputGraph: Graph[Double, Double] =
0387   inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
0388 {% endhighlight %}
0389 
0390 <a name="structural_operators"></a>
0391 
0392 ## Structural Operators
0393 
0394 Currently GraphX supports only a simple set of commonly used structural operators and we expect to
0395 add more in the future.  The following is a list of the basic structural operators.
0396 
0397 {% highlight scala %}
0398 class Graph[VD, ED] {
0399   def reverse: Graph[VD, ED]
0400   def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
0401                vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
0402   def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
0403   def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
0404 }
0405 {% endhighlight %}
0406 
0407 The [`reverse`][Graph.reverse] operator returns a new graph with all the edge directions reversed.
0408 This can be useful when, for example, trying to compute the inverse PageRank.  Because the reverse
0409 operation does not modify vertex or edge properties or change the number of edges, it can be
0410 implemented efficiently without data movement or duplication.
0411 
0412 
0413 The [`subgraph`][Graph.subgraph] operator takes vertex and edge predicates and returns the graph
0414 containing only the vertices that satisfy the vertex predicate (evaluate to true) and edges that
0415 satisfy the edge predicate *and connect vertices that satisfy the vertex predicate*.  The `subgraph`
0416 operator can be used in number of situations to restrict the graph to the vertices and edges of
0417 interest or eliminate broken links. For example in the following code we remove broken links:
0418 
0419 
0420 {% highlight scala %}
0421 // Create an RDD for the vertices
0422 val users: RDD[(VertexId, (String, String))] =
0423   sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
0424                        (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
0425                        (4L, ("peter", "student"))))
0426 // Create an RDD for edges
0427 val relationships: RDD[Edge[String]] =
0428   sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
0429                        Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
0430                        Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
0431 // Define a default user in case there are relationship with missing user
0432 val defaultUser = ("John Doe", "Missing")
0433 // Build the initial Graph
0434 val graph = Graph(users, relationships, defaultUser)
0435 // Notice that there is a user 0 (for which we have no information) connected to users
0436 // 4 (peter) and 5 (franklin).
0437 graph.triplets.map(
0438   triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
0439 ).collect.foreach(println(_))
0440 // Remove missing vertices as well as the edges to connected to them
0441 val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
0442 // The valid subgraph will disconnect users 4 and 5 by removing user 0
0443 validGraph.vertices.collect.foreach(println(_))
0444 validGraph.triplets.map(
0445   triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
0446 ).collect.foreach(println(_))
0447 {% endhighlight %}
0448 
0449 > Note in the above example only the vertex predicate is provided.  The `subgraph` operator defaults
0450 > to `true` if the vertex or edge predicates are not provided.
0451 
0452 The [`mask`][Graph.mask] operator constructs a subgraph by returning a graph that contains the
0453 vertices and edges that are also found in the input graph.  This can be used in conjunction with the
0454 `subgraph` operator to restrict a graph based on the properties in another related graph.  For
0455 example, we might run connected components using the graph with missing vertices and then restrict
0456 the answer to the valid subgraph.
0457 
0458 
0459 {% highlight scala %}
0460 // Run Connected Components
0461 val ccGraph = graph.connectedComponents() // No longer contains missing field
0462 // Remove missing vertices as well as the edges to connected to them
0463 val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
0464 // Restrict the answer to the valid subgraph
0465 val validCCGraph = ccGraph.mask(validGraph)
0466 {% endhighlight %}
0467 
0468 The [`groupEdges`][Graph.groupEdges] operator merges parallel edges (i.e., duplicate edges between
0469 pairs of vertices) in the multigraph.  In many numerical applications, parallel edges can be *added*
0470 (their weights combined) into a single edge thereby reducing the size of the graph.
0471 
0472 <a name="join_operators"></a>
0473 
0474 ## Join Operators
0475 
0476 In many cases it is necessary to join data from external collections (RDDs) with graphs.  For
0477 example, we might have extra user properties that we want to merge with an existing graph or we
0478 might want to pull vertex properties from one graph into another.  These tasks can be accomplished
0479 using the *join* operators. Below we list the key join operators:
0480 
0481 {% highlight scala %}
0482 class Graph[VD, ED] {
0483   def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
0484     : Graph[VD, ED]
0485   def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
0486     : Graph[VD2, ED]
0487 }
0488 {% endhighlight %}
0489 
0490 The [`joinVertices`][GraphOps.joinVertices] operator joins the vertices with the input RDD and
0491 returns a new graph with the vertex properties obtained by applying the user defined `map` function
0492 to the result of the joined vertices.  Vertices without a matching value in the RDD retain their
0493 original value.
0494 
0495 > Note that if the RDD contains more than one value for a given vertex only one will be used.  It
0496 > is therefore recommended that the input RDD be made unique using the following which will
0497 > also *pre-index* the resulting values to substantially accelerate the subsequent join.
0498 > {% highlight scala %}
0499 val nonUniqueCosts: RDD[(VertexId, Double)]
0500 val uniqueCosts: VertexRDD[Double] =
0501   graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
0502 val joinedGraph = graph.joinVertices(uniqueCosts)(
0503   (id, oldCost, extraCost) => oldCost + extraCost)
0504 {% endhighlight %}
0505 
0506 The more general [`outerJoinVertices`][Graph.outerJoinVertices] behaves similarly to `joinVertices`
0507 except that the user defined `map` function is applied to all vertices and can change the vertex
0508 property type.  Because not all vertices may have a matching value in the input RDD the `map`
0509 function takes an `Option` type.  For example, we can set up a graph for PageRank by initializing
0510 vertex properties with their `outDegree`.
0511 
0512 
0513 {% highlight scala %}
0514 val outDegrees: VertexRDD[Int] = graph.outDegrees
0515 val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
0516   outDegOpt match {
0517     case Some(outDeg) => outDeg
0518     case None => 0 // No outDegree means zero outDegree
0519   }
0520 }
0521 {% endhighlight %}
0522 
0523 > You may have noticed the multiple parameter lists (e.g., `f(a)(b)`) curried function pattern used
0524 > in the above examples.  While we could have equally written `f(a)(b)` as `f(a,b)` this would mean
0525 > that type inference on `b` would not depend on `a`.  As a consequence, the user would need to
0526 > provide type annotation for the user defined function:
0527 > {% highlight scala %}
0528 val joinedGraph = graph.joinVertices(uniqueCosts,
0529   (id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)
0530 {% endhighlight %}
0531 
0532 >
0533 
0534 <a name="neighborhood-aggregation">
0535 
0536 ## Neighborhood Aggregation
0537 
0538 A key step in many graph analytics tasks is aggregating information about the neighborhood of each
0539 vertex.
0540 For example, we might want to know the number of followers each user has or the average age of
0541 the followers of each user.  Many iterative graph algorithms (e.g., PageRank, Shortest Path, and
0542 connected components) repeatedly aggregate properties of neighboring vertices (e.g., current
0543 PageRank Value, shortest path to the source, and smallest reachable vertex id).
0544 
0545 > To improve performance the primary aggregation operator changed from
0546 `graph.mapReduceTriplets` to the new `graph.AggregateMessages`.  While the changes in the API are
0547 relatively small, we provide a transition guide below.
0548 
0549 <a name="aggregateMessages"></a>
0550 
0551 ### Aggregate Messages (aggregateMessages)
0552 
0553 The core aggregation operation in GraphX is [`aggregateMessages`][Graph.aggregateMessages].
0554 This operator applies a user defined `sendMsg` function to each <i>edge triplet</i> in the graph
0555 and then uses the `mergeMsg` function to aggregate those messages at their destination vertex.
0556 
0557 {% highlight scala %}
0558 class Graph[VD, ED] {
0559   def aggregateMessages[Msg: ClassTag](
0560       sendMsg: EdgeContext[VD, ED, Msg] => Unit,
0561       mergeMsg: (Msg, Msg) => Msg,
0562       tripletFields: TripletFields = TripletFields.All)
0563     : VertexRDD[Msg]
0564 }
0565 {% endhighlight %}
0566 
0567 The user defined `sendMsg` function takes an [`EdgeContext`][EdgeContext], which exposes the
0568 source and destination attributes along with the edge attribute and functions
0569 ([`sendToSrc`][EdgeContext.sendToSrc], and [`sendToDst`][EdgeContext.sendToDst]) to send
0570 messages to the source and destination attributes.  Think of `sendMsg` as the <i>map</i>
0571 function in map-reduce.
0572 The user defined `mergeMsg` function takes two messages destined to the same vertex and
0573 yields a single message.  Think of `mergeMsg` as the <i>reduce</i> function in map-reduce.
0574 The  [`aggregateMessages`][Graph.aggregateMessages] operator returns a `VertexRDD[Msg]`
0575 containing the aggregate message (of type `Msg`) destined to each vertex.  Vertices that did not
0576 receive a message are not included in the returned `VertexRDD`[VertexRDD].
0577 
0578 <!--
0579 > An [`EdgeContext`][EdgeContext] is provided in place of a [`EdgeTriplet`][EdgeTriplet] to
0580 expose the additional ([`sendToSrc`][EdgeContext.sendToSrc],
0581 and [`sendToDst`][EdgeContext.sendToDst]) which GraphX uses to optimize message routing.
0582  -->
0583 
0584 In addition, [`aggregateMessages`][Graph.aggregateMessages] takes an optional
0585 `tripletsFields` which indicates what data is accessed in the [`EdgeContext`][EdgeContext]
0586 (i.e., the source vertex attribute but not the destination vertex attribute).
0587 The possible options for the `tripletsFields` are defined in [`TripletFields`][TripletFields] and
0588 the default value is [`TripletFields.All`][TripletFields.All] which indicates that the user
0589 defined `sendMsg` function may access any of the fields in the [`EdgeContext`][EdgeContext].
0590 The `tripletFields` argument can be used to notify GraphX that only part of the
0591 [`EdgeContext`][EdgeContext] will be needed allowing GraphX to select an optimized join strategy.
0592 For example if we are computing the average age of the followers of each user we would only require
0593 the source field and so we would use [`TripletFields.Src`][TripletFields.Src] to indicate that we
0594 only require the source field
0595 
0596 > In earlier versions of GraphX we used byte code inspection to infer the
0597 [`TripletFields`][TripletFields] however we have found that bytecode inspection to be
0598 slightly unreliable and instead opted for more explicit user control.
0599 
0600 In the following example we use the [`aggregateMessages`][Graph.aggregateMessages] operator to
0601 compute the average age of the more senior followers of each user.
0602 
0603 {% include_example scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala %}
0604 
0605 > The `aggregateMessages` operation performs optimally when the messages (and the sums of
0606 > messages) are constant sized (e.g., floats and addition instead of lists and concatenation).
0607 
0608 <a name="mrTripletsTransition"></a>
0609 
0610 ### Map Reduce Triplets Transition Guide (Legacy)
0611 
0612 In earlier versions of GraphX neighborhood aggregation was accomplished using the
0613 `mapReduceTriplets` operator:
0614 
0615 {% highlight scala %}
0616 class Graph[VD, ED] {
0617   def mapReduceTriplets[Msg](
0618       map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
0619       reduce: (Msg, Msg) => Msg)
0620     : VertexRDD[Msg]
0621 }
0622 {% endhighlight %}
0623 
0624 The `mapReduceTriplets` operator takes a user defined map function which
0625 is applied to each triplet and can yield *messages* which are aggregated using the user defined
0626 `reduce` function.
0627 However, we found the user of the returned iterator to be expensive and it inhibited our ability to
0628 apply additional optimizations (e.g., local vertex renumbering).
0629 In [`aggregateMessages`][Graph.aggregateMessages] we introduced the EdgeContext which exposes the
0630 triplet fields and also functions to explicitly send messages to the source and destination vertex.
0631 Furthermore we removed bytecode inspection and instead require the user to indicate what fields
0632 in the triplet are actually required.
0633 
0634 The following code block using `mapReduceTriplets`:
0635 
0636 {% highlight scala %}
0637 val graph: Graph[Int, Float] = ...
0638 def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
0639   Iterator((triplet.dstId, "Hi"))
0640 }
0641 def reduceFun(a: String, b: String): String = a + " " + b
0642 val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
0643 {% endhighlight %}
0644 
0645 can be rewritten using `aggregateMessages` as:
0646 
0647 {% highlight scala %}
0648 val graph: Graph[Int, Float] = ...
0649 def msgFun(triplet: EdgeContext[Int, Float, String]) {
0650   triplet.sendToDst("Hi")
0651 }
0652 def reduceFun(a: String, b: String): String = a + " " + b
0653 val result = graph.aggregateMessages[String](msgFun, reduceFun)
0654 {% endhighlight %}
0655 
0656 
0657 ### Computing Degree Information
0658 
0659 A common aggregation task is computing the degree of each vertex: the number of edges adjacent to
0660 each vertex.  In the context of directed graphs it is often necessary to know the in-degree, 
0661 out-degree, and the total degree of each vertex.  The  [`GraphOps`][GraphOps] class contains a
0662 collection of operators to compute the degrees of each vertex.  For example in the following we
0663 compute the max in, out, and total degrees:
0664 
0665 {% highlight scala %}
0666 // Define a reduce operation to compute the highest degree vertex
0667 def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
0668   if (a._2 > b._2) a else b
0669 }
0670 // Compute the max degrees
0671 val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
0672 val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
0673 val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)
0674 {% endhighlight %}
0675 
0676 ### Collecting Neighbors
0677 
0678 In some cases it may be easier to express computation by collecting neighboring vertices and their
0679 attributes at each vertex. This can be easily accomplished using the
0680 [`collectNeighborIds`][GraphOps.collectNeighborIds] and the
0681 [`collectNeighbors`][GraphOps.collectNeighbors] operators.
0682 
0683 {% highlight scala %}
0684 class GraphOps[VD, ED] {
0685   def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
0686   def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
0687 }
0688 {% endhighlight %}
0689 
0690 > These operators can be quite costly as they duplicate information and require
0691 > substantial communication.  If possible try expressing the same computation using the
0692 > [`aggregateMessages`][Graph.aggregateMessages]  operator directly.
0693 
0694 ## Caching and Uncaching
0695 
0696 In Spark, RDDs are not persisted in memory by default. To avoid recomputation, they must be explicitly cached when using them multiple times (see the [Spark Programming Guide][RDD Persistence]). Graphs in GraphX behave the same way. **When using a graph multiple times, make sure to call [`Graph.cache()`][Graph.cache] on it first.**
0697 
0698 
0699 In iterative computations, *uncaching* may also be necessary for best performance. By default, cached RDDs and graphs will remain in memory until memory pressure forces them to be evicted in LRU order. For iterative computation, intermediate results from previous iterations will fill up the cache. Though they will eventually be evicted, the unnecessary data stored in memory will slow down garbage collection. It would be more efficient to uncache intermediate results as soon as they are no longer necessary. This involves materializing (caching and forcing) a graph or RDD every iteration, uncaching all other datasets, and only using the materialized dataset in future iterations. However, because graphs are composed of multiple RDDs, it can be difficult to unpersist them correctly. **For iterative computation we recommend using the Pregel API, which correctly unpersists intermediate results.**
0700 
0701 <a name="pregel"></a>
0702 
0703 # Pregel API
0704 
0705 Graphs are inherently recursive data structures as properties of vertices depend on properties of
0706 their neighbors which in turn depend on properties of *their* neighbors.  As a
0707 consequence many important graph algorithms iteratively recompute the properties of each vertex
0708 until a fixed-point condition is reached.  A range of graph-parallel abstractions have been proposed
0709 to express these iterative algorithms.  GraphX exposes a variant of the Pregel API.
0710 
0711 At a high level the Pregel operator in GraphX is a bulk-synchronous parallel messaging abstraction
0712 *constrained to the topology of the graph*.  The Pregel operator executes in a series of super steps
0713 in which vertices receive the *sum* of their inbound messages from the previous super step, compute
0714 a new value for the vertex property, and then send messages to neighboring vertices in the next
0715 super step.  Unlike Pregel, messages are computed in parallel as a
0716 function of the edge triplet and the message computation has access to both the source and
0717 destination vertex attributes.  Vertices that do not receive a message are skipped within a super
0718 step.  The Pregel operator terminates iteration and returns the final graph when there are no
0719 messages remaining.
0720 
0721 > Note, unlike more standard Pregel implementations, vertices in GraphX can only send messages to
0722 > neighboring vertices and the message construction is done in parallel using a user defined
0723 > messaging function.  These constraints allow additional optimization within GraphX.
0724 
0725 The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch*
0726 of its implementation (note: to avoid stackOverflowError due to long lineage chains, pregel support periodically
0727 checkpoint graph and messages by setting "spark.graphx.pregel.checkpointInterval" to a positive number,
0728 say 10. And set checkpoint directory as well using SparkContext.setCheckpointDir(directory: String)):
0729 
0730 {% highlight scala %}
0731 class GraphOps[VD, ED] {
0732   def pregel[A]
0733       (initialMsg: A,
0734        maxIter: Int = Int.MaxValue,
0735        activeDir: EdgeDirection = EdgeDirection.Out)
0736       (vprog: (VertexId, VD, A) => VD,
0737        sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
0738        mergeMsg: (A, A) => A)
0739     : Graph[VD, ED] = {
0740     // Receive the initial message at each vertex
0741     var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
0742 
0743     // compute the messages
0744     var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
0745     var activeMessages = messages.count()
0746     // Loop until no messages remain or maxIterations is achieved
0747     var i = 0
0748     while (activeMessages > 0 && i < maxIterations) {
0749       // Receive the messages and update the vertices.
0750       g = g.joinVertices(messages)(vprog).cache()
0751       val oldMessages = messages
0752       // Send new messages, skipping edges where neither side received a message. We must cache
0753       // messages so it can be materialized on the next line, allowing us to uncache the previous
0754       // iteration.
0755       messages = GraphXUtils.mapReduceTriplets(
0756         g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
0757       activeMessages = messages.count()
0758       i += 1
0759     }
0760     g
0761   }
0762 }
0763 {% endhighlight %}
0764 
0765 Notice that Pregel takes two argument lists (i.e., `graph.pregel(list1)(list2)`).  The first
0766 argument list contains configuration parameters including the initial message, the maximum number of
0767 iterations, and the edge direction in which to send messages (by default along out edges).  The
0768 second argument list contains the user defined functions for receiving messages (the vertex program
0769 `vprog`), computing messages (`sendMsg`), and combining messages `mergeMsg`.
0770 
0771 We can use the Pregel operator to express computation such as single source
0772 shortest path in the following example.
0773 
0774 {% include_example scala/org/apache/spark/examples/graphx/SSSPExample.scala %}
0775 
0776 <a name="graph_builders"></a>
0777 
0778 # Graph Builders
0779 
0780 GraphX provides several ways of building a graph from a collection of vertices and edges in an RDD or on disk. None of the graph builders repartitions the graph's edges by default; instead, edges are left in their default partitions (such as their original blocks in HDFS). [`Graph.groupEdges`][Graph.groupEdges] requires the graph to be repartitioned because it assumes identical edges will be colocated on the same partition, so you must call [`Graph.partitionBy`][Graph.partitionBy] before calling `groupEdges`.
0781 
0782 {% highlight scala %}
0783 object GraphLoader {
0784   def edgeListFile(
0785       sc: SparkContext,
0786       path: String,
0787       canonicalOrientation: Boolean = false,
0788       minEdgePartitions: Int = 1)
0789     : Graph[Int, Int]
0790 }
0791 {% endhighlight %}
0792 
0793 [`GraphLoader.edgeListFile`][GraphLoader.edgeListFile] provides a way to load a graph from a list of edges on disk. It parses an adjacency list of (source vertex ID, destination vertex ID) pairs of the following form, skipping comment lines that begin with `#`:
0794 
0795 ~~~
0796 # This is a comment
0797 2 1
0798 4 1
0799 1 2
0800 ~~~
0801 
0802 It creates a `Graph` from the specified edges, automatically creating any vertices mentioned by edges. All vertex and edge attributes default to 1. The `canonicalOrientation` argument allows reorienting edges in the positive direction (`srcId < dstId`), which is required by the [connected components][ConnectedComponents] algorithm. The `minEdgePartitions` argument specifies the minimum number of edge partitions to generate; there may be more edge partitions than specified if, for example, the HDFS file has more blocks.
0803 
0804 {% highlight scala %}
0805 object Graph {
0806   def apply[VD, ED](
0807       vertices: RDD[(VertexId, VD)],
0808       edges: RDD[Edge[ED]],
0809       defaultVertexAttr: VD = null)
0810     : Graph[VD, ED]
0811 
0812   def fromEdges[VD, ED](
0813       edges: RDD[Edge[ED]],
0814       defaultValue: VD): Graph[VD, ED]
0815 
0816   def fromEdgeTuples[VD](
0817       rawEdges: RDD[(VertexId, VertexId)],
0818       defaultValue: VD,
0819       uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
0820 
0821 }
0822 {% endhighlight %}
0823 
0824 [`Graph.apply`][Graph.apply] allows creating a graph from RDDs of vertices and edges. Duplicate vertices are picked arbitrarily and vertices found in the edge RDD but not the vertex RDD are assigned the default attribute.
0825 
0826 [`Graph.fromEdges`][Graph.fromEdges] allows creating a graph from only an RDD of edges, automatically creating any vertices mentioned by edges and assigning them the default value.
0827 
0828 [`Graph.fromEdgeTuples`][Graph.fromEdgeTuples] allows creating a graph from only an RDD of edge tuples, assigning the edges the value 1, and automatically creating any vertices mentioned by edges and assigning them the default value. It also supports deduplicating the edges; to deduplicate, pass `Some` of a [`PartitionStrategy`][PartitionStrategy] as the `uniqueEdges` parameter (for example, `uniqueEdges = Some(PartitionStrategy.RandomVertexCut)`). A partition strategy is necessary to colocate identical edges on the same partition so they can be deduplicated.
0829 
0830 <a name="vertex_and_edge_rdds"></a>
0831 
0832 # Vertex and Edge RDDs
0833 
0834 GraphX exposes `RDD` views of the vertices and edges stored within the graph.  However, because
0835 GraphX maintains the vertices and edges in optimized data structures and these data structures
0836 provide additional functionality, the vertices and edges are returned as `VertexRDD`[VertexRDD] and `EdgeRDD`[EdgeRDD]
0837 respectively.  In this section we review some of the additional useful functionality in these types.
0838 Note that this is just an incomplete list, please refer to the API docs for the official list of operations. 
0839 
0840 ## VertexRDDs
0841 
0842 The `VertexRDD[A]` extends `RDD[(VertexId, A)]` and adds the additional constraint that each
0843 `VertexId` occurs only *once*.  Moreover, `VertexRDD[A]` represents a *set* of vertices each with an
0844 attribute of type `A`.  Internally, this is achieved by storing the vertex attributes in a reusable
0845 hash-map data-structure.  As a consequence if two `VertexRDD`s are derived from the same base
0846 `VertexRDD`[VertexRDD] (e.g., by `filter` or `mapValues`) they can be joined in constant time without hash
0847 evaluations. To leverage this indexed data structure, the `VertexRDD`[VertexRDD] exposes the following
0848 additional functionality:
0849 
0850 {% highlight scala %}
0851 class VertexRDD[VD] extends RDD[(VertexId, VD)] {
0852   // Filter the vertex set but preserves the internal index
0853   def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
0854   // Transform the values without changing the ids (preserves the internal index)
0855   def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
0856   def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
0857   // Show only vertices unique to this set based on their VertexId's
0858   def minus(other: RDD[(VertexId, VD)])
0859   // Remove vertices from this set that appear in the other set
0860   def diff(other: VertexRDD[VD]): VertexRDD[VD]
0861   // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
0862   def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
0863   def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
0864   // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
0865   def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
0866 }
0867 {% endhighlight %}
0868 
0869 Notice, for example,  how the `filter` operator returns an `VertexRDD`[VertexRDD].  Filter is actually
0870 implemented using a `BitSet` thereby reusing the index and preserving the ability to do fast joins
0871 with other `VertexRDD`s.  Likewise, the `mapValues` operators do not allow the `map` function to
0872 change the `VertexId` thereby enabling the same `HashMap` data structures to be reused.  Both the
0873 `leftJoin` and `innerJoin` are able to identify when joining two `VertexRDD`s derived from the same
0874 `HashMap` and implement the join by linear scan rather than costly point lookups.
0875 
0876 The `aggregateUsingIndex` operator is useful for efficient construction of a new `VertexRDD`[VertexRDD] from an
0877 `RDD[(VertexId, A)]`.  Conceptually, if I have constructed a `VertexRDD[B]` over a set of vertices,
0878 *which is a super-set* of the vertices in some `RDD[(VertexId, A)]` then I can reuse the index to
0879 both aggregate and then subsequently index the `RDD[(VertexId, A)]`.  For example:
0880 
0881 {% highlight scala %}
0882 val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
0883 val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
0884 // There should be 200 entries in rddB
0885 rddB.count
0886 val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
0887 // There should be 100 entries in setB
0888 setB.count
0889 // Joining A and B should now be fast!
0890 val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
0891 {% endhighlight %}
0892 
0893 ## EdgeRDDs
0894 
0895 The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
0896 of the various partitioning strategies defined in [`PartitionStrategy`][PartitionStrategy].  Within
0897 each partition, edge attributes and adjacency structure, are stored separately enabling maximum
0898 reuse when changing attribute values.
0899 
0900 The three additional functions exposed by the `EdgeRDD`[EdgeRDD] are:
0901 {% highlight scala %}
0902 // Transform the edge attributes while preserving the structure
0903 def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
0904 // Reverse the edges reusing both attributes and structure
0905 def reverse: EdgeRDD[ED]
0906 // Join two `EdgeRDD`s partitioned using the same partitioning strategy.
0907 def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
0908 {% endhighlight %}
0909 
0910 In most applications we have found that operations on the `EdgeRDD`[EdgeRDD] are accomplished through the
0911 graph operators or rely on operations defined in the base `RDD` class.
0912 
0913 # Optimized Representation
0914 
0915 While a detailed description of the optimizations used in the GraphX representation of distributed
0916 graphs is beyond the scope of this guide, some high-level understanding may aid in the design of
0917 scalable algorithms as well as optimal use of the API.  GraphX adopts a vertex-cut approach to
0918 distributed graph partitioning:
0919 
0920 <p style="text-align: center;">
0921   <img src="img/edge_cut_vs_vertex_cut.png"
0922        title="Edge Cut vs. Vertex Cut"
0923        alt="Edge Cut vs. Vertex Cut"
0924        width="50%" />
0925   <!-- Images are downsized intentionally to improve quality on retina displays -->
0926 </p>
0927 
0928 Rather than splitting graphs along edges, GraphX partitions the graph along vertices which can
0929 reduce both the communication and storage overhead.  Logically, this corresponds to assigning edges
0930 to machines and allowing vertices to span multiple machines.  The exact method of assigning edges
0931 depends on the [`PartitionStrategy`][PartitionStrategy] and there are several tradeoffs to the
0932 various heuristics.  Users can choose between different strategies by repartitioning the graph with
0933 the [`Graph.partitionBy`][Graph.partitionBy] operator.  The default partitioning strategy is to use
0934 the initial partitioning of the edges as provided on graph construction.  However, users can easily
0935 switch to 2D-partitioning or other heuristics included in GraphX.
0936 
0937 
0938 <p style="text-align: center;">
0939   <img src="img/vertex_routing_edge_tables.png"
0940        title="RDD Graph Representation"
0941        alt="RDD Graph Representation"
0942        width="50%" />
0943   <!-- Images are downsized intentionally to improve quality on retina displays -->
0944 </p>
0945 
0946 Once the edges have been partitioned the key challenge to efficient graph-parallel computation is
0947 efficiently joining vertex attributes with the edges.  Because real-world graphs typically have more
0948 edges than vertices, we move vertex attributes to the edges.  Because not all partitions will
0949 contain edges adjacent to all vertices we internally maintain a routing table which identifies where
0950 to broadcast vertices when implementing the join required for operations like `triplets` and
0951 `aggregateMessages`.
0952 
0953 <a name="graph_algorithms"></a>
0954 
0955 # Graph Algorithms
0956 
0957 GraphX includes a set of graph algorithms to simplify analytics tasks. The algorithms are contained in the `org.apache.spark.graphx.lib` package and can be accessed directly as methods on `Graph` via [`GraphOps`][GraphOps]. This section describes the algorithms and how they are used.
0958 
0959 <a name="pagerank"></a>
0960 
0961 ## PageRank
0962 
0963 PageRank measures the importance of each vertex in a graph, assuming an edge from *u* to *v* represents an endorsement of *v*'s importance by *u*. For example, if a Twitter user is followed by many others, the user will be ranked highly.
0964 
0965 GraphX comes with static and dynamic implementations of PageRank as methods on the [`PageRank` object][PageRank]. Static PageRank runs for a fixed number of iterations, while dynamic PageRank runs until the ranks converge (i.e., stop changing by more than a specified tolerance). [`GraphOps`][GraphOps] allows calling these algorithms directly as methods on `Graph`.
0966 
0967 GraphX also includes an example social network dataset that we can run PageRank on. A set of users is given in `data/graphx/users.txt`, and a set of relationships between users is given in `data/graphx/followers.txt`. We compute the PageRank of each user as follows:
0968 
0969 {% include_example scala/org/apache/spark/examples/graphx/PageRankExample.scala %}
0970 
0971 ## Connected Components
0972 
0973 The connected components algorithm labels each connected component of the graph with the ID of its lowest-numbered vertex. For example, in a social network, connected components can approximate clusters. GraphX contains an implementation of the algorithm in the [`ConnectedComponents` object][ConnectedComponents], and we compute the connected components of the example social network dataset from the [PageRank section](#pagerank) as follows:
0974 
0975 {% include_example scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala %}
0976 
0977 ## Triangle Counting
0978 
0979 A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].*
0980 
0981 {% include_example scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala %}
0982 
0983 
0984 # Examples
0985 
0986 Suppose I want to build a graph from some text files, restrict the graph
0987 to important relationships and users, run page-rank on the subgraph, and
0988 then finally return attributes associated with the top users.  I can do
0989 all of this in just a few lines with GraphX:
0990 
0991 {% include_example scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala %}