Skip to content

Commit b181126

Browse files
brkyvzzsxwing
authored andcommitted
[STREAMING][FLAKY-TEST] Catch execution context race condition in FileBasedWriteAheadLog.close()
There is a race condition in `FileBasedWriteAheadLog.close()`, where if delete's of old log files are in progress, the write ahead log may close, and result in a `RejectedExecutionException`. This is okay, and should be handled gracefully. Example test failures: https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.6-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=spark-test/95/testReport/junit/org.apache.spark.streaming.util/BatchedWriteAheadLogWithCloseFileAfterWriteSuite/BatchedWriteAheadLog___clean_old_logs/ The reason the test fails is in `afterEach`, `writeAheadLog.close` is called, and there may still be async deletes in flight. tdas zsxwing Author: Burak Yavuz <[email protected]> Closes #9953 from brkyvz/flaky-ss. (cherry picked from commit a5d9887) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 862d788 commit b181126

File tree

1 file changed

+11
-5
lines changed

1 file changed

+11
-5
lines changed

streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package org.apache.spark.streaming.util
1818

1919
import java.nio.ByteBuffer
20-
import java.util.concurrent.ThreadPoolExecutor
20+
import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor}
2121
import java.util.{Iterator => JIterator}
2222

2323
import scala.collection.JavaConverters._
@@ -176,10 +176,16 @@ private[streaming] class FileBasedWriteAheadLog(
176176
}
177177
oldLogFiles.foreach { logInfo =>
178178
if (!executionContext.isShutdown) {
179-
val f = Future { deleteFile(logInfo) }(executionContext)
180-
if (waitForCompletion) {
181-
import scala.concurrent.duration._
182-
Await.ready(f, 1 second)
179+
try {
180+
val f = Future { deleteFile(logInfo) }(executionContext)
181+
if (waitForCompletion) {
182+
import scala.concurrent.duration._
183+
Await.ready(f, 1 second)
184+
}
185+
} catch {
186+
case e: RejectedExecutionException =>
187+
logWarning("Execution context shutdown before deleting old WriteAheadLogs. " +
188+
"This would not affect recovery correctness.", e)
183189
}
184190
}
185191
}

0 commit comments

Comments
 (0)