Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 124 additions & 15 deletions mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

package org.apache.spark.ml.feature

import breeze.linalg.{DenseVector => BDV}
import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.Since
import org.apache.spark.ml._
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.mllib.feature
import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
Expand Down Expand Up @@ -86,10 +85,14 @@ final class IDF @Since("1.4.0") (@Since("1.4.0") override val uid: String)
@Since("2.0.0")
override def fit(dataset: Dataset[_]): IDFModel = {
transformSchema(dataset.schema, logging = true)
val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map {
case Row(v: Vector) => OldVectors.fromML(v)
val input: RDD[Vector] = dataset.select($(inputCol)).rdd.map {
case Row(v: Vector) => v
}
val idf = new feature.IDF($(minDocFreq)).fit(input)
val idf = input.treeAggregate(
new IDF.DocumentFrequencyAggregator(minDocFreq = $(minDocFreq)))(
seqOp = (df, v) => df.add(v),
combOp = (df1, df2) => df1.merge(df2)
).idf()
copyValues(new IDFModel(uid, idf).setParent(this))
}

Expand All @@ -107,6 +110,92 @@ object IDF extends DefaultParamsReadable[IDF] {

@Since("1.6.0")
override def load(path: String): IDF = super.load(path)

/** Document frequency aggregator. */
class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable {

/** number of documents */
private var m = 0L
/** document frequency vector */
private var df: BDV[Long] = _


def this() = this(0)

/** Adds a new document. */
def add(doc: Vector): this.type = {
if (isEmpty) {
df = BDV.zeros(doc.size)
}
doc match {
case SparseVector(size, indices, values) =>
val nnz = indices.length
var k = 0
while (k < nnz) {
if (values(k) > 0) {
df(indices(k)) += 1L
}
k += 1
}
case DenseVector(values) =>
val n = values.length
var j = 0
while (j < n) {
if (values(j) > 0.0) {
df(j) += 1L
}
j += 1
}
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
m += 1L
this
}

/** Merges another. */
def merge(other: DocumentFrequencyAggregator): this.type = {
if (!other.isEmpty) {
m += other.m
if (df == null) {
df = other.df.copy
} else {
df += other.df
}
}
this
}

private def isEmpty: Boolean = m == 0L

/** Returns the current IDF vector. */
def idf(): Vector = {
if (isEmpty) {
throw new IllegalStateException("Haven't seen any document yet.")
}
val n = df.length
val inv = new Array[Double](n)
var j = 0
while (j < n) {
/*
* If the term is not present in the minimum
* number of documents, set IDF to 0. This
* will cause multiplication in IDFModel to
* set TF-IDF to 0.
*
* Since arrays are initialized to 0 by default,
* we just omit changing those entries.
*/
if (df(j) >= minDocFreq) {
inv(j) = math.log((m + 1.0) / (df(j) + 1.0))
}
j += 1
}
Vectors.dense(inv)
}
}

}

/**
Expand All @@ -115,7 +204,7 @@ object IDF extends DefaultParamsReadable[IDF] {
@Since("1.4.0")
class IDFModel private[ml] (
@Since("1.4.0") override val uid: String,
idfModel: feature.IDFModel)
@Since("2.2.0") idfVector: Vector)
extends Model[IDFModel] with IDFBase with MLWritable {

import IDFModel._
Expand All @@ -131,9 +220,8 @@ class IDFModel private[ml] (
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
// TODO: Make the idfModel.transform natively in ml framework to avoid extra conversion.
val idf = udf { vec: Vector => idfModel.transform(OldVectors.fromML(vec)).asML }
dataset.withColumn($(outputCol), idf(col($(inputCol))))
val idfUDF = udf { vec: Vector => IDFModel.transform(idf, vec) }
dataset.withColumn($(outputCol), idfUDF(col($(inputCol))))
}

@Since("1.4.0")
Expand All @@ -143,13 +231,11 @@ class IDFModel private[ml] (

@Since("1.4.1")
override def copy(extra: ParamMap): IDFModel = {
val copied = new IDFModel(uid, idfModel)
val copied = new IDFModel(uid, idf)
copyValues(copied, extra).setParent(parent)
}

/** Returns the IDF vector. */
@Since("2.0.0")
def idf: Vector = idfModel.idf.asML
def idf: Vector = idfVector

@Since("1.6.0")
override def write: MLWriter = new IDFModelWriter(this)
Expand Down Expand Up @@ -181,7 +267,7 @@ object IDFModel extends MLReadable[IDFModel] {
val Row(idf: Vector) = MLUtils.convertVectorColumnsToML(data, "idf")
.select("idf")
.head()
val model = new IDFModel(metadata.uid, new feature.IDFModel(OldVectors.fromML(idf)))
val model = new IDFModel(metadata.uid, idf)
DefaultParamsReader.getAndSetParams(model, metadata)
model
}
Expand All @@ -192,4 +278,27 @@ object IDFModel extends MLReadable[IDFModel] {

@Since("1.6.0")
override def load(path: String): IDFModel = super.load(path)

private def transform(idf: Vector, v: Vector): Vector = {
val newSize = v.size
v match {
case SparseVector(_, indices, values) =>
val nnz = indices.length
val newValues = new Array[Double](nnz)
var k = 0
while (k < nnz) {
newValues(k) = values(k) * idf(indices(k))
k += 1
}
Vectors.sparse(newSize, indices, newValues)
case DenseVector(values) =>
val newValues = new Array[Double](newSize)
var j = 0
while (j < newSize) {
newValues(j) = values(j) * idf(j)
j += 1
}
Vectors.dense(newValues)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.DefaultReadWriteTest
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.feature.{IDFModel => OldIDFModel}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.Row

Expand All @@ -46,7 +44,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead

test("params") {
ParamsSuite.checkParams(new IDF)
val model = new IDFModel("idf", new OldIDFModel(Vectors.dense(1.0)))
val model = new IDFModel("idf", Vectors.dense(1.0))
ParamsSuite.checkParams(model)
}

Expand Down Expand Up @@ -112,7 +110,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
}

test("IDFModel read/write") {
val instance = new IDFModel("myIDFModel", new OldIDFModel(Vectors.dense(1.0, 2.0)))
val instance = new IDFModel("myIDFModel", Vectors.dense(1.0, 2.0))
.setInputCol("myInputCol")
.setOutputCol("myOutputCol")
val newInstance = testDefaultReadWrite(instance)
Expand Down