Skip to content

Commit d749f6d

Browse files
committed
Merge pull request #2 from mengxr/SPARK-12363
use Graph instead of GraphImpl and update tests/example based on PIC paper
2 parents 4c7623f + c7ff1e6 commit d749f6d

File tree

3 files changed

+71
-78
lines changed

3 files changed

+71
-78
lines changed

examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -40,27 +40,23 @@ import org.apache.spark.{SparkConf, SparkContext}
4040
* n: Number of sampled points on innermost circle.. There are proportionally more points
4141
* within the outer/larger circles
4242
* maxIterations: Number of Power Iterations
43-
* outerRadius: radius of the outermost of the concentric circles
4443
* }}}
4544
*
4645
* Here is a sample run and output:
4746
*
48-
* ./bin/run-example mllib.PowerIterationClusteringExample -k 3 --n 30 --maxIterations 15
49-
*
50-
* Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14],
51-
* 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
47+
* ./bin/run-example mllib.PowerIterationClusteringExample -k 2 --n 10 --maxIterations 15
5248
*
49+
* Cluster assignments: 1 -> [0,1,2,3,4,5,6,7,8,9],
50+
* 0 -> [10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
5351
*
5452
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
5553
*/
5654
object PowerIterationClusteringExample {
5755

5856
case class Params(
59-
input: String = null,
60-
k: Int = 3,
61-
numPoints: Int = 5,
62-
maxIterations: Int = 10,
63-
outerRadius: Double = 3.0
57+
k: Int = 2,
58+
numPoints: Int = 10,
59+
maxIterations: Int = 15
6460
) extends AbstractParams[Params]
6561

6662
def main(args: Array[String]) {
@@ -69,17 +65,14 @@ object PowerIterationClusteringExample {
6965
val parser = new OptionParser[Params]("PowerIterationClusteringExample") {
7066
head("PowerIterationClusteringExample: an example PIC app using concentric circles.")
7167
opt[Int]('k', "k")
72-
.text(s"number of circles (/clusters), default: ${defaultParams.k}")
68+
.text(s"number of circles (clusters), default: ${defaultParams.k}")
7369
.action((x, c) => c.copy(k = x))
7470
opt[Int]('n', "n")
7571
.text(s"number of points in smallest circle, default: ${defaultParams.numPoints}")
7672
.action((x, c) => c.copy(numPoints = x))
7773
opt[Int]("maxIterations")
7874
.text(s"number of iterations, default: ${defaultParams.maxIterations}")
7975
.action((x, c) => c.copy(maxIterations = x))
80-
opt[Double]('r', "r")
81-
.text(s"radius of outermost circle, default: ${defaultParams.outerRadius}")
82-
.action((x, c) => c.copy(outerRadius = x))
8376
}
8477

8578
parser.parse(args, defaultParams).map { params =>
@@ -97,20 +90,21 @@ object PowerIterationClusteringExample {
9790

9891
Logger.getRootLogger.setLevel(Level.WARN)
9992

100-
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius)
93+
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
10194
val model = new PowerIterationClustering()
10295
.setK(params.k)
10396
.setMaxIterations(params.maxIterations)
97+
.setInitializationMode("degree")
10498
.run(circlesRdd)
10599

106100
val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
107-
val assignments = clusters.toList.sortBy { case (k, v) => v.length}
101+
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
108102
val assignmentsStr = assignments
109103
.map { case (k, v) =>
110104
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
111-
}.mkString(",")
105+
}.mkString(", ")
112106
val sizesStr = assignments.map {
113-
_._2.size
107+
_._2.length
114108
}.sorted.mkString("(", ",", ")")
115109
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
116110

@@ -124,20 +118,17 @@ object PowerIterationClusteringExample {
124118
}
125119
}
126120

127-
def generateCirclesRdd(sc: SparkContext,
128-
nCircles: Int = 3,
129-
nPoints: Int = 30,
130-
outerRadius: Double): RDD[(Long, Long, Double)] = {
131-
132-
val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)}
133-
val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints}
134-
val points = (0 until nCircles).flatMap { cx =>
135-
generateCircle(radii(cx), groupSizes(cx))
121+
def generateCirclesRdd(
122+
sc: SparkContext,
123+
nCircles: Int,
124+
nPoints: Int): RDD[(Long, Long, Double)] = {
125+
val points = (1 to nCircles).flatMap { i =>
126+
generateCircle(i, i * nPoints)
136127
}.zipWithIndex
137128
val rdd = sc.parallelize(points)
138129
val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1)) =>
139130
if (i0 < i1) {
140-
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0)))
131+
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1))))
141132
} else {
142133
None
143134
}
@@ -148,11 +139,9 @@ object PowerIterationClusteringExample {
148139
/**
149140
* Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel
150141
*/
151-
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double): Double = {
152-
val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma)
153-
val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0)
142+
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double)): Double = {
154143
val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)
155-
coeff * math.exp(expCoeff * ssquares)
144+
math.exp(-ssquares / 2.0)
156145
}
157146
}
158147
// scalastyle:on println

mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import org.json4s.jackson.JsonMethods._
2424
import org.apache.spark.annotation.Since
2525
import org.apache.spark.api.java.JavaRDD
2626
import org.apache.spark.graphx._
27-
import org.apache.spark.graphx.impl.GraphImpl
2827
import org.apache.spark.mllib.linalg.Vectors
2928
import org.apache.spark.mllib.util.{Loader, MLUtils, Saveable}
3029
import org.apache.spark.rdd.RDD
@@ -262,10 +261,10 @@ object PowerIterationClustering extends Logging {
262261
},
263262
mergeMsg = _ + _,
264263
TripletFields.EdgeOnly)
265-
GraphImpl.fromExistingRDDs(vD, graph.edges)
264+
Graph(vD, graph.edges)
266265
.mapTriplets(
267266
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
268-
TripletFields.All)
267+
new TripletFields(/* useSrc */ true, /* useDst */ false, /* useEdge */ true))
269268
}
270269

271270
/**
@@ -291,10 +290,10 @@ object PowerIterationClustering extends Logging {
291290
},
292291
mergeMsg = _ + _,
293292
TripletFields.EdgeOnly)
294-
GraphImpl.fromExistingRDDs(vD, gA.edges)
293+
Graph(vD, gA.edges)
295294
.mapTriplets(
296295
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
297-
TripletFields.Src)
296+
new TripletFields(/* useSrc */ true, /* useDst */ false, /* useEdge */ true))
298297
}
299298

300299
/**
@@ -315,7 +314,7 @@ object PowerIterationClustering extends Logging {
315314
}, preservesPartitioning = true).cache()
316315
val sum = r.values.map(math.abs).sum()
317316
val v0 = r.mapValues(x => x / sum)
318-
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
317+
Graph(VertexRDD(v0), g.edges)
319318
}
320319

321320
/**
@@ -330,7 +329,7 @@ object PowerIterationClustering extends Logging {
330329
def initDegreeVector(g: Graph[Double, Double]): Graph[Double, Double] = {
331330
val sum = g.vertices.values.sum()
332331
val v0 = g.vertices.mapValues(_ / sum)
333-
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
332+
Graph(VertexRDD(v0), g.edges)
334333
}
335334

336335
/**
@@ -355,7 +354,7 @@ object PowerIterationClustering extends Logging {
355354
val v = curG.aggregateMessages[Double](
356355
sendMsg = ctx => ctx.sendToSrc(ctx.attr * ctx.dstAttr),
357356
mergeMsg = _ + _,
358-
TripletFields.Dst).cache()
357+
new TripletFields(/* useSrc */ false, /* useDst */ true, /* useEdge */ true)).cache()
359358
// normalize v
360359
val norm = v.values.map(math.abs).sum()
361360
logInfo(s"$msgPrefix: norm(v) = $norm.")
@@ -368,7 +367,7 @@ object PowerIterationClustering extends Logging {
368367
diffDelta = math.abs(delta - prevDelta)
369368
logInfo(s"$msgPrefix: diff(delta) = $diffDelta.")
370369
// update v
371-
curG = GraphImpl.fromExistingRDDs(VertexRDD(v1), g.edges)
370+
curG = Graph(VertexRDD(v1), g.edges)
372371
prevDelta = delta
373372
}
374373
curG.vertices

mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala

Lines changed: 42 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,62 +30,65 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
3030

3131
import org.apache.spark.mllib.clustering.PowerIterationClustering._
3232

33+
/** Generates a circle of points. */
34+
private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
35+
Array.tabulate(n) { i =>
36+
val theta = 2.0 * math.Pi * i / n
37+
(r * math.cos(theta), r * math.sin(theta))
38+
}
39+
}
40+
41+
/** Computes Gaussian similarity. */
42+
private def sim(x: (Double, Double), y: (Double, Double)): Double = {
43+
val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2)
44+
math.exp(-dist2 / 2.0)
45+
}
46+
3347
test("power iteration clustering") {
34-
/*
35-
We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
36-
edge (3, 4).
37-
38-
15-14 -13 -12
39-
| |
40-
4 . 3 - 2 11
41-
| | x | |
42-
5 0 - 1 10
43-
| |
44-
6 - 7 - 8 - 9
45-
*/
48+
// Generate two circles following the example in the PIC paper.
49+
val r1 = 1.0
50+
val n1 = 10
51+
val r2 = 4.0
52+
val n2 = 40
53+
val n = n1 + n2
54+
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
55+
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
56+
(i.toLong, j.toLong, sim(points(i), points(j)))
57+
}
4658

47-
val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
48-
(1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
49-
(4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
50-
(10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
5159
val model = new PowerIterationClustering()
5260
.setK(2)
61+
.setMaxIterations(40)
5362
.run(sc.parallelize(similarities, 2))
5463
val predictions = Array.fill(2)(mutable.Set.empty[Long])
5564
model.assignments.collect().foreach { a =>
5665
predictions(a.cluster) += a.id
5766
}
58-
assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
67+
assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
5968

6069
val model2 = new PowerIterationClustering()
6170
.setK(2)
71+
.setMaxIterations(10)
6272
.setInitializationMode("degree")
6373
.run(sc.parallelize(similarities, 2))
6474
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
6575
model2.assignments.collect().foreach { a =>
6676
predictions2(a.cluster) += a.id
6777
}
68-
assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
78+
assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
6979
}
7080

7181
test("power iteration clustering on graph") {
72-
/*
73-
We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
74-
edge (3, 4).
75-
76-
15-14 -13 -12
77-
| |
78-
4 . 3 - 2 11
79-
| | x | |
80-
5 0 - 1 10
81-
| |
82-
6 - 7 - 8 - 9
83-
*/
84-
85-
val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
86-
(1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
87-
(4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
88-
(10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
82+
// Generate two circles following the example in the PIC paper.
83+
val r1 = 1.0
84+
val n1 = 10
85+
val r2 = 4.0
86+
val n2 = 40
87+
val n = n1 + n2
88+
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
89+
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
90+
(i.toLong, j.toLong, sim(points(i), points(j)))
91+
}
8992

9093
val edges = similarities.flatMap { case (i, j, s) =>
9194
if (i != j) {
@@ -98,22 +101,24 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
98101

99102
val model = new PowerIterationClustering()
100103
.setK(2)
104+
.setMaxIterations(40)
101105
.run(graph)
102106
val predictions = Array.fill(2)(mutable.Set.empty[Long])
103107
model.assignments.collect().foreach { a =>
104108
predictions(a.cluster) += a.id
105109
}
106-
assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
110+
assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
107111

108112
val model2 = new PowerIterationClustering()
109113
.setK(2)
114+
.setMaxIterations(10)
110115
.setInitializationMode("degree")
111116
.run(sc.parallelize(similarities, 2))
112117
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
113118
model2.assignments.collect().foreach { a =>
114119
predictions2(a.cluster) += a.id
115120
}
116-
assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
121+
assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
117122
}
118123

119124
test("normalize and powerIter") {

0 commit comments

Comments
 (0)