diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala index cb1abbd18fd4d..9fb1659b10069 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala @@ -18,8 +18,6 @@ package org.apache.spark.examples.mllib import scopt.OptionParser - -import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, RowMatrix} import org.apache.spark.{SparkConf, SparkContext} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala new file mode 100644 index 0000000000000..b3fb6652bcdeb --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * An example app for running item similarity computation on MovieLens format + * sparse data (http://grouplens.org/datasets/movielens/) through column based + * similarity calculation and compare with row based similarity calculation and + * ALS + row based similarity calculation flow. For running row and column based + * similarity on raw features, we are using implicit matrix factorization. + * + * + * A synthetic dataset in MovieLens format can be found at `data/mllib/sample_movielens_data.txt`. + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +package org.apache.spark.examples.mllib + +import org.apache.log4j.{Level, Logger} +import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} +import org.apache.spark.{SparkContext, SparkConf} +import scopt.OptionParser +import org.apache.spark.mllib.recommendation.{ALS, Rating} +import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ +import org.apache.spark.mllib.evaluation.RankingMetrics +import scala.collection.mutable + +object MovieLensSimilarity { + + case class Params( + input: String = null, + numIterations: Int = 20, + rank: Int = 50, + alpha: Double = 0.0, + numUserBlocks: Int = -1, + numProductBlocks: Int = -1, + delim: String = "::", + topk: Int = 50, + threshold: Double = 1e-4) extends AbstractParams[Params] + + def main(args: Array[String]) { + val defaultParams = Params() + + val parser = new OptionParser[Params]("MovieLensSimilarity") { + head("MovieLensSimilarity: an example app for similarity flows on MovieLens data.") + opt[Int]("rank") + .text(s"rank, default: ${defaultParams.rank}}") + .action((x, c) => c.copy(rank = x)) + opt[Int]("numIterations") + .text(s"number of iterations, default: ${defaultParams.numIterations}") + .action((x, c) => c.copy(numIterations = x)) + opt[Int]("numUserBlocks") + .text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)") + .action((x, c) => c.copy(numUserBlocks = x)) + opt[Int]("numProductBlocks") + .text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)") + .action((x, c) => c.copy(numProductBlocks = x)) + opt[Double]("alpha") + .text(s"alpha for implicit feedback") + .action((x, c) => c.copy(alpha = x)) + opt[Int]("topk") + .text("topk for ALS validation") + .action((x, c) => c.copy(topk = x)) + opt[Double]("threshold") + .text("threshold for dimsum sampling and kernel sparsity") + .action((x, c) => c.copy(threshold = x)) + opt[String]("delim") + .text("use delimiter, default ::") + .action((x, c) => c.copy(delim = x)) + arg[String]("") + .required() + .text("input paths to a MovieLens dataset of ratings") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app on a synthetic dataset: + | + | bin/run-example mllib.MovieLensSimilarity \ + | --rank 25 --numIterations 20 --alpha 0.01 --topk 25\ + | data/mllib/sample_movielens_data.txt + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + System.exit(1) + } + } + + def run(params: Params) { + val conf = + new SparkConf() + .setAppName(s"MovieLensSimilarity with $params") + .registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating])) + + val sc = new SparkContext(conf) + Logger.getRootLogger.setLevel(Level.WARN) + + val delim = params.delim + val ratings = sc.textFile(params.input).map { line => + val fields = line.split(delim) + /* + * MovieLens ratings are on a scale of 1-5: + * 5: Must see + * 4: Will enjoy + * 3: It's okay + * 2: Fairly bad + * 1: Awful + * So we should not recommend a movie if the predicted rating is less than 3. + * To map ratings to confidence scores, we use + * 5 -> 2.5, 4 -> 1.5, 3 -> 0.5, 2 -> -0.5, 1 -> -1.5. This mappings means unobserved + * entries are generally between It's okay and Fairly bad. + * The semantics of 0 in this expanded world of non-positive weights + * are "the same as never having interacted at all". + * + * Options: + * + * Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5) + * Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) + * Rating(fields(0).toInt, fields(1).toInt, 1.0) + */ + Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) + }.cache() + + val numRatings = ratings.count() + val numUsers = ratings.map(_.user).distinct().count() + val numMovies = ratings.map(_.product).distinct().count() + + println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.") + + val productFeatures = ratings.map { entry => + MatrixEntry(entry.product, entry.user, entry.rating) + } + val productMatrix = new CoordinateMatrix(productFeatures).toIndexedRowMatrix() + + // brute force row similarities + println("Running row similarities with threshold 1e-4") + val rowSimilarities = productMatrix.rowSimilarities() + + // Row similarities using user defined threshold + println(s"Running row similarities with threshold ${params.threshold}") + val rowSimilaritiesApprox = productMatrix.rowSimilarities(threshold = params.threshold) + + // Compute similar columns on transpose matrix + val userFeatures = ratings.map { entry => + MatrixEntry(entry.user, entry.product, entry.rating) + }.repartition(sc.defaultParallelism) + + val featureMatrix = new CoordinateMatrix(userFeatures).toRowMatrix() + + // Compute similar columns with dimsum sampling + println(s"Running column similarity with threshold ${params.threshold}") + val colSimilarities = featureMatrix.columnSimilarities(params.threshold) + + val exactEntries = rowSimilarities.entries.map { case MatrixEntry(i, j, u) => ((i, j), u) } + val rowEntriesApprox = rowSimilaritiesApprox.entries.map { case MatrixEntry(i, j, u) => + ((i, j), u) + } + val colEntriesApprox = colSimilarities.entries.map { case MatrixEntry(i, j, v) => ((i, j), v) } + + val rowMAE = exactEntries.join(rowEntriesApprox).values.map { + case (u, v) => math.abs(u - v) + } + + val colMAE = exactEntries.join(colEntriesApprox).values.map { + case (u, v) => math.abs(u - v) + } + + println(s"Common entries row: ${rowMAE.count()} col: ${colMAE.count()}") + println(s"Average absolute error in estimate row: ${rowMAE.mean()} col: ${colMAE.mean()}") + + println(s"Running row similarity with topk ${params.topk}") + val rowSimilaritiesTopk = productMatrix.rowSimilarities(topk=params.topk) + + val rowEntriesTopk = rowSimilaritiesTopk.entries.map { case MatrixEntry(i, j, u) => + ((i, j), u) + } + + val rowTopkMAE = exactEntries.join(rowEntriesTopk).values.map { + case (u, v) => math.abs(u - v) + } + println(s"Average absolute error in topk row: ${rowTopkMAE.mean()}") + + val model = new ALS() + .setRank(params.rank) + .setIterations(params.numIterations) + .setLambda(0.0) + .setAlpha(params.alpha) + .setImplicitPrefs(true) + .setUserBlocks(params.numUserBlocks) + .setProductBlocks(params.numProductBlocks) + .run(ratings) + + val topk = params.topk + // Compute similar columns through low rank approximation using ALS + println(s"Running ALS based row similarities") + + val lowRankedSimilarities = model.similarProducts(topk) + + val labels = rowSimilarities.entries.map { case (entry) => + (entry.i, (entry.j, entry.value)) + }.topByKey(params.topk)(Ordering.by(_._2)).map { case (item, similarItems) => + (item, similarItems.map(_._1)) + } + + val predicted = lowRankedSimilarities.entries.map { case (entry) => + (entry.i, (entry.j, entry.value)) + }.topByKey(params.topk)(Ordering.by(_._2)).map { case (item, similarItems) => + (item, similarItems.map(_._1)) + } + + val predictionAndLabels = + predicted.join(labels).map { case (item, (predicted, labels)) => + (predicted, labels) + } + + val rankingMetrics = new RankingMetrics[Long](predictionAndLabels) + println(s"prec@$topk ${rankingMetrics.precisionAt(topk)}") + + sc.stop() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala new file mode 100644 index 0000000000000..68e0b3a62de2c --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.mllib.util.MLUtils +import scala.collection.Map + +/** + * Represents a Kernel abstraction for vector and gemv/gemm based computation + */ +trait Kernel extends Serializable { + + /** compute the kernel value using vector. */ + def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double + + /** update the kernel value after gemv/gemm computation is done. */ + def compute(indexi: Long, indexj: Long, value: Double): Double +} + +/** + * CosineKernel is the default option for similarity calculation + * @param rowNorms denominator needs to be normalized by rowNorm + * @param threshold don't shuffle if similarity is less than the threshold specified by user + */ +case class CosineKernel(rowNorms: Map[Long, Double], threshold: Double) extends Kernel { + + override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = { + val similarity = BLAS.dot(vi, vj) / rowNorms(indexi) / rowNorms(indexj) + if (similarity <= threshold) return 0.0 + similarity + } + + override def compute(indexi: Long, indexj: Long, value: Double): Double = { + value / rowNorms(indexi) / rowNorms(indexj) + } +} + +// For distributed matrix multiplication with user defined normalization +case class ProductKernel() extends Kernel { + + override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = { + BLAS.dot(vi, vj) + } + + override def compute(indexi: Long, indexj: Long, value: Double): Double = value +} + +// For PowerIterationClustering flow +case class RBFKernel(rowNorms: Map[Long, Double], sigma: Double, threshold: Double) extends Kernel { + + val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma) + val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0) + + override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = { + val ssquares = MLUtils.fastSquaredDistance(vi, rowNorms(indexi), vj, rowNorms(indexj)) + coeff * math.exp(expCoeff * ssquares) + } + + override def compute(indexi: Long, indexj: Long, value: Double): Double = { + val norm1 = rowNorms(indexi) + val norm2 = rowNorms(indexj) + val sumSquaredNorm = norm1 * norm1 + norm2 * norm2 - 2.0 * value + coeff * math.exp(expCoeff * sumSquaredNorm) + } +} + +object KernelType extends Enumeration { + type KernelType = Value + val COSINE, PRODUCT, RBF = Value +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala index 3be530fa07537..beb794476ed43 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala @@ -18,11 +18,16 @@ package org.apache.spark.mllib.linalg.distributed import breeze.linalg.{DenseMatrix => BDM} +import org.apache.spark.HashPartitioner +import scala.collection.Map import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.SingularValueDecomposition +import scala.collection.mutable +import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ +import KernelType._ /** * :: Experimental :: @@ -186,4 +191,93 @@ class IndexedRowMatrix( } mat } + + /** + * Computes the row norm of each entry from IndexedRowMatrix + * @param norm L1/L2 norm of each row + * @return Map of row index and norm + */ + def rowMagnitudes(norm: Int): Map[Long, Double] = { + rows.map { indexedRow => + (indexedRow.index, Vectors.norm(indexedRow.vector, norm)) + }.collectAsMap() + } + + /** + * row similarity calculation with user defined kernel + * @param kernel vector based kernel computation + * @param topk topk similar rows to each row + * @return CoordinateMatrix of rows similar to every row + */ + def rowSimilarities( + kernel: Kernel, + topk: Int): CoordinateMatrix = { + multiply(rows, kernel, topk) + } + + /** + * row similarity calculation with cosine kernel + * @param topk topk similar rows to each row + * @param threshold cosine similarity threshold + * @return CoordinateMatrix of rows similar to every row + */ + def rowSimilarities( + topk: Int = nRows.toInt, + threshold: Double = 1e-4): CoordinateMatrix = { + val rowNorms = rowMagnitudes(2) + val kernel = CosineKernel(rowNorms, threshold) + rowSimilarities(kernel, topk) + } + + private def blockify(features: RDD[IndexedRow], blockSize: Int): RDD[(Int, Array[IndexedRow])] = { + val featurePartitioner = new HashPartitioner(blockSize) + val blockedFeatures = features.map { row => + (featurePartitioner.getPartition(row.index), row) + }.groupByKey(blockSize).map { + case (index, rows) => (index, rows.toArray) + } + blockedFeatures.count() + blockedFeatures + } + + private def multiply(query: RDD[IndexedRow], + kernel: Kernel, + topk: Int): CoordinateMatrix = { + val ord = Ordering[(Float, Long)].on[(Long, Double)](x => (x._2.toFloat, x._1)) + val defaultParallelism = rows.sparkContext.defaultParallelism + + val queryBlocks = math.max(query.sparkContext.defaultParallelism, query.partitions.size) / 2 + val dictionaryBlocks = math.max(defaultParallelism, rows.partitions.size) / 2 + + val blockedSmall = blockify(query, queryBlocks) + val blockedBig = blockify(rows, dictionaryBlocks) + + blockedSmall.setName("blockedSmallMatrix") + blockedBig.setName("blockedBigMatrix") + + val topkSims = blockedBig.cartesian(blockedSmall).flatMap { + case ((bigBlockIndex, bigRows), (smallBlockIndex, smallRows)) => + val buf = mutable.ArrayBuilder.make[(Long, (Long, Double))] + for (i <- 0 until bigRows.size; j <- 0 until smallRows.size) { + val bigIndex = bigRows(i).index + val bigRow = bigRows(i).vector + val smallIndex = smallRows(j).index + val smallRow = smallRows(j).vector + val kernelVal = kernel.compute(smallRow, smallIndex, bigRow, bigIndex) + if (kernelVal != 0.0) { + val entry = (bigIndex, (smallIndex, kernelVal)) + buf += entry + } + } + buf.result() + }.topByKey(topk)(ord).flatMap { case (i, value) => + value.map { sim => + MatrixEntry(i, sim._1, sim._2) + } + }.coalesce(defaultParallelism) + + // Materialize the cartesian RDD + topkSims.count() + new CoordinateMatrix(topkSims) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 88c2148403313..2203bd0dc34f9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -24,6 +24,7 @@ import scala.collection.mutable import com.github.fommil.netlib.BLAS.{getInstance => blas} import org.apache.hadoop.fs.Path +import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, CoordinateMatrix} import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -135,6 +136,51 @@ class MatrixFactorizationModel( MatrixFactorizationModel.recommend(productFeatures.lookup(product).head, userFeatures, num) .map(t => Rating(t._1, product, t._2)) + /** + * Find similar products to every product. Cosine similarity is used + * + * @param num how many products to return. The number returned may be less than this. + * @return CoordinateMatrix containing similar product pair and the corresponding similarity + * scores. zero similarities are filtered out + */ + def similarProducts(num: Int, threshold: Double = 1e-4): CoordinateMatrix = { + val productNorms = productFeatures.map { + case (index, features) => (index.toLong, blas.dnrm2(rank, features, 1)) + }.collectAsMap() + + val kernel = CosineKernel(productNorms, threshold) + + val similarProducts = + MatrixFactorizationModel.recommendForAll( + rank, kernel, + productFeatures, productFeatures, num).flatMap { + case (product, top) => top.map { case (index, value) => MatrixEntry(product, index, value) } + } + new CoordinateMatrix(similarProducts) + } + + /** + * Find similar users to every user. Cosine similarity is used + * + * @param num how many users to return. The number returned may be less than this. + * @return CoordinateMatrix containing similar user pair and the corresponding similarity + * scores. zero similarities are filtered out + */ + def similarUsers(num: Int, threshold: Double=1e-4): CoordinateMatrix = { + val userNorms = userFeatures.map { + case (index, features) => (index.toLong, blas.dnrm2(rank, features, 1)) + }.collectAsMap() + + val kernel = CosineKernel(userNorms, threshold) + + val similarUsers = + MatrixFactorizationModel.recommendForAll( + rank, kernel, userFeatures, userFeatures, num).flatMap { + case (user, top) => top.map { case (index, value) => MatrixEntry(user, index, value) } + } + new CoordinateMatrix(similarUsers) + } + protected override val formatVersion: String = "1.0" override def save(sc: SparkContext, path: String): Unit = { @@ -150,14 +196,14 @@ class MatrixFactorizationModel( * rating field. Semantics of score is same as recommendProducts API */ def recommendProductsForUsers(num: Int): RDD[(Int, Array[Rating])] = { - MatrixFactorizationModel.recommendForAll(rank, userFeatures, productFeatures, num).map { + val kernel = ProductKernel() + MatrixFactorizationModel.recommendForAll(rank, kernel, userFeatures, productFeatures, num).map { case (user, top) => val ratings = top.map { case (product, rating) => Rating(user, product, rating) } (user, ratings) } } - /** * Recommends topK users for all products. * @@ -167,7 +213,8 @@ class MatrixFactorizationModel( * rating field. Semantics of score is same as recommendUsers API */ def recommendUsersForProducts(num: Int): RDD[(Int, Array[Rating])] = { - MatrixFactorizationModel.recommendForAll(rank, productFeatures, userFeatures, num).map { + val kernel = ProductKernel() + MatrixFactorizationModel.recommendForAll(rank, kernel, productFeatures, userFeatures, num).map { case (product, top) => val ratings = top.map { case (user, rating) => Rating(user, product, rating) } (product, ratings) @@ -193,8 +240,9 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { } /** - * Makes recommendations for all users (or products). + * Makes batch user->product, user->user and product->product recommendations * @param rank rank + * @param kernel kernels for distance calculation where computation is dot decomposable * @param srcFeatures src features to receive recommendations * @param dstFeatures dst features used to make recommendations * @param num number of recommendations for each record @@ -203,6 +251,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { */ private def recommendForAll( rank: Int, + kernel: Kernel, srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { @@ -216,7 +265,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { val output = new Array[(Int, (Int, Double))](m * n) var k = 0 ratings.foreachActive { (i, j, r) => - output(k) = (srcIds(i), (dstIds(j), r)) + val kernelVal = kernel.compute(srcIds(i), dstIds(j), r) + output(k) = (srcIds(i), (dstIds(j), kernelVal)) k += 1 } output.toSeq @@ -311,5 +361,4 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { new Path(dataPath(path), "product").toUri.toString } } - } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala index 2ab53cc13db71..7368deeed241e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala @@ -143,6 +143,109 @@ class IndexedRowMatrixSuite extends FunSuite with MLlibTestSparkContext { } } + test("similar rows with cosine kernel") { + val n = 3 + + val denseData = sc.parallelize(Seq( + IndexedRow(0, Vectors.dense(0.0, 3.0, 6.0, 9.0)), + IndexedRow(1, Vectors.dense(1.0, 4.0, 7.0, 0.0)), + IndexedRow(2, Vectors.dense(2.0, 5.0, 8.0, 1.0)) + ), 2) + + val denseMat = new IndexedRowMatrix(denseData, 3L, 4) + + val similarRows = denseMat.rowSimilarities() + + assert(similarRows.numRows() == n) + assert(similarRows.numCols() == n) + + val similarEntries = similarRows.entries.collect() + + val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)) + + val expected = BDM( + (126.0, 54.0, 72.0), + (54.0, 66.0, 78.0), + (72.0, 78.0, 94.0)) + + for (i <- 0 until n; j <- 0 until n) expected(i, j) /= (colMags(i) * colMags(j)) + + similarEntries.foreach { entry => + val row = entry.i.toInt + val col = entry.j.toInt + assert(math.abs(entry.value - expected(row, col)) < 1e-6) + } + } + + test("similar rows with cosine kernel and topk") { + val n = 3 + + val denseData = sc.parallelize(Seq( + IndexedRow(0, Vectors.dense(0.0, 3.0, 6.0, 9.0)), + IndexedRow(1, Vectors.dense(1.0, 4.0, 7.0, 0.0)), + IndexedRow(2, Vectors.dense(2.0, 5.0, 8.0, 1.0)) + ), 2) + + val denseMat = new IndexedRowMatrix(denseData, 3L, 4) + val topk = 2 + val similarRows = denseMat.rowSimilarities(topk=2) + + assert(similarRows.numRows() == n) + assert(similarRows.entries.count() == n * topk) + + val similarEntries = similarRows.entries.collect() + + val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)) + + val expected = BDM( + (126.0, 54.0, 72.0), + (54.0, 66.0, 78.0), + (72.0, 78.0, 94.0)) + + for (i <- 0 until n; j <- 0 until n) expected(i, j) /= (colMags(i) * colMags(j)) + + similarEntries.foreach { entry => + val row = entry.i.toInt + val col = entry.j.toInt + assert(math.abs(entry.value - expected(row, col)) < 1e-6) + } + } + + test("similar rows with cosine kernel and sparse data") { + val n = 3 + + val sparseData = sc.parallelize(Seq( + IndexedRow(0, Vectors.sparse(4, Array(1, 2, 3), Array(3.0, 6.0, 9.0))), + IndexedRow(1, Vectors.sparse(4, Array(0, 1, 2), Array(1.0, 4.0, 7.0))), + IndexedRow(2, Vectors.sparse(4, Array(0, 1, 2, 3), Array(2.0, 5.0, 8.0, 1.0))) + ), 2) + + val sparseMat = new IndexedRowMatrix(sparseData, 3L, 4) + + + val similarRows = sparseMat.rowSimilarities() + + assert(similarRows.numRows() == n) + assert(similarRows.numCols() == n) + + val similarEntries = similarRows.entries.collect() + + val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)) + + val expected = BDM( + (126.0, 54.0, 72.0), + (54.0, 66.0, 78.0), + (72.0, 78.0, 94.0)) + + for (i <- 0 until n; j <- 0 until n) expected(i, j) /= (colMags(i) * colMags(j)) + + similarEntries.foreach { entry => + val row = entry.i.toInt + val col = entry.j.toInt + assert(math.abs(entry.value - expected(row, col)) < 1e-6) + } + } + def closeToZero(G: BDM[Double]): Boolean = { G.valuesIterator.map(math.abs).sum < 1e-6 } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala index 2c92866f3893d..ba9a4d96c7e0e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.recommendation +import org.apache.spark.mllib.linalg.{DenseMatrix, Vectors} import org.scalatest.FunSuite import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -92,4 +93,51 @@ class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext assert(recommendations(2)(1).user == 0) assert(recommendations(2)(1).rating ~== 17.0 relTol 1e-14) } + + test("batch similar users/products") { + val n = 3 + + val userFeatures = sc.parallelize(Seq( + (0, Array(0.0, 3.0, 6.0, 9.0)), + (1, Array(1.0, 4.0, 7.0, 0.0)), + (2, Array(2.0, 5.0, 8.0, 1.0)) + ), 2) + + val model = new MatrixFactorizationModel(4, userFeatures, userFeatures) + + val topk = 2 + + val similarUsers = model.similarUsers(topk) + + val similarProducts = model.similarProducts(topk) + + assert(similarUsers.numRows() == n) + assert(similarUsers.entries.count() == n * topk) + + assert(similarProducts.numRows() == n) + assert(similarProducts.entries.count() == n * topk) + + val similarEntriesUsers = similarUsers.entries.collect() + val similarEntriesProducts = similarProducts.entries.collect() + + val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)) + + val expected = + new DenseMatrix(3, 3, + Array(126.0, 54.0, 72.0, 54.0, 66.0, 78.0, 72.0, 78.0, 94.0)) + + for (i <- 0 until n; j <- 0 until n) expected(i, j) /= (colMags(i) * colMags(j)) + + similarEntriesUsers.foreach { entry => + val row = entry.i.toInt + val col = entry.j.toInt + assert(entry.value ~== expected(row, col) relTol 1e-6) + } + + similarEntriesProducts.foreach { entry => + val row = entry.i.toInt + val col = entry.j.toInt + assert(entry.value ~== expected(row, col) relTol 1e-6) + } + } }