diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 42ab27bf55ccf..f8bcda5f84455 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -92,6 +92,32 @@ GraphX in Spark {{site.SPARK_VERSION}} contains one user-facing interface change [EdgeRDD]: api/scala/index.html#org.apache.spark.graphx.EdgeRDD +## Workaround for `Graph.partitionBy` in Spark 1.0.0 + + +The [`Graph.partitionBy`][Graph.partitionBy] operator allows users to choose the graph partitioning strategy, but due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931), this method is broken in Spark 1.0.0. We encourage users to build the latest version of Spark from [`branch-1.0`](https://github.com/apache/spark/tree/branch-1.0), which contains a fix. Alternatively, a workaround is to partition the edges before constructing the graph, as follows: + +{% highlight scala %} +// Define our own version of partitionBy to work around SPARK-1931 +import org.apache.spark.HashPartitioner +def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = { + val numPartitions = edges.partitions.size + edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e)) + .partitionBy(new HashPartitioner(numPartitions)) + .mapPartitions(_.map(_._2), preservesPartitioning = true) +} + +val vertices = ... +val edges = ... + +// Instead of: +val g = Graph(vertices, edges).partitionBy(PartitionStrategy.EdgePartition2D) // broken in Spark 1.0.0 + +// Use: +val g = Graph(vertices, partitionBy(edges, PartitionStrategy.EdgePartition2D)) +{% endhighlight %} + + # Getting Started To get started you first need to import Spark and GraphX into your project, as follows: @@ -315,6 +341,8 @@ class Graph[VD, ED] { def cache(): Graph[VD, ED] def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] // Change the partitioning heuristic ============================================================ + // - WARNING: partitionBy is broken in Spark 1.0.0 due to SPARK-1931. + // See the section "Workaround for Graph.partitionBy in Spark 1.0.0" above. def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] // Transform vertex and edge attributes ========================================================== def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED] @@ -796,7 +824,7 @@ println(sssp.vertices.collect.mkString("\n")) # Graph Builders -GraphX provides several ways of building a graph from a collection of vertices and edges in an RDD or on disk. None of the graph builders repartitions the graph's edges by default; instead, edges are left in their default partitions (such as their original blocks in HDFS). [`Graph.groupEdges`][Graph.groupEdges] requires the graph to be repartitioned because it assumes identical edges will be colocated on the same partition, so you must call [`Graph.partitionBy`][Graph.partitionBy] before calling `groupEdges`. +GraphX provides several ways of building a graph from a collection of vertices and edges in an RDD or on disk. None of the graph builders repartitions the graph's edges by default; instead, edges are left in their default partitions (such as their original blocks in HDFS). [`Graph.groupEdges`][Graph.groupEdges] requires the graph to be repartitioned because it assumes identical edges will be colocated on the same partition, so you must call [`Graph.partitionBy`][Graph.partitionBy] before calling `groupEdges`. However, note that `Graph.partitionBy` is broken in Spark 1.0.0 due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931); see the [suggested workarounds](#partitionBy_workaround) above. {% highlight scala %} object GraphLoader { @@ -953,10 +981,12 @@ Rather than splitting graphs along edges, GraphX partitions the graph along vert reduce both the communication and storage overhead. Logically, this corresponds to assigning edges to machines and allowing vertices to span multiple machines. The exact method of assigning edges depends on the [`PartitionStrategy`][PartitionStrategy] and there are several tradeoffs to the -various heuristics. Users can choose between different strategies by repartitioning the graph with -the [`Graph.partitionBy`][Graph.partitionBy] operator. The default partitioning strategy is to use -the initial partitioning of the edges as provided on graph construction. However, users can easily -switch to 2D-partitioning or other heuristics included in GraphX. +various heuristics. The default partitioning strategy is to use the initial partitioning of the +edges as provided on graph construction. However, users can easily switch to 2D-partitioning or +other heuristics included in GraphX. + +Users can choose between different strategies by repartitioning the graph with +the [`Graph.partitionBy`][Graph.partitionBy] operator. However, note that `Graph.partitionBy` is broken in Spark 1.0.0 due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931); see the [suggested workarounds](#partitionBy_workaround) above. [Graph.partitionBy]: api/scala/index.html#org.apache.spark.graphx.Graph$@partitionBy(partitionStrategy:org.apache.spark.graphx.PartitionStrategy):org.apache.spark.graphx.Graph[VD,ED] @@ -1033,14 +1063,26 @@ println(ccByUsername.collect().mkString("\n")) ## Triangle Counting -A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].* +A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].* Also note that `Graph.partitionBy` is broken in Spark 1.0.0 due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931); see the [suggested workarounds](#partitionBy_workaround) above. [TriangleCount]: api/scala/index.html#org.apache.spark.graphx.lib.TriangleCount$ [Graph.partitionBy]: api/scala/index.html#org.apache.spark.graphx.Graph@partitionBy(PartitionStrategy):Graph[VD,ED] {% highlight scala %} +// Define our own version of partitionBy to work around SPARK-1931 +import org.apache.spark.HashPartitioner +def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = { + val numPartitions = edges.partitions.size + edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e)) + .partitionBy(new HashPartitioner(numPartitions)) + .mapPartitions(_.map(_._2), preservesPartitioning = true) +} + // Load the edges in canonical order and partition the graph for triangle count -val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut) +val unpartitionedGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true) +val graph = Graph( + partitionBy(unpartitionedGraph.edges, PartitionStrategy.RandomVertexCut), + unpartitionedGraph.vertices) // Find the triangle count for each vertex val triCounts = graph.triangleCount().vertices // Join the triangle counts with the usernames diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index dc5dac4fdad57..44cf77b9cacde 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -104,7 +104,33 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] /** - * Repartitions the edges in the graph according to `partitionStrategy`. + * Repartitions the edges in the graph according to `partitionStrategy` (WARNING: broken in + * Spark 1\u20240\u20240). + * + * To use this function in Spark 1.0.0, either build the latest version of Spark from + * [[https://github.com/apache/spark/tree/branch-1.0 branch-1.0]], or apply the following + * workaround: + * {{{ + * // Define our own version of partitionBy to work around SPARK-1931 + * import org.apache.spark.HashPartitioner + * def partitionBy[ED]( + * edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = { + * val numPartitions = edges.partitions.size + * edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e)) + * .partitionBy(new HashPartitioner(numPartitions)) + * .mapPartitions(_.map(_._2), preservesPartitioning = true) + * } + * + * val vertices = ... + * val edges = ... + * + * // Instead of: + * val g = Graph(vertices, edges) + * .partitionBy(PartitionStrategy.EdgePartition2D) // broken in Spark 1.0.0 + * + * // Use: + * val g = Graph(vertices, partitionBy(edges, PartitionStrategy.EdgePartition2D)) + * }}} */ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]