Skip to content

Commit 4c176d9

Browse files
rnowlingjkbradley
authored andcommitted
[SPARK-12450][MLLIB] Un-persist broadcasted variables in KMeans
SPARK-12450 . Un-persist broadcasted variables in KMeans. Author: RJ Nowling <[email protected]> Closes #10415 from rnowling/spark-12450. (cherry picked from commit 78015a8) Signed-off-by: Joseph K. Bradley <[email protected]>
1 parent ebf350f commit 4c176d9

File tree

1 file changed

+8
-0
lines changed
  • mllib/src/main/scala/org/apache/spark/mllib/clustering

1 file changed

+8
-0
lines changed

mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,8 @@ class KMeans private (
301301
contribs.iterator
302302
}.reduceByKey(mergeContribs).collectAsMap()
303303

304+
bcActiveCenters.unpersist(blocking = false)
305+
304306
// Update the cluster centers and costs for each active run
305307
for ((run, i) <- activeRuns.zipWithIndex) {
306308
var changed = false
@@ -419,7 +421,10 @@ class KMeans private (
419421
s0
420422
}
421423
)
424+
425+
bcNewCenters.unpersist(blocking = false)
422426
preCosts.unpersist(blocking = false)
427+
423428
val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) =>
424429
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
425430
pointsWithCosts.flatMap { case (p, c) =>
@@ -448,6 +453,9 @@ class KMeans private (
448453
((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0)
449454
}
450455
}.reduceByKey(_ + _).collectAsMap()
456+
457+
bcCenters.unpersist(blocking = false)
458+
451459
val finalCenters = (0 until runs).par.map { r =>
452460
val myCenters = centers(r).toArray
453461
val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray

0 commit comments

Comments
 (0)