diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
index 2c30a1d9aa947..71c0ccf2c65f1 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
@@ -97,13 +97,15 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has
def getNeighborsCol: String = $(neighborsCol)
/**
- * Param for the name of the input column for neighbors in the adjacency list representation.
+ * Param for the name of the input column for non-negative weights (similarities) of edges
+ * between the vertex in `idCol` and each neighbor in `neighborsCol`.
* Default: "similarities"
* @group param
*/
@Since("2.4.0")
val similaritiesCol = new Param[String](this, "similaritiesCol",
- "Name of the input column for neighbors in the adjacency list representation.",
+ "Name of the input column for non-negative weights (similarities) of edges between the " +
+ "vertex in `idCol` and each neighbor in `neighborsCol`.",
(value: String) => value.nonEmpty)
setDefault(similaritiesCol, "similarities")
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index b3d5fb17f6b81..317f24d4be81f 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -19,14 +19,14 @@
from pyspark import since, keyword_only
from pyspark.ml.util import *
-from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper
+from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, JavaTransformer, JavaWrapper
from pyspark.ml.param.shared import *
from pyspark.ml.common import inherit_doc
__all__ = ['BisectingKMeans', 'BisectingKMeansModel', 'BisectingKMeansSummary',
'KMeans', 'KMeansModel',
'GaussianMixture', 'GaussianMixtureModel', 'GaussianMixtureSummary',
- 'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel']
+ 'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel', 'PowerIterationClustering']
class ClusteringSummary(JavaWrapper):
@@ -836,7 +836,7 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed, HasCheckpointInter
Terminology:
- - "term" = "word": an el
+ - "term" = "word": an element of the vocabulary
- "token": instance of a term appearing in a document
- "topic": multinomial distribution over terms representing some concept
- "document": one piece of text, corresponding to one row in the input data
@@ -938,7 +938,7 @@ def __init__(self, featuresCol="features", maxIter=20, seed=None, checkpointInte
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
subsamplingRate=0.05, optimizeDocConcentration=True,\
docConcentration=None, topicConcentration=None,\
- topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
+ topicDistributionCol="topicDistribution", keepLastCheckpoint=True)
"""
super(LDA, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.LDA", self.uid)
@@ -967,7 +967,7 @@ def setParams(self, featuresCol="features", maxIter=20, seed=None, checkpointInt
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
subsamplingRate=0.05, optimizeDocConcentration=True,\
docConcentration=None, topicConcentration=None,\
- topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
+ topicDistributionCol="topicDistribution", keepLastCheckpoint=True)
Sets params for LDA.
"""
@@ -1156,6 +1156,205 @@ def getKeepLastCheckpoint(self):
return self.getOrDefault(self.keepLastCheckpoint)
+@inherit_doc
+class PowerIterationClustering(HasMaxIter, HasPredictionCol, JavaTransformer, JavaParams,
+ JavaMLReadable, JavaMLWritable):
+ """
+ .. note:: Experimental
+
+ Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by
+ Lin and Cohen. From the abstract:
+ PIC finds a very low-dimensional embedding of a dataset using truncated power
+ iteration on a normalized pair-wise similarity matrix of the data.
+
+ PIC takes an affinity matrix between items (or vertices) as input. An affinity matrix
+ is a symmetric matrix whose entries are non-negative similarities between items.
+ PIC takes this matrix (or graph) as an adjacency matrix. Specifically, each input row
+ includes:
+
+ - :py:attr:`idCol`: vertex ID
+ - :py:attr:`neighborsCol`: neighbors of vertex in :py:attr:`idCol`
+ - :py:attr:`similaritiesCol`: non-negative weights (similarities) of edges between the
+ vertex in :py:attr:`idCol` and each neighbor in :py:attr:`neighborsCol`
+
+ PIC returns a cluster assignment for each input vertex. It appends a new column
+ :py:attr:`predictionCol` containing the cluster assignment in :py:attr:`[0,k)` for
+ each row (vertex).
+
+ .. note::
+
+ - [[PowerIterationClustering]] is a transformer with an expensive [[transform]] operation.
+ Transform runs the iterative PIC algorithm to cluster the whole input dataset.
+ - Input validation: This validates that similarities are non-negative but does NOT validate
+ that the input matrix is symmetric.
+
+ .. seealso:: `Wikipedia on Spectral clustering \
+ `_
+
+ >>> from pyspark.sql.types import ArrayType, DoubleType, LongType, StructField, StructType
+ >>> similarities = [((long)(1), [0], [0.5]), ((long)(2), [0, 1], [0.7,0.5]), \
+ ((long)(3), [0, 1, 2], [0.9, 0.7, 0.5]), \
+ ((long)(4), [0, 1, 2, 3], [1.1, 0.9, 0.7,0.5]), \
+ ((long)(5), [0, 1, 2, 3, 4], [1.3, 1.1, 0.9, 0.7,0.5])]
+ >>> rdd = sc.parallelize(similarities, 2)
+ >>> schema = StructType([StructField("id", LongType(), False), \
+ StructField("neighbors", ArrayType(LongType(), False), True), \
+ StructField("similarities", ArrayType(DoubleType(), False), True)])
+ >>> df = spark.createDataFrame(rdd, schema)
+ >>> pic = PowerIterationClustering()
+ >>> result = pic.setK(2).setMaxIter(10).transform(df)
+ >>> predictions = sorted(set([(i[0], i[1]) for i in result.select(result.id, result.prediction)
+ ... .collect()]), key=lambda x: x[0])
+ >>> predictions[0]
+ (1, 1)
+ >>> predictions[1]
+ (2, 1)
+ >>> predictions[2]
+ (3, 0)
+ >>> predictions[3]
+ (4, 0)
+ >>> predictions[4]
+ (5, 0)
+ >>> pic_path = temp_path + "/pic"
+ >>> pic.save(pic_path)
+ >>> pic2 = PowerIterationClustering.load(pic_path)
+ >>> pic2.getK()
+ 2
+ >>> pic2.getMaxIter()
+ 10
+ >>> pic3 = PowerIterationClustering(k=4, initMode="degree")
+ >>> pic3.getIdCol()
+ 'id'
+ >>> pic3.getK()
+ 4
+ >>> pic3.getMaxIter()
+ 20
+ >>> pic3.getInitMode()
+ 'degree'
+
+ .. versionadded:: 2.4.0
+ """
+
+ k = Param(Params._dummy(), "k",
+ "The number of clusters to create. Must be > 1.",
+ typeConverter=TypeConverters.toInt)
+ initMode = Param(Params._dummy(), "initMode",
+ "The initialization algorithm. This can be either " +
+ "'random' to use a random vector as vertex properties, or 'degree' to use " +
+ "a normalized sum of similarities with other vertices. Supported options: " +
+ "'random' and 'degree'.",
+ typeConverter=TypeConverters.toString)
+ idCol = Param(Params._dummy(), "idCol",
+ "Name of the input column for vertex IDs.",
+ typeConverter=TypeConverters.toString)
+ neighborsCol = Param(Params._dummy(), "neighborsCol",
+ "Name of the input column for neighbors in the adjacency list " +
+ "representation.",
+ typeConverter=TypeConverters.toString)
+ similaritiesCol = Param(Params._dummy(), "similaritiesCol",
+ "Name of the input column for non-negative weights (similarities) " +
+ "of edges between the vertex in `idCol` and each neighbor in " +
+ "`neighborsCol`",
+ typeConverter=TypeConverters.toString)
+
+ @keyword_only
+ def __init__(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",
+ idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"):
+ """
+ __init__(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",\
+ idCol="id", neighborsCol="neighbors", similaritiesCol="similarities")
+ """
+ super(PowerIterationClustering, self).__init__()
+ self._java_obj = self._new_java_obj(
+ "org.apache.spark.ml.clustering.PowerIterationClustering", self.uid)
+ self._setDefault(k=2, maxIter=20, initMode="random", idCol="id", neighborsCol="neighbors",
+ similaritiesCol="similarities")
+ kwargs = self._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ @since("2.4.0")
+ def setParams(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",
+ idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"):
+ """
+ setParams(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",\
+ idCol="id", neighborsCol="neighbors", similaritiesCol="similarities")
+ Sets params for PowerIterationClustering.
+ """
+ kwargs = self._input_kwargs
+ return self._set(**kwargs)
+
+ @since("2.4.0")
+ def setK(self, value):
+ """
+ Sets the value of :py:attr:`k`.
+ """
+ return self._set(k=value)
+
+ @since("2.4.0")
+ def getK(self):
+ """
+ Gets the value of :py:attr:`k`.
+ """
+ return self.getOrDefault(self.k)
+
+ @since("2.4.0")
+ def setInitMode(self, value):
+ """
+ Sets the value of :py:attr:`initMode`.
+ """
+ return self._set(initMode=value)
+
+ @since("2.4.0")
+ def getInitMode(self):
+ """
+ Gets the value of `initMode`
+ """
+ return self.getOrDefault(self.initMode)
+
+ @since("2.4.0")
+ def setIdCol(self, value):
+ """
+ Sets the value of :py:attr:`idCol`.
+ """
+ return self._set(idCol=value)
+
+ @since("2.4.0")
+ def getIdCol(self):
+ """
+ Gets the value of :py:attr:`idCol`.
+ """
+ return self.getOrDefault(self.idCol)
+
+ @since("2.4.0")
+ def setNeighborsCol(self, value):
+ """
+ Sets the value of :py:attr:`neighborsCol`.
+ """
+ return self._set(neighborsCol=value)
+
+ @since("2.4.0")
+ def getNeighborsCol(self):
+ """
+ Gets the value of :py:attr:`neighborsCol`.
+ """
+ return self.getOrDefault(self.neighborsCol)
+
+ @since("2.4.0")
+ def setSimilaritiesCol(self, value):
+ """
+ Sets the value of :py:attr:`similaritiesCol`.
+ """
+ return self._set(similaritiesCol=value)
+
+ @since("2.4.0")
+ def getSimilaritiesCol(self):
+ """
+ Gets the value of :py:attr:`similaritiesCol`.
+ """
+ return self.getOrDefault(self.similaritiesCol)
+
+
if __name__ == "__main__":
import doctest
import pyspark.ml.clustering
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 2ec0be60e9fa9..0a5a7e2592ac1 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -1873,6 +1873,53 @@ def test_kmeans_cosine_distance(self):
self.assertTrue(result[4].prediction == result[5].prediction)
+class PowerIterationClustering(SparkSessionTestCase):
+
+ def test_power_iteration_clustering(self):
+ from pyspark.sql.types import ArrayType, DoubleType, LongType, StructField, StructType
+ from pyspark.ml.clustering import PowerIterationClustering
+ import math
+
+ def genCircle(r, n):
+ points = []
+ for i in range(0, n):
+ theta = 2.0 * math.pi * i / n
+ points.append((r * math.cos(theta), r * math.sin(theta)))
+ return points
+
+ def sim(x, y):
+ dist = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
+ return math.exp(-dist / 2.0)
+
+ r1 = 1.0
+ n1 = 10
+ r2 = 4.0
+ n2 = 40
+ n = n1 + n2
+ points = genCircle(r1, n1) + genCircle(r2, n2)
+ similarities = []
+ for i in range(1, n):
+ neighbor = []
+ weight = []
+ for j in range(i):
+ neighbor.append((long)(j))
+ weight.append(sim(points[i], points[j]))
+ similarities.append([(long)(i), neighbor, weight])
+ rdd = self.sc.parallelize(similarities, 2)
+ schema = StructType([StructField("id", LongType(), False),
+ StructField("neighbors", ArrayType(LongType(), False), True),
+ StructField("similarities", ArrayType(DoubleType(), False), True)])
+ df = self.spark.createDataFrame(rdd, schema)
+ pic = PowerIterationClustering()
+ result = pic.setK(2).setMaxIter(40).transform(df)
+ predictions = sorted(set([(i[0], i[1]) for i in result.select(result.id,
+ result.prediction).collect()]), key=lambda x: x[0])
+ for i in range(0, 8):
+ self.assertEqual(predictions[i], (i+1, 1))
+ for i in range(9, 48):
+ self.assertEqual(predictions[i], (i+1, 0))
+
+
class OneVsRestTests(SparkSessionTestCase):
def test_copy(self):