Skip to content

Commit db420f7

Browse files
holdenkdongjoon-hyun
authored andcommitted
[SPARK-33049][CORE] Decommission shuffle block test is flaky
### What changes were proposed in this pull request? Increase the listener bus event length, syncrhonize the addition of blocks modified to the array list. ### Why are the changes needed? This test appears flaky in Jenkins (can not repro locally). Given that the index file made it through and the index file is only transferred after the data file, the only two reasons I could come up with an interminentent failure here are with the listenerbus dropping a message or the two block change messages being received at the same time. ### Does this PR introduce _any_ user-facing change? No (test only). ### How was this patch tested? The tests still pass on my machine but they did before. We'll need to run it through jenkins a few times first. Closes #29929 from holdenk/fix-.BlockManagerDecommissionIntegrationSuite. Authored-by: Holden Karau <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 37c806a commit db420f7

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
6969
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
7070
.set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist)
7171
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
72+
// Since we use the bus for testing we don't want to drop any messages
73+
.set(config.LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1000000)
7274
// Just replicate blocks quickly during testing, there isn't another
7375
// workload we need to worry about.
7476
.set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
@@ -137,7 +139,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
137139
taskEndEvents.add(taskEnd)
138140
}
139141

140-
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
142+
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = synchronized {
141143
blocksUpdated.append(blockUpdated)
142144
}
143145

0 commit comments

Comments
 (0)