Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,22 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri

/**
* For testing only. Wait until there are no more events in the queue, or until the specified
* time has elapsed. Return true if the queue has emptied and false is the specified time
* elapsed before the queue emptied.
* time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
* emptied.
*/
@VisibleForTesting
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
@throws(classOf[TimeoutException])
def waitUntilEmpty(timeoutMillis: Long): Unit = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!queueIsEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
throw new TimeoutException(
s"The event queue is not empty after $timeoutMillis milliseconds")
}
/* Sleep rather than using wait/notify, because this is used only for testing and
* wait/notify add overhead in the general case. */
Thread.sleep(10)
}
true
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
// Browse to each URL to check that it's valid
Expand Down Expand Up @@ -71,7 +71,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
assert(listeners.size === 1)
val listener = listeners(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ class DAGSchedulerSuite
test("[SPARK-3353] parent stage should have lower stage id") {
sparkListener.stageByOrderOfExecution.clear()
sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.stageByOrderOfExecution.length === 2)
assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1))
}
Expand Down Expand Up @@ -389,7 +389,7 @@ class DAGSchedulerSuite
submit(unserializableRdd, Array(0))
assert(failure.getMessage.startsWith(
"Job aborted due to stage failure: Task not serializable:"))
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
assertDataStructuresEmpty()
Expand All @@ -399,7 +399,7 @@ class DAGSchedulerSuite
submit(new MyRDD(sc, 1, Nil), Array(0))
failed(taskSets(0), "some failure")
assert(failure.getMessage === "Job aborted due to stage failure: some failure")
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
assertDataStructuresEmpty()
Expand All @@ -410,7 +410,7 @@ class DAGSchedulerSuite
val jobId = submit(rdd, Array(0))
cancel(jobId)
assert(failure.getMessage === s"Job $jobId cancelled ")
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
assertDataStructuresEmpty()
Expand Down Expand Up @@ -462,7 +462,7 @@ class DAGSchedulerSuite
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.isEmpty)
assert(sparkListener.successfulStages.contains(0))
}
Expand Down Expand Up @@ -531,7 +531,7 @@ class DAGSchedulerSuite
Map[Long, Any](),
createFakeTaskInfo(),
null))
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(1))

// The second ResultTask fails, with a fetch failure for the output from the second mapper.
Expand All @@ -543,7 +543,7 @@ class DAGSchedulerSuite
createFakeTaskInfo(),
null))
// The SparkListener should not receive redundant failure events.
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.size == 1)
}

Expand Down Expand Up @@ -592,7 +592,7 @@ class DAGSchedulerSuite

// Listener bus should get told about the map stage failing, but not the reduce stage
// (since the reduce stage hasn't been started yet).
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.toSet === Set(0))

assertDataStructuresEmpty()
Expand Down Expand Up @@ -643,7 +643,7 @@ class DAGSchedulerSuite
assert(cancelledStages.toSet === Set(0, 2))

// Make sure the listeners got told about both failed stages.
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.successfulStages.isEmpty)
assert(sparkListener.failedStages.toSet === Set(0, 2))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

// Starting listener bus should flush all buffered events
bus.start(sc)
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)

// After listener bus has stopped, posting events should not increment counter
Expand Down Expand Up @@ -131,7 +131,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
rdd2.setName("Target RDD")
rdd2.count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

listener.stageInfos.size should be {1}
val (stageInfo, taskInfoMetrics) = listener.stageInfos.head
Expand All @@ -156,7 +156,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
rdd3.setName("Trois")

rdd1.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be {1}
val stageInfo1 = listener.stageInfos.keys.find(_.stageId == 0).get
stageInfo1.rddInfos.size should be {1} // ParallelCollectionRDD
Expand All @@ -165,7 +165,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.stageInfos.clear()

rdd2.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be {1}
val stageInfo2 = listener.stageInfos.keys.find(_.stageId == 1).get
stageInfo2.rddInfos.size should be {3} // ParallelCollectionRDD, FilteredRDD, MappedRDD
Expand All @@ -174,7 +174,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.stageInfos.clear()

rdd3.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be {2} // Shuffle map stage + result stage
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get
stageInfo3.rddInfos.size should be {1} // ShuffledRDD
Expand All @@ -190,7 +190,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val rdd2 = rdd1.map(_.toString)
sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1), true)

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

listener.stageInfos.size should be {1}
val (stageInfo, _) = listener.stageInfos.head
Expand All @@ -214,7 +214,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

val d = sc.parallelize(0 to 1e4.toInt, 64).map(w)
d.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be (1)

val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
Expand All @@ -225,7 +225,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
d4.setName("A Cogroup")
d4.collectAsMap()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be (4)
listener.stageInfos.foreach { case (stageInfo, taskInfoMetrics) =>
/**
Expand Down Expand Up @@ -281,7 +281,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
.reduce { case (x, y) => x }
assert(result === 1.to(akkaFrameSize).toArray)

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
assert(listener.startedGettingResultTasks.contains(TASK_INDEX))
Expand All @@ -297,7 +297,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
assert(result === 2)

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
assert(listener.startedGettingResultTasks.isEmpty)
Expand Down Expand Up @@ -352,7 +352,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

// The exception should be caught, and the event should be propagated to other listeners
assert(bus.listenerThreadIsAlive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext
rdd2.setName("Target RDD")
rdd2.count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(listener.addedExecutorInfo.size == 2)
assert(listener.addedExecutorInfo("0").totalCores == 1)
assert(listener.addedExecutorInfo("1").totalCores == 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ private object YarnClusterDriver extends Logging with Matchers {
var result = "failure"
try {
val data = sc.parallelize(1 to 4, 4).collect().toSet
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
data should be (Set(1, 2, 3, 4))
result = "success"
} finally {
Expand Down