From 9ca3d58615b1fad739d358816c167e1748bd2a3a Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 28 May 2014 17:31:41 -0700 Subject: [PATCH 1/3] Suggest workarounds for partitionBy in Spark 1.0.0 due to SPARK-1931 We encourage users to build the latest version of Spark from the master branch, which contains a fix. Alternatively, a workaround is to partition the edges before constructing the graph. --- docs/graphx-programming-guide.md | 56 ++++++++++++++++--- .../scala/org/apache/spark/graphx/Graph.scala | 28 +++++++++- 2 files changed, 76 insertions(+), 8 deletions(-) diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 42ab27bf55ccf..a446250fe85a3 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -92,6 +92,32 @@ GraphX in Spark {{site.SPARK_VERSION}} contains one user-facing interface change [EdgeRDD]: api/scala/index.html#org.apache.spark.graphx.EdgeRDD +## Workaround for `Graph.partitionBy` in Spark 1.0.0 + + +The [`Graph.partitionBy`][Graph.partitionBy] operator allows users to choose the graph partitioning strategy, but due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931), this method is broken in Spark 1.0.0. We encourage users to build the latest version of Spark from the master branch, which contains a fix. Alternatively, a workaround is to partition the edges before constructing the graph, as follows: + +{% highlight scala %} +// Define our own version of partitionBy to work around SPARK-1931 +import org.apache.spark.HashPartitioner +def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = { + val numPartitions = edges.partitions.size + edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e)) + .partitionBy(new HashPartitioner(numPartitions)) + .mapPartitions(_.map(_._2), preservesPartitioning = true) +} + +val vertices = ... +val edges = ... + +// Instead of: +val g = Graph(vertices, edges).partitionBy(PartitionStrategy.EdgePartition2D) // broken in Spark 1.0.0 + +// Use: +val g = Graph(vertices, partitionBy(edges, PartitionStrategy.EdgePartition2D)) +{% endhighlight %} + + # Getting Started To get started you first need to import Spark and GraphX into your project, as follows: @@ -315,6 +341,8 @@ class Graph[VD, ED] { def cache(): Graph[VD, ED] def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] // Change the partitioning heuristic ============================================================ + // - WARNING: partitionBy is broken in Spark 1.0.0 due to SPARK-1931. + // See the section "Workaround for Graph.partitionBy in Spark 1.0.0" above. def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] // Transform vertex and edge attributes ========================================================== def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED] @@ -796,7 +824,7 @@ println(sssp.vertices.collect.mkString("\n")) # Graph Builders -GraphX provides several ways of building a graph from a collection of vertices and edges in an RDD or on disk. None of the graph builders repartitions the graph's edges by default; instead, edges are left in their default partitions (such as their original blocks in HDFS). [`Graph.groupEdges`][Graph.groupEdges] requires the graph to be repartitioned because it assumes identical edges will be colocated on the same partition, so you must call [`Graph.partitionBy`][Graph.partitionBy] before calling `groupEdges`. +GraphX provides several ways of building a graph from a collection of vertices and edges in an RDD or on disk. None of the graph builders repartitions the graph's edges by default; instead, edges are left in their default partitions (such as their original blocks in HDFS). [`Graph.groupEdges`][Graph.groupEdges] requires the graph to be repartitioned because it assumes identical edges will be colocated on the same partition, so you must call [`Graph.partitionBy`][Graph.partitionBy] before calling `groupEdges`. However, note that `Graph.partitionBy` is broken in Spark 1.0.0 due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931); see the [suggested workarounds](#partitionBy_workaround) above. {% highlight scala %} object GraphLoader { @@ -953,10 +981,12 @@ Rather than splitting graphs along edges, GraphX partitions the graph along vert reduce both the communication and storage overhead. Logically, this corresponds to assigning edges to machines and allowing vertices to span multiple machines. The exact method of assigning edges depends on the [`PartitionStrategy`][PartitionStrategy] and there are several tradeoffs to the -various heuristics. Users can choose between different strategies by repartitioning the graph with -the [`Graph.partitionBy`][Graph.partitionBy] operator. The default partitioning strategy is to use -the initial partitioning of the edges as provided on graph construction. However, users can easily -switch to 2D-partitioning or other heuristics included in GraphX. +various heuristics. The default partitioning strategy is to use the initial partitioning of the +edges as provided on graph construction. However, users can easily switch to 2D-partitioning or +other heuristics included in GraphX. + +Users can choose between different strategies by repartitioning the graph with +the [`Graph.partitionBy`][Graph.partitionBy] operator. However, note that `Graph.partitionBy` is broken in Spark 1.0.0 due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931); see the [suggested workarounds](#partitionBy_workaround) above. [Graph.partitionBy]: api/scala/index.html#org.apache.spark.graphx.Graph$@partitionBy(partitionStrategy:org.apache.spark.graphx.PartitionStrategy):org.apache.spark.graphx.Graph[VD,ED] @@ -1033,14 +1063,26 @@ println(ccByUsername.collect().mkString("\n")) ## Triangle Counting -A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].* +A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].* Also note that `Graph.partitionBy` is broken in Spark 1.0.0 due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931); see the [suggested workarounds](#partitionBy_workaround) above. [TriangleCount]: api/scala/index.html#org.apache.spark.graphx.lib.TriangleCount$ [Graph.partitionBy]: api/scala/index.html#org.apache.spark.graphx.Graph@partitionBy(PartitionStrategy):Graph[VD,ED] {% highlight scala %} +// Define our own version of partitionBy to work around SPARK-1931 +import org.apache.spark.HashPartitioner +def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = { + val numPartitions = edges.partitions.size + edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e)) + .partitionBy(new HashPartitioner(numPartitions)) + .mapPartitions(_.map(_._2), preservesPartitioning = true) +} + // Load the edges in canonical order and partition the graph for triangle count -val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut) +val unpartitionedGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true) +val graph = Graph( + partitionBy(unpartitionedGraph.edges, PartitionStrategy.RandomVertexCut), + unpartitionedGraph.vertices) // Find the triangle count for each vertex val triCounts = graph.triangleCount().vertices // Join the triangle counts with the usernames diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index dc5dac4fdad57..57feb4876ca03 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -103,8 +103,34 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab */ def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] + /** @define dot */ /** - * Repartitions the edges in the graph according to `partitionStrategy`. + * Repartitions the edges in the graph according to `partitionStrategy` (WARNING: broken in + * Spark 1\u20240\u20240). + * + * To use this function in Spark 1.0.0, either build the latest version of Spark from the master + * branch, or apply the following workaround: + * {{{ + * // Define our own version of partitionBy to work around SPARK-1931 + * import org.apache.spark.HashPartitioner + * def partitionBy[ED]( + * edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = { + * val numPartitions = edges.partitions.size + * edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e)) + * .partitionBy(new HashPartitioner(numPartitions)) + * .mapPartitions(_.map(_._2), preservesPartitioning = true) + * } + * + * val vertices = ... + * val edges = ... + * + * // Instead of: + * val g = Graph(vertices, edges) + * .partitionBy(PartitionStrategy.EdgePartition2D) // broken in Spark 1.0.0 + * + * // Use: + * val g = Graph(vertices, partitionBy(edges, PartitionStrategy.EdgePartition2D)) + * }}} */ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] From 5809279dc58e144c4069d5d0d18eebe1e3a7f95c Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 28 May 2014 18:09:41 -0700 Subject: [PATCH 2/3] Remove unnecessary comment --- graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 57feb4876ca03..1c9a1fd24eb91 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -103,7 +103,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab */ def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] - /** @define dot */ /** * Repartitions the edges in the graph according to `partitionStrategy` (WARNING: broken in * Spark 1\u20240\u20240). From fbd7a12776fc9dbe9bb0c0425e586dd084c64255 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 28 May 2014 21:54:37 -0700 Subject: [PATCH 3/3] Point to branch-1.0 instead of master for fix --- docs/graphx-programming-guide.md | 2 +- graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index a446250fe85a3..f8bcda5f84455 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -95,7 +95,7 @@ GraphX in Spark {{site.SPARK_VERSION}} contains one user-facing interface change ## Workaround for `Graph.partitionBy` in Spark 1.0.0 -The [`Graph.partitionBy`][Graph.partitionBy] operator allows users to choose the graph partitioning strategy, but due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931), this method is broken in Spark 1.0.0. We encourage users to build the latest version of Spark from the master branch, which contains a fix. Alternatively, a workaround is to partition the edges before constructing the graph, as follows: +The [`Graph.partitionBy`][Graph.partitionBy] operator allows users to choose the graph partitioning strategy, but due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931), this method is broken in Spark 1.0.0. We encourage users to build the latest version of Spark from [`branch-1.0`](https://github.com/apache/spark/tree/branch-1.0), which contains a fix. Alternatively, a workaround is to partition the edges before constructing the graph, as follows: {% highlight scala %} // Define our own version of partitionBy to work around SPARK-1931 diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 1c9a1fd24eb91..44cf77b9cacde 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -107,8 +107,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * Repartitions the edges in the graph according to `partitionStrategy` (WARNING: broken in * Spark 1\u20240\u20240). * - * To use this function in Spark 1.0.0, either build the latest version of Spark from the master - * branch, or apply the following workaround: + * To use this function in Spark 1.0.0, either build the latest version of Spark from + * [[https://github.com/apache/spark/tree/branch-1.0 branch-1.0]], or apply the following + * workaround: * {{{ * // Define our own version of partitionBy to work around SPARK-1931 * import org.apache.spark.HashPartitioner