Skip to content

Commit 7010ccb

Browse files
zsxwingjeanlyn
authored andcommitted
[SPARK-7384][Core][Tests] Fix flaky tests for distributed mode in BroadcastSuite
Fixed the following failure: https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.3-Maven-pre-YARN/hadoop.version=1.0.4,label=centos/452/testReport/junit/org.apache.spark.broadcast/BroadcastSuite/Unpersisting_HttpBroadcast_on_executors_and_driver_in_distributed_mode/ The tests should wait until all slaves are up. Otherwise, there may be only a part of `BlockManager`s registered, and fail the tests. Author: zsxwing <[email protected]> Closes apache#5925 from zsxwing/SPARK-7384 and squashes the following commits: 783cb7b [zsxwing] Add comments for _jobProgressListener and remove postfixOps 1009ef1 [zsxwing] [SPARK-7384][Core][Tests] Fix flaky tests for distributed mode in BroadcastSuite
1 parent 6f6d68b commit 7010ccb

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -407,15 +407,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
407407

408408
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
409409

410+
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
411+
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
412+
_jobProgressListener = new JobProgressListener(_conf)
413+
listenerBus.addListener(jobProgressListener)
414+
410415
// Create the Spark execution environment (cache, map output tracker, etc)
411416
_env = createSparkEnv(_conf, isLocal, listenerBus)
412417
SparkEnv.set(_env)
413418

414419
_metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
415420

416-
_jobProgressListener = new JobProgressListener(_conf)
417-
listenerBus.addListener(jobProgressListener)
418-
419421
_statusTracker = new SparkStatusTracker(this)
420422

421423
_progressBar =

core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.spark.broadcast
1919

20+
import scala.concurrent.duration._
2021
import scala.util.Random
2122

2223
import org.scalatest.{Assertions, FunSuite}
24+
import org.scalatest.concurrent.Eventually._
2325

2426
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkEnv}
2527
import org.apache.spark.io.SnappyCompressionCodec
@@ -307,7 +309,17 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
307309
removeFromDriver: Boolean) {
308310

309311
sc = if (distributed) {
310-
new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
312+
val _sc =
313+
new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
314+
// Wait until all salves are up
315+
eventually(timeout(10.seconds), interval(10.milliseconds)) {
316+
_sc.jobProgressListener.synchronized {
317+
val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size
318+
assert(numBlockManagers == numSlaves + 1,
319+
s"Expect ${numSlaves + 1} block managers, but was ${numBlockManagers}")
320+
}
321+
}
322+
_sc
311323
} else {
312324
new SparkContext("local", "test", broadcastConf)
313325
}

0 commit comments

Comments
 (0)