From f9fd6fbfb1a55142a9eb8f2129d3729ca25ab501 Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 16 May 2015 17:05:52 -0700 Subject: [PATCH 01/16] blocked kernalized row similarity calculation and tests --- .../linalg/distributed/IndexedRowMatrix.scala | 85 +++++++++++++++ .../distributed/IndexedRowMatrixSuite.scala | 103 ++++++++++++++++++ 2 files changed, 188 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala index 3be530fa07537..1de4e3a82bca9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala @@ -18,11 +18,15 @@ package org.apache.spark.mllib.linalg.distributed import breeze.linalg.{DenseMatrix => BDM} +import org.apache.spark.HashPartitioner import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.SingularValueDecomposition +import scala.collection.mutable +import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ +import KernelType._ /** * :: Experimental :: @@ -186,4 +190,85 @@ class IndexedRowMatrix( } mat } + + def rowSimilarities( + kernelType: KernelType = COSINE, + topk: Int = nRows.toInt, + threshold: Double = 1e-4): CoordinateMatrix = { + val rowNorms = IndexedRowMatrix.rowMagnitudes(rows, 2) + val kernel = kernelType match { + case COSINE => CosineKernel(rowNorms, threshold) + case EUCLIDEAN => EuclideanKernel(rowNorms, threshold) + case RBF => RBFKernel(rowNorms, threshold) + } + IndexedRowMatrix.multiply(rows, rows, kernel, topk) + } +} + +object IndexedRowMatrix { + def rowMagnitudes(rows: RDD[IndexedRow], norm: Int) : Map[Long, Double] = { + rows.map { indexedRow => + (indexedRow.index, Vectors.norm(indexedRow.vector, norm)) + }.collect().toMap + } + + def blockify(features: RDD[IndexedRow], blockSize: Int): RDD[(Int, Array[IndexedRow])] = { + val featurePartitioner = new HashPartitioner(blockSize) + val blockedFeatures = features.map { row => + (featurePartitioner.getPartition(row.index), row) + }.groupByKey(blockSize).map { + case (index, rows) => (index, rows.toArray) + } + blockedFeatures.count() + blockedFeatures + } + + // TO DO: Explore LSH and KDTree ideas to further improve runtime + def multiply( + small: RDD[IndexedRow], + big: RDD[IndexedRow], + kernel: Kernel, + topk: Int): CoordinateMatrix = { + val ord = Ordering[(Float, Long)].on[(Long, Double)](x => (x._2.toFloat, x._1)) + val defaultParallelism = big.sparkContext.defaultParallelism + + val smallBlocks = math.max(small.sparkContext.defaultParallelism, small.partitions.size) / 2 + val bigBlocks = math.max(defaultParallelism, big.partitions.size) / 2 + + val blockedSmall = blockify(small, smallBlocks) + val blockedBig = blockify(big, bigBlocks) + + blockedSmall.setName("blockedSmallMatrix") + blockedBig.setName("blockedBigMatrix") + + blockedBig.cache() + + val topkSims = blockedBig.cartesian(blockedSmall).flatMap { + case ((bigBlockIndex, bigRows), (smallBlockIndex, smallRows)) => + val buf = mutable.ArrayBuilder.make[(Long, (Long, Double))] + for (i <- 0 until bigRows.size; j <- 0 until smallRows.size) { + val bigIndex = bigRows(i).index + val bigRow = bigRows(i).vector + val smallIndex = smallRows(j).index + val smallRow = smallRows(j).vector + val kernelVal = kernel.compute(smallRow, smallIndex, bigRow, bigIndex) + if (kernelVal != 0.0) { + val entry = (bigIndex, (smallIndex, kernelVal)) + buf += entry + } + } + buf.result() + }.topByKey(topk)(ord).flatMap { case (i, value) => + value.map { sim => + MatrixEntry(i, sim._1, sim._2) + } + }.repartition(defaultParallelism) + + blockedBig.unpersist() + + // Materialize the cartesian RDD + topkSims.count() + new CoordinateMatrix(topkSims) + } } + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala index 2ab53cc13db71..7368deeed241e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala @@ -143,6 +143,109 @@ class IndexedRowMatrixSuite extends FunSuite with MLlibTestSparkContext { } } + test("similar rows with cosine kernel") { + val n = 3 + + val denseData = sc.parallelize(Seq( + IndexedRow(0, Vectors.dense(0.0, 3.0, 6.0, 9.0)), + IndexedRow(1, Vectors.dense(1.0, 4.0, 7.0, 0.0)), + IndexedRow(2, Vectors.dense(2.0, 5.0, 8.0, 1.0)) + ), 2) + + val denseMat = new IndexedRowMatrix(denseData, 3L, 4) + + val similarRows = denseMat.rowSimilarities() + + assert(similarRows.numRows() == n) + assert(similarRows.numCols() == n) + + val similarEntries = similarRows.entries.collect() + + val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)) + + val expected = BDM( + (126.0, 54.0, 72.0), + (54.0, 66.0, 78.0), + (72.0, 78.0, 94.0)) + + for (i <- 0 until n; j <- 0 until n) expected(i, j) /= (colMags(i) * colMags(j)) + + similarEntries.foreach { entry => + val row = entry.i.toInt + val col = entry.j.toInt + assert(math.abs(entry.value - expected(row, col)) < 1e-6) + } + } + + test("similar rows with cosine kernel and topk") { + val n = 3 + + val denseData = sc.parallelize(Seq( + IndexedRow(0, Vectors.dense(0.0, 3.0, 6.0, 9.0)), + IndexedRow(1, Vectors.dense(1.0, 4.0, 7.0, 0.0)), + IndexedRow(2, Vectors.dense(2.0, 5.0, 8.0, 1.0)) + ), 2) + + val denseMat = new IndexedRowMatrix(denseData, 3L, 4) + val topk = 2 + val similarRows = denseMat.rowSimilarities(topk=2) + + assert(similarRows.numRows() == n) + assert(similarRows.entries.count() == n * topk) + + val similarEntries = similarRows.entries.collect() + + val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)) + + val expected = BDM( + (126.0, 54.0, 72.0), + (54.0, 66.0, 78.0), + (72.0, 78.0, 94.0)) + + for (i <- 0 until n; j <- 0 until n) expected(i, j) /= (colMags(i) * colMags(j)) + + similarEntries.foreach { entry => + val row = entry.i.toInt + val col = entry.j.toInt + assert(math.abs(entry.value - expected(row, col)) < 1e-6) + } + } + + test("similar rows with cosine kernel and sparse data") { + val n = 3 + + val sparseData = sc.parallelize(Seq( + IndexedRow(0, Vectors.sparse(4, Array(1, 2, 3), Array(3.0, 6.0, 9.0))), + IndexedRow(1, Vectors.sparse(4, Array(0, 1, 2), Array(1.0, 4.0, 7.0))), + IndexedRow(2, Vectors.sparse(4, Array(0, 1, 2, 3), Array(2.0, 5.0, 8.0, 1.0))) + ), 2) + + val sparseMat = new IndexedRowMatrix(sparseData, 3L, 4) + + + val similarRows = sparseMat.rowSimilarities() + + assert(similarRows.numRows() == n) + assert(similarRows.numCols() == n) + + val similarEntries = similarRows.entries.collect() + + val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)) + + val expected = BDM( + (126.0, 54.0, 72.0), + (54.0, 66.0, 78.0), + (72.0, 78.0, 94.0)) + + for (i <- 0 until n; j <- 0 until n) expected(i, j) /= (colMags(i) * colMags(j)) + + similarEntries.foreach { entry => + val row = entry.i.toInt + val col = entry.j.toInt + assert(math.abs(entry.value - expected(row, col)) < 1e-6) + } + } + def closeToZero(G: BDM[Double]): Boolean = { G.valuesIterator.map(math.abs).sum < 1e-6 } From 66176f9f346c324b9c77c252be369e24f7fdd991 Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 16 May 2015 17:06:36 -0700 Subject: [PATCH 02/16] Cosine, Euclidean, RBF and Product Kernel added --- .../apache/spark/mllib/linalg/Kernel.scala | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala new file mode 100644 index 0000000000000..1eb2902778cfc --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.mllib.util.MLUtils + +trait Kernel { + def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double +} + +/** + * CosineKernel is the default option for similarity calculation + * @param rowNorms denominator needs to be normalized by rowNorm + * @param threshold don't shuffle if similarity is less than the threshold specified by user + */ +case class CosineKernel(rowNorms: Map[Long, Double], threshold: Double) extends Kernel { + override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = { + val similarity = BLAS.dot(vi, vj) / rowNorms(indexi) / rowNorms(indexj) + if (similarity <= threshold) return 0.0 + similarity + } +} + +// For distributed matrix multiplication with user defined normalization +case class ProductKernel() extends Kernel { + override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = { + BLAS.dot(vi, vj) + } +} + +case class EuclideanKernel(rowNorms: Map[Long, Double], threshold: Double) extends Kernel { + override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = { + MLUtils.fastSquaredDistance(vi, rowNorms(indexi), vj, rowNorms(indexj)) + } +} + +// For PowerIterationClustering flows +case class RBFKernel(rowNorms: Map[Long, Double], sigma: Double) extends Kernel { + val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma) + val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0) + + override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = { + val ssquares = MLUtils.fastSquaredDistance(vi, rowNorms(indexi), vj, rowNorms(indexj)) + coeff * math.exp(expCoeff * ssquares) + } +} + +object KernelType extends Enumeration { + type KernelType = Value + val COSINE, PRODUCT, EUCLIDEAN, RBF = Value +} From 3f96963f80a40f3a4fce6b6dbd97c20605ebaecc Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 16 May 2015 17:07:28 -0700 Subject: [PATCH 03/16] row similarity API added to drive MatrixFactorizationModel similarUsers and similarProducts --- .../MatrixFactorizationModel.scala | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 88c2148403313..13441ce3b1170 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -24,6 +24,8 @@ import scala.collection.mutable import com.github.fommil.netlib.BLAS.{getInstance => blas} import org.apache.hadoop.fs.Path +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.distributed.{IndexedRowMatrix, IndexedRow, CoordinateMatrix} import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -135,6 +137,36 @@ class MatrixFactorizationModel( MatrixFactorizationModel.recommend(productFeatures.lookup(product).head, userFeatures, num) .map(t => Rating(t._1, product, t._2)) + /** + * Find similar products to every product. Cosine similarity is used + * + * @param num how many products to return. The number returned may be less than this. + * @return CoordinateMatrix containing similar product pair and the corresponding similarity + * scores. zero similarities are filtered out + */ + def similarProducts(num: Int): CoordinateMatrix = { + val indexedRows = productFeatures.map { + case (index, features) => IndexedRow(index.toLong, Vectors.dense(features)) + } + val indexedMatrix = new IndexedRowMatrix(indexedRows) + indexedMatrix.rowSimilarities(topk = num) + } + + /** + * Find similar users to every user. Cosine similarity is used + * + * @param num how many users to return. The number returned may be less than this. + * @return CoordinateMatrix containing similar user pair and the corresponding similarity + * scores. zero similarities are filtered out + */ + def similarUsers(num: Int): CoordinateMatrix = { + val indexedRows = userFeatures.map { + case (index, features) => IndexedRow(index.toLong, Vectors.dense(features)) + } + val indexedMatrix = new IndexedRowMatrix(indexedRows) + indexedMatrix.rowSimilarities(topk = num) + } + protected override val formatVersion: String = "1.0" override def save(sc: SparkContext, path: String): Unit = { From 6dc9e18d507cfe0d2ee12e768ca6bddb5c3c4b38 Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 16 May 2015 17:09:24 -0700 Subject: [PATCH 04/16] MovieLens flow to demonstrate item similarity calculation using raw features and ALS factors --- .../examples/mllib/MovieLensSimilarity.scala | 172 ++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala new file mode 100644 index 0000000000000..125a47d0642c5 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * An example app for running item similarity computation on MovieLens data + * (http://grouplens.org/datasets/movielens/) through column based similarity + * calculation flow compared with ALS + row based similarity calculation flow + * + * Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.MovieLensALS + * }}} + * + * A synthetic dataset in MovieLens format can be found at `data/mllib/sample_movielens_data.txt`. + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +package org.apache.spark.examples.mllib + +import org.apache.log4j.{Level, Logger} +import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} +import org.apache.spark.{SparkContext, SparkConf} +import scopt.OptionParser +import org.apache.spark.mllib.recommendation.Rating + +import scala.collection.mutable + +object MovieLensSimilarity { + + case class Params( + input: String = null, + kryo: Boolean = false, + numIterations: Int = 20, + lambda: Double = 1.0, + rank: Int = 10, + numUserBlocks: Int = -1, + numProductBlocks: Int = -1, + implicitPrefs: Boolean = false, + threshold: Double = 1e-4) extends AbstractParams[Params] + + def main(args: Array[String]) { + val defaultParams = Params() + + val parser = new OptionParser[Params]("MovieLensALS") { + head("MovieLensALS: an example app for ALS on MovieLens data.") + opt[Int]("rank") + .text(s"rank, default: ${defaultParams.rank}}") + .action((x, c) => c.copy(rank = x)) + opt[Int]("numIterations") + .text(s"number of iterations, default: ${defaultParams.numIterations}") + .action((x, c) => c.copy(numIterations = x)) + opt[Double]("lambda") + .text(s"lambda (smoothing constant), default: ${defaultParams.lambda}") + .action((x, c) => c.copy(lambda = x)) + opt[Unit]("kryo") + .text("use Kryo serialization") + .action((_, c) => c.copy(kryo = true)) + opt[Int]("numUserBlocks") + .text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)") + .action((x, c) => c.copy(numUserBlocks = x)) + opt[Int]("numProductBlocks") + .text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)") + .action((x, c) => c.copy(numProductBlocks = x)) + opt[Unit]("implicitPrefs") + .text("use implicit preference") + .action((_, c) => c.copy(implicitPrefs = true)) + opt[Double]("threshold") + .text("similarity threshold for dimsum sampling and kernel shuffle optimization") + .action((x, c) => c.copy(threshold = x)) + arg[String]("") + .required() + .text("input paths to a MovieLens dataset of ratings") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app on a synthetic dataset: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.MovieLensALS \ + | examples/target/scala-*/spark-examples-*.jar \ + | --rank 5 --numIterations 20 --lambda 1.0 --kryo \ + | data/mllib/sample_movielens_data.txt + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + System.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"MovieLensSimilarity with $params") + if (params.kryo) { + conf.registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating])) + .set("spark.kryoserializer.buffer.mb", "8") + } + val sc = new SparkContext(conf) + + Logger.getRootLogger.setLevel(Level.WARN) + + val implicitPrefs = params.implicitPrefs + + val ratings = sc.textFile(params.input).map { line => + val fields = line.split("::") + if (implicitPrefs) { + /* + * MovieLens ratings are on a scale of 1-5: + * 5: Must see + * 4: Will enjoy + * 3: It's okay + * 2: Fairly bad + * 1: Awful + * So we should not recommend a movie if the predicted rating is less than 3. + * To map ratings to confidence scores, we use + * 5 -> 2.5, 4 -> 1.5, 3 -> 0.5, 2 -> -0.5, 1 -> -1.5. This mappings means unobserved + * entries are generally between It's okay and Fairly bad. + * The semantics of 0 in this expanded world of non-positive weights + * are "the same as never having interacted at all". + */ + Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5) + } else { + Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) + } + }.cache() + + val numRatings = ratings.count() + val numUsers = ratings.map(_.user).distinct().count() + val numMovies = ratings.map(_.product).distinct().count() + + println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.") + + val productFeatures = ratings.map { entry => + MatrixEntry(entry.product, entry.user, entry.rating) + } + val productMatrix = new CoordinateMatrix(productFeatures).toIndexedRowMatrix() + val rowSimilarities = productMatrix.rowSimilarities(threshold=params.threshold) + + // Compute similar rows through dimsum sampling + val userFeatures = ratings.map { entry => + MatrixEntry(entry.user, entry.product, entry.rating) + } + val featureMatrix = new CoordinateMatrix(userFeatures).toRowMatrix() + val colSimilarities = featureMatrix.columnSimilarities(params.threshold) + + val rowEntries = rowSimilarities.entries.map { case MatrixEntry(i, j, u) => ((i, j), u) } + val colEntries = colSimilarities.entries.map { case MatrixEntry(i, j, v) => ((i, j), v) } + val MAE = colEntries.leftOuterJoin(rowEntries).values.map { + case (u, Some(v)) => + math.abs(u - v) + case (u, None) => + math.abs(u) + }.mean() + + println(s"Average absolute error in estimate is: $MAE") + + sc.stop() + } +} From 71f24a4629cf54c39af4e9e598d9808d85952532 Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 16 May 2015 17:09:45 -0700 Subject: [PATCH 05/16] import cleanup --- .../org/apache/spark/examples/mllib/CosineSimilarity.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala index cb1abbd18fd4d..9fb1659b10069 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala @@ -18,8 +18,6 @@ package org.apache.spark.examples.mllib import scopt.OptionParser - -import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, RowMatrix} import org.apache.spark.{SparkConf, SparkContext} From 4b8a7eb99ed3fea3b6520983685c5f8c1f983c92 Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 23 May 2015 11:12:07 -0700 Subject: [PATCH 06/16] Cleaned EuclideanKernel since PowerIterationClustering flow uses RBF --- .../apache/spark/mllib/linalg/Kernel.scala | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala index 1eb2902778cfc..81e49d919af89 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala @@ -18,9 +18,11 @@ package org.apache.spark.mllib.linalg import org.apache.spark.mllib.util.MLUtils +import scala.collection.Map trait Kernel { def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double + def compute(indexi: Long, indexj: Long, value: Double): Double } /** @@ -34,6 +36,9 @@ case class CosineKernel(rowNorms: Map[Long, Double], threshold: Double) extends if (similarity <= threshold) return 0.0 similarity } + override def compute(indexi: Long, indexj: Long, value: Double): Double = { + value / rowNorms(indexi) / rowNorms(indexj) + } } // For distributed matrix multiplication with user defined normalization @@ -41,16 +46,11 @@ case class ProductKernel() extends Kernel { override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = { BLAS.dot(vi, vj) } + override def compute(indexi: Long, indexj: Long, value: Double): Double = value } -case class EuclideanKernel(rowNorms: Map[Long, Double], threshold: Double) extends Kernel { - override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = { - MLUtils.fastSquaredDistance(vi, rowNorms(indexi), vj, rowNorms(indexj)) - } -} - -// For PowerIterationClustering flows -case class RBFKernel(rowNorms: Map[Long, Double], sigma: Double) extends Kernel { +// For PowerIterationClustering flow +case class RBFKernel(rowNorms: Map[Long, Double], sigma: Double, threshold: Double) extends Kernel { val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma) val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0) @@ -58,9 +58,16 @@ case class RBFKernel(rowNorms: Map[Long, Double], sigma: Double) extends Kernel val ssquares = MLUtils.fastSquaredDistance(vi, rowNorms(indexi), vj, rowNorms(indexj)) coeff * math.exp(expCoeff * ssquares) } + + override def compute(indexi: Long, indexj: Long, value: Double): Double = { + val norm1 = rowNorms(indexi) + val norm2 = rowNorms(indexj) + val sumSquaredNorm = norm1 * norm1 + norm2 * norm2 - 2.0 * value + coeff * math.exp(expCoeff * sumSquaredNorm) + } } object KernelType extends Enumeration { type KernelType = Value - val COSINE, PRODUCT, EUCLIDEAN, RBF = Value + val COSINE, PRODUCT, RBF = Value } From e36c677ed5c3670e0233ba79979b246837ee5f45 Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 23 May 2015 11:21:29 -0700 Subject: [PATCH 07/16] IndexedRowMatrix refactor: rowSimilarity is only exposed as public API --- .../linalg/distributed/IndexedRowMatrix.scala | 67 +++++++++++-------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala index 1de4e3a82bca9..65e004b38fe79 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala @@ -20,6 +20,7 @@ package org.apache.spark.mllib.linalg.distributed import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.HashPartitioner +import scala.collection.Map import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg._ @@ -191,28 +192,44 @@ class IndexedRowMatrix( mat } + /** + * Computes the row norm of each entry from IndexedRowMatrix + * @param norm L1/L2 norm of each row + * @return Map of row index and norm + */ + def rowMagnitudes(norm: Int): Map[Long, Double] = { + rows.map { indexedRow => + (indexedRow.index, Vectors.norm(indexedRow.vector, norm)) + }.collectAsMap() + } + + /** + * row similarity calculation with user defined kernel + * @param kernel vector based kernel computation + * @param topk topk similar rows to each row + * @return CoordinateMatrix of rows similar to every row + */ def rowSimilarities( - kernelType: KernelType = COSINE, - topk: Int = nRows.toInt, - threshold: Double = 1e-4): CoordinateMatrix = { - val rowNorms = IndexedRowMatrix.rowMagnitudes(rows, 2) - val kernel = kernelType match { - case COSINE => CosineKernel(rowNorms, threshold) - case EUCLIDEAN => EuclideanKernel(rowNorms, threshold) - case RBF => RBFKernel(rowNorms, threshold) - } - IndexedRowMatrix.multiply(rows, rows, kernel, topk) + kernel: Kernel, + topk: Int): CoordinateMatrix = { + multiply(rows, kernel, topk) } -} -object IndexedRowMatrix { - def rowMagnitudes(rows: RDD[IndexedRow], norm: Int) : Map[Long, Double] = { - rows.map { indexedRow => - (indexedRow.index, Vectors.norm(indexedRow.vector, norm)) - }.collect().toMap + /** + * row similarity calculation with cosine kernel + * @param topk topk similar rows to each row + * @param threshold cosine similarity threshold + * @return CoordinateMatrix of rows similar to every row + */ + def rowSimilarities( + topk: Int = nRows.toInt, + threshold: Double = 1e-4): CoordinateMatrix = { + val rowNorms = rowMagnitudes(2) + val kernel = CosineKernel(rowNorms, threshold) + rowSimilarities(kernel, topk) } - def blockify(features: RDD[IndexedRow], blockSize: Int): RDD[(Int, Array[IndexedRow])] = { + private def blockify(features: RDD[IndexedRow], blockSize: Int): RDD[(Int, Array[IndexedRow])] = { val featurePartitioner = new HashPartitioner(blockSize) val blockedFeatures = features.map { row => (featurePartitioner.getPartition(row.index), row) @@ -223,20 +240,17 @@ object IndexedRowMatrix { blockedFeatures } - // TO DO: Explore LSH and KDTree ideas to further improve runtime - def multiply( - small: RDD[IndexedRow], - big: RDD[IndexedRow], + private def multiply(query: RDD[IndexedRow], kernel: Kernel, topk: Int): CoordinateMatrix = { val ord = Ordering[(Float, Long)].on[(Long, Double)](x => (x._2.toFloat, x._1)) - val defaultParallelism = big.sparkContext.defaultParallelism + val defaultParallelism = rows.sparkContext.defaultParallelism - val smallBlocks = math.max(small.sparkContext.defaultParallelism, small.partitions.size) / 2 - val bigBlocks = math.max(defaultParallelism, big.partitions.size) / 2 + val queryBlocks = math.max(query.sparkContext.defaultParallelism, query.partitions.size) / 2 + val dictionaryBlocks = math.max(defaultParallelism, rows.partitions.size) / 2 - val blockedSmall = blockify(small, smallBlocks) - val blockedBig = blockify(big, bigBlocks) + val blockedSmall = blockify(query, queryBlocks) + val blockedBig = blockify(rows, dictionaryBlocks) blockedSmall.setName("blockedSmallMatrix") blockedBig.setName("blockedBigMatrix") @@ -271,4 +285,3 @@ object IndexedRowMatrix { new CoordinateMatrix(topkSims) } } - From 2541cd79bac942967c9fa85be68a36111c295b2d Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 23 May 2015 11:22:31 -0700 Subject: [PATCH 08/16] recommendAll generalized for user->item (product kernel), user->user and item->item (cosine kernel) --- .../MatrixFactorizationModel.scala | 49 ++++++++++++------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 13441ce3b1170..cd075ce16fca9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -24,8 +24,7 @@ import scala.collection.mutable import com.github.fommil.netlib.BLAS.{getInstance => blas} import org.apache.hadoop.fs.Path -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.linalg.distributed.{IndexedRowMatrix, IndexedRow, CoordinateMatrix} +import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, CoordinateMatrix} import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -144,12 +143,17 @@ class MatrixFactorizationModel( * @return CoordinateMatrix containing similar product pair and the corresponding similarity * scores. zero similarities are filtered out */ - def similarProducts(num: Int): CoordinateMatrix = { - val indexedRows = productFeatures.map { - case (index, features) => IndexedRow(index.toLong, Vectors.dense(features)) + def similarProducts(num: Int, threshold: Double = 1e-4): CoordinateMatrix = { + val productNorms = productFeatures.map { + case (index, features) => (index.toLong, blas.dnrm2(rank, features, 1)) + }.collectAsMap() + + val kernel = CosineKernel(productNorms, threshold) + + val similarProducts = MatrixFactorizationModel.recommendAll(rank, kernel, productFeatures, productFeatures, num).flatMap { + case (product, top) => top.map { case (index, value) => MatrixEntry(product, index, value) } } - val indexedMatrix = new IndexedRowMatrix(indexedRows) - indexedMatrix.rowSimilarities(topk = num) + new CoordinateMatrix(similarProducts) } /** @@ -159,12 +163,17 @@ class MatrixFactorizationModel( * @return CoordinateMatrix containing similar user pair and the corresponding similarity * scores. zero similarities are filtered out */ - def similarUsers(num: Int): CoordinateMatrix = { - val indexedRows = userFeatures.map { - case (index, features) => IndexedRow(index.toLong, Vectors.dense(features)) + def similarUsers(num: Int, threshold: Double=1e-4): CoordinateMatrix = { + val userNorms = userFeatures.map { + case (index, features) => (index.toLong, blas.dnrm2(rank, features, 1)) + }.collectAsMap() + + val kernel = CosineKernel(userNorms, threshold) + + val similarUsers = MatrixFactorizationModel.recommendAll(rank, kernel, userFeatures, userFeatures, num).flatMap { + case (user, top) => top.map { case (index, value) => MatrixEntry(user, index, value) } } - val indexedMatrix = new IndexedRowMatrix(indexedRows) - indexedMatrix.rowSimilarities(topk = num) + new CoordinateMatrix(similarUsers) } protected override val formatVersion: String = "1.0" @@ -182,14 +191,14 @@ class MatrixFactorizationModel( * rating field. Semantics of score is same as recommendProducts API */ def recommendProductsForUsers(num: Int): RDD[(Int, Array[Rating])] = { - MatrixFactorizationModel.recommendForAll(rank, userFeatures, productFeatures, num).map { + val kernel = ProductKernel() + MatrixFactorizationModel.recommendAll(rank, kernel, userFeatures, productFeatures, num).map { case (user, top) => val ratings = top.map { case (product, rating) => Rating(user, product, rating) } (user, ratings) } } - /** * Recommends topK users for all products. * @@ -199,7 +208,8 @@ class MatrixFactorizationModel( * rating field. Semantics of score is same as recommendUsers API */ def recommendUsersForProducts(num: Int): RDD[(Int, Array[Rating])] = { - MatrixFactorizationModel.recommendForAll(rank, productFeatures, userFeatures, num).map { + val kernel = ProductKernel() + MatrixFactorizationModel.recommendAll(rank, kernel, productFeatures, userFeatures, num).map { case (product, top) => val ratings = top.map { case (user, rating) => Rating(user, product, rating) } (product, ratings) @@ -225,16 +235,18 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { } /** - * Makes recommendations for all users (or products). + * Makes batch user->product, user->user and product->product recommendations * @param rank rank + * @param kernel user defined kernels for distance calculation where computation is dot decomposable * @param srcFeatures src features to receive recommendations * @param dstFeatures dst features used to make recommendations * @param num number of recommendations for each record * @return an RDD of (srcId: Int, recommendations), where recommendations are stored as an array * of (dstId, rating) pairs. */ - private def recommendForAll( + private def recommendAll( rank: Int, + kernel: Kernel, srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { @@ -248,7 +260,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { val output = new Array[(Int, (Int, Double))](m * n) var k = 0 ratings.foreachActive { (i, j, r) => - output(k) = (srcIds(i), (dstIds(j), r)) + val kernelVal = kernel.compute(srcIds(i), dstIds(j), r) + output(k) = (srcIds(i), (dstIds(j), kernelVal)) k += 1 } output.toSeq From 00cabd02fc2da63b2cb26f408c88bbad1b9167bf Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 23 May 2015 11:22:53 -0700 Subject: [PATCH 09/16] testcases for similarUsers, similarProducts --- .../MatrixFactorizationModelSuite.scala | 50 ++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala index 2c92866f3893d..97469e3f63faa 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.recommendation +import org.apache.spark.mllib.linalg.{DenseMatrix, Vectors} import org.scalatest.FunSuite import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -92,4 +93,51 @@ class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext assert(recommendations(2)(1).user == 0) assert(recommendations(2)(1).rating ~== 17.0 relTol 1e-14) } -} + + test("batch similar users/products") { + val n = 3 + + val userFeatures = sc.parallelize(Seq( + (0, Array(0.0, 3.0, 6.0, 9.0)), + (1, Array(1.0, 4.0, 7.0, 0.0)), + (2, Array(2.0, 5.0, 8.0, 1.0)) + ), 2) + + val model = new MatrixFactorizationModel(4, userFeatures, userFeatures) + + val topk = 2 + + val similarUsers = model.similarUsers(topk) + + val similarProducts = model.similarProducts(topk) + + assert(similarUsers.numRows() == n) + assert(similarUsers.entries.count() == n * topk) + + assert(similarProducts.numRows() == n) + assert(similarProducts.entries.count() == n * topk) + + val similarEntriesUsers = similarUsers.entries.collect() + val similarEntriesProducts = similarProducts.entries.collect() + + val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)) + + val expected = + new DenseMatrix(3, 3, + Array(126.0, 54.0, 72.0, 54.0, 66.0, 78.0, 72.0, 78.0, 94.0)) + + for (i <- 0 until n; j <- 0 until n) expected(i, j) /= (colMags(i) * colMags(j)) + + similarEntriesUsers.foreach { entry => + val row = entry.i.toInt + val col = entry.j.toInt + assert(entry.value ~== expected(row, col) relTol 1e-6) + } + + similarEntriesProducts.foreach { entry => + val row = entry.i.toInt + val col = entry.j.toInt + assert(entry.value ~== expected(row, col) relTol 1e-6) + } + } +} \ No newline at end of file From aa219542a85a7c83b5c86e3428eaf863b2a41e17 Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 23 May 2015 12:00:28 -0700 Subject: [PATCH 10/16] scalastyle cleanup --- .../MatrixFactorizationModel.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index cd075ce16fca9..d2ff929430fc8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -150,9 +150,12 @@ class MatrixFactorizationModel( val kernel = CosineKernel(productNorms, threshold) - val similarProducts = MatrixFactorizationModel.recommendAll(rank, kernel, productFeatures, productFeatures, num).flatMap { - case (product, top) => top.map { case (index, value) => MatrixEntry(product, index, value) } - } + val similarProducts = + MatrixFactorizationModel.recommendAll( + rank, kernel, + productFeatures, productFeatures, num).flatMap { + case (product, top) => top.map { case (index, value) => MatrixEntry(product, index, value) } + } new CoordinateMatrix(similarProducts) } @@ -170,9 +173,10 @@ class MatrixFactorizationModel( val kernel = CosineKernel(userNorms, threshold) - val similarUsers = MatrixFactorizationModel.recommendAll(rank, kernel, userFeatures, userFeatures, num).flatMap { - case (user, top) => top.map { case (index, value) => MatrixEntry(user, index, value) } - } + val similarUsers = + MatrixFactorizationModel.recommendAll(rank, kernel, userFeatures, userFeatures, num).flatMap { + case (user, top) => top.map { case (index, value) => MatrixEntry(user, index, value) } + } new CoordinateMatrix(similarUsers) } @@ -237,7 +241,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { /** * Makes batch user->product, user->user and product->product recommendations * @param rank rank - * @param kernel user defined kernels for distance calculation where computation is dot decomposable + * @param kernel kernels for distance calculation where computation is dot decomposable * @param srcFeatures src features to receive recommendations * @param dstFeatures dst features used to make recommendations * @param num number of recommendations for each record From e0034c862fa016ccee5f8a2a90263d1e7d92225d Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 23 May 2015 12:08:25 -0700 Subject: [PATCH 11/16] recommendAll changed back to recommendForAll --- .../recommendation/MatrixFactorizationModel.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index d2ff929430fc8..3bab5c30b68d7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -151,7 +151,7 @@ class MatrixFactorizationModel( val kernel = CosineKernel(productNorms, threshold) val similarProducts = - MatrixFactorizationModel.recommendAll( + MatrixFactorizationModel.recommendForAll( rank, kernel, productFeatures, productFeatures, num).flatMap { case (product, top) => top.map { case (index, value) => MatrixEntry(product, index, value) } @@ -174,7 +174,7 @@ class MatrixFactorizationModel( val kernel = CosineKernel(userNorms, threshold) val similarUsers = - MatrixFactorizationModel.recommendAll(rank, kernel, userFeatures, userFeatures, num).flatMap { + MatrixFactorizationModel.recommendForAll(rank, kernel, userFeatures, userFeatures, num).flatMap { case (user, top) => top.map { case (index, value) => MatrixEntry(user, index, value) } } new CoordinateMatrix(similarUsers) @@ -196,7 +196,7 @@ class MatrixFactorizationModel( */ def recommendProductsForUsers(num: Int): RDD[(Int, Array[Rating])] = { val kernel = ProductKernel() - MatrixFactorizationModel.recommendAll(rank, kernel, userFeatures, productFeatures, num).map { + MatrixFactorizationModel.recommendForAll(rank, kernel, userFeatures, productFeatures, num).map { case (user, top) => val ratings = top.map { case (product, rating) => Rating(user, product, rating) } (user, ratings) @@ -213,7 +213,7 @@ class MatrixFactorizationModel( */ def recommendUsersForProducts(num: Int): RDD[(Int, Array[Rating])] = { val kernel = ProductKernel() - MatrixFactorizationModel.recommendAll(rank, kernel, productFeatures, userFeatures, num).map { + MatrixFactorizationModel.recommendForAll(rank, kernel, productFeatures, userFeatures, num).map { case (product, top) => val ratings = top.map { case (user, rating) => Rating(user, product, rating) } (product, ratings) @@ -248,7 +248,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { * @return an RDD of (srcId: Int, recommendations), where recommendations are stored as an array * of (dstId, rating) pairs. */ - private def recommendAll( + private def recommendForAll( rank: Int, kernel: Kernel, srcFeatures: RDD[(Int, Array[Double])], From 156ceca7edab14cfcd27074cae20edf32a7202dd Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 23 May 2015 12:11:09 -0700 Subject: [PATCH 12/16] newline character added --- .../spark/mllib/recommendation/MatrixFactorizationModel.scala | 1 - .../mllib/recommendation/MatrixFactorizationModelSuite.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 3bab5c30b68d7..a09956ba17828 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -360,5 +360,4 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { new Path(dataPath(path), "product").toUri.toString } } - } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala index 97469e3f63faa..ba9a4d96c7e0e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala @@ -140,4 +140,4 @@ class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext assert(entry.value ~== expected(row, col) relTol 1e-6) } } -} \ No newline at end of file +} From efe68f45e1f108cfb3bc723babad8e1343150fbe Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 23 May 2015 12:54:59 -0700 Subject: [PATCH 13/16] scalastyle fix --- .../spark/mllib/recommendation/MatrixFactorizationModel.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index a09956ba17828..2203bd0dc34f9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -174,7 +174,8 @@ class MatrixFactorizationModel( val kernel = CosineKernel(userNorms, threshold) val similarUsers = - MatrixFactorizationModel.recommendForAll(rank, kernel, userFeatures, userFeatures, num).flatMap { + MatrixFactorizationModel.recommendForAll( + rank, kernel, userFeatures, userFeatures, num).flatMap { case (user, top) => top.map { case (index, value) => MatrixEntry(user, index, value) } } new CoordinateMatrix(similarUsers) From af7058380f1f2d0b21908011be882e812cbce7a2 Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Sat, 23 May 2015 19:09:16 -0700 Subject: [PATCH 14/16] examples updated with row and column based flow and comparison with ALS for low rank item similarity calculation --- .../examples/mllib/MovieLensSimilarity.scala | 155 +++++++++++------- .../apache/spark/mllib/linalg/Kernel.scala | 14 +- 2 files changed, 112 insertions(+), 57 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala index 125a47d0642c5..d6379d8daf1e9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala @@ -16,14 +16,12 @@ */ /** - * An example app for running item similarity computation on MovieLens data - * (http://grouplens.org/datasets/movielens/) through column based similarity - * calculation flow compared with ALS + row based similarity calculation flow + * An example app for running item similarity computation on MovieLens format + * sparse data (http://grouplens.org/datasets/movielens/) through column based + * similarity calculation and compare with row based similarity calculation and + * ALS + row based similarity calculation flow. For running row and column based + * similarity on raw features, we are using implicit matrix factorization. * - * Run with - * {{{ - * bin/run-example org.apache.spark.examples.mllib.MovieLensALS - * }}} * * A synthetic dataset in MovieLens format can be found at `data/mllib/sample_movielens_data.txt`. * If you use it as a template to create your own app, please use `spark-submit` to submit your app. @@ -34,52 +32,53 @@ import org.apache.log4j.{Level, Logger} import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} import org.apache.spark.{SparkContext, SparkConf} import scopt.OptionParser -import org.apache.spark.mllib.recommendation.Rating - +import org.apache.spark.mllib.recommendation.{ALS, Rating} +import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ +import org.apache.spark.mllib.evaluation.RankingMetrics import scala.collection.mutable object MovieLensSimilarity { case class Params( input: String = null, - kryo: Boolean = false, numIterations: Int = 20, - lambda: Double = 1.0, - rank: Int = 10, + rank: Int = 50, + alpha: Double = 0.0, numUserBlocks: Int = -1, numProductBlocks: Int = -1, - implicitPrefs: Boolean = false, - threshold: Double = 1e-4) extends AbstractParams[Params] + delim: String = "::", + topk: Int = 50, + threshold: Double = 1e-2) extends AbstractParams[Params] def main(args: Array[String]) { val defaultParams = Params() - val parser = new OptionParser[Params]("MovieLensALS") { - head("MovieLensALS: an example app for ALS on MovieLens data.") + val parser = new OptionParser[Params]("MovieLensSimilarity") { + head("MovieLensSimilarity: an example app for similarity flows on MovieLens data.") opt[Int]("rank") .text(s"rank, default: ${defaultParams.rank}}") .action((x, c) => c.copy(rank = x)) opt[Int]("numIterations") .text(s"number of iterations, default: ${defaultParams.numIterations}") .action((x, c) => c.copy(numIterations = x)) - opt[Double]("lambda") - .text(s"lambda (smoothing constant), default: ${defaultParams.lambda}") - .action((x, c) => c.copy(lambda = x)) - opt[Unit]("kryo") - .text("use Kryo serialization") - .action((_, c) => c.copy(kryo = true)) opt[Int]("numUserBlocks") .text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)") .action((x, c) => c.copy(numUserBlocks = x)) opt[Int]("numProductBlocks") .text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)") .action((x, c) => c.copy(numProductBlocks = x)) - opt[Unit]("implicitPrefs") - .text("use implicit preference") - .action((_, c) => c.copy(implicitPrefs = true)) + opt[Double]("alpha") + .text(s"alpha for implicit feedback") + .action((x, c) => c.copy(alpha = x)) + opt[Int]("topk") + .text("topk for ALS validation") + .action((x, c) => c.copy(topk = x)) opt[Double]("threshold") - .text("similarity threshold for dimsum sampling and kernel shuffle optimization") + .text("threshold for dimsum sampling and kernel sparsity") .action((x, c) => c.copy(threshold = x)) + opt[String]("delim") + .text("use delimiter, default ::") + .action((x, c) => c.copy(delim = x)) arg[String]("") .required() .text("input paths to a MovieLens dataset of ratings") @@ -88,9 +87,8 @@ object MovieLensSimilarity { """ |For example, the following command runs this app on a synthetic dataset: | - | bin/spark-submit --class org.apache.spark.examples.mllib.MovieLensALS \ - | examples/target/scala-*/spark-examples-*.jar \ - | --rank 5 --numIterations 20 --lambda 1.0 --kryo \ + | bin/run-example mllib.MovieLensSimilarity \ + | --rank 25 --numIterations 20 --alpha 0.01 --topk 25\ | data/mllib/sample_movielens_data.txt """.stripMargin) } @@ -103,21 +101,18 @@ object MovieLensSimilarity { } def run(params: Params) { - val conf = new SparkConf().setAppName(s"MovieLensSimilarity with $params") - if (params.kryo) { - conf.registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating])) - .set("spark.kryoserializer.buffer.mb", "8") - } - val sc = new SparkContext(conf) + val conf = + new SparkConf() + .setAppName(s"MovieLensSimilarity with $params") + .registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating])) + val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) - val implicitPrefs = params.implicitPrefs - + val delim = params.delim val ratings = sc.textFile(params.input).map { line => - val fields = line.split("::") - if (implicitPrefs) { - /* + val fields = line.split(delim) + /* * MovieLens ratings are on a scale of 1-5: * 5: Must see * 4: Will enjoy @@ -131,10 +126,7 @@ object MovieLensSimilarity { * The semantics of 0 in this expanded world of non-positive weights * are "the same as never having interacted at all". */ - Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5) - } else { - Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) - } + Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5) }.cache() val numRatings = ratings.count() @@ -147,25 +139,76 @@ object MovieLensSimilarity { MatrixEntry(entry.product, entry.user, entry.rating) } val productMatrix = new CoordinateMatrix(productFeatures).toIndexedRowMatrix() - val rowSimilarities = productMatrix.rowSimilarities(threshold=params.threshold) - // Compute similar rows through dimsum sampling + // brute force row similarities + println("Running row similarities with threshold 1e-4") + val rowSimilarities = productMatrix.rowSimilarities() + + // Row similarities using user defined threshold + println(s"Running row similarities with threshold ${params.threshold}") + val rowSimilaritiesApprox = productMatrix.rowSimilarities(threshold = params.threshold) + + // Compute similar columns on transpose matrix val userFeatures = ratings.map { entry => MatrixEntry(entry.user, entry.product, entry.rating) - } + }.repartition(sc.defaultParallelism).cache() + val featureMatrix = new CoordinateMatrix(userFeatures).toRowMatrix() + // Compute similar columns with dimsum sampling + println(s"Running column similarity with threshold ${params.threshold}") val colSimilarities = featureMatrix.columnSimilarities(params.threshold) - val rowEntries = rowSimilarities.entries.map { case MatrixEntry(i, j, u) => ((i, j), u) } - val colEntries = colSimilarities.entries.map { case MatrixEntry(i, j, v) => ((i, j), v) } - val MAE = colEntries.leftOuterJoin(rowEntries).values.map { - case (u, Some(v)) => - math.abs(u - v) - case (u, None) => - math.abs(u) - }.mean() + val exactEntries = rowSimilarities.entries.map { case MatrixEntry(i, j, u) => ((i, j), u) } + val rowEntriesApprox = rowSimilaritiesApprox.entries.map { case MatrixEntry(i, j, u) => + ((i, j), u) + } + val colEntriesApprox = colSimilarities.entries.map { case MatrixEntry(i, j, v) => ((i, j), v) } + + val rowMAE = exactEntries.join(rowEntriesApprox).values.map { + case (u, v) => math.abs(u - v) + } + + val colMAE = exactEntries.join(colEntriesApprox).values.map { + case (u, v) => math.abs(u - v) + } + + println(s"Common entries row: ${rowMAE.count()} col: ${colMAE.count()}") + println(s"Average absolute error in estimate row: ${rowMAE.mean()} col: ${colMAE.mean()}") + + val model = new ALS() + .setRank(params.rank) + .setIterations(params.numIterations) + .setLambda(0.0) + .setAlpha(params.alpha) + .setImplicitPrefs(true) + .setUserBlocks(params.numUserBlocks) + .setProductBlocks(params.numProductBlocks) + .run(ratings) + + // Compute similar columns through low rank approximation using ALS + println(s"Running ALS based row similarities") + + val lowRankedSimilarities = model.similarProducts(params.topk) + + val labels = rowSimilarities.entries.map { case (entry) => + (entry.i, (entry.j, entry.value)) + }.topByKey(params.topk)(Ordering.by(_._2)).map { case (item, similarItems) => + (item, similarItems.map(_._1)) + } + + val predicted = lowRankedSimilarities.entries.map { case (entry) => + (entry.i, (entry.j, entry.value)) + }.topByKey(params.topk)(Ordering.by(_._2)).map { case (item, similarItems) => + (item, similarItems.map(_._1)) + } + + val predictionAndLabels = + predicted.join(labels).map { case (item, (predicted, labels)) => + (predicted, labels) + } - println(s"Average absolute error in estimate is: $MAE") + val rankingMetrics = new RankingMetrics[Long](predictionAndLabels) + println(s"MAP ${rankingMetrics.meanAveragePrecision}") sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala index 81e49d919af89..68e0b3a62de2c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Kernel.scala @@ -20,8 +20,15 @@ package org.apache.spark.mllib.linalg import org.apache.spark.mllib.util.MLUtils import scala.collection.Map -trait Kernel { +/** + * Represents a Kernel abstraction for vector and gemv/gemm based computation + */ +trait Kernel extends Serializable { + + /** compute the kernel value using vector. */ def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double + + /** update the kernel value after gemv/gemm computation is done. */ def compute(indexi: Long, indexj: Long, value: Double): Double } @@ -31,11 +38,13 @@ trait Kernel { * @param threshold don't shuffle if similarity is less than the threshold specified by user */ case class CosineKernel(rowNorms: Map[Long, Double], threshold: Double) extends Kernel { + override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = { val similarity = BLAS.dot(vi, vj) / rowNorms(indexi) / rowNorms(indexj) if (similarity <= threshold) return 0.0 similarity } + override def compute(indexi: Long, indexj: Long, value: Double): Double = { value / rowNorms(indexi) / rowNorms(indexj) } @@ -43,14 +52,17 @@ case class CosineKernel(rowNorms: Map[Long, Double], threshold: Double) extends // For distributed matrix multiplication with user defined normalization case class ProductKernel() extends Kernel { + override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = { BLAS.dot(vi, vj) } + override def compute(indexi: Long, indexj: Long, value: Double): Double = value } // For PowerIterationClustering flow case class RBFKernel(rowNorms: Map[Long, Double], sigma: Double, threshold: Double) extends Kernel { + val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma) val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0) From 3c49d98d801652b14a8d151344ad3088a0062106 Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Fri, 24 Jul 2015 00:46:30 -0700 Subject: [PATCH 15/16] use coalesce in place of repartition to avoid shuffle --- .../spark/mllib/linalg/distributed/IndexedRowMatrix.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala index 65e004b38fe79..7582e9061d1e1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala @@ -276,7 +276,7 @@ class IndexedRowMatrix( value.map { sim => MatrixEntry(i, sim._1, sim._2) } - }.repartition(defaultParallelism) + }.coalesce(defaultParallelism) blockedBig.unpersist() From 4e95ea6127552d1782e98d5247069d9ed0e3c394 Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Fri, 24 Jul 2015 02:34:03 -0700 Subject: [PATCH 16/16] cleaned caches for runtime/shuffle benchmark; added topk option in similarity example --- .../examples/mllib/MovieLensSimilarity.scala | 30 +++++++++++++++---- .../linalg/distributed/IndexedRowMatrix.scala | 4 --- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala index d6379d8daf1e9..b3fb6652bcdeb 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensSimilarity.scala @@ -48,7 +48,7 @@ object MovieLensSimilarity { numProductBlocks: Int = -1, delim: String = "::", topk: Int = 50, - threshold: Double = 1e-2) extends AbstractParams[Params] + threshold: Double = 1e-4) extends AbstractParams[Params] def main(args: Array[String]) { val defaultParams = Params() @@ -125,8 +125,14 @@ object MovieLensSimilarity { * entries are generally between It's okay and Fairly bad. * The semantics of 0 in this expanded world of non-positive weights * are "the same as never having interacted at all". + * + * Options: + * + * Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5) + * Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) + * Rating(fields(0).toInt, fields(1).toInt, 1.0) */ - Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5) + Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) }.cache() val numRatings = ratings.count() @@ -151,9 +157,10 @@ object MovieLensSimilarity { // Compute similar columns on transpose matrix val userFeatures = ratings.map { entry => MatrixEntry(entry.user, entry.product, entry.rating) - }.repartition(sc.defaultParallelism).cache() + }.repartition(sc.defaultParallelism) val featureMatrix = new CoordinateMatrix(userFeatures).toRowMatrix() + // Compute similar columns with dimsum sampling println(s"Running column similarity with threshold ${params.threshold}") val colSimilarities = featureMatrix.columnSimilarities(params.threshold) @@ -175,6 +182,18 @@ object MovieLensSimilarity { println(s"Common entries row: ${rowMAE.count()} col: ${colMAE.count()}") println(s"Average absolute error in estimate row: ${rowMAE.mean()} col: ${colMAE.mean()}") + println(s"Running row similarity with topk ${params.topk}") + val rowSimilaritiesTopk = productMatrix.rowSimilarities(topk=params.topk) + + val rowEntriesTopk = rowSimilaritiesTopk.entries.map { case MatrixEntry(i, j, u) => + ((i, j), u) + } + + val rowTopkMAE = exactEntries.join(rowEntriesTopk).values.map { + case (u, v) => math.abs(u - v) + } + println(s"Average absolute error in topk row: ${rowTopkMAE.mean()}") + val model = new ALS() .setRank(params.rank) .setIterations(params.numIterations) @@ -185,10 +204,11 @@ object MovieLensSimilarity { .setProductBlocks(params.numProductBlocks) .run(ratings) + val topk = params.topk // Compute similar columns through low rank approximation using ALS println(s"Running ALS based row similarities") - val lowRankedSimilarities = model.similarProducts(params.topk) + val lowRankedSimilarities = model.similarProducts(topk) val labels = rowSimilarities.entries.map { case (entry) => (entry.i, (entry.j, entry.value)) @@ -208,7 +228,7 @@ object MovieLensSimilarity { } val rankingMetrics = new RankingMetrics[Long](predictionAndLabels) - println(s"MAP ${rankingMetrics.meanAveragePrecision}") + println(s"prec@$topk ${rankingMetrics.precisionAt(topk)}") sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala index 7582e9061d1e1..beb794476ed43 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala @@ -255,8 +255,6 @@ class IndexedRowMatrix( blockedSmall.setName("blockedSmallMatrix") blockedBig.setName("blockedBigMatrix") - blockedBig.cache() - val topkSims = blockedBig.cartesian(blockedSmall).flatMap { case ((bigBlockIndex, bigRows), (smallBlockIndex, smallRows)) => val buf = mutable.ArrayBuilder.make[(Long, (Long, Double))] @@ -278,8 +276,6 @@ class IndexedRowMatrix( } }.coalesce(defaultParallelism) - blockedBig.unpersist() - // Materialize the cartesian RDD topkSims.count() new CoordinateMatrix(topkSims)