From 53d7763b58d05a6baf9fcf1cef2ae327a5d42e04 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 20 Apr 2018 21:15:37 -0700 Subject: [PATCH 1/7] [SPARK-19826][ML][PYTHON]add spark.ml Python API for PIC --- python/pyspark/ml/clustering.py | 192 +++++++++++++++++++++++++++++++- 1 file changed, 191 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index b3d5fb17f6b81..8cf5ed4333804 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -19,7 +19,7 @@ from pyspark import since, keyword_only from pyspark.ml.util import * -from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper +from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, JavaTransformer, JavaWrapper from pyspark.ml.param.shared import * from pyspark.ml.common import inherit_doc @@ -1156,6 +1156,196 @@ def getKeepLastCheckpoint(self): return self.getOrDefault(self.keepLastCheckpoint) +class _PowerIterationClusteringParams(JavaParams, HasMaxIter, HasPredictionCol): + """ + Params for :py:attr:`PowerIterationClustering`. + .. versionadded:: 2.4.0 + """ + + k = Param(Params._dummy(), "k", + "The number of clusters to create. Must be > 1.", + typeConverter=TypeConverters.toInt) + initMode = Param(Params._dummy(), "initMode", + "The initialization algorithm. This can be either " + + "'random' to use a random vector as vertex properties, or 'degree' to use " + + "a normalized sum of similarities with other vertices. Supported options: " + + "'random' and 'degree'.", + typeConverter=TypeConverters.toString) + idCol = Param(Params._dummy(), "idCol", + "Name of the input column for vertex IDs.", + typeConverter=TypeConverters.toString) + neighborsCol = Param(Params._dummy(), "neighborsCol", + "Name of the input column for neighbors in the adjacency list " + + "representation.", + typeConverter=TypeConverters.toString) + similaritiesCol = Param(Params._dummy(), "similaritiesCol", + "non-negative weights (similarities) of edges between the vertex in " + + "`idCol` and each neighbor in `neighborsCol`", + typeConverter=TypeConverters.toString) + + @since("2.4.0") + def getK(self): + """ + Gets the value of `k` + """ + return self.getOrDefault(self.k) + + @since("2.4.0") + def getInitMode(self): + """ + Gets the value of `initMode` + """ + return self.getOrDefault(self.initMode) + + @since("2.4.0") + def getIdCol(self): + """ + Gets the value of `idCol` + """ + return self.getOrDefault(self.idCol) + + @since("2.4.0") + def getNeighborsCol(self): + """ + Gets the value of `neighborsCol` + """ + return self.getOrDefault(self.neighborsCol) + + @since("2.4.0") + def getSimilaritiesCol(self): + """ + Gets the value of `similaritiesCol` + """ + return self.getOrDefault(self.binary) + + +@inherit_doc +class PowerIterationClustering(JavaTransformer, _PowerIterationClusteringParams, JavaMLReadable, + JavaMLWritable): + """ + Model produced by [[PowerIterationClustering]]. + >>> from pyspark.sql.types import ArrayType, DoubleType, LongType, StructField, StructType + >>> import math + >>> def genCircle(r, n): + ... points = [] + ... for i in range(0, n): + ... theta = 2.0 * math.pi * i / n + ... points.append((r * math.cos(theta), r * math.sin(theta))) + ... return points + >>> def sim(x, y): + ... dist = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1]) + ... return math.exp(-dist / 2.0) + >>> r1 = 1.0 + >>> n1 = 10 + >>> r2 = 4.0 + >>> n2 = 40 + >>> n = n1 + n2 + >>> points = genCircle(r1, n1) + genCircle(r2, n2) + >>> similarities = [] + >>> for i in range (1, n): + ... neighbor = [] + ... weight = [] + ... for j in range (i): + ... neighbor.append((long)(j)) + ... weight.append(sim(points[i], points[j])) + ... similarities.append([(long)(i), neighbor, weight]) + >>> rdd = sc.parallelize(similarities, 2) + >>> schema = StructType([StructField("id", LongType(), False), \ + StructField("neighbors", ArrayType(LongType(), False), True), \ + StructField("similarities", ArrayType(DoubleType(), False), True)]) + >>> pic = PowerIterationClustering() + >>> df = spark.createDataFrame(rdd, schema) + >>> result = pic.setK(2).setMaxIter(40).transform(df) + >>> predictions = sorted(set([(i[0], i[1]) for i in result.select(result.id, result.prediction) + ... .collect()]), key=lambda x: x[0]) + >>> predictions[0] + (1, 1) + >>> predictions[8] + (9, 1) + >>> predictions[9] + (10, 0) + >>> predictions[20] + (21, 0) + >>> predictions[48] + (49, 0) + >>> pic_path = temp_path + "/pic" + >>> pic.save(pic_path) + >>> pic2 = PowerIterationClustering.load(pic_path) + >>> pic2.getK() + 2 + >>> pic2.getMaxIter() + 40 + >>> pic3 = PowerIterationClustering(k=4, initMode="degree") + >>> pic3.getK() + 4 + >>> pic3.getMaxIter() + 20 + >>> pic3.getInitMode() + 'degree' + .. versionadded:: 2.4.0 + """ + @keyword_only + def __init__(self, predictionCol="prediction", k=2, maxIter=20, initMode="random", + idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"): + """ + __init__(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",\ + idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"): + """ + super(PowerIterationClustering, self).__init__() + self._java_obj = self._new_java_obj( + "org.apache.spark.ml.clustering.PowerIterationClustering", self.uid) + self._setDefault(k=2, maxIter=20, initMode="random") + kwargs = self._input_kwargs + self.setParams(**kwargs) + + @keyword_only + @since("2.4.0") + def setParams(self, predictionCol="prediction", k=2, maxIter=20, initMode="random", + idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"): + """ + setParams(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",\ + idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"): + Sets params for PowerIterationClustering. + """ + kwargs = self._input_kwargs + return self._set(**kwargs) + + @since("2.4.0") + def setK(self, value): + """ + Sets the value of :py:attr:`k`. + """ + return self._set(k=value) + + @since("2.4.0") + def setInitMode(self, value): + """ + Sets the value of :py:attr:`initMode`. + """ + return self._set(initMode=value) + + @since("2.4.0") + def setIdCol(self, value): + """ + Sets the value of :py:attr:`idCol`. + """ + return self._set(idCol=value) + + @since("2.4.0") + def setNeighborsCol(self, value): + """ + Sets the value of :py:attr:`neighborsCol. + """ + return self._set(neighborsCol=value) + + @since("2.4.0") + def setSimilaritiesCol(self, value): + """ + Sets the value of :py:attr:`similaritiesCol`. + """ + return self._set(similaritiesCol=value) + + if __name__ == "__main__": import doctest import pyspark.ml.clustering From 2d0e3943440718c7f603cdaba410ba35ab81279e Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 23 Apr 2018 13:21:53 -0700 Subject: [PATCH 2/7] fix test problem --- .../ml/clustering/PowerIterationClustering.scala | 6 ++++-- python/pyspark/ml/clustering.py | 14 ++++++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 2c30a1d9aa947..71c0ccf2c65f1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -97,13 +97,15 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has def getNeighborsCol: String = $(neighborsCol) /** - * Param for the name of the input column for neighbors in the adjacency list representation. + * Param for the name of the input column for non-negative weights (similarities) of edges + * between the vertex in `idCol` and each neighbor in `neighborsCol`. * Default: "similarities" * @group param */ @Since("2.4.0") val similaritiesCol = new Param[String](this, "similaritiesCol", - "Name of the input column for neighbors in the adjacency list representation.", + "Name of the input column for non-negative weights (similarities) of edges between the " + + "vertex in `idCol` and each neighbor in `neighborsCol`.", (value: String) => value.nonEmpty) setDefault(similaritiesCol, "similarities") diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 8cf5ed4333804..8a5ad812c50cb 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -1179,8 +1179,9 @@ class _PowerIterationClusteringParams(JavaParams, HasMaxIter, HasPredictionCol): "representation.", typeConverter=TypeConverters.toString) similaritiesCol = Param(Params._dummy(), "similaritiesCol", - "non-negative weights (similarities) of edges between the vertex in " + - "`idCol` and each neighbor in `neighborsCol`", + "Name of the input column for non-negative weights (similarities) " + + "of edges between the vertex in `idCol` and each neighbor in " + + "`neighborsCol`", typeConverter=TypeConverters.toString) @since("2.4.0") @@ -1253,8 +1254,8 @@ class PowerIterationClustering(JavaTransformer, _PowerIterationClusteringParams, >>> schema = StructType([StructField("id", LongType(), False), \ StructField("neighbors", ArrayType(LongType(), False), True), \ StructField("similarities", ArrayType(DoubleType(), False), True)]) - >>> pic = PowerIterationClustering() >>> df = spark.createDataFrame(rdd, schema) + >>> pic = PowerIterationClustering() >>> result = pic.setK(2).setMaxIter(40).transform(df) >>> predictions = sorted(set([(i[0], i[1]) for i in result.select(result.id, result.prediction) ... .collect()]), key=lambda x: x[0]) @@ -1276,12 +1277,16 @@ class PowerIterationClustering(JavaTransformer, _PowerIterationClusteringParams, >>> pic2.getMaxIter() 40 >>> pic3 = PowerIterationClustering(k=4, initMode="degree") + >>> pic3.getIdCol() + 'id' >>> pic3.getK() 4 >>> pic3.getMaxIter() 20 >>> pic3.getInitMode() 'degree' + + .. versionadded:: 2.4.0 """ @keyword_only @@ -1294,7 +1299,8 @@ def __init__(self, predictionCol="prediction", k=2, maxIter=20, initMode="random super(PowerIterationClustering, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.clustering.PowerIterationClustering", self.uid) - self._setDefault(k=2, maxIter=20, initMode="random") + self._setDefault(k=2, maxIter=20, initMode="random", idCol="id", neighborsCol="neighbors", + similaritiesCol="similarities") kwargs = self._input_kwargs self.setParams(**kwargs) From 387d6ffa8234f365e0de250082707187f67ecf61 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 23 Apr 2018 13:27:14 -0700 Subject: [PATCH 3/7] remove extra space --- python/pyspark/ml/clustering.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 8a5ad812c50cb..0471666fd624d 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -1286,7 +1286,6 @@ class PowerIterationClustering(JavaTransformer, _PowerIterationClusteringParams, >>> pic3.getInitMode() 'degree' - .. versionadded:: 2.4.0 """ @keyword_only From 6d00f343f5c78fbe290793fe85cbc3deed53cf3e Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 27 Apr 2018 14:32:22 -0700 Subject: [PATCH 4/7] address comments --- python/pyspark/ml/clustering.py | 218 ++++++++++++++++---------------- python/pyspark/ml/tests.py | 47 +++++++ 2 files changed, 156 insertions(+), 109 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 0471666fd624d..4b9f5cb299de4 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -26,7 +26,7 @@ __all__ = ['BisectingKMeans', 'BisectingKMeansModel', 'BisectingKMeansSummary', 'KMeans', 'KMeansModel', 'GaussianMixture', 'GaussianMixtureModel', 'GaussianMixtureSummary', - 'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel'] + 'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel', 'PowerIterationClustering'] class ClusteringSummary(JavaWrapper): @@ -836,7 +836,7 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed, HasCheckpointInter Terminology: - - "term" = "word": an el + - "term" = "word": an element of the vocabulary - "token": instance of a term appearing in a document - "topic": multinomial distribution over terms representing some concept - "document": one piece of text, corresponding to one row in the input data @@ -938,7 +938,7 @@ def __init__(self, featuresCol="features", maxIter=20, seed=None, checkpointInte k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\ subsamplingRate=0.05, optimizeDocConcentration=True,\ docConcentration=None, topicConcentration=None,\ - topicDistributionCol="topicDistribution", keepLastCheckpoint=True): + topicDistributionCol="topicDistribution", keepLastCheckpoint=True) """ super(LDA, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.LDA", self.uid) @@ -967,7 +967,7 @@ def setParams(self, featuresCol="features", maxIter=20, seed=None, checkpointInt k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\ subsamplingRate=0.05, optimizeDocConcentration=True,\ docConcentration=None, topicConcentration=None,\ - topicDistributionCol="topicDistribution", keepLastCheckpoint=True): + topicDistributionCol="topicDistribution", keepLastCheckpoint=True) Sets params for LDA. """ @@ -1156,126 +1156,68 @@ def getKeepLastCheckpoint(self): return self.getOrDefault(self.keepLastCheckpoint) -class _PowerIterationClusteringParams(JavaParams, HasMaxIter, HasPredictionCol): - """ - Params for :py:attr:`PowerIterationClustering`. - .. versionadded:: 2.4.0 - """ - - k = Param(Params._dummy(), "k", - "The number of clusters to create. Must be > 1.", - typeConverter=TypeConverters.toInt) - initMode = Param(Params._dummy(), "initMode", - "The initialization algorithm. This can be either " + - "'random' to use a random vector as vertex properties, or 'degree' to use " + - "a normalized sum of similarities with other vertices. Supported options: " + - "'random' and 'degree'.", - typeConverter=TypeConverters.toString) - idCol = Param(Params._dummy(), "idCol", - "Name of the input column for vertex IDs.", - typeConverter=TypeConverters.toString) - neighborsCol = Param(Params._dummy(), "neighborsCol", - "Name of the input column for neighbors in the adjacency list " + - "representation.", - typeConverter=TypeConverters.toString) - similaritiesCol = Param(Params._dummy(), "similaritiesCol", - "Name of the input column for non-negative weights (similarities) " + - "of edges between the vertex in `idCol` and each neighbor in " + - "`neighborsCol`", - typeConverter=TypeConverters.toString) - - @since("2.4.0") - def getK(self): - """ - Gets the value of `k` - """ - return self.getOrDefault(self.k) - - @since("2.4.0") - def getInitMode(self): - """ - Gets the value of `initMode` - """ - return self.getOrDefault(self.initMode) - - @since("2.4.0") - def getIdCol(self): - """ - Gets the value of `idCol` - """ - return self.getOrDefault(self.idCol) - - @since("2.4.0") - def getNeighborsCol(self): - """ - Gets the value of `neighborsCol` - """ - return self.getOrDefault(self.neighborsCol) - - @since("2.4.0") - def getSimilaritiesCol(self): - """ - Gets the value of `similaritiesCol` - """ - return self.getOrDefault(self.binary) - - @inherit_doc -class PowerIterationClustering(JavaTransformer, _PowerIterationClusteringParams, JavaMLReadable, - JavaMLWritable): +class PowerIterationClustering(HasMaxIter, HasPredictionCol, JavaTransformer, JavaParams, + JavaMLReadable, JavaMLWritable): """ - Model produced by [[PowerIterationClustering]]. + .. note:: Experimental + Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by + Lin and Cohen. From the abstract: + PIC finds a very low-dimensional embedding of a dataset using truncated power + iteration on a normalized pair-wise similarity matrix of the data. + + PIC takes an affinity matrix between items (or vertices) as input. An affinity matrix + is a symmetric matrix whose entries are non-negative similarities between items. + PIC takes this matrix (or graph) as an adjacency matrix. Specifically, each input row + includes: + - :py:class:`idCol`: vertex ID + - :py:class:`neighborsCol`: neighbors of vertex in :py:class:`idCol` + - :py:class:`similaritiesCol`: non-negative weights (similarities) of edges between the + vertex in :py:class:`idCol` and each neighbor in :py:class:`neighborsCol` + PIC returns a cluster assignment for each input vertex. It appends a new column + :py:class:`predictionCol` containing the cluster assignment in :py:class:`[0,k)` for + each row (vertex). + + Notes: + - [[PowerIterationClustering]] is a transformer with an expensive [[transform]] operation. + Transform runs the iterative PIC algorithm to cluster the whole input dataset. + - Input validation: This validates that similarities are non-negative but does NOT validate + that the input matrix is symmetric. + + @see + Spectral clustering (Wikipedia) + >>> from pyspark.sql.types import ArrayType, DoubleType, LongType, StructField, StructType - >>> import math - >>> def genCircle(r, n): - ... points = [] - ... for i in range(0, n): - ... theta = 2.0 * math.pi * i / n - ... points.append((r * math.cos(theta), r * math.sin(theta))) - ... return points - >>> def sim(x, y): - ... dist = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1]) - ... return math.exp(-dist / 2.0) - >>> r1 = 1.0 - >>> n1 = 10 - >>> r2 = 4.0 - >>> n2 = 40 - >>> n = n1 + n2 - >>> points = genCircle(r1, n1) + genCircle(r2, n2) - >>> similarities = [] - >>> for i in range (1, n): - ... neighbor = [] - ... weight = [] - ... for j in range (i): - ... neighbor.append((long)(j)) - ... weight.append(sim(points[i], points[j])) - ... similarities.append([(long)(i), neighbor, weight]) + >>> similarities = [((long)(1), [0], [0.5]), ((long)(2), [0, 1], [0.7,0.5]), \ + ((long)(3), [0, 1, 2], [0.9, 0.7, 0.5]), \ + ((long)(4), [0, 1, 2, 3], [1.1, 0.9, 0.7,0.5]), \ + ((long)(5), [0, 1, 2, 3, 4], [1.3, 1.1, 0.9, 0.7,0.5])] >>> rdd = sc.parallelize(similarities, 2) >>> schema = StructType([StructField("id", LongType(), False), \ - StructField("neighbors", ArrayType(LongType(), False), True), \ - StructField("similarities", ArrayType(DoubleType(), False), True)]) + StructField("neighbors", ArrayType(LongType(), False), True), \ + StructField("similarities", ArrayType(DoubleType(), False), True)]) >>> df = spark.createDataFrame(rdd, schema) >>> pic = PowerIterationClustering() - >>> result = pic.setK(2).setMaxIter(40).transform(df) + >>> result = pic.setK(2).setMaxIter(10).transform(df) >>> predictions = sorted(set([(i[0], i[1]) for i in result.select(result.id, result.prediction) ... .collect()]), key=lambda x: x[0]) >>> predictions[0] (1, 1) - >>> predictions[8] - (9, 1) - >>> predictions[9] - (10, 0) - >>> predictions[20] - (21, 0) - >>> predictions[48] - (49, 0) + >>> predictions[1] + (2, 1) + >>> predictions[2] + (3, 0) + >>> predictions[3] + (4, 0) + >>> predictions[4] + (5, 0) >>> pic_path = temp_path + "/pic" >>> pic.save(pic_path) >>> pic2 = PowerIterationClustering.load(pic_path) >>> pic2.getK() 2 >>> pic2.getMaxIter() - 40 + 10 >>> pic3 = PowerIterationClustering(k=4, initMode="degree") >>> pic3.getIdCol() 'id' @@ -1288,12 +1230,35 @@ class PowerIterationClustering(JavaTransformer, _PowerIterationClusteringParams, .. versionadded:: 2.4.0 """ + + k = Param(Params._dummy(), "k", + "The number of clusters to create. Must be > 1.", + typeConverter=TypeConverters.toInt) + initMode = Param(Params._dummy(), "initMode", + "The initialization algorithm. This can be either " + + "'random' to use a random vector as vertex properties, or 'degree' to use " + + "a normalized sum of similarities with other vertices. Supported options: " + + "'random' and 'degree'.", + typeConverter=TypeConverters.toString) + idCol = Param(Params._dummy(), "idCol", + "Name of the input column for vertex IDs.", + typeConverter=TypeConverters.toString) + neighborsCol = Param(Params._dummy(), "neighborsCol", + "Name of the input column for neighbors in the adjacency list " + + "representation.", + typeConverter=TypeConverters.toString) + similaritiesCol = Param(Params._dummy(), "similaritiesCol", + "Name of the input column for non-negative weights (similarities) " + + "of edges between the vertex in `idCol` and each neighbor in " + + "`neighborsCol`", + typeConverter=TypeConverters.toString) + @keyword_only def __init__(self, predictionCol="prediction", k=2, maxIter=20, initMode="random", idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"): """ __init__(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",\ - idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"): + idCol="id", neighborsCol="neighbors", similaritiesCol="similarities") """ super(PowerIterationClustering, self).__init__() self._java_obj = self._new_java_obj( @@ -1309,7 +1274,7 @@ def setParams(self, predictionCol="prediction", k=2, maxIter=20, initMode="rando idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"): """ setParams(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",\ - idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"): + idCol="id", neighborsCol="neighbors", similaritiesCol="similarities") Sets params for PowerIterationClustering. """ kwargs = self._input_kwargs @@ -1322,6 +1287,13 @@ def setK(self, value): """ return self._set(k=value) + @since("2.4.0") + def getK(self): + """ + Gets the value of :py:attr:`k`. + """ + return self.getOrDefault(self.k) + @since("2.4.0") def setInitMode(self, value): """ @@ -1329,6 +1301,13 @@ def setInitMode(self, value): """ return self._set(initMode=value) + @since("2.4.0") + def getInitMode(self): + """ + Gets the value of `initMode` + """ + return self.getOrDefault(self.initMode) + @since("2.4.0") def setIdCol(self, value): """ @@ -1336,6 +1315,13 @@ def setIdCol(self, value): """ return self._set(idCol=value) + @since("2.4.0") + def getIdCol(self): + """ + Gets the value of :py:attr:`idCol`. + """ + return self.getOrDefault(self.idCol) + @since("2.4.0") def setNeighborsCol(self, value): """ @@ -1343,6 +1329,13 @@ def setNeighborsCol(self, value): """ return self._set(neighborsCol=value) + @since("2.4.0") + def getNeighborsCol(self): + """ + Gets the value of :py:attr:`neighborsCol`. + """ + return self.getOrDefault(self.neighborsCol) + @since("2.4.0") def setSimilaritiesCol(self, value): """ @@ -1350,6 +1343,13 @@ def setSimilaritiesCol(self, value): """ return self._set(similaritiesCol=value) + @since("2.4.0") + def getSimilaritiesCol(self): + """ + Gets the value of :py:attr:`similaritiesCol`. + """ + return self.getOrDefault(self.binary) + if __name__ == "__main__": import doctest diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 2ec0be60e9fa9..0a5a7e2592ac1 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1873,6 +1873,53 @@ def test_kmeans_cosine_distance(self): self.assertTrue(result[4].prediction == result[5].prediction) +class PowerIterationClustering(SparkSessionTestCase): + + def test_power_iteration_clustering(self): + from pyspark.sql.types import ArrayType, DoubleType, LongType, StructField, StructType + from pyspark.ml.clustering import PowerIterationClustering + import math + + def genCircle(r, n): + points = [] + for i in range(0, n): + theta = 2.0 * math.pi * i / n + points.append((r * math.cos(theta), r * math.sin(theta))) + return points + + def sim(x, y): + dist = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1]) + return math.exp(-dist / 2.0) + + r1 = 1.0 + n1 = 10 + r2 = 4.0 + n2 = 40 + n = n1 + n2 + points = genCircle(r1, n1) + genCircle(r2, n2) + similarities = [] + for i in range(1, n): + neighbor = [] + weight = [] + for j in range(i): + neighbor.append((long)(j)) + weight.append(sim(points[i], points[j])) + similarities.append([(long)(i), neighbor, weight]) + rdd = self.sc.parallelize(similarities, 2) + schema = StructType([StructField("id", LongType(), False), + StructField("neighbors", ArrayType(LongType(), False), True), + StructField("similarities", ArrayType(DoubleType(), False), True)]) + df = self.spark.createDataFrame(rdd, schema) + pic = PowerIterationClustering() + result = pic.setK(2).setMaxIter(40).transform(df) + predictions = sorted(set([(i[0], i[1]) for i in result.select(result.id, + result.prediction).collect()]), key=lambda x: x[0]) + for i in range(0, 8): + self.assertEqual(predictions[i], (i+1, 1)) + for i in range(9, 48): + self.assertEqual(predictions[i], (i+1, 0)) + + class OneVsRestTests(SparkSessionTestCase): def test_copy(self): From a6b18222b65e878e22ddf8f2d340aa3127c99e0c Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 27 Apr 2018 18:08:46 -0700 Subject: [PATCH 5/7] fix doc format problem --- python/pyspark/ml/clustering.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 4b9f5cb299de4..f8fec7f0d3905 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -1170,22 +1170,25 @@ class PowerIterationClustering(HasMaxIter, HasPredictionCol, JavaTransformer, Ja is a symmetric matrix whose entries are non-negative similarities between items. PIC takes this matrix (or graph) as an adjacency matrix. Specifically, each input row includes: - - :py:class:`idCol`: vertex ID - - :py:class:`neighborsCol`: neighbors of vertex in :py:class:`idCol` - - :py:class:`similaritiesCol`: non-negative weights (similarities) of edges between the - vertex in :py:class:`idCol` and each neighbor in :py:class:`neighborsCol` - PIC returns a cluster assignment for each input vertex. It appends a new column - :py:class:`predictionCol` containing the cluster assignment in :py:class:`[0,k)` for - each row (vertex). - - Notes: - - [[PowerIterationClustering]] is a transformer with an expensive [[transform]] operation. + + - :py:class:`idCol`: vertex ID + - :py:class:`neighborsCol`: neighbors of vertex in :py:class:`idCol` + - :py:class:`similaritiesCol`: non-negative weights (similarities) of edges between the + vertex in :py:class:`idCol` and each neighbor in :py:class:`neighborsCol` + + PIC returns a cluster assignment for each input vertex. It appends a new column + :py:class:`predictionCol` containing the cluster assignment in :py:class:`[0,k)` for + each row (vertex). + + Notes: + + - [[PowerIterationClustering]] is a transformer with an expensive [[transform]] operation. Transform runs the iterative PIC algorithm to cluster the whole input dataset. - - Input validation: This validates that similarities are non-negative but does NOT validate + - Input validation: This validates that similarities are non-negative but does NOT validate that the input matrix is symmetric. - @see - Spectral clustering (Wikipedia) + @see + Spectral clustering (Wikipedia) >>> from pyspark.sql.types import ArrayType, DoubleType, LongType, StructField, StructType >>> similarities = [((long)(1), [0], [0.5]), ((long)(2), [0, 1], [0.7,0.5]), \ From c25d3dcb11eff13bfe1092e1dc64c035335b852b Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sat, 28 Apr 2018 23:42:10 -0700 Subject: [PATCH 6/7] address comments (2) --- python/pyspark/ml/clustering.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index f8fec7f0d3905..2f0500691c0f3 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -1161,6 +1161,7 @@ class PowerIterationClustering(HasMaxIter, HasPredictionCol, JavaTransformer, Ja JavaMLReadable, JavaMLWritable): """ .. note:: Experimental + Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by Lin and Cohen. From the abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power @@ -1171,23 +1172,23 @@ class PowerIterationClustering(HasMaxIter, HasPredictionCol, JavaTransformer, Ja PIC takes this matrix (or graph) as an adjacency matrix. Specifically, each input row includes: - - :py:class:`idCol`: vertex ID - - :py:class:`neighborsCol`: neighbors of vertex in :py:class:`idCol` - - :py:class:`similaritiesCol`: non-negative weights (similarities) of edges between the - vertex in :py:class:`idCol` and each neighbor in :py:class:`neighborsCol` + - :py:attr:`idCol`: vertex ID + - :py:attr:`neighborsCol`: neighbors of vertex in :py:attr:`idCol` + - :py:attr:`similaritiesCol`: non-negative weights (similarities) of edges between the + vertex in :py:attr:`idCol` and each neighbor in :py:attr:`neighborsCol` PIC returns a cluster assignment for each input vertex. It appends a new column - :py:class:`predictionCol` containing the cluster assignment in :py:class:`[0,k)` for + :py:attr:`predictionCol` containing the cluster assignment in :py:attr:`[0,k)` for each row (vertex). - Notes: + .. note:: - [[PowerIterationClustering]] is a transformer with an expensive [[transform]] operation. Transform runs the iterative PIC algorithm to cluster the whole input dataset. - Input validation: This validates that similarities are non-negative but does NOT validate that the input matrix is symmetric. - @see + .. seealso:: Spectral clustering (Wikipedia) >>> from pyspark.sql.types import ArrayType, DoubleType, LongType, StructField, StructType @@ -1328,7 +1329,7 @@ def getIdCol(self): @since("2.4.0") def setNeighborsCol(self, value): """ - Sets the value of :py:attr:`neighborsCol. + Sets the value of :py:attr:`neighborsCol`. """ return self._set(neighborsCol=value) @@ -1351,7 +1352,7 @@ def getSimilaritiesCol(self): """ Gets the value of :py:attr:`similaritiesCol`. """ - return self.getOrDefault(self.binary) + return self.getOrDefault(self.similaritiesCol) if __name__ == "__main__": From ae9f953d4a06228b6bf7b6867f031a1bfc84d1e2 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 29 Apr 2018 09:48:10 -0700 Subject: [PATCH 7/7] fix python doc error --- python/pyspark/ml/clustering.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 2f0500691c0f3..317f24d4be81f 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -1188,8 +1188,8 @@ class PowerIterationClustering(HasMaxIter, HasPredictionCol, JavaTransformer, Ja - Input validation: This validates that similarities are non-negative but does NOT validate that the input matrix is symmetric. - .. seealso:: - Spectral clustering (Wikipedia) + .. seealso:: `Wikipedia on Spectral clustering \ + `_ >>> from pyspark.sql.types import ArrayType, DoubleType, LongType, StructField, StructType >>> similarities = [((long)(1), [0], [0.5]), ((long)(2), [0, 1], [0.7,0.5]), \