Skip to content

Commit 905173d

Browse files
ankurdavepwendell
authored andcommitted
Unify GraphImpl RDDs + other graph load optimizations
This PR makes the following changes, primarily in e4fbd32: 1. *Unify RDDs to avoid zipPartitions.* A graph used to be four RDDs: vertices, edges, routing table, and triplet view. This commit merges them down to two: vertices (with routing table), and edges (with replicated vertices). 2. *Avoid duplicate shuffle in graph building.* We used to do two shuffles when building a graph: one to extract routing information from the edges and move it to the vertices, and another to find nonexistent vertices referred to by edges. With this commit, the latter is done as a side effect of the former. 3. *Avoid no-op shuffle when joins are fully eliminated.* This is a side effect of unifying the edges and the triplet view. 4. *Join elimination for mapTriplets.* 5. *Ship only the needed vertex attributes when upgrading the triplet view.* If the triplet view already contains source attributes, and we now need both attributes, only ship destination attributes rather than re-shipping both. This is done in `ReplicatedVertexView#upgrade`. Author: Ankur Dave <[email protected]> Closes #497 from ankurdave/unify-rdds and squashes the following commits: 332ab43 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds 4933e2e [Ankur Dave] Exclude RoutingTable from binary compatibility check 5ba8789 [Ankur Dave] Add GraphX upgrade guide from Spark 0.9.1 13ac845 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds a04765c [Ankur Dave] Remove unnecessary toOps call 57202e8 [Ankur Dave] Replace case with pair parameter 75af062 [Ankur Dave] Add explicit return types 04d3ae5 [Ankur Dave] Convert implicit parameter to context bound c88b269 [Ankur Dave] Revert upgradeIterator to if-in-a-loop 0d3584c [Ankur Dave] EdgePartition.size should be val 2a928b2 [Ankur Dave] Set locality wait 10b3596 [Ankur Dave] Clean up public API ae36110 [Ankur Dave] Fix style errors e4fbd32 [Ankur Dave] Unify GraphImpl RDDs + other graph load optimizations d6d60e2 [Ankur Dave] In GraphLoader, coalesce to minEdgePartitions 62c7b78 [Ankur Dave] In Analytics, take PageRank numIter d64e8d4 [Ankur Dave] Log current Pregel iteration
1 parent 6c2691d commit 905173d

28 files changed

+1353
-851
lines changed

docs/graphx-programming-guide.md

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ support the [Bagel API](api/scala/index.html#org.apache.spark.bagel.package) and
8686
[Bagel programming guide](bagel-programming-guide.html). However, we encourage Bagel users to
8787
explore the new GraphX API and comment on issues that may complicate the transition from Bagel.
8888

89+
## Upgrade Guide from Spark 0.9.1
90+
91+
GraphX in Spark {{site.SPARK_VERSION}} contains one user-facing interface change from Spark 0.9.1. [`EdgeRDD`][EdgeRDD] may now store adjacent vertex attributes to construct the triplets, so it has gained a type parameter. The edges of a graph of type `Graph[VD, ED]` are of type `EdgeRDD[ED, VD]` rather than `EdgeRDD[ED]`.
92+
93+
[EdgeRDD]: api/scala/index.html#org.apache.spark.graphx.EdgeRDD
94+
8995
# Getting Started
9096

9197
To get started you first need to import Spark and GraphX into your project, as follows:
@@ -145,12 +151,12 @@ the vertices and edges of the graph:
145151
{% highlight scala %}
146152
class Graph[VD, ED] {
147153
val vertices: VertexRDD[VD]
148-
val edges: EdgeRDD[ED]
154+
val edges: EdgeRDD[ED, VD]
149155
}
150156
{% endhighlight %}
151157

152-
The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexID,
153-
VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED]` provide additional
158+
The classes `VertexRDD[VD]` and `EdgeRDD[ED, VD]` extend and are optimized versions of `RDD[(VertexID,
159+
VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED, VD]` provide additional
154160
functionality built around graph computation and leverage internal optimizations. We discuss the
155161
`VertexRDD` and `EdgeRDD` API in greater detail in the section on [vertex and edge
156162
RDDs](#vertex_and_edge_rdds) but for now they can be thought of as simply RDDs of the form:
@@ -302,7 +308,7 @@ class Graph[VD, ED] {
302308
val degrees: VertexRDD[Int]
303309
// Views of the graph as collections =============================================================
304310
val vertices: VertexRDD[VD]
305-
val edges: EdgeRDD[ED]
311+
val edges: EdgeRDD[ED, VD]
306312
val triplets: RDD[EdgeTriplet[VD, ED]]
307313
// Functions for caching graphs ==================================================================
308314
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
@@ -908,7 +914,7 @@ val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
908914

909915
## EdgeRDDs
910916

911-
The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
917+
The `EdgeRDD[ED, VD]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
912918
of the various partitioning strategies defined in [`PartitionStrategy`][PartitionStrategy]. Within
913919
each partition, edge attributes and adjacency structure, are stored separately enabling maximum
914920
reuse when changing attribute values.
@@ -918,11 +924,11 @@ reuse when changing attribute values.
918924
The three additional functions exposed by the `EdgeRDD` are:
919925
{% highlight scala %}
920926
// Transform the edge attributes while preserving the structure
921-
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
927+
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]
922928
// Revere the edges reusing both attributes and structure
923-
def reverse: EdgeRDD[ED]
929+
def reverse: EdgeRDD[ED, VD]
924930
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
925-
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
931+
def innerJoin[ED2, ED3](other: EdgeRDD[ED2, VD])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]
926932
{% endhighlight %}
927933

928934
In most applications we have found that operations on the `EdgeRDD` are accomplished through the

graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,19 @@ package org.apache.spark.graphx
2020
import scala.reflect.{classTag, ClassTag}
2121

2222
import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
23-
import org.apache.spark.graphx.impl.EdgePartition
2423
import org.apache.spark.rdd.RDD
2524
import org.apache.spark.storage.StorageLevel
2625

26+
import org.apache.spark.graphx.impl.EdgePartition
27+
2728
/**
28-
* `EdgeRDD[ED]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each partition
29-
* for performance.
29+
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
30+
* partition for performance. It may additionally store the vertex attributes associated with each
31+
* edge to provide the triplet view. Shipping of the vertex attributes is managed by
32+
* `impl.ReplicatedVertexView`.
3033
*/
31-
class EdgeRDD[@specialized ED: ClassTag](
32-
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
34+
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
35+
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
3336
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
3437

3538
partitionsRDD.setName("EdgeRDD")
@@ -45,8 +48,12 @@ class EdgeRDD[@specialized ED: ClassTag](
4548
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
4649

4750
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
48-
val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
49-
p.next._2.iterator.map(_.copy())
51+
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
52+
if (p.hasNext) {
53+
p.next._2.iterator.map(_.copy())
54+
} else {
55+
Iterator.empty
56+
}
5057
}
5158

5259
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
@@ -61,11 +68,15 @@ class EdgeRDD[@specialized ED: ClassTag](
6168
this
6269
}
6370

64-
private[graphx] def mapEdgePartitions[ED2: ClassTag](
65-
f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
66-
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
67-
val (pid, ep) = iter.next()
68-
Iterator(Tuple2(pid, f(pid, ep)))
71+
private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
72+
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
73+
new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
74+
if (iter.hasNext) {
75+
val (pid, ep) = iter.next()
76+
Iterator(Tuple2(pid, f(pid, ep)))
77+
} else {
78+
Iterator.empty
79+
}
6980
}, preservesPartitioning = true))
7081
}
7182

@@ -76,15 +87,22 @@ class EdgeRDD[@specialized ED: ClassTag](
7687
* @param f the function from an edge to a new edge value
7788
* @return a new EdgeRDD containing the new edge values
7889
*/
79-
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2] =
90+
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
8091
mapEdgePartitions((pid, part) => part.map(f))
8192

8293
/**
8394
* Reverse all the edges in this RDD.
8495
*
8596
* @return a new EdgeRDD containing all the edges reversed
8697
*/
87-
def reverse: EdgeRDD[ED] = mapEdgePartitions((pid, part) => part.reverse)
98+
def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
99+
100+
/** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
101+
def filter(
102+
epred: EdgeTriplet[VD, ED] => Boolean,
103+
vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
104+
mapEdgePartitions((pid, part) => part.filter(epred, vpred))
105+
}
88106

89107
/**
90108
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
@@ -96,19 +114,15 @@ class EdgeRDD[@specialized ED: ClassTag](
96114
* with values supplied by `f`
97115
*/
98116
def innerJoin[ED2: ClassTag, ED3: ClassTag]
99-
(other: EdgeRDD[ED2])
100-
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
117+
(other: EdgeRDD[ED2, _])
118+
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
101119
val ed2Tag = classTag[ED2]
102120
val ed3Tag = classTag[ED3]
103-
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
121+
new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
104122
(thisIter, otherIter) =>
105123
val (pid, thisEPart) = thisIter.next()
106124
val (_, otherEPart) = otherIter.next()
107125
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
108126
})
109127
}
110-
111-
private[graphx] def collectVertexIds(): RDD[VertexId] = {
112-
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
113-
}
114128
}

graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
6363
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
6464

6565
override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
66+
67+
def toTuple: ((VertexId, VD), (VertexId, VD), ED) = ((srcId, srcAttr), (dstId, dstAttr), attr)
6668
}

graphx/src/main/scala/org/apache/spark/graphx/Graph.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
5959
* along with their vertex data.
6060
*
6161
*/
62-
@transient val edges: EdgeRDD[ED]
62+
@transient val edges: EdgeRDD[ED, VD]
6363

6464
/**
6565
* An RDD containing the edge triplets, which are edges along with the vertex data associated with

graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ package org.apache.spark.graphx
1919

2020
import com.esotericsoftware.kryo.Kryo
2121

22-
import org.apache.spark.graphx.impl._
2322
import org.apache.spark.serializer.KryoRegistrator
24-
import org.apache.spark.util.collection.BitSet
2523
import org.apache.spark.util.BoundedPriorityQueue
24+
import org.apache.spark.util.collection.BitSet
25+
26+
import org.apache.spark.graphx.impl._
2627

2728
/**
2829
* Registers GraphX classes with Kryo for improved performance.
@@ -33,8 +34,9 @@ class GraphKryoRegistrator extends KryoRegistrator {
3334
kryo.register(classOf[Edge[Object]])
3435
kryo.register(classOf[MessageToPartition[Object]])
3536
kryo.register(classOf[VertexBroadcastMsg[Object]])
37+
kryo.register(classOf[RoutingTableMessage])
3638
kryo.register(classOf[(VertexId, Object)])
37-
kryo.register(classOf[EdgePartition[Object]])
39+
kryo.register(classOf[EdgePartition[Object, Object]])
3840
kryo.register(classOf[BitSet])
3941
kryo.register(classOf[VertexIdToIndexMap])
4042
kryo.register(classOf[VertexAttributeBlock[Object]])

graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ object GraphLoader extends Logging {
4747
* @param path the path to the file (e.g., /home/data/file or hdfs://file)
4848
* @param canonicalOrientation whether to orient edges in the positive
4949
* direction
50-
* @param minEdgePartitions the number of partitions for the
51-
* the edge RDD
50+
* @param minEdgePartitions the number of partitions for the edge RDD
5251
*/
5352
def edgeListFile(
5453
sc: SparkContext,
@@ -60,8 +59,9 @@ object GraphLoader extends Logging {
6059
val startTime = System.currentTimeMillis
6160

6261
// Parse the edge data table directly into edge partitions
63-
val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
64-
val builder = new EdgePartitionBuilder[Int]
62+
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
63+
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
64+
val builder = new EdgePartitionBuilder[Int, Int]
6565
iter.foreach { line =>
6666
if (!line.isEmpty && line(0) != '#') {
6767
val lineArray = line.split("\\s+")
@@ -78,7 +78,7 @@ object GraphLoader extends Logging {
7878
}
7979
}
8080
Iterator((pid, builder.toEdgePartition))
81-
}.cache()
81+
}.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
8282
edges.count()
8383

8484
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))

graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
package org.apache.spark.graphx
1919

2020
import scala.reflect.ClassTag
21-
import org.apache.spark.SparkContext._
21+
import scala.util.Random
22+
2223
import org.apache.spark.SparkException
23-
import org.apache.spark.graphx.lib._
24+
import org.apache.spark.SparkContext._
2425
import org.apache.spark.rdd.RDD
25-
import scala.util.Random
26+
27+
import org.apache.spark.graphx.lib._
2628

2729
/**
2830
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
@@ -43,19 +45,22 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
4345
* The in-degree of each vertex in the graph.
4446
* @note Vertices with no in-edges are not returned in the resulting RDD.
4547
*/
46-
@transient lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
48+
@transient lazy val inDegrees: VertexRDD[Int] =
49+
degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")
4750

4851
/**
4952
* The out-degree of each vertex in the graph.
5053
* @note Vertices with no out-edges are not returned in the resulting RDD.
5154
*/
52-
@transient lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
55+
@transient lazy val outDegrees: VertexRDD[Int] =
56+
degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")
5357

5458
/**
5559
* The degree of each vertex in the graph.
5660
* @note Vertices with no edges are not returned in the resulting RDD.
5761
*/
58-
@transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
62+
@transient lazy val degrees: VertexRDD[Int] =
63+
degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")
5964

6065
/**
6166
* Computes the neighboring vertex degrees.

graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.graphx
1919

2020
import scala.reflect.ClassTag
21+
import org.apache.spark.Logging
2122

2223

2324
/**
@@ -52,7 +53,7 @@ import scala.reflect.ClassTag
5253
* }}}
5354
*
5455
*/
55-
object Pregel {
56+
object Pregel extends Logging {
5657

5758
/**
5859
* Execute a Pregel-like iterative vertex-parallel abstraction. The
@@ -142,6 +143,9 @@ object Pregel {
142143
// hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
143144
// vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
144145
activeMessages = messages.count()
146+
147+
logInfo("Pregel finished iteration " + i)
148+
145149
// Unpersist the RDDs hidden by newly-materialized RDDs
146150
oldMessages.unpersist(blocking=false)
147151
newVerts.unpersist(blocking=false)

0 commit comments

Comments
 (0)