Skip to content

Commit 3467cff

Browse files
committed
Merge remote-tracking branch 'upstream/master' into dimsumv2
Conflicts: mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
2 parents aea0247 + 0dc868e commit 3467cff

File tree

12 files changed

+121
-40
lines changed

12 files changed

+121
-40
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ conf/*.cmd
2323
conf/*.properties
2424
conf/*.conf
2525
conf/*.xml
26+
conf/slaves
2627
docs/_site
2728
docs/api
2829
target/

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ log4j.properties
1919
log4j.properties.template
2020
metrics.properties.template
2121
slaves
22+
slaves.template
2223
spark-env.sh
2324
spark-env.cmd
2425
spark-env.sh.template
File renamed without changes.

docs/mllib-clustering.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ import org.apache.spark.mllib.linalg.Vectors
5252

5353
// Load and parse the data
5454
val data = sc.textFile("data/mllib/kmeans_data.txt")
55-
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
55+
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
5656

5757
// Cluster the data into two classes using KMeans
5858
val numClusters = 2
@@ -100,6 +100,7 @@ public class KMeansExample {
100100
}
101101
}
102102
);
103+
parsedData.cache();
103104

104105
// Cluster the data into two classes using KMeans
105106
int numClusters = 2;

docs/mllib-linear-methods.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ val data = sc.textFile("data/mllib/ridge-data/lpsa.data")
396396
val parsedData = data.map { line =>
397397
val parts = line.split(',')
398398
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
399-
}
399+
}.cache()
400400

401401
// Building the model
402402
val numIterations = 100
@@ -455,6 +455,7 @@ public class LinearRegression {
455455
}
456456
}
457457
);
458+
parsedData.cache();
458459

459460
// Building the model
460461
int numIterations = 100;
@@ -470,7 +471,7 @@ public class LinearRegression {
470471
}
471472
}
472473
);
473-
JavaRDD<Object> MSE = new JavaDoubleRDD(valuesAndPreds.map(
474+
double MSE = new JavaDoubleRDD(valuesAndPreds.map(
474475
new Function<Tuple2<Double, Double>, Object>() {
475476
public Object call(Tuple2<Double, Double> pair) {
476477
return Math.pow(pair._1() - pair._2(), 2.0);
@@ -553,8 +554,8 @@ but in practice you will likely want to use unlabeled vectors for test data.
553554

554555
{% highlight scala %}
555556

556-
val trainingData = ssc.textFileStream('/training/data/dir').map(LabeledPoint.parse)
557-
val testData = ssc.textFileStream('/testing/data/dir').map(LabeledPoint.parse)
557+
val trainingData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse).cache()
558+
val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)
558559

559560
{% endhighlight %}
560561

docs/mllib-optimization.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
217217
import org.apache.spark.mllib.linalg.Vectors
218218
import org.apache.spark.mllib.util.MLUtils
219219
import org.apache.spark.mllib.classification.LogisticRegressionModel
220+
import org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater}
220221

221222
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
222223
val numFeatures = data.take(1)(0).features.size

docs/spark-standalone.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,12 @@ Finally, the following configuration options can be passed to the master and wor
6262

6363
# Cluster Launch Scripts
6464

65-
To launch a Spark standalone cluster with the launch scripts, you need to create a file called `conf/slaves` in your Spark directory, which should contain the hostnames of all the machines where you would like to start Spark workers, one per line. The master machine must be able to access each of the slave machines via password-less `ssh` (using a private key). For testing, you can just put `localhost` in this file.
65+
To launch a Spark standalone cluster with the launch scripts, you should create a file called conf/slaves in your Spark directory,
66+
which must contain the hostnames of all the machines where you intend to start Spark workers, one per line.
67+
If conf/slaves does not exist, the launch scripts defaults to a single machine (localhost), which is useful for testing.
68+
Note, the master machine accesses each of the worker machines via ssh. By default, ssh is run in parallel and requires password-less (using a private key) access to be setup.
69+
If you do not have a password-less setup, you can set the environment variable SPARK_SSH_FOREGROUND and serially provide a password for each worker.
70+
6671

6772
Once you've set up this file, you can launch or stop your cluster with the following shell scripts, based on Hadoop's deploy scripts, and available in `SPARK_HOME/bin`:
6873

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,13 @@ class PythonMLLibAPI extends Serializable {
6767
MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions)
6868

6969
private def trainRegressionModel(
70-
trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
70+
learner: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel],
7171
data: JavaRDD[LabeledPoint],
7272
initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = {
7373
val initialWeights = SerDe.loads(initialWeightsBA).asInstanceOf[Vector]
74-
val model = trainFunc(data.rdd, initialWeights)
74+
// Disable the uncached input warning because 'data' is a deliberately uncached MappedRDD.
75+
learner.disableUncachedWarning()
76+
val model = learner.run(data.rdd, initialWeights)
7577
val ret = new java.util.LinkedList[java.lang.Object]()
7678
ret.add(SerDe.dumps(model.weights))
7779
ret.add(model.intercept: java.lang.Double)
@@ -106,8 +108,7 @@ class PythonMLLibAPI extends Serializable {
106108
+ " Can only be initialized using the following string values: [l1, l2, none].")
107109
}
108110
trainRegressionModel(
109-
(data, initialWeights) =>
110-
lrAlg.run(data, initialWeights),
111+
lrAlg,
111112
data,
112113
initialWeightsBA)
113114
}
@@ -122,15 +123,14 @@ class PythonMLLibAPI extends Serializable {
122123
regParam: Double,
123124
miniBatchFraction: Double,
124125
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
126+
val lassoAlg = new LassoWithSGD()
127+
lassoAlg.optimizer
128+
.setNumIterations(numIterations)
129+
.setRegParam(regParam)
130+
.setStepSize(stepSize)
131+
.setMiniBatchFraction(miniBatchFraction)
125132
trainRegressionModel(
126-
(data, initialWeights) =>
127-
LassoWithSGD.train(
128-
data,
129-
numIterations,
130-
stepSize,
131-
regParam,
132-
miniBatchFraction,
133-
initialWeights),
133+
lassoAlg,
134134
data,
135135
initialWeightsBA)
136136
}
@@ -145,15 +145,14 @@ class PythonMLLibAPI extends Serializable {
145145
regParam: Double,
146146
miniBatchFraction: Double,
147147
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
148+
val ridgeAlg = new RidgeRegressionWithSGD()
149+
ridgeAlg.optimizer
150+
.setNumIterations(numIterations)
151+
.setRegParam(regParam)
152+
.setStepSize(stepSize)
153+
.setMiniBatchFraction(miniBatchFraction)
148154
trainRegressionModel(
149-
(data, initialWeights) =>
150-
RidgeRegressionWithSGD.train(
151-
data,
152-
numIterations,
153-
stepSize,
154-
regParam,
155-
miniBatchFraction,
156-
initialWeights),
155+
ridgeAlg,
157156
data,
158157
initialWeightsBA)
159158
}
@@ -186,8 +185,7 @@ class PythonMLLibAPI extends Serializable {
186185
+ " Can only be initialized using the following string values: [l1, l2, none].")
187186
}
188187
trainRegressionModel(
189-
(data, initialWeights) =>
190-
SVMAlg.run(data, initialWeights),
188+
SVMAlg,
191189
data,
192190
initialWeightsBA)
193191
}
@@ -220,8 +218,7 @@ class PythonMLLibAPI extends Serializable {
220218
+ " Can only be initialized using the following string values: [l1, l2, none].")
221219
}
222220
trainRegressionModel(
223-
(data, initialWeights) =>
224-
LogRegAlg.run(data, initialWeights),
221+
LogRegAlg,
225222
data,
226223
initialWeightsBA)
227224
}
@@ -249,7 +246,14 @@ class PythonMLLibAPI extends Serializable {
249246
maxIterations: Int,
250247
runs: Int,
251248
initializationMode: String): KMeansModel = {
252-
KMeans.train(data.rdd, k, maxIterations, runs, initializationMode)
249+
val kMeansAlg = new KMeans()
250+
.setK(k)
251+
.setMaxIterations(maxIterations)
252+
.setRuns(runs)
253+
.setInitializationMode(initializationMode)
254+
// Disable the uncached input warning because 'data' is a deliberately uncached MappedRDD.
255+
.disableUncachedWarning()
256+
return kMeansAlg.run(data.rdd)
253257
}
254258

255259
/**

mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.SparkContext._
2727
import org.apache.spark.mllib.linalg.{Vector, Vectors}
2828
import org.apache.spark.mllib.util.MLUtils
2929
import org.apache.spark.rdd.RDD
30+
import org.apache.spark.storage.StorageLevel
3031
import org.apache.spark.util.random.XORShiftRandom
3132

3233
/**
@@ -112,11 +113,26 @@ class KMeans private (
112113
this
113114
}
114115

116+
/** Whether a warning should be logged if the input RDD is uncached. */
117+
private var warnOnUncachedInput = true
118+
119+
/** Disable warnings about uncached input. */
120+
private[spark] def disableUncachedWarning(): this.type = {
121+
warnOnUncachedInput = false
122+
this
123+
}
124+
115125
/**
116126
* Train a K-means model on the given set of points; `data` should be cached for high
117127
* performance, because this is an iterative algorithm.
118128
*/
119129
def run(data: RDD[Vector]): KMeansModel = {
130+
131+
if (warnOnUncachedInput && data.getStorageLevel == StorageLevel.NONE) {
132+
logWarning("The input data is not directly cached, which may hurt performance if its"
133+
+ " parent RDDs are also uncached.")
134+
}
135+
120136
// Compute squared norms and cache them.
121137
val norms = data.map(v => breezeNorm(v.toBreeze, 2.0))
122138
norms.persist()
@@ -125,6 +141,12 @@ class KMeans private (
125141
}
126142
val model = runBreeze(breezeData)
127143
norms.unpersist()
144+
145+
// Warn at the end of the run as well, for increased visibility.
146+
if (warnOnUncachedInput && data.getStorageLevel == StorageLevel.NONE) {
147+
logWarning("The input data was not directly cached, which may hurt performance if its"
148+
+ " parent RDDs are also uncached.")
149+
}
128150
model
129151
}
130152

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.mllib.rdd.RDDFunctions._
3434
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary}
3535
import org.apache.spark.rdd.RDD
3636
import org.apache.spark.util.random.XORShiftRandom
37+
import org.apache.spark.storage.StorageLevel
3738

3839
/**
3940
* :: Experimental ::
@@ -235,6 +236,10 @@ class RowMatrix(
235236
val brzSvd.SVD(uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G)
236237
(sigmaSquaresFull, uFull)
237238
case SVDMode.DistARPACK =>
239+
if (rows.getStorageLevel == StorageLevel.NONE) {
240+
logWarning("The input data is not directly cached, which may hurt performance if its"
241+
+ " parent RDDs are also uncached.")
242+
}
238243
require(k < n, s"k must be smaller than n in dist-eigs mode but got k=$k and n=$n.")
239244
EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter)
240245
}
@@ -260,6 +265,12 @@ class RowMatrix(
260265
logWarning(s"Requested $k singular values but only found $sk nonzeros.")
261266
}
262267

268+
// Warn at the end of the run as well, for increased visibility.
269+
if (computeMode == SVDMode.DistARPACK && rows.getStorageLevel == StorageLevel.NONE) {
270+
logWarning("The input data was not directly cached, which may hurt performance if its"
271+
+ " parent RDDs are also uncached.")
272+
}
273+
263274
val s = Vectors.dense(Arrays.copyOfRange(sigmas.data, 0, sk))
264275
val V = Matrices.dense(n, sk, Arrays.copyOfRange(u.data, 0, n * sk))
265276

0 commit comments

Comments
 (0)