diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index ca11ede4ccd47..d27da02497489 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.Logging import org.apache.spark.annotation.Since -import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.{axpy, scal} import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD @@ -45,7 +45,9 @@ class KMeans private ( private var initializationMode: String, private var initializationSteps: Int, private var epsilon: Double, - private var seed: Long) extends Serializable with Logging { + private var seed: Long, + private var vectorFactory: VectorFactory = DenseVectorFactory.instance + ) extends Serializable with Logging { /** * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, @@ -176,6 +178,13 @@ class KMeans private ( this } + def getVectorFactory: VectorFactory = vectorFactory + + def setVectorFactory(vectorFactory: VectorFactory): this.type = { + this.vectorFactory = vectorFactory + this + } + // Initial cluster centers can be provided as a KMeansModel object rather than using the // random or k-means|| initializationMode private var initialModel: Option[KMeansModel] = None @@ -282,7 +291,8 @@ class KMeans private ( val k = thisActiveCenters(0).length val dims = thisActiveCenters(0)(0).vector.size - val sums = Array.fill(runs, k)(Vectors.zeros(dims)) +// val sums = Array.fill(runs, k)(Vectors.zeros(dims)) + val sums = Array.fill(runs, k)(vectorFactory.zeros(dims)) val counts = Array.fill(runs, k)(0L) points.foreach { point => @@ -376,7 +386,8 @@ class KMeans private ( // Initialize each run's first center to a random point. val seed = new XORShiftRandom(this.seed).nextInt() val sample = data.takeSample(true, runs, seed).toSeq - val newCenters = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense)) +// val newCenters = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense)) + val newCenters = Array.tabulate(runs)(r => ArrayBuffer(sample(r).compact(vectorFactory))) /** Merges new centers to centers. */ def mergeNewCenters(): Unit = { @@ -436,7 +447,8 @@ class KMeans private ( }.collect() mergeNewCenters() chosen.foreach { case (p, rs) => - rs.foreach(newCenters(_) += p.toDense) +// rs.foreach(newCenters(_) += p.toDense) + rs.foreach(newCenters(_) += p) } step += 1 } @@ -459,7 +471,7 @@ class KMeans private ( val finalCenters = (0 until runs).par.map { r => val myCenters = centers(r).toArray val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray - LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30) + LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30, vectorFactory) } finalCenters.toArray @@ -488,6 +500,7 @@ object KMeans { * @param runs number of parallel runs, defaults to 1. The best model is returned. * @param initializationMode initialization model, either "random" or "k-means||" (default). * @param seed random seed value for cluster initialization + * @param vectorFactory provide factory to use for creating the vectors representing the centroids */ @Since("1.3.0") def train( @@ -496,12 +509,14 @@ object KMeans { maxIterations: Int, runs: Int, initializationMode: String, - seed: Long): KMeansModel = { + seed: Long, + vectorFactory: VectorFactory = DenseVectorFactory.instance): KMeansModel = { new KMeans().setK(k) .setMaxIterations(maxIterations) .setRuns(runs) .setInitializationMode(initializationMode) .setSeed(seed) + .setVectorFactory(vectorFactory) .run(data) } @@ -617,5 +632,33 @@ class VectorWithNorm(val vector: Vector, val norm: Double) extends Serializable def this(array: Array[Double]) = this(Vectors.dense(array)) /** Converts the vector to a dense vector. */ - def toDense: VectorWithNorm = new VectorWithNorm(Vectors.dense(vector.toArray), norm) +// def toDense: VectorWithNorm = new VectorWithNorm(Vectors.dense(vector.toArray), norm) + + def compact(fact: VectorFactory): VectorWithNorm = new VectorWithNorm(fact.compact(vector), norm) +} + +trait VectorFactory extends Serializable { + def zeros(size: Int): Vector + + def compact(vec: Vector): Vector +} + +class DenseVectorFactory private() extends VectorFactory { + override def zeros(size: Int): Vector = Vectors.zeros(size) + + override def compact(vec: Vector): Vector = vec.toDense +} + +object DenseVectorFactory { + val instance = new DenseVectorFactory +} + +class SmartVectorFactory private() extends VectorFactory { + override def zeros(size: Int): Vector = new SparseVector(size, Array.empty, Array.empty) + + override def compact(vec: Vector): Vector = vec.compressed +} + +object SmartVectorFactory { + val instance = new SmartVectorFactory } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala index c9a96c68667af..41b74c83fff20 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala @@ -38,14 +38,15 @@ private[mllib] object LocalKMeans extends Logging { points: Array[VectorWithNorm], weights: Array[Double], k: Int, - maxIterations: Int + maxIterations: Int, + vectorFactory: VectorFactory ): Array[VectorWithNorm] = { val rand = new Random(seed) val dimensions = points(0).vector.size val centers = new Array[VectorWithNorm](k) // Initialize centers by sampling using the k-means++ procedure. - centers(0) = pickWeighted(rand, points, weights).toDense + centers(0) = pickWeighted(rand, points, weights).compact(vectorFactory) for (i <- 1 until k) { // Pick the next center with a probability proportional to cost under current centers val curCenters = centers.view.take(i) @@ -62,9 +63,9 @@ private[mllib] object LocalKMeans extends Logging { if (j == 0) { logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." + s" Using duplicate point for center k = $i.") - centers(i) = points(0).toDense + centers(i) = points(0).compact(vectorFactory) } else { - centers(i) = points(j - 1).toDense + centers(i) = points(j - 1).compact(vectorFactory) } } @@ -93,7 +94,7 @@ private[mllib] object LocalKMeans extends Logging { while (j < k) { if (counts(j) == 0.0) { // Assign center to a random point - centers(j) = points(rand.nextInt(points.length)).toDense + centers(j) = points(rand.nextInt(points.length)).compact(vectorFactory) } else { scal(1.0 / counts(j), sums(j)) centers(j) = new VectorWithNorm(sums(j)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala index df9f4ae145b88..a073dd94bdc6c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala @@ -54,6 +54,16 @@ private[spark] object BLAS extends Serializable with Logging { throw new UnsupportedOperationException( s"axpy doesn't support x type ${x.getClass}.") } + case sy: SparseVector => + x match { + case sx: SparseVector => + axpy(a, sx, sy) + case dx: DenseVector => + axpy(a, dx, sy) + case _ => + throw new UnsupportedOperationException( + s"axpy doesn't support x type ${x.getClass}.") + } case _ => throw new IllegalArgumentException( s"axpy only supports adding to a dense vector but got type ${y.getClass}.") @@ -92,6 +102,74 @@ private[spark] object BLAS extends Serializable with Logging { } } + /** + * y += a * x + */ + private def axpy(a: Double, x: DenseVector, y: SparseVector): Unit = { + require(x.size == y.size) + + val xIndices = (0 until x.size).filter(i => x(i) != 0.0).toArray + val xValues = xIndices.map(i => x(i)) + + axpy(a, Vectors.sparse(x.size, xIndices, xValues), y) + } + + /** + * y += a * x + */ + private def axpy(a: Double, x: SparseVector, y: SparseVector): Unit = { + val xSortedIndices = x.indices + val xValues = x.values + + val ySortedIndices = y.indices + val yValues = y.values + + val newIndices = new Array[Int](xSortedIndices.length + ySortedIndices.length) + val newValues = new Array[Double](xValues.length + yValues.length) + + assert(newIndices.length == newValues.length) + + var xj = 0 + var yj = 0 + var j = 0 + var previ = Int.MinValue + + def getAt(indices: Array[Int], j: Int): Int = + if (j < indices.length) indices(j) else Int.MaxValue + + while (xj < xSortedIndices.length || yj < ySortedIndices.length) { + val xi = getAt(xSortedIndices, xj) + val yi = getAt(ySortedIndices, yj) + + val (i, value) = if (xi <= yi) { + val vv = a*xValues(xj) + xj += 1 + (xi, vv) + } + else { + val vv = yValues(yj) + yj += 1 + (yi, vv) + } + + assert(i >= previ) + + if (previ != i) { + newIndices(j) = i + newValues(j) = value + j += 1 + } + else { + assert(newIndices(j - 1) == i) + newValues(j - 1) += value + } + + previ = i + } + + y.reassign(newIndices.slice(0, j), newValues.slice(0, j)) + } + /** Y += a * x */ private[spark] def axpy(a: Double, X: DenseMatrix, Y: DenseMatrix): Unit = { require(X.numRows == Y.numRows && X.numCols == Y.numCols, "Dimension mismatch: " + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index cecfd067bd874..bacb2dd0fa3b6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -700,21 +700,37 @@ object DenseVector { * A sparse vector represented by an index array and an value array. * * @param size size of the vector. - * @param indices index array, assume to be strictly increasing. - * @param values value array, must have the same length as the index array. + * @param sortedIndices index array, assume to be strictly increasing. + * @param sortedValues value array, must have the same length as the index array. */ @Since("1.0.0") @SQLUserDefinedType(udt = classOf[VectorUDT]) class SparseVector @Since("1.0.0") ( @Since("1.0.0") override val size: Int, - @Since("1.0.0") val indices: Array[Int], - @Since("1.0.0") val values: Array[Double]) extends Vector { + @Since("1.0.0") private var sortedIndices: Array[Int], + @Since("1.0.0") private var sortedValues: Array[Double]) extends Vector { + + require(allRequirements()) + + def allRequirements(): Boolean = { + require(indices.length == values.length, "Sparse vectors require that the dimension of the" + + s" indices match the dimension of the values. You provided ${indices.length} indices and " + + s" ${values.length} values.") + require(indices.length <= size, s"You provided ${indices.length} indices and values, " + + s"which exceeds the specified vector size ${size}.") + + true + } + + def reassign(newSortedIndices: Array[Int], newValues: Array[Double]): Unit = { + sortedIndices = newSortedIndices + sortedValues = newValues + require(allRequirements()) + } + + def indices: Array[Int] = sortedIndices - require(indices.length == values.length, "Sparse vectors require that the dimension of the" + - s" indices match the dimension of the values. You provided ${indices.length} indices and " + - s" ${values.length} values.") - require(indices.length <= size, s"You provided ${indices.length} indices and values, " + - s"which exceeds the specified vector size ${size}.") + def values: Array[Double] = sortedValues override def toString: String = s"($size,${indices.mkString("[", ",", "]")},${values.mkString("[", ",", "]")})" diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index 3003c62d9876c..ec3430d083ae8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.mllib.clustering +import org.apache.spark.mllib.clustering.KMeans._ +import org.apache.spark.rdd.RDD + import scala.util.Random import org.apache.spark.SparkFunSuite @@ -25,11 +28,25 @@ import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkCont import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.util.Utils +object KMeansSuiteHelper { + private[clustering] + def trainKMeans(data: RDD[Vector], + k: Int, + maxIterations: Int, + runs: Int = 1, + initializationMode: String = K_MEANS_PARALLEL, + seed: Long = 42, + vectorFactory: VectorFactory = DenseVectorFactory.instance): KMeansModel = { + KMeans.train(data, k, maxIterations, runs, initializationMode, seed, vectorFactory) + } +} + class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { import org.apache.spark.mllib.clustering.KMeans.{K_MEANS_PARALLEL, RANDOM} + import KMeansSuiteHelper.trainKMeans - test("single cluster") { + private def testSingleCluster(vectorFactory: VectorFactory): Unit = { val data = sc.parallelize(Array( Vectors.dense(1.0, 2.0, 6.0), Vectors.dense(1.0, 3.0, 0.0), @@ -41,30 +58,38 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { // No matter how many runs or iterations we use, we should get one cluster, // centered at the mean of the points - var model = KMeans.train(data, k = 1, maxIterations = 1) + var model = trainKMeans(data, k = 1, maxIterations = 1, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 2) + model = trainKMeans(data, k = 1, maxIterations = 2, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 5) + model = trainKMeans(data, k = 1, maxIterations = 5, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5) + model = trainKMeans(data, k = 1, maxIterations = 1, runs = 5, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5) + model = trainKMeans(data, k = 1, maxIterations = 1, runs = 5, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM) + model = trainKMeans( + data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM, + vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train( - data, k = 1, maxIterations = 1, runs = 1, initializationMode = K_MEANS_PARALLEL) + model = trainKMeans( + data, k = 1, maxIterations = 1, runs = 1, initializationMode = K_MEANS_PARALLEL, + vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) } - test("no distinct points") { + test("single cluster") { + testSingleCluster(DenseVectorFactory.instance) + testSingleCluster(SmartVectorFactory.instance) + } + + private def testNoDistinctPoints(vectorFactory: VectorFactory): Unit = { val data = sc.parallelize( Array( Vectors.dense(1.0, 2.0, 3.0), @@ -74,11 +99,16 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { val center = Vectors.dense(1.0, 2.0, 3.0) // Make sure code runs. - var model = KMeans.train(data, k = 2, maxIterations = 1) + var model = trainKMeans(data, k = 2, maxIterations = 1, vectorFactory = vectorFactory) assert(model.clusterCenters.size === 2) } - test("more clusters than points") { + test("no distinct points") { + testNoDistinctPoints(DenseVectorFactory.instance) + testNoDistinctPoints(SmartVectorFactory.instance) + } + + private def testMoreClustersThanPoints(vectorFactory: VectorFactory): Unit = { val data = sc.parallelize( Array( Vectors.dense(1.0, 2.0, 3.0), @@ -86,23 +116,28 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { 2) // Make sure code runs. - var model = KMeans.train(data, k = 3, maxIterations = 1) + var model = trainKMeans(data, k = 3, maxIterations = 1, vectorFactory = vectorFactory) assert(model.clusterCenters.size === 3) } - test("deterministic initialization") { + test("more clusters than points") { + testMoreClustersThanPoints(DenseVectorFactory.instance) + testMoreClustersThanPoints(SmartVectorFactory.instance) + } + + private def testDeterministicInitialization(vectorFactory: VectorFactory): Unit = { // Create a large-ish set of points for clustering val points = List.tabulate(1000)(n => Vectors.dense(n, n)) val rdd = sc.parallelize(points, 3) for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) { // Create three deterministic models and compare cluster means - val model1 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, - initializationMode = initMode, seed = 42) + val model1 = trainKMeans(rdd, k = 10, maxIterations = 2, runs = 1, + initializationMode = initMode, seed = 42, vectorFactory = vectorFactory) val centers1 = model1.clusterCenters - val model2 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, - initializationMode = initMode, seed = 42) + val model2 = trainKMeans(rdd, k = 10, maxIterations = 2, runs = 1, + initializationMode = initMode, seed = 42, vectorFactory = vectorFactory) val centers2 = model2.clusterCenters centers1.zip(centers2).foreach { case (c1, c2) => @@ -111,7 +146,12 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { } } - test("single cluster with big dataset") { + test("deterministic initialization") { + testDeterministicInitialization(DenseVectorFactory.instance) + testDeterministicInitialization(SmartVectorFactory.instance) + } + + private def testSingleClusterWithBigDataset(vectorFactory: VectorFactory): Unit = { val smallData = Array( Vectors.dense(1.0, 2.0, 6.0), Vectors.dense(1.0, 3.0, 0.0), @@ -124,32 +164,37 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { val center = Vectors.dense(1.0, 3.0, 4.0) - var model = KMeans.train(data, k = 1, maxIterations = 1) + var model = trainKMeans(data, k = 1, maxIterations = 1, vectorFactory = vectorFactory) assert(model.clusterCenters.size === 1) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 2) + model = trainKMeans(data, k = 1, maxIterations = 2, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 5) + model = trainKMeans(data, k = 1, maxIterations = 5, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5) + model = trainKMeans(data, k = 1, maxIterations = 1, runs = 5, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5) + model = trainKMeans(data, k = 1, maxIterations = 1, runs = 5, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM) + model = trainKMeans(data, k = 1, maxIterations = 1, runs = 1, + initializationMode = RANDOM, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, - initializationMode = K_MEANS_PARALLEL) + model = trainKMeans(data, k = 1, maxIterations = 1, runs = 1, + initializationMode = K_MEANS_PARALLEL, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) } - test("single cluster with sparse data") { + test("single cluster with big dataset") { + testSingleClusterWithBigDataset(DenseVectorFactory.instance) + testSingleClusterWithBigDataset(SmartVectorFactory.instance) + } + private def testSingleClusterWithSparseData(vectorFactory: VectorFactory): Unit = { val n = 10000 val data = sc.parallelize((1 to 100).flatMap { i => val x = i / 1000.0 @@ -170,33 +215,38 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { val center = Vectors.sparse(n, Seq((0, 1.0), (1, 3.0), (2, 4.0))) - var model = KMeans.train(data, k = 1, maxIterations = 1) + var model = trainKMeans(data, k = 1, maxIterations = 1, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 2) + model = trainKMeans(data, k = 1, maxIterations = 2, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 5) + model = trainKMeans(data, k = 1, maxIterations = 5, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5) + model = trainKMeans(data, k = 1, maxIterations = 1, runs = 5, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5) + model = trainKMeans(data, k = 1, maxIterations = 1, runs = 5, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM) + model = trainKMeans(data, k = 1, maxIterations = 1, runs = 1, + initializationMode = RANDOM, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, - initializationMode = K_MEANS_PARALLEL) + model = trainKMeans(data, k = 1, maxIterations = 1, runs = 1, + initializationMode = K_MEANS_PARALLEL, vectorFactory = vectorFactory) assert(model.clusterCenters.head ~== center absTol 1E-5) data.unpersist() } - test("k-means|| initialization") { + test("single cluster with sparse data") { + testSingleClusterWithSparseData(DenseVectorFactory.instance) + testSingleClusterWithSparseData(SmartVectorFactory.instance) + } + private def testKMeansParallelInitialization(vectorFactory: VectorFactory): Unit = { case class VectorWithCompare(x: Vector) extends Ordered[VectorWithCompare] { override def compare(that: VectorWithCompare): Int = { if (this.x.toArray.foldLeft[Double](0.0)((acc, x) => acc + x * x) > @@ -221,23 +271,28 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { // it will make at least five passes, and it will give non-zero probability to each // unselected point as long as it hasn't yet selected all of them - var model = KMeans.train(rdd, k = 5, maxIterations = 1) + var model = trainKMeans(rdd, k = 5, maxIterations = 1, vectorFactory = vectorFactory) assert(model.clusterCenters.sortBy(VectorWithCompare(_)) .zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5)) // Iterations of Lloyd's should not change the answer either - model = KMeans.train(rdd, k = 5, maxIterations = 10) + model = trainKMeans(rdd, k = 5, maxIterations = 10, vectorFactory = vectorFactory) assert(model.clusterCenters.sortBy(VectorWithCompare(_)) .zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5)) // Neither should more runs - model = KMeans.train(rdd, k = 5, maxIterations = 10, runs = 5) + model = trainKMeans(rdd, k = 5, maxIterations = 10, runs = 5, vectorFactory = vectorFactory) assert(model.clusterCenters.sortBy(VectorWithCompare(_)) .zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5)) } - test("two clusters") { + test("k-means|| initialization") { + testKMeansParallelInitialization(DenseVectorFactory.instance) + testKMeansParallelInitialization(SmartVectorFactory.instance) + } + + private def testTwoClusters(vectorFactory: VectorFactory): Unit = { val points = Seq( Vectors.dense(0.0, 0.0), Vectors.dense(0.0, 0.1), @@ -250,7 +305,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) { // Two iterations are sufficient no matter where the initial centers are. - val model = KMeans.train(rdd, k = 2, maxIterations = 2, runs = 1, initMode) + val model = trainKMeans(rdd, k = 2, maxIterations = 2, runs = 1, initializationMode = initMode, vectorFactory = vectorFactory) val predicts = model.predict(rdd).collect() @@ -262,6 +317,11 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { } } + test("two clusters") { + testTwoClusters(DenseVectorFactory.instance) + testTwoClusters(SmartVectorFactory.instance) + } + test("model save/load") { val tempDir = Utils.createTempDir() val path = tempDir.toURI.toString @@ -279,7 +339,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { } } - test("Initialize using given cluster centers") { + private def testInitializeUsingGivenClusterCenters(vectorFactory: VectorFactory): Unit = { val points = Seq( Vectors.dense(0.0, 0.0), Vectors.dense(1.0, 0.0), @@ -294,12 +354,17 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { .setK(2) .setMaxIterations(0) .setInitialModel(initialModel) + .setVectorFactory(vectorFactory) .run(rdd) - // comparing the returned model and the initial model + // comparing the returned model and the initial model assert(returnModel.clusterCenters(0) === initialModel.clusterCenters(0)) assert(returnModel.clusterCenters(1) === initialModel.clusterCenters(1)) } + test("Initialize using given cluster centers") { + testInitializeUsingGivenClusterCenters(DenseVectorFactory.instance) + testInitializeUsingGivenClusterCenters(SmartVectorFactory.instance) + } } object KMeansSuite extends SparkFunSuite { @@ -327,8 +392,9 @@ object KMeansSuite extends SparkFunSuite { } class KMeansClusterSuite extends SparkFunSuite with LocalClusterSparkContext { + import KMeansSuiteHelper.trainKMeans - test("task size should be small in both training and prediction") { + private def testTaskSizeShouldBeSmallInBoth(vectorFactory: VectorFactory): Unit = { val m = 4 val n = 200000 val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) => @@ -338,9 +404,14 @@ class KMeansClusterSuite extends SparkFunSuite with LocalClusterSparkContext { for (initMode <- Seq(KMeans.RANDOM, KMeans.K_MEANS_PARALLEL)) { // If we serialize data directly in the task closure, the size of the serialized task would be // greater than 1MB and hence Spark would throw an error. - val model = KMeans.train(points, 2, 2, 1, initMode) + val model = trainKMeans(points, 2, 2, 1, initMode, vectorFactory = vectorFactory) val predictions = model.predict(points).collect() val cost = model.computeCost(points) } } + + test("task size should be small in both training and prediction") { + testTaskSizeShouldBeSmallInBoth(DenseVectorFactory.instance) + testTaskSizeShouldBeSmallInBoth(SmartVectorFactory.instance) + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala index 80da03cc2efeb..efad470cfae7a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala @@ -64,6 +64,19 @@ class BLASSuite extends SparkFunSuite { assert(dx ~== Vectors.dense(0.1, 0.0, -0.2) absTol 1e-15) } + test("axpy(a: Double, x: SparseVector, y: SparseVector)") { + val x = new SparseVector(25, Array(0, 2, 4, 7, 8, 15), Array(0.0, 2.0, 4.0, 7.0, 8.0, 15.0)) + val y = new SparseVector(25, Array(4, 9, 20), Array(4.0, 9.0, 20.0)) + + axpy(0.5, x, y) + + val expected = Vectors.sparse(25, + Array(0, 2, 4, 7, 8, 9, 15, 20), + Array(0.0, 1.0, 6.0, 3.5, 4.0, 9.0, 7.5, 20.0)) + + assert(expected ~== y absTol 1e-15) + } + test("axpy") { val alpha = 0.1 val sx = Vectors.sparse(3, Array(0, 2), Array(1.0, -2.0)) @@ -79,14 +92,22 @@ class BLASSuite extends SparkFunSuite { axpy(alpha, dx, dy2) assert(dy2 ~== expected absTol 1e-15) - val sy = Vectors.sparse(4, Array(0, 1), Array(2.0, 1.0)) + val sy1 = Vectors.dense(dy).toSparse + axpy(alpha, sx, sy1) + assert(sy1 ~== expected absTol 1e-15) + + val sy2 = Vectors.dense(dy).toSparse + axpy(alpha, dx, sy2) + assert(sy2 ~== expected absTol 1e-15) + + val largerSy = Vectors.sparse(4, Array(0, 1), Array(2.0, 1.0)) intercept[IllegalArgumentException] { - axpy(alpha, sx, sy) + axpy(alpha, sx, largerSy) } intercept[IllegalArgumentException] { - axpy(alpha, dx, sy) + axpy(alpha, dx, largerSy) } withClue("vector sizes must match") {