Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 086b0c8

Browse files
committed
[SPARK-18617][SPARK-18560][TESTS] Fix flaky test: StreamingContextSuite. Receiver data should be deserialized properly
## What changes were proposed in this pull request? Avoid to create multiple threads to stop StreamingContext. Otherwise, the latch added in apache#16091 can be passed too early. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes apache#16105 from zsxwing/SPARK-18617-2.
1 parent 78bb7f8 commit 086b0c8

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -814,10 +814,12 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
814814
ssc = new StreamingContext(conf, Milliseconds(100))
815815
val input = ssc.receiverStream(new TestReceiver)
816816
val latch = new CountDownLatch(1)
817+
@volatile var stopping = false
817818
input.count().foreachRDD { rdd =>
818819
// Make sure we can read from BlockRDD
819-
if (rdd.collect().headOption.getOrElse(0L) > 0) {
820+
if (rdd.collect().headOption.getOrElse(0L) > 0 && !stopping) {
820821
// Stop StreamingContext to unblock "awaitTerminationOrTimeout"
822+
stopping = true
821823
new Thread() {
822824
setDaemon(true)
823825
override def run(): Unit = {

0 commit comments

Comments
 (0)