Skip to content
9 changes: 9 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ abstract class VertexRDD[VD](
*/
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]

/**
* For each vertex present in both `this` and `other`, `diff` returns only those vertices with
* differing values; for values that are different, keeps the values from `other`. This is
* only guaranteed to work if the VertexRDDs share a common ancestor.
*
* @param other the other RDD[(VertexId, VD)] with which to diff against.
*/
def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]

/**
* For each vertex present in both `this` and `other`, `diff` returns only those vertices with
* differing values; for values that are different, keeps the values from `other`. This is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class VertexRDDImpl[VD] private[graphx] (
override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))

override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
}

override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
val otherPartition = other match {
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
Expand Down
13 changes: 13 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.graphx
import org.scalatest.FunSuite

import org.apache.spark.{HashPartitioner, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

class VertexRDDSuite extends FunSuite with LocalSparkContext {
Expand Down Expand Up @@ -58,6 +59,18 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
}
}

test("diff with RDD[(VertexId, VD)]") {
withSpark { sc =>
val n = 100
val verts = vertices(sc, n).cache()
val flipEvens: RDD[(VertexId, Int)] =
sc.parallelize(0L to 100L)
.map(id => if (id % 2 == 0) (id, -id.toInt) else (id, id.toInt)).cache()
// diff should keep only the changed vertices
assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet)
}
}

test("diff vertices with the non-equal number of partitions") {
withSpark { sc =>
val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => (i.toLong, 0)))
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock")
) ++ Seq(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed -- does it actually remove a version of a diff method that exists?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still needed. I double checked with ./dev/mima and received:

[error]  * abstract method diff(org.apache.spark.rdd.RDD)org.apache.spark.graphx.VertexRDD in class org.apache.spark.graphx.VertexRDD does not have a correspondent in old version
[error]    filter with: ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, right, this is what @ankurdave was saying was an acceptable API change since this isn't meant to be extended anyway and can't technically be extended because a class the API uses isn't public. I'd still rather let him or @jegonzal push the final button as it's touching the API of GraphX.

// SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, VD)]) to VertexRDD
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff")
)

case v if v.startsWith("1.2") =>
Expand Down