Skip to content

Commit 6e2cfea

Browse files
author
Andrew Or
committed
Remove all return statements in withScope
The closure cleaner doesn't like these statements, for a good reason.
1 parent d19c4da commit 6e2cfea

File tree

3 files changed

+36
-28
lines changed

3 files changed

+36
-28
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
643643
/**
644644
* Execute a block of code in a scope.
645645
* All new RDDs created in this body will be part of the same scope.
646+
*
647+
* Note: Return statements are NOT allowed in the given body.
646648
*/
647649
private def withRDDScope[U](body: => U): U = RDDScope.withScope[U](this)(body)
648650

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,8 @@ abstract class RDD[T: ClassTag](
280280
/**
281281
* Execute a block of code in a scope.
282282
* All new RDDs created in this body will be part of the same scope.
283+
*
284+
* Note: Return statements are NOT allowed in the given body.
283285
*/
284286
private[spark] def withScope[U](body: => U): U = RDDScope.withScope[U](sc)(body)
285287

@@ -1199,38 +1201,38 @@ abstract class RDD[T: ClassTag](
11991201
*/
12001202
def take(num: Int): Array[T] = withScope {
12011203
if (num == 0) {
1202-
return new Array[T](0)
1203-
}
1204-
1205-
val buf = new ArrayBuffer[T]
1206-
val totalParts = this.partitions.length
1207-
var partsScanned = 0
1208-
while (buf.size < num && partsScanned < totalParts) {
1209-
// The number of partitions to try in this iteration. It is ok for this number to be
1210-
// greater than totalParts because we actually cap it at totalParts in runJob.
1211-
var numPartsToTry = 1
1212-
if (partsScanned > 0) {
1213-
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
1214-
// interpolate the number of partitions we need to try, but overestimate it by 50%.
1215-
// We also cap the estimation in the end.
1216-
if (buf.size == 0) {
1217-
numPartsToTry = partsScanned * 4
1218-
} else {
1219-
// the left side of max is >=1 whenever partsScanned >= 2
1220-
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
1221-
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
1204+
new Array[T](0)
1205+
} else {
1206+
val buf = new ArrayBuffer[T]
1207+
val totalParts = this.partitions.length
1208+
var partsScanned = 0
1209+
while (buf.size < num && partsScanned < totalParts) {
1210+
// The number of partitions to try in this iteration. It is ok for this number to be
1211+
// greater than totalParts because we actually cap it at totalParts in runJob.
1212+
var numPartsToTry = 1
1213+
if (partsScanned > 0) {
1214+
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
1215+
// interpolate the number of partitions we need to try, but overestimate it by 50%.
1216+
// We also cap the estimation in the end.
1217+
if (buf.size == 0) {
1218+
numPartsToTry = partsScanned * 4
1219+
} else {
1220+
// the left side of max is >=1 whenever partsScanned >= 2
1221+
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
1222+
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
1223+
}
12221224
}
1223-
}
12241225

1225-
val left = num - buf.size
1226-
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
1227-
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
1226+
val left = num - buf.size
1227+
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
1228+
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
12281229

1229-
res.foreach(buf ++= _.take(num - buf.size))
1230-
partsScanned += numPartsToTry
1231-
}
1230+
res.foreach(buf ++= _.take(num - buf.size))
1231+
partsScanned += numPartsToTry
1232+
}
12321233

1233-
buf.toArray
1234+
buf.toArray
1235+
}
12341236
}
12351237

12361238
/**

core/src/main/scala/org/apache/spark/rdd/RDDScope.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ private[spark] object RDDScope {
5454
/**
5555
* Execute the given body such that all RDDs created in this body will have the same scope.
5656
* The name of the scope will be the name of the method that immediately encloses this one.
57+
*
58+
* Note: Return statements are NOT allowed in body.
5759
*/
5860
private[spark] def withScope[T](
5961
sc: SparkContext,
@@ -68,6 +70,8 @@ private[spark] object RDDScope {
6870
* If nesting is allowed, this concatenates the previous scope with the new one in a way that
6971
* signifies the hierarchy. Otherwise, if nesting is not allowed, then any children calls to
7072
* this method executed in the body will have no effect.
73+
*
74+
* Note: Return statements are NOT allowed in body.
7175
*/
7276
private[spark] def withScope[T](
7377
sc: SparkContext,

0 commit comments

Comments
 (0)