1818package org .apache .spark .examples .mllib
1919
2020import org .apache .spark .mllib .linalg .Vectors
21+ import org .apache .spark .mllib .regression .LabeledPoint
2122import org .apache .spark .mllib .clustering .StreamingKMeans
2223import org .apache .spark .SparkConf
2324import org .apache .spark .streaming .{Seconds , StreamingContext }
@@ -27,9 +28,13 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
2728 * on another stream, where the data streams arrive as text files
2829 * into two different directories.
2930 *
30- * The rows of the text files must be vector data in the form
31+ * The rows of the training text files must be vector data in the form
3132 * `[x1,x2,x3,...,xn]`
32- * Where n is the number of dimensions. n must be the same for train and test.
33+ * Where n is the number of dimensions.
34+ *
35+ * The rows of the test text files must be labeled data in the form
36+ * `(y,[x1,x2,x3,...,xn])`
37+ * Where y is some identifier. n must be the same for train and test.
3338 *
3439 * Usage: StreamingKmeans <trainingDir> <testDir> <batchDuration> <numClusters> <numDimensions>
3540 *
@@ -57,15 +62,15 @@ object StreamingKMeans {
5762 val ssc = new StreamingContext (conf, Seconds (args(2 ).toLong))
5863
5964 val trainingData = ssc.textFileStream(args(0 )).map(Vectors .parse)
60- val testData = ssc.textFileStream(args(1 )).map(Vectors .parse)
65+ val testData = ssc.textFileStream(args(1 )).map(LabeledPoint .parse)
6166
6267 val model = new StreamingKMeans ()
6368 .setK(args(3 ).toInt)
6469 .setDecayFactor(1.0 )
6570 .setRandomCenters(args(4 ).toInt)
6671
6772 model.trainOn(trainingData)
68- model.predictOn (testData).print()
73+ model.predictOnValues (testData.map(lp => (lp.label, lp.features)) ).print()
6974
7075 ssc.start()
7176 ssc.awaitTermination()
0 commit comments