Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.mllib.api.python

import java.nio.{ByteBuffer, ByteOrder}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
Expand All @@ -28,8 +29,11 @@ import org.apache.spark.mllib.regression._
import org.apache.spark.rdd.RDD

/**
* :: DeveloperApi ::
*
* The Java stubs necessary for the Python mllib bindings.
*/
@DeveloperApi
class PythonMLLibAPI extends Serializable {
private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
val packetLength = bytes.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class LogisticRegressionModel(
this
}

override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
override protected def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
intercept: Double) = {
val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
val score = 1.0/ (1.0 + math.exp(-margin))
Expand All @@ -71,27 +71,27 @@ class LogisticRegressionModel(
* NOTE: Labels used in Logistic Regression should be {0, 1}
*/
class LogisticRegressionWithSGD private (
var stepSize: Double,
var numIterations: Int,
var regParam: Double,
var miniBatchFraction: Double)
private var stepSize: Double,
private var numIterations: Int,
private var regParam: Double,
private var miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {

val gradient = new LogisticGradient()
val updater = new SimpleUpdater()
private val gradient = new LogisticGradient()
private val updater = new SimpleUpdater()
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)
override val validators = List(DataValidators.classificationLabels)
override protected val validators = List(DataValidators.binaryLabelValidator)

/**
* Construct a LogisticRegression object with default parameters
*/
def this() = this(1.0, 100, 0.0, 1.0)

def createModel(weights: Vector, intercept: Double) = {
override protected def createModel(weights: Vector, intercept: Double) = {
new LogisticRegressionModel(weights, intercept)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.mllib.classification

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}

import org.apache.spark.annotation.Experimental
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vector
Expand All @@ -27,11 +28,16 @@ import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD

/**
* :: Experimental ::
*
* Model for Naive Bayes Classifiers.
*
* @param pi Log of class priors, whose dimension is C.
* @param theta Log of class conditional probabilities, whose dimension is CxD.
* @param labels list of labels
* @param pi log of class priors, whose dimension is C, number of labels
* @param theta log of class conditional probabilities, whose dimension is C-by-D,
* where D is number of features
*/
@Experimental
class NaiveBayesModel(
val labels: Array[Double],
val pi: Array[Double],
Expand All @@ -40,14 +46,17 @@ class NaiveBayesModel(
private val brzPi = new BDV[Double](pi)
private val brzTheta = new BDM[Double](theta.length, theta(0).length)

var i = 0
while (i < theta.length) {
var j = 0
while (j < theta(i).length) {
brzTheta(i, j) = theta(i)(j)
j += 1
{
// Need to put an extra pair of braces to prevent Scala treating `i` as a member.
var i = 0
while (i < theta.length) {
var j = 0
while (j < theta(i).length) {
brzTheta(i, j) = theta(i)(j)
j += 1
}
i += 1
}
i += 1
}

override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict)
Expand All @@ -65,7 +74,7 @@ class NaiveBayesModel(
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
*/
class NaiveBayes private (var lambda: Double) extends Serializable with Logging {
class NaiveBayes private (private var lambda: Double) extends Serializable with Logging {

def this() = this(1.0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ class SVMModel(
this
}

override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
override protected def predictPoint(
dataMatrix: Vector,
weightMatrix: Vector,
intercept: Double) = {
val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
threshold match {
Expand All @@ -70,28 +72,27 @@ class SVMModel(
* NOTE: Labels used in SVM should be {0, 1}.
*/
class SVMWithSGD private (
var stepSize: Double,
var numIterations: Int,
var regParam: Double,
var miniBatchFraction: Double)
private var stepSize: Double,
private var numIterations: Int,
private var regParam: Double,
private var miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[SVMModel] with Serializable {

val gradient = new HingeGradient()
val updater = new SquaredL2Updater()
private val gradient = new HingeGradient()
private val updater = new SquaredL2Updater()
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)

override val validators = List(DataValidators.classificationLabels)
override protected val validators = List(DataValidators.binaryLabelValidator)

/**
* Construct a SVM object with default parameters
*/
def this() = this(1.0, 100, 1.0, 1.0)

def createModel(weights: Vector, intercept: Double) = {
override protected def createModel(weights: Vector, intercept: Double) = {
new SVMModel(weights, intercept)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer

import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm}

import org.apache.spark.annotation.Experimental
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
Expand All @@ -37,12 +38,17 @@ import org.apache.spark.util.random.XORShiftRandom
* to it should be cached by the user.
*/
class KMeans private (
var k: Int,
var maxIterations: Int,
var runs: Int,
var initializationMode: String,
var initializationSteps: Int,
var epsilon: Double) extends Serializable with Logging {
private var k: Int,
private var maxIterations: Int,
private var runs: Int,
private var initializationMode: String,
private var initializationSteps: Int,
private var epsilon: Double) extends Serializable with Logging {

/**
* Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1,
* initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4}.
*/
def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4)

/** Set the number of clusters to create (k). Default: 2. */
Expand Down Expand Up @@ -71,6 +77,8 @@ class KMeans private (
}

/**
* :: Experimental ::
*
* Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm
* this many times with random starting conditions (configured by the initialization mode), then
* return the best clustering found over any run. Default: 1.
Expand Down Expand Up @@ -316,15 +324,36 @@ object KMeans {
data: RDD[Vector],
k: Int,
maxIterations: Int,
runs: Int = 1,
initializationMode: String = K_MEANS_PARALLEL): KMeansModel = {
runs: Int,
initializationMode: String): KMeansModel = {
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setRuns(runs)
.setInitializationMode(initializationMode)
.run(data)
}

/**
* Trains a k-means model using specified parameters and the default values for unspecified.
*/
def train(
data: RDD[Vector],
k: Int,
maxIterations: Int): KMeansModel = {
train(data, k, maxIterations, 1, K_MEANS_PARALLEL)
}

/**
* Trains a k-means model using specified parameters and the default values for unspecified.
*/
def train(
data: RDD[Vector],
k: Int,
maxIterations: Int,
runs: Int): KMeansModel = {
train(data, k, maxIterations, runs, K_MEANS_PARALLEL)
}

/**
* Returns the index of the closest center to the given point, as well as the squared distance.
*/
Expand Down Expand Up @@ -369,6 +398,10 @@ object KMeans {
MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm)
}

/**
* :: Experimental ::
*/
@Experimental
def main(args: Array[String]) {
if (args.length < 4) {
println("Usage: KMeans <master> <input_file> <k> <max_iterations> [<runs>]")
Expand Down
19 changes: 11 additions & 8 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ trait Vector extends Serializable {

/**
* Factory methods for [[org.apache.spark.mllib.linalg.Vector]].
* We don't use the name `Vector` because Scala imports
* [[scala.collection.immutable.Vector]] by default.
*/
object Vectors {

/**
* Creates a dense vector.
* Creates a dense vector from its values.
*/
@varargs
def dense(firstValue: Double, otherValues: Double*): Vector =
Expand Down Expand Up @@ -158,20 +160,21 @@ class DenseVector(val values: Array[Double]) extends Vector {
/**
* A sparse vector represented by an index array and an value array.
*
* @param n size of the vector.
* @param size size of the vector.
* @param indices index array, assume to be strictly increasing.
* @param values value array, must have the same length as the index array.
*/
class SparseVector(val n: Int, val indices: Array[Int], val values: Array[Double]) extends Vector {

override def size: Int = n
class SparseVector(
override val size: Int,
val indices: Array[Int],
val values: Array[Double]) extends Vector {

override def toString: String = {
"(" + n + "," + indices.zip(values).mkString("[", "," ,"]") + ")"
"(" + size + "," + indices.zip(values).mkString("[", "," ,"]") + ")"
}

override def toArray: Array[Double] = {
val data = new Array[Double](n)
val data = new Array[Double](size)
var i = 0
val nnz = indices.length
while (i < nnz) {
Expand All @@ -181,5 +184,5 @@ class SparseVector(val n: Int, val indices: Array[Int], val values: Array[Double
data
}

private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, n)
private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, size)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vectors
Expand All @@ -32,6 +33,8 @@ import org.apache.spark.mllib.linalg.Vectors
case class MatrixEntry(i: Long, j: Long, value: Double)

/**
* :: Experimental ::
*
* Represents a matrix in coordinate format.
*
* @param entries matrix entries
Expand All @@ -40,6 +43,7 @@ case class MatrixEntry(i: Long, j: Long, value: Double)
* @param nCols number of columns. A non-positive value means unknown, and then the number of
* columns will be determined by the max column index plus one.
*/
@Experimental
class CoordinateMatrix(
val entries: RDD[MatrixEntry],
private var nRows: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.mllib.linalg.Matrix

/**
* Represents a distributively stored matrix backed by one or more RDDs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,22 @@ package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.SingularValueDecomposition

/** Represents a row of [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]]. */
/**
* :: Experimental ::
*
* Represents a row of [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]].
*/
@Experimental
case class IndexedRow(index: Long, vector: Vector)

/**
* :: Experimental ::
*
* Represents a row-oriented [[org.apache.spark.mllib.linalg.distributed.DistributedMatrix]] with
* indexed rows.
*
Expand All @@ -36,6 +44,7 @@ case class IndexedRow(index: Long, vector: Vector)
* @param nCols number of columns. A non-positive value means unknown, and then the number of
* columns will be determined by the size of the first row.
*/
@Experimental
class IndexedRowMatrix(
val rows: RDD[IndexedRow],
private var nRows: Long,
Expand Down
Loading