1818package org .apache .spark .mllib .regression
1919
2020import java .io .Serializable
21+ import java .lang .{Double => JDouble }
2122import java .util .Arrays .binarySearch
2223
2324import org .apache .spark .api .java .{JavaDoubleRDD , JavaRDD }
@@ -53,8 +54,9 @@ class IsotonicRegressionModel (
5354 * @param testData Features to be labeled.
5455 * @return Predicted labels.
5556 */
56- def predict (testData : RDD [Double ]): RDD [Double ] =
57+ def predict (testData : RDD [Double ]): RDD [Double ] = {
5758 testData.map(predict)
59+ }
5860
5961 /**
6062 * Predict labels for provided features.
@@ -63,8 +65,9 @@ class IsotonicRegressionModel (
6365 * @param testData Features to be labeled.
6466 * @return Predicted labels.
6567 */
66- def predict (testData : JavaDoubleRDD ): JavaDoubleRDD =
68+ def predict (testData : JavaDoubleRDD ): JavaDoubleRDD = {
6769 JavaDoubleRDD .fromRDD(predict(testData.rdd.asInstanceOf [RDD [Double ]]))
70+ }
6871
6972 /**
7073 * Predict a single label.
@@ -75,8 +78,8 @@ class IsotonicRegressionModel (
7578 * If testData exactly matches a boundary then associated prediction is directly returned
7679 * If testData is lower or higher than all boundaries
7780 * then first or last prediction is returned respectively
78- * If testData falls between two values in boundary then predictions is treated as piecewise
79- * linear function and interpolated value is returned
81+ * If testData falls between two values in boundary then predictions is treated
82+ * as piecewise linear function and interpolated value is returned
8083 */
8184 def predict (testData : Double ): Double = {
8285
@@ -88,8 +91,8 @@ class IsotonicRegressionModel (
8891
8992 val normalisedInsertIndex = - insertIndex - 1
9093
91- // Find if the index was lower than all values,
92- // higher than all values, inbetween two values or exact match.
94+ // Find if the index was lower than all values,
95+ // higher than all values, inbetween two values or exact match.
9396 if (insertIndex == - 1 ) {
9497 predictions.head
9598 } else if (normalisedInsertIndex == boundaries.length){
@@ -121,37 +124,50 @@ class IsotonicRegressionModel (
121124 * "An approach to parallelizing isotonic regression."
122125 * Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147.
123126 */
124- class IsotonicRegression extends Serializable {
127+ class IsotonicRegression private ( private var isotonic : Boolean ) extends Serializable {
125128
126129 /**
127- * Run pool adjacent violators algorithm to obtain isotonic regression model.
130+ * Constructs IsotonicRegression instance with default parameter isotonic = true
131+ * @return New instance of IsotonicRegression
132+ */
133+ def this () = this (true )
134+
135+ /**
136+ * Sets the isotonic parameter
137+ * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
138+ * @return The instance of IsotonicRegression
139+ */
140+ def setIsotonic (isotonic : Boolean ): this .type = {
141+ this .isotonic = isotonic
142+ this
143+ }
144+
145+ /**
146+ * Run IsotonicRegression algorithm to obtain isotonic regression model.
128147 *
129148 * @param input RDD of tuples (label, feature, weight) where label is dependent variable
130149 * for which we calculate isotonic regression, feature is independent variable
131150 * and weight represents number of measures with default 1.
132151 *
133- * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
134152 * @return Isotonic regression model.
135153 */
136- def run (
137- input : RDD [(Double , Double , Double )],
138- isotonic : Boolean ): IsotonicRegressionModel =
154+ def run (input : RDD [(Double , Double , Double )]): IsotonicRegressionModel = {
139155 createModel(parallelPoolAdjacentViolators(input, isotonic), isotonic)
156+ }
140157
141- /**
158+ /**
142159 * Run pool adjacent violators algorithm to obtain isotonic regression model.
143160 *
144161 * @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable
145162 * for which we calculate isotonic regression, feature is independent variable
146163 * and weight represents number of measures with default 1.
147164 *
148- * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
149165 * @return Isotonic regression model.
150166 */
151167 def run (
152- input : JavaRDD [(java.lang. Double , java.lang. Double , java.lang. Double )],
153- isotonic : Boolean ) : IsotonicRegressionModel =
154- run(input.rdd. asInstanceOf [ RDD [( Double , Double , Double )]], isotonic)
168+ input : JavaRDD [(JDouble , JDouble , JDouble )]) : IsotonicRegressionModel = {
169+ run(input.rdd. asInstanceOf [ RDD [( Double , Double , Double )]])
170+ }
155171
156172 /**
157173 * Creates isotonic regression model with given parameters.
@@ -164,11 +180,7 @@ class IsotonicRegression extends Serializable {
164180 protected def createModel (
165181 predictions : Array [(Double , Double , Double )],
166182 isotonic : Boolean ): IsotonicRegressionModel = {
167-
168- val labels = predictions.map(_._1)
169- val features = predictions.map(_._2)
170-
171- new IsotonicRegressionModel (features, labels)
183+ new IsotonicRegressionModel (predictions.map(_._2), predictions.map(_._1))
172184 }
173185
174186 /**
@@ -249,4 +261,4 @@ class IsotonicRegression extends Serializable {
249261
250262 poolAdjacentViolators(parallelStepResult.collect(), isotonic)
251263 }
252- }
264+ }
0 commit comments