From fa220a6b21dd36591a44dfb7d32494fee7c60b08 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 1 Dec 2016 20:29:36 +0530 Subject: [PATCH 1/7] add transform --- .../org/apache/spark/ml/feature/IDF.scala | 29 ++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 46a0730f5ddb8..746bc1e787ec0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -18,10 +18,9 @@ package org.apache.spark.ml.feature import org.apache.hadoop.fs.Path - import org.apache.spark.annotation.Since import org.apache.spark.ml._ -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, VectorUDT, Vectors} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -131,8 +130,7 @@ class IDFModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) - // TODO: Make the idfModel.transform natively in ml framework to avoid extra conversion. - val idf = udf { vec: Vector => idfModel.transform(OldVectors.fromML(vec)).asML } + val idf = udf { vec: Vector => IDFModel.transform(idfModel.idf.asML, vec) } dataset.withColumn($(outputCol), idf(col($(inputCol)))) } @@ -192,4 +190,27 @@ object IDFModel extends MLReadable[IDFModel] { @Since("1.6.0") override def load(path: String): IDFModel = super.load(path) + + private def transform(idf: Vector, v: Vector): Vector = { + val newSize = v.size + v match { + case SparseVector(_, indices, values) => + val nnz = indices.length + val newValues = new Array[Double](nnz) + var k = 0 + while (k < nnz) { + newValues(k) = values(k) * idf(indices(k)) + k += 1 + } + Vectors.sparse(newSize, indices, newValues) + case DenseVector(values) => + val newValues = new Array[Double](newSize) + var j = 0 + while (j < newSize) { + newValues(j) = values(j) * idf(j) + j += 1 + } + Vectors.dense(newValues) + } + } } From cb869eb71392f47b9e63af3ba6aeaa031523baaf Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 1 Dec 2016 20:32:30 +0530 Subject: [PATCH 2/7] make IDFModel work --- .../scala/org/apache/spark/ml/feature/IDF.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 746bc1e787ec0..1fc16b212de8b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -18,9 +18,10 @@ package org.apache.spark.ml.feature import org.apache.hadoop.fs.Path + import org.apache.spark.annotation.Since import org.apache.spark.ml._ -import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, VectorUDT, Vectors} +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -114,7 +115,7 @@ object IDF extends DefaultParamsReadable[IDF] { @Since("1.4.0") class IDFModel private[ml] ( @Since("1.4.0") override val uid: String, - idfModel: feature.IDFModel) + idf: Vector) extends Model[IDFModel] with IDFBase with MLWritable { import IDFModel._ @@ -130,7 +131,7 @@ class IDFModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) - val idf = udf { vec: Vector => IDFModel.transform(idfModel.idf.asML, vec) } + val idf = udf { vec: Vector => IDFModel.transform(idf, vec) } dataset.withColumn($(outputCol), idf(col($(inputCol)))) } @@ -141,14 +142,10 @@ class IDFModel private[ml] ( @Since("1.4.1") override def copy(extra: ParamMap): IDFModel = { - val copied = new IDFModel(uid, idfModel) + val copied = new IDFModel(uid, idf) copyValues(copied, extra).setParent(parent) } - /** Returns the IDF vector. */ - @Since("2.0.0") - def idf: Vector = idfModel.idf.asML - @Since("1.6.0") override def write: MLWriter = new IDFModelWriter(this) } From d1bb36d3c93e99214aeaec34bffdd63c82724f89 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 1 Dec 2016 20:32:59 +0530 Subject: [PATCH 3/7] since tag --- mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 1fc16b212de8b..b0bc0dbf4c621 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -115,7 +115,7 @@ object IDF extends DefaultParamsReadable[IDF] { @Since("1.4.0") class IDFModel private[ml] ( @Since("1.4.0") override val uid: String, - idf: Vector) + @Since("2.2.0") idf: Vector) extends Model[IDFModel] with IDFBase with MLWritable { import IDFModel._ From 89546ec4e5248d71db39b519cf6a6d072b767bd1 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 1 Dec 2016 20:45:37 +0530 Subject: [PATCH 4/7] works --- .../main/scala/org/apache/spark/ml/feature/IDF.scala | 12 +++++++----- .../scala/org/apache/spark/ml/feature/IDFSuite.scala | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index b0bc0dbf4c621..5c52abda67652 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -89,7 +89,7 @@ final class IDF @Since("1.4.0") (@Since("1.4.0") override val uid: String) val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) } - val idf = new feature.IDF($(minDocFreq)).fit(input) + val idf = new feature.IDF($(minDocFreq)).fit(input).idf.asML copyValues(new IDFModel(uid, idf).setParent(this)) } @@ -131,8 +131,8 @@ class IDFModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) - val idf = udf { vec: Vector => IDFModel.transform(idf, vec) } - dataset.withColumn($(outputCol), idf(col($(inputCol)))) + val idfUDF = udf { vec: Vector => IDFModel.transform(idf, vec) } + dataset.withColumn($(outputCol), idfUDF(col($(inputCol)))) } @Since("1.4.0") @@ -146,6 +146,8 @@ class IDFModel private[ml] ( copyValues(copied, extra).setParent(parent) } + def idfVector: Vector = idf + @Since("1.6.0") override def write: MLWriter = new IDFModelWriter(this) } @@ -159,7 +161,7 @@ object IDFModel extends MLReadable[IDFModel] { override protected def saveImpl(path: String): Unit = { DefaultParamsWriter.saveMetadata(instance, path, sc) - val data = Data(instance.idf) + val data = Data(instance.idfVector) val dataPath = new Path(path, "data").toString sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) } @@ -176,7 +178,7 @@ object IDFModel extends MLReadable[IDFModel] { val Row(idf: Vector) = MLUtils.convertVectorColumnsToML(data, "idf") .select("idf") .head() - val model = new IDFModel(metadata.uid, new feature.IDFModel(OldVectors.fromML(idf))) + val model = new IDFModel(metadata.uid, idf) DefaultParamsReader.getAndSetParams(model, metadata) model } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index 5325d95526a50..48b984f4defb0 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -46,7 +46,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead test("params") { ParamsSuite.checkParams(new IDF) - val model = new IDFModel("idf", new OldIDFModel(Vectors.dense(1.0))) + val model = new IDFModel("idf", Vectors.dense(1.0)) ParamsSuite.checkParams(model) } From 72f8c7d59da2224bd71b0d56e1f2c388e277f9df Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 1 Dec 2016 20:52:27 +0530 Subject: [PATCH 5/7] works --- mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala | 6 +++--- .../test/scala/org/apache/spark/ml/feature/IDFSuite.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 5c52abda67652..3982e705ca380 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -115,7 +115,7 @@ object IDF extends DefaultParamsReadable[IDF] { @Since("1.4.0") class IDFModel private[ml] ( @Since("1.4.0") override val uid: String, - @Since("2.2.0") idf: Vector) + @Since("2.2.0") idfVector: Vector) extends Model[IDFModel] with IDFBase with MLWritable { import IDFModel._ @@ -146,7 +146,7 @@ class IDFModel private[ml] ( copyValues(copied, extra).setParent(parent) } - def idfVector: Vector = idf + def idf: Vector = idfVector @Since("1.6.0") override def write: MLWriter = new IDFModelWriter(this) @@ -161,7 +161,7 @@ object IDFModel extends MLReadable[IDFModel] { override protected def saveImpl(path: String): Unit = { DefaultParamsWriter.saveMetadata(instance, path, sc) - val data = Data(instance.idfVector) + val data = Data(instance.idf) val dataPath = new Path(path, "data").toString sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index 48b984f4defb0..604125972b3ba 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -112,7 +112,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead } test("IDFModel read/write") { - val instance = new IDFModel("myIDFModel", new OldIDFModel(Vectors.dense(1.0, 2.0))) + val instance = new IDFModel("myIDFModel", Vectors.dense(1.0, 2.0)) .setInputCol("myInputCol") .setOutputCol("myOutputCol") val newInstance = testDefaultReadWrite(instance) From 5cb2c3e4df4807941647e72cec1f41ce4f02018b Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 1 Dec 2016 21:02:00 +0530 Subject: [PATCH 6/7] migrate everything to ml --- .../org/apache/spark/ml/feature/IDF.scala | 97 ++++++++++++++++++- 1 file changed, 94 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 3982e705ca380..a424d420974cf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -17,6 +17,7 @@ package org.apache.spark.ml.feature +import breeze.linalg.{DenseVector => BDV} import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since @@ -86,10 +87,14 @@ final class IDF @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("2.0.0") override def fit(dataset: Dataset[_]): IDFModel = { transformSchema(dataset.schema, logging = true) - val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map { - case Row(v: Vector) => OldVectors.fromML(v) + val input: RDD[Vector] = dataset.select($(inputCol)).rdd.map { + case Row(v: Vector) => v } - val idf = new feature.IDF($(minDocFreq)).fit(input).idf.asML + val idf = input.treeAggregate( + new IDF.DocumentFrequencyAggregator(minDocFreq = $(minDocFreq)))( + seqOp = (df, v) => df.add(v), + combOp = (df1, df2) => df1.merge(df2) + ).idf() copyValues(new IDFModel(uid, idf).setParent(this)) } @@ -107,6 +112,92 @@ object IDF extends DefaultParamsReadable[IDF] { @Since("1.6.0") override def load(path: String): IDF = super.load(path) + + /** Document frequency aggregator. */ + class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable { + + /** number of documents */ + private var m = 0L + /** document frequency vector */ + private var df: BDV[Long] = _ + + + def this() = this(0) + + /** Adds a new document. */ + def add(doc: Vector): this.type = { + if (isEmpty) { + df = BDV.zeros(doc.size) + } + doc match { + case SparseVector(size, indices, values) => + val nnz = indices.length + var k = 0 + while (k < nnz) { + if (values(k) > 0) { + df(indices(k)) += 1L + } + k += 1 + } + case DenseVector(values) => + val n = values.length + var j = 0 + while (j < n) { + if (values(j) > 0.0) { + df(j) += 1L + } + j += 1 + } + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + m += 1L + this + } + + /** Merges another. */ + def merge(other: DocumentFrequencyAggregator): this.type = { + if (!other.isEmpty) { + m += other.m + if (df == null) { + df = other.df.copy + } else { + df += other.df + } + } + this + } + + private def isEmpty: Boolean = m == 0L + + /** Returns the current IDF vector. */ + def idf(): Vector = { + if (isEmpty) { + throw new IllegalStateException("Haven't seen any document yet.") + } + val n = df.length + val inv = new Array[Double](n) + var j = 0 + while (j < n) { + /* + * If the term is not present in the minimum + * number of documents, set IDF to 0. This + * will cause multiplication in IDFModel to + * set TF-IDF to 0. + * + * Since arrays are initialized to 0 by default, + * we just omit changing those entries. + */ + if (df(j) >= minDocFreq) { + inv(j) = math.log((m + 1.0) / (df(j) + 1.0)) + } + j += 1 + } + Vectors.dense(inv) + } + } + } /** From 18f220b16329f9dea06ce2f9e6f1cfdb7e927ded Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 1 Dec 2016 21:05:45 +0530 Subject: [PATCH 7/7] remove unused imports --- mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala | 2 -- mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala | 2 -- 2 files changed, 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index a424d420974cf..13dab0c5e6630 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -26,8 +26,6 @@ import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ -import org.apache.spark.mllib.feature -import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql._ diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index 604125972b3ba..ce36974f58ff2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -22,8 +22,6 @@ import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.ml.util.TestingUtils._ -import org.apache.spark.mllib.feature.{IDFModel => OldIDFModel} -import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.Row