Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 49 additions & 7 deletions docs/graphx-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,32 @@ GraphX in Spark {{site.SPARK_VERSION}} contains one user-facing interface change

[EdgeRDD]: api/scala/index.html#org.apache.spark.graphx.EdgeRDD

## Workaround for `Graph.partitionBy` in Spark 1.0.0
<a name="partitionBy_workaround"></a>

The [`Graph.partitionBy`][Graph.partitionBy] operator allows users to choose the graph partitioning strategy, but due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931), this method is broken in Spark 1.0.0. We encourage users to build the latest version of Spark from [`branch-1.0`](https://github.com/apache/spark/tree/branch-1.0), which contains a fix. Alternatively, a workaround is to partition the edges before constructing the graph, as follows:

{% highlight scala %}
// Define our own version of partitionBy to work around SPARK-1931
import org.apache.spark.HashPartitioner
def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = {
val numPartitions = edges.partitions.size
edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e))
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitions(_.map(_._2), preservesPartitioning = true)
}

val vertices = ...
val edges = ...

// Instead of:
val g = Graph(vertices, edges).partitionBy(PartitionStrategy.EdgePartition2D) // broken in Spark 1.0.0

// Use:
val g = Graph(vertices, partitionBy(edges, PartitionStrategy.EdgePartition2D))
{% endhighlight %}


# Getting Started

To get started you first need to import Spark and GraphX into your project, as follows:
Expand Down Expand Up @@ -315,6 +341,8 @@ class Graph[VD, ED] {
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
// Change the partitioning heuristic ============================================================
// - WARNING: partitionBy is broken in Spark 1.0.0 due to SPARK-1931.
// See the section "Workaround for Graph.partitionBy in Spark 1.0.0" above.
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
Expand Down Expand Up @@ -796,7 +824,7 @@ println(sssp.vertices.collect.mkString("\n"))
# Graph Builders
<a name="graph_builders"></a>

GraphX provides several ways of building a graph from a collection of vertices and edges in an RDD or on disk. None of the graph builders repartitions the graph's edges by default; instead, edges are left in their default partitions (such as their original blocks in HDFS). [`Graph.groupEdges`][Graph.groupEdges] requires the graph to be repartitioned because it assumes identical edges will be colocated on the same partition, so you must call [`Graph.partitionBy`][Graph.partitionBy] before calling `groupEdges`.
GraphX provides several ways of building a graph from a collection of vertices and edges in an RDD or on disk. None of the graph builders repartitions the graph's edges by default; instead, edges are left in their default partitions (such as their original blocks in HDFS). [`Graph.groupEdges`][Graph.groupEdges] requires the graph to be repartitioned because it assumes identical edges will be colocated on the same partition, so you must call [`Graph.partitionBy`][Graph.partitionBy] before calling `groupEdges`. However, note that `Graph.partitionBy` is broken in Spark 1.0.0 due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931); see the [suggested workarounds](#partitionBy_workaround) above.

{% highlight scala %}
object GraphLoader {
Expand Down Expand Up @@ -953,10 +981,12 @@ Rather than splitting graphs along edges, GraphX partitions the graph along vert
reduce both the communication and storage overhead. Logically, this corresponds to assigning edges
to machines and allowing vertices to span multiple machines. The exact method of assigning edges
depends on the [`PartitionStrategy`][PartitionStrategy] and there are several tradeoffs to the
various heuristics. Users can choose between different strategies by repartitioning the graph with
the [`Graph.partitionBy`][Graph.partitionBy] operator. The default partitioning strategy is to use
the initial partitioning of the edges as provided on graph construction. However, users can easily
switch to 2D-partitioning or other heuristics included in GraphX.
various heuristics. The default partitioning strategy is to use the initial partitioning of the
edges as provided on graph construction. However, users can easily switch to 2D-partitioning or
other heuristics included in GraphX.

Users can choose between different strategies by repartitioning the graph with
the [`Graph.partitionBy`][Graph.partitionBy] operator. However, note that `Graph.partitionBy` is broken in Spark 1.0.0 due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931); see the [suggested workarounds](#partitionBy_workaround) above.

[Graph.partitionBy]: api/scala/index.html#org.apache.spark.graphx.Graph$@partitionBy(partitionStrategy:org.apache.spark.graphx.PartitionStrategy):org.apache.spark.graphx.Graph[VD,ED]

Expand Down Expand Up @@ -1033,14 +1063,26 @@ println(ccByUsername.collect().mkString("\n"))

## Triangle Counting

A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].*
A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].* Also note that `Graph.partitionBy` is broken in Spark 1.0.0 due to [SPARK-1931](https://issues.apache.org/jira/browse/SPARK-1931); see the [suggested workarounds](#partitionBy_workaround) above.

[TriangleCount]: api/scala/index.html#org.apache.spark.graphx.lib.TriangleCount$
[Graph.partitionBy]: api/scala/index.html#org.apache.spark.graphx.Graph@partitionBy(PartitionStrategy):Graph[VD,ED]

{% highlight scala %}
// Define our own version of partitionBy to work around SPARK-1931
import org.apache.spark.HashPartitioner
def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = {
val numPartitions = edges.partitions.size
edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e))
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitions(_.map(_._2), preservesPartitioning = true)
}

// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
val unpartitionedGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true)
val graph = Graph(
partitionBy(unpartitionedGraph.edges, PartitionStrategy.RandomVertexCut),
unpartitionedGraph.vertices)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
Expand Down
28 changes: 27 additions & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,33 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]

/**
* Repartitions the edges in the graph according to `partitionStrategy`.
* Repartitions the edges in the graph according to `partitionStrategy` (WARNING: broken in
* Spark 1\u20240\u20240).
*
* To use this function in Spark 1.0.0, either build the latest version of Spark from
* [[https://github.com/apache/spark/tree/branch-1.0 branch-1.0]], or apply the following
* workaround:
* {{{
* // Define our own version of partitionBy to work around SPARK-1931
* import org.apache.spark.HashPartitioner
* def partitionBy[ED](
* edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = {
* val numPartitions = edges.partitions.size
* edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e))
* .partitionBy(new HashPartitioner(numPartitions))
* .mapPartitions(_.map(_._2), preservesPartitioning = true)
* }
*
* val vertices = ...
* val edges = ...
*
* // Instead of:
* val g = Graph(vertices, edges)
* .partitionBy(PartitionStrategy.EdgePartition2D) // broken in Spark 1.0.0
*
* // Use:
* val g = Graph(vertices, partitionBy(edges, PartitionStrategy.EdgePartition2D))
* }}}
*/
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

Expand Down