Skip to content

Commit cbf14b2

Browse files
committed
A separated thread to clean up the workDir and the finished application
1 parent a353262 commit cbf14b2

File tree

1 file changed

+13
-18
lines changed
  • core/src/main/scala/org/apache/spark/deploy/worker

1 file changed

+13
-18
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,11 @@ private[deploy] class Worker(
6262
private val forwordMessageScheduler =
6363
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
6464

65-
// A separated thread to clean up the workDir. Used to provide the implicit parameter of `Future`
66-
// methods.
65+
// A separated thread to clean up the workDir and the finished application.
66+
// Used to provide the implicit parameter of `Future` methods.
6767
private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
6868
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
6969

70-
// A separated thread to clean up for the finished application. Because cleaning up
71-
// may cost much time, and it may block other rpc messages.
72-
private val cleanupApplicationThreadExecutor = ExecutionContext.fromExecutorService(
73-
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-application-thread"))
74-
7570
// For worker and executor IDs
7671
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
7772
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
@@ -582,17 +577,18 @@ private[deploy] class Worker(
582577
val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id)
583578
if (shouldCleanup) {
584579
finishedApps -= id
585-
cleanupApplicationThreadExecutor.submit(new Runnable {
586-
override def run(): Unit = {
587-
appDirectories.remove(id).foreach { dirList =>
588-
logInfo(s"Cleaning up local directories for application $id")
589-
dirList.foreach { dir =>
590-
Utils.deleteRecursively(new File(dir))
591-
}
580+
appDirectories.remove(id).foreach { dirList =>
581+
concurrent.Future {
582+
logInfo(s"Cleaning up local directories for application $id")
583+
dirList.foreach { dir =>
584+
Utils.deleteRecursively(new File(dir))
592585
}
593-
shuffleService.applicationRemoved(id)
594-
}
595-
})
586+
}(cleanupThreadExecutor).onFailure {
587+
case e: Throwable =>
588+
logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
589+
}(cleanupThreadExecutor)
590+
}
591+
shuffleService.applicationRemoved(id)
596592
}
597593
}
598594

@@ -615,7 +611,6 @@ private[deploy] class Worker(
615611

616612
override def onStop() {
617613
cleanupThreadExecutor.shutdownNow()
618-
cleanupApplicationThreadExecutor.shutdown()
619614
metricsSystem.report()
620615
cancelLastRegistrationRetry()
621616
forwordMessageScheduler.shutdownNow()

0 commit comments

Comments
 (0)