-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-18553][CORE][BRANCH-1.6] Fix leak of TaskSetManager following executor loss #16070
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18553][CORE][BRANCH-1.6] Fix leak of TaskSetManager following executor loss #16070
Conversation
…executor loss This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. This PR is opened against branch-2.0, where I first observed this problem, but will also need to be fixed in master, branch-2.1, and branch-1.6 (which I'll do in followup PRs after this fix is reviewed and merged). I added a new unit test to `TaskSchedulerImplSuite`. You can check out this PR as of 25e455e to see the failing test. cc kayousterhout, markhamstra, rxin for review. Author: Josh Rosen <[email protected]> Closes apache#15986 from JoshRosen/fix-leak-following-total-executor-loss.
| case Some(taskSet) => | ||
| if (state == TaskState.LOST) { | ||
| // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, | ||
| // TaskState.LOST is only used by the Mesos fine-grained scheduling mode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch on the comment here!
|
LGTM |
|
Test build #69366 has finished for PR 16070 at commit
|
|
Jenkins, retest this please |
|
Test build #69414 has finished for PR 16070 at commit
|
|
I'm going to merge this to branch-1.6 now. Thanks for reviewing! |
…executor loss ## What changes were proposed in this pull request? _This is the master branch-1.6 version of #15986; the original description follows:_ This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. ## How was this patch tested? I added a new unit test to `TaskSchedulerImplSuite`. /cc kayousterhout and markhamstra, who reviewed #15986. Author: Josh Rosen <[email protected]> Closes #16070 from JoshRosen/fix-leak-following-total-executor-loss-1.6.
…executor loss ## What changes were proposed in this pull request? _This is the master branch-1.6 version of apache#15986; the original description follows:_ This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. ## How was this patch tested? I added a new unit test to `TaskSchedulerImplSuite`. /cc kayousterhout and markhamstra, who reviewed apache#15986. Author: Josh Rosen <[email protected]> Closes apache#16070 from JoshRosen/fix-leak-following-total-executor-loss-1.6.
…executor loss ## What changes were proposed in this pull request? _This is the master branch-1.6 version of apache#15986; the original description follows:_ This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. ## How was this patch tested? I added a new unit test to `TaskSchedulerImplSuite`. /cc kayousterhout and markhamstra, who reviewed apache#15986. Author: Josh Rosen <[email protected]> Closes apache#16070 from JoshRosen/fix-leak-following-total-executor-loss-1.6.
What changes were proposed in this pull request?
This is the master branch-1.6 version of #15986; the original description follows:
This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails.
This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several
ShuffleDependencyinstances that were retained byTaskSetManagers inside the scheduler but were not otherwise referenced. Each of theseTaskSetManagers was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by theTaskSchedulerImpl.taskIdToTaskSetManagermap.Entries are added to the
taskIdToTaskSetManagermap when tasks are launched and are removed inside ofTaskScheduler.statusUpdate(), which is invoked by the scheduler backend while processingStatusUpdatemessages from executors. The problem with this design is that a completely dead executor will never send aStatusUpdate. There is some code instatusUpdatewhich handles tasks that exit with theTaskState.LOSTstate (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. TheexecutorLostandremoveExecutormethods don't appear to perform any cleanup of thetaskId -> *mappings, causing the leaks observed here.This patch's fix is to maintain a
executorId -> running task idmapping so that thesetaskId -> *maps can be properly cleaned up following an executor loss.There are some potential corner-case interactions that I'm concerned about here, especially some details in the comment in
removeExecutor, so I'd appreciate a very careful review of these changes.How was this patch tested?
I added a new unit test to
TaskSchedulerImplSuite./cc @kayousterhout and @markhamstra, who reviewed #15986.