Skip to content

Commit a353262

Browse files
committed
[SPARK-19831][CORE] Use a separate thread to clean up the finished application to avoid the block
1 parent 932196d commit a353262

File tree

1 file changed

+16
-6
lines changed
  • core/src/main/scala/org/apache/spark/deploy/worker

1 file changed

+16
-6
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ private[deploy] class Worker(
6767
private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
6868
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
6969

70+
// A separated thread to clean up for the finished application. Because cleaning up
71+
// may cost much time, and it may block other rpc messages.
72+
private val cleanupApplicationThreadExecutor = ExecutionContext.fromExecutorService(
73+
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-application-thread"))
74+
7075
// For worker and executor IDs
7176
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
7277
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
@@ -577,13 +582,17 @@ private[deploy] class Worker(
577582
val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id)
578583
if (shouldCleanup) {
579584
finishedApps -= id
580-
appDirectories.remove(id).foreach { dirList =>
581-
logInfo(s"Cleaning up local directories for application $id")
582-
dirList.foreach { dir =>
583-
Utils.deleteRecursively(new File(dir))
585+
cleanupApplicationThreadExecutor.submit(new Runnable {
586+
override def run(): Unit = {
587+
appDirectories.remove(id).foreach { dirList =>
588+
logInfo(s"Cleaning up local directories for application $id")
589+
dirList.foreach { dir =>
590+
Utils.deleteRecursively(new File(dir))
591+
}
592+
}
593+
shuffleService.applicationRemoved(id)
584594
}
585-
}
586-
shuffleService.applicationRemoved(id)
595+
})
587596
}
588597
}
589598

@@ -606,6 +615,7 @@ private[deploy] class Worker(
606615

607616
override def onStop() {
608617
cleanupThreadExecutor.shutdownNow()
618+
cleanupApplicationThreadExecutor.shutdown()
609619
metricsSystem.report()
610620
cancelLastRegistrationRetry()
611621
forwordMessageScheduler.shutdownNow()

0 commit comments

Comments
 (0)