Skip to content

Commit d77c337

Browse files
committed
Merge pull request alteryx#232 from markhamstra/FiniteWait
jobWaiter.synchronized before jobWaiter.wait ...else ``IllegalMonitorStateException`` in ``SimpleFutureAction#ready``. (cherry picked from commit 0780498) Signed-off-by: Reynold Xin <[email protected]>
1 parent 17ca8a1 commit d77c337

File tree

3 files changed

+28
-1
lines changed

3 files changed

+28
-1
lines changed

core/src/main/scala/org/apache/spark/FutureAction.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
9999
override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = {
100100
if (!atMost.isFinite()) {
101101
awaitResult()
102-
} else {
102+
} else jobWaiter.synchronized {
103103
val finishTime = System.currentTimeMillis() + atMost.toMillis
104104
while (!isCompleted) {
105105
val time = System.currentTimeMillis()

core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ private[spark] class JobWaiter[T](
3131
private var finishedTasks = 0
3232

3333
// Is the job as a whole finished (succeeded or failed)?
34+
@volatile
3435
private var _jobFinished = totalTasks == 0
3536

3637
def jobFinished = _jobFinished

core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.rdd
1919

2020
import java.util.concurrent.Semaphore
2121

22+
import scala.concurrent.{Await, TimeoutException}
23+
import scala.concurrent.duration.Duration
2224
import scala.concurrent.ExecutionContext.Implicits.global
2325

2426
import org.scalatest.{BeforeAndAfterAll, FunSuite}
@@ -173,4 +175,28 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll with Timeouts
173175
sem.acquire(2)
174176
}
175177
}
178+
179+
/**
180+
* Awaiting FutureAction results
181+
*/
182+
test("FutureAction result, infinite wait") {
183+
val f = sc.parallelize(1 to 100, 4)
184+
.countAsync()
185+
assert(Await.result(f, Duration.Inf) === 100)
186+
}
187+
188+
test("FutureAction result, finite wait") {
189+
val f = sc.parallelize(1 to 100, 4)
190+
.countAsync()
191+
assert(Await.result(f, Duration(30, "seconds")) === 100)
192+
}
193+
194+
test("FutureAction result, timeout") {
195+
val f = sc.parallelize(1 to 100, 4)
196+
.mapPartitions(itr => { Thread.sleep(20); itr })
197+
.countAsync()
198+
intercept[TimeoutException] {
199+
Await.result(f, Duration(20, "milliseconds"))
200+
}
201+
}
176202
}

0 commit comments

Comments
 (0)