@@ -30,11 +30,12 @@ import org.apache.spark.rdd.RDD
3030 * @param boundaries Array of boundaries for which predictions are known.
3131 * Boundaries must be sorted in increasing order.
3232 * @param predictions Array of predictions associated to the boundaries at the same index.
33- * Result of isotonic regression and therefore is monotone.
33+ * Results of isotonic regression and therefore monotone.
3434 */
3535class IsotonicRegressionModel (
3636 boundaries : Array [Double ],
37- val predictions : Array [Double ])
37+ val predictions : Array [Double ],
38+ isotonic : Boolean )
3839 extends Serializable {
3940
4041 private def isSorted (xs : Array [Double ]): Boolean = {
@@ -46,6 +47,12 @@ class IsotonicRegressionModel (
4647 true
4748 }
4849
50+ if (isotonic) {
51+ assert(isSorted(predictions))
52+ } else {
53+ assert(isSorted(predictions.map(- _)))
54+ }
55+
4956 assert(isSorted(boundaries))
5057 assert(boundaries.length == predictions.length)
5158
@@ -77,11 +84,15 @@ class IsotonicRegressionModel (
7784 *
7885 * @param testData Feature to be labeled.
7986 * @return Predicted label.
80- * If testData exactly matches a boundary then associated prediction is directly returned.
81- * If testData is lower or higher than all boundaries.
82- * then first or last prediction is returned respectively.
83- * If testData falls between two values in boundary array then predictions is treated
84- * as piecewise linear function and interpolated value is returned.
87+ * 1) If testData exactly matches a boundary then associated prediction is returned.
88+ * In case there are multiple predictions with the same boundary then one of them
89+ * is returned. Which one is undefined (same as java.util.Arrays.binarySearch).
90+ * 2) If testData is lower or higher than all boundaries then first or last prediction
91+ * is returned respectively. In case there are multiple predictions with the same
92+ * boundary then the lowest or highest is returned respectively.
93+ * 3) If testData falls between two values in boundary array then prediction is treated
94+ * as piecewise linear function and interpolated value is returned. In case there are
95+ * multiple values with the same boundary then the same rules as in 2) are used.
8596 */
8697 def predict (testData : Double ): Double = {
8798
@@ -131,12 +142,14 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
131142
132143 /**
133144 * Constructs IsotonicRegression instance with default parameter isotonic = true.
145+ *
134146 * @return New instance of IsotonicRegression.
135147 */
136148 def this () = this (true )
137149
138150 /**
139151 * Sets the isotonic parameter.
152+ *
140153 * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
141154 * @return This instance of IsotonicRegression.
142155 */
@@ -151,10 +164,23 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
151164 * @param input RDD of tuples (label, feature, weight) where label is dependent variable
152165 * for which we calculate isotonic regression, feature is independent variable
153166 * and weight represents number of measures with default 1.
167+ * If multiple labels share the same feature value then they are ordered before
168+ * the algorithm is executed.
154169 * @return Isotonic regression model.
155170 */
156171 def run (input : RDD [(Double , Double , Double )]): IsotonicRegressionModel = {
157- createModel(parallelPoolAdjacentViolators(input, isotonic), isotonic)
172+ val preprocessedInput = if (isotonic) {
173+ input
174+ } else {
175+ input.map(x => (- x._1, x._2, x._3))
176+ }
177+
178+ val isotonicRegression = parallelPoolAdjacentViolators(preprocessedInput)
179+
180+ val predictions = if (isotonic) isotonicRegression.map(_._1) else isotonicRegression.map(- _._1)
181+ val boundaries = isotonicRegression.map(_._2)
182+
183+ new IsotonicRegressionModel (boundaries, predictions, isotonic)
158184 }
159185
160186 /**
@@ -163,42 +189,26 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
163189 * @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable
164190 * for which we calculate isotonic regression, feature is independent variable
165191 * and weight represents number of measures with default 1.
166- *
192+ * If multiple labels share the same feature value then they are ordered before
193+ * the algorithm is executed.
167194 * @return Isotonic regression model.
168195 */
169- def run (
170- input : JavaRDD [(JDouble , JDouble , JDouble )]): IsotonicRegressionModel = {
196+ def run (input : JavaRDD [(JDouble , JDouble , JDouble )]): IsotonicRegressionModel = {
171197 run(input.rdd.asInstanceOf [RDD [(Double , Double , Double )]])
172198 }
173199
174- /**
175- * Creates isotonic regression model with given parameters.
176- *
177- * @param predictions Predictions calculated using pool adjacent violators algorithm.
178- * Used for predictions on new data points.
179- * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
180- * @return Isotonic regression model.
181- */
182- protected def createModel (
183- predictions : Array [(Double , Double , Double )],
184- isotonic : Boolean ): IsotonicRegressionModel = {
185- new IsotonicRegressionModel (predictions.map(_._2), predictions.map(_._1))
186- }
187-
188200 /**
189201 * Performs a pool adjacent violators algorithm (PAV).
190202 * Uses approach with single processing of data where violators
191203 * in previously processed data created by pooling are fixed immediately.
192204 * Uses optimization of discovering monotonicity violating sequences (blocks).
193205 *
194206 * @param input Input data of tuples (label, feature, weight).
195- * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
196207 * @return Result tuples (label, feature, weight) where labels were updated
197208 * to form a monotone sequence as per isotonic regression definition.
198209 */
199210 private def poolAdjacentViolators (
200- input : Array [(Double , Double , Double )],
201- isotonic : Boolean ): Array [(Double , Double , Double )] = {
211+ input : Array [(Double , Double , Double )]): Array [(Double , Double , Double )] = {
202212
203213 // Pools sub array within given bounds assigning weighted average value to all elements.
204214 def pool (input : Array [(Double , Double , Double )], start : Int , end : Int ): Unit = {
@@ -214,15 +224,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
214224 }
215225 }
216226
217- val monotonicityConstraintHolds : (Double , Double ) => Boolean =
218- (x, y) => if (isotonic) x <= y else x >= y
219-
220227 var i = 0
221228 while (i < input.length) {
222229 var j = i
223230
224231 // Find monotonicity violating sequence, if any.
225- while (j < input.length - 1 && ! monotonicityConstraintHolds( input(j)._1, input(j + 1 )._1) ) {
232+ while (j < input.length - 1 && input(j)._1 > input(j + 1 )._1) {
226233 j = j + 1
227234 }
228235
@@ -232,7 +239,7 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
232239 } else {
233240 // Otherwise pool the violating sequence
234241 // and check if pooling caused monotonicity violation in previously processed points.
235- while (i >= 0 && ! monotonicityConstraintHolds( input(i)._1, input(i + 1 )._1) ) {
242+ while (i >= 0 && input(i)._1 > input(i + 1 )._1) {
236243 pool(input, i, j)
237244 i = i - 1
238245 }
@@ -248,19 +255,17 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
248255 * Performs parallel pool adjacent violators algorithm.
249256 * Performs Pool adjacent violators algorithm on each partition and then again on the result.
250257 *
251- * @param testData Input data of tuples (label, feature, weight).
252- * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
258+ * @param input Input data of tuples (label, feature, weight).
253259 * @return Result tuples (label, feature, weight) where labels were updated
254260 * to form a monotone sequence as per isotonic regression definition.
255261 */
256262 private def parallelPoolAdjacentViolators (
257- testData : RDD [(Double , Double , Double )],
258- isotonic : Boolean ): Array [(Double , Double , Double )] = {
263+ input : RDD [(Double , Double , Double )]): Array [(Double , Double , Double )] = {
259264
260- val parallelStepResult = testData
261- .sortBy(_ ._2)
262- .mapPartitions(it => poolAdjacentViolators(it.toArray, isotonic ).toIterator)
265+ val parallelStepResult = input
266+ .sortBy(x => (x ._2, x._1) )
267+ .mapPartitions(it => poolAdjacentViolators(it.toArray).toIterator)
263268
264- poolAdjacentViolators(parallelStepResult.collect(), isotonic )
269+ poolAdjacentViolators(parallelStepResult.collect())
265270 }
266271}
0 commit comments