-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight #32114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
It turns out "on-disk storage" ( However, the same encryption test passes on Github. JVM exits while dynamically loading Lesson learnt: always perform a final sanity test run using |
I had You could check if the |
|
I just realized I can now re-run the checks in my personal fork, instead of pushing empty commits. |
a04d352 to
01b8ae4
Compare
|
cc @Ngone51 FYI |
|
I fixed a typo in the title and description: registerations => registrations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason for such a high cache size ?
Also, expire after some time ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The high cache size is to ensure the fix works for a large enough job with 30000 executors.
Sure, does an expiry of 10min (or larger) sounds good?
This should give the executor long enough to process StopExecutor (in-flight) message and complete the shutdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout should be modeled based on what is the max expected delay for heartbeat to come in from executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think another possible solution is to extend the BlockManagerInfo with the timestamp of the removing. So modelling the removing as a new state and this way we could avoid using this separate cache completely and all the bm related data would be in the same place.
Of course in this case you should implement the cleanup.
For example it could be just a simple Long var which is 0 by default which means the BlockManager is alive/active (this special value can be hidden behind a method of BlockMangerInfo like isAlive(currentTs)). The cleanup would triggered for delay plus some extra time to avoid too frequent iteration on the blockManagerInfo collection.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this a bit more but haven't checked the code yet: Is it possible to separate driver commanded intentionally removed executors from unintentional executor loss?
Intentionally removed executors shouldn't be re-registered.
cc @Ngone51
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When CoarseGrainedSchedulerBackend receives RemoveExecutor, it has the ExecutorLossReason. However, after processing the message, when it publishes SparkListenerExecutorRemoved, the reason is passed in form of a string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, on RemoveExecutor, we remove the corresponding BlockManagerInfo from blockManagerInfo map...
As per #32114 (comment), I don't think there would be a BlockManagerMessages.RemoveExecutor raised in this PR case.
Could you point out on which code path that the BlockManagerMessages.RemoveExecutor is raised?
If there's no more code path raises the BlockManagerMessages.RemoveExecutor in this PR case, then @attilapiros definitely works. But, I'd also suggest another idea in #32114 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to separate driver commanded intentionally removed executors from unintentional executor loss?
@attilapiros It's possible to know that an executor is removed intentionally by the driver or not. The problem is, currently, the executor info is stored in many different places. So you have to update many methods or messages to add the isIntentional filed (for example), to let all the components know and make certain decisions on it, which could be miscellaneous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a code path which will break @attilapiros's proposed solution since the following holds true.
However, we will have to abstract blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo].
Currently, ...
Let's consider the following set of events:
- A
CoarseGrainedClusterMessage.RemoveExecutoris issued CoarseGrainedSchedulerBackendissues asyncStopExecutoronexecutorEndpointand then invokesexecutorLostonTaskSchedulerImpl
spark/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
Line 430 in e609395
scheduler.executorLost(executorId, lossReason) - TaskSchedulerImpl in its
executorLostinvokesdagScheduler.executorLost
spark/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
Lines 998 to 1001 in e609395
if (failedExecutor.isDefined) { dagScheduler.executorLost(failedExecutor.get, reason) backend.reviveOffers() } DAGSchedulerwhile handling executorLost invokesremoveExecutorAndUnregisterOutputswhich internally invokesblockManagerMaster.removeExecutor(execId)(as you pointed out in your comment below) which further clearsblockManagerIdfromblockManagerInfoinBlockManagerMasterEndpoint
removeExecutorAndUnregisterOutputs( - The Executor has not yet processed
StopExecutor - Executor reports its Heartbeat
HeartbeatReceiverinvokesscheduler.executorHeartbeatReceivedto check if the BlockManager on the executor requires re-registration
val unknownExecutor = !scheduler.executorHeartbeatReceived( TashSchedulerImpldelegates this toDAGScheduler
dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId, DAGSchedulerasksBlockManagerMasterHeartbeatEndpointif it knows the BlockManager
blockManagerMaster.driverHeartbeatEndPoint.askSync[Boolean]( BlockManagerMasterHeartbeatEndpointreturns false since it cannot findblockManagerIdinBlockManagerInfoindicating the blockManager should re-register
spark/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterHeartbeatEndpoint.scala
Line 51 in e609395
if (!blockManagerInfo.contains(blockManagerId)) { - BlockManager re-registers which publishes the
SparkListenerBlockManagerAddedcausing the inconsistent book-keeping inAppStatusStore - Executor processes
StopExecutorand exits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the cache into BlockManagerMasterEndpoint and hide the impl detail ?
We can have a cleaner interface here ... BlockManagerMasterHeartbeatEndpoint simply needs a way to validate if an executor was recently removed - does not need to know if it was a Cache/Set/etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
The same comment applies to already present blockManagerInfo map as well.
I could refactor that as well. I was thinking of creating a BlockMangerEndpointSharedState class which contains blockManagerInfo and recentlyRemovedExecutors. The class would expose corresponding methods for lookup and updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: for just checking whether an object is null you do not need to build an Option instance (Option is good when when you pass the object to another method to emphasize it can be null).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: same here (Option is not needed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey guys, I'd like to propose a simpler (but might be a little bit ticky) fix if I understand the issue correctly. The idea is,
Instead of removing the executor directly, we set executorLastSeen(executorId) = -1L when we receives ExecutorRemoved in HeartbeatReceiver. And then,
-
if
Heartbeatcomes first beforeExpireDeadHosts, we remove the executor fromexecutorLastSeenby checking the value "< 0" and avoid the re-register. -
if
ExpireDeadHostscomes first beforeHeartbeat, we setexecutorLastSeen(executorId) = -2L. We can't remove it this time inExpireDeadHostsbecause ifHeartbeatcomes later we'd have the same issue again.
2.1 if Heartbeat comes later, we remove the executor from executorLastSeen by checking the value "< 0" too and also avoid the re-register.
2.2 if Heartbeat doesn't come (that means the executor stopped before sending the heartbeat), we remove the executor from executorLastSeen by checking the value = -2L in next ExpireDeadHosts.
In this way, we can avoid the extra cache and all changes should be limited to HeartbeatReceiver.
Any thoughts?
|
In essence, if I understood correctly, we are adding a
Did I miss anything ? I am fine with this approach. |
Thanks for the comment @Ngone51 .
|
Thanks for the comment @mridulm . I believe @attilapiros suggestion would take care of both cases where re-registration is trigger without introducing another Cache of |
|
I am getting a little confused between PR description and the subsequent discussion. An expiration of executor from heartbeat master not only sends a My understanding was, there is a race here between cluster manager notifying application (after killing executor) and the executor heartbeat/blockmanager re-registration : which ends up causing a dead executor to be marked live indefinitely. Is this the only case we are addressing ? Or are there any other paths that are impacted ? (@Ngone51 Not sure if standalone has nuances that I am missing here). |
|
Standalone should be the same. @mridulm |
You're right. I followed the PR description only so I thought I checked the code and surprisingly find that we don't remove
If that's the case (it seems not correct but exits for a long time already), I think posting the spark/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala Lines 531 to 562 in ee7d838
|
@Ngone51 I believe moving Thanks! |
|
I was wondering what is policy for merging the latest changes to a upstream-dev-branch?
|
I tried this and the issue still exists. When we apply the sequence of events mentioned in #32114, the issue surfaces again. |
Rebase is recommended whenever it's possible. @sumeetgajjar |
I see. I think I missed the code path of Thanks for the experiment. |
|
So I think the solution now would be: For the heartbeat, using #32114 (review). For the In WDYT? |
@Ngone51 Thank you for this suggestion, I understand the solution, however, I believe this might be slight complex to keep track of things since the cleanup/removal is triggered from a different component i.e. I spoke to @attilapiros offline regarding his solution and it seems he missed to mention one thing that spark/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala Line 344 in 1b609c7
Insterad, the cleanup thread would take care of removal. This will keep the whole logic in the same component i.e. I will proceed with @attilapiros proposal where we model BlockManager removal as a new state, run some tests and update more. |
…oveRdd ### What changes were proposed in this pull request? In `BlockManagerMasterEndpoint` for the disk persisted RDDs (when `spark.shuffle.service.fetch.rdd.enable` is enabled) we are keeping track the block status entries by external shuffle service instances (so on YARN we are basically keeping them by nodes). This is the `blockStatusByShuffleService` member val. And when all the RDD blocks are removed for one external shuffle service instance then the key and the empty map can be removed from `blockStatusByShuffleService`. ### Why are the changes needed? It is a small leak and I was asked to take care of it in apache/spark#32114 (comment). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually by adding a temporary log line to check `blockStatusByShuffleService` value before and after the `removeRdd` and run the `SPARK-25888: using external shuffle service fetching disk persisted blocks` test in `ExternalShuffleServiceSuite`. Closes #32790 from attilapiros/SPARK-35543. Authored-by: attilapiros <[email protected]> Signed-off-by: attilapiros <[email protected]>
|
I just realized this bug does cause the real problem when working in conjunction with #24533. Basically, the re-registration issue leads to the driver thinks an executor is alive while it's actually dead, which in turn causes the client to retry the block on the dead executor, while it shouldn't. Could you @sumeetgajjar backport this fix to 3.1/3.0 as well? |
@Ngone51, sure I will backport it to 3.1 and 3.0 as well. |
|
SPARK-34949 should also be backported to close any gaps. P.S. It is already in 3.1 we just need to backport it to 3.0 |
…or msg is in-flight ### What changes were proposed in this pull request? This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight. Here on receiving `StopExecutor` msg, we do not remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, instead we mark it as dead by updating the corresponding `executorRemovalTs`. There's a separate cleanup thread running to periodically remove the stale `BlockManagerInfo` from `blockManangerInfo` map. Now if a recently removed `BlockManager` tries to register, the driver simply ignores it since the `blockManagerInfo` map already contains an entry for it. The same applies to `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, the `blockManagerInfo` map would contain an entry and we shall not ask the corresponding `BlockManager` to re-register. ### Why are the changes needed? This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark. Consider the following scenario: - `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint - `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`. - Executor has still not processed `StopExecutor` from the Driver - Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)` - `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus` - Executor starts processing the `StopExecutor` and exits - `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore` - `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Modified the existing unittests. - Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached. Closes apache#32114 from sumeetgajjar/SPARK-35011. Authored-by: Sumeet Gajjar <[email protected]> Signed-off-by: yi.wu <[email protected]>
…xecutor msg is in-flight This PR backports #32114 to 3.1 <hr> ### What changes were proposed in this pull request? This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight. Here on receiving `StopExecutor` msg, we do not remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, instead we mark it as dead by updating the corresponding `executorRemovalTs`. There's a separate cleanup thread running to periodically remove the stale `BlockManagerInfo` from `blockManangerInfo` map. Now if a recently removed `BlockManager` tries to register, the driver simply ignores it since the `blockManagerInfo` map already contains an entry for it. The same applies to `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, the `blockManagerInfo` map would contain an entry and we shall not ask the corresponding `BlockManager` to re-register. ### Why are the changes needed? This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark. Consider the following scenario: - `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint - `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`. - Executor has still not processed `StopExecutor` from the Driver - Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)` - `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus` - Executor starts processing the `StopExecutor` and exits - `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore` - `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Modified the existing unittests. - Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached. Closes #33771 from sumeetgajjar/SPARK-35011-br-3.1. Authored-by: Sumeet Gajjar <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…or msg is in-flight This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight. Here on receiving `StopExecutor` msg, we do not remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, instead we mark it as dead by updating the corresponding `executorRemovalTs`. There's a separate cleanup thread running to periodically remove the stale `BlockManagerInfo` from `blockManangerInfo` map. Now if a recently removed `BlockManager` tries to register, the driver simply ignores it since the `blockManagerInfo` map already contains an entry for it. The same applies to `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, the `blockManagerInfo` map would contain an entry and we shall not ask the corresponding `BlockManager` to re-register. This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark. Consider the following scenario: - `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint - `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`. - Executor has still not processed `StopExecutor` from the Driver - Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)` - `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus` - Executor starts processing the `StopExecutor` and exits - `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore` - `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive. No - Modified the existing unittests. - Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached. Closes apache#32114 from sumeetgajjar/SPARK-35011. Authored-by: Sumeet Gajjar <[email protected]> Signed-off-by: yi.wu <[email protected]>
…xecutor msg is in-flight This PR backports #32114 to 3.0 <hr> <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. 7. If you want to add a new configuration, please read the guideline first for naming configurations in 'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'. --> ### What changes were proposed in this pull request? <!-- Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below. 1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers. 2. If you fix some SQL features, you can provide some references of other DBMSes. 3. If there is design documentation, please add the link. 4. If there is a discussion in the mailing list, please add the link. --> This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight. Here on receiving `StopExecutor` msg, we do not remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, instead we mark it as dead by updating the corresponding `executorRemovalTs`. There's a separate cleanup thread running to periodically remove the stale `BlockManagerInfo` from `blockManangerInfo` map. Now if a recently removed `BlockManager` tries to register, the driver simply ignores it since the `blockManagerInfo` map already contains an entry for it. The same applies to `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, the `blockManagerInfo` map would contain an entry and we shall not ask the corresponding `BlockManager` to re-register. ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark. Consider the following scenario: - `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint - `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`. - Executor has still not processed `StopExecutor` from the Driver - Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)` - `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus` - Executor starts processing the `StopExecutor` and exits - `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore` - `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive. ### Does this PR introduce _any_ user-facing change? <!-- Note that it means *any* user-facing change including all aspects such as the documentation fix. If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master. If no, write 'No'. --> No ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. --> - Modified the existing unittests. - Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached. Closes #33782 from sumeetgajjar/SPARK-35011-br-3.0. Authored-by: Sumeet Gajjar <[email protected]> Signed-off-by: yi.wu <[email protected]>
…opExecutor msg is in-flight" This reverts commit b9e53f8. ### What changes were proposed in this pull request? Revert #32114 ### Why are the changes needed? It breaks the expected `BlockManager` re-registration (e.g., heartbeat loss of an active executor) due to deferred removal of `BlockManager`, see the check: https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass existing tests. Closes #33942 from Ngone51/SPARK-36700. Authored-by: yi.wu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…opExecutor msg is in-flight" This reverts commit b9e53f89379c34bde36b9c37471e21f037092749. ### What changes were proposed in this pull request? Revert apache/spark#32114 ### Why are the changes needed? It breaks the expected `BlockManager` re-registration (e.g., heartbeat loss of an active executor) due to deferred removal of `BlockManager`, see the check: https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass existing tests. Closes #33942 from Ngone51/SPARK-36700. Authored-by: yi.wu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…opExecutor msg is in-flight" This reverts commit b9e53f8. ### What changes were proposed in this pull request? Revert #32114 ### Why are the changes needed? It breaks the expected `BlockManager` re-registration (e.g., heartbeat loss of an active executor) due to deferred removal of `BlockManager`, see the check: https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass existing tests. Closes #33959 from Ngone51/revert-35011-3.2. Authored-by: yi.wu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…xecutor msg is in-flight This PR backports apache#32114 to 3.1 <hr> This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight. Here on receiving `StopExecutor` msg, we do not remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, instead we mark it as dead by updating the corresponding `executorRemovalTs`. There's a separate cleanup thread running to periodically remove the stale `BlockManagerInfo` from `blockManangerInfo` map. Now if a recently removed `BlockManager` tries to register, the driver simply ignores it since the `blockManagerInfo` map already contains an entry for it. The same applies to `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, the `blockManagerInfo` map would contain an entry and we shall not ask the corresponding `BlockManager` to re-register. This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark. Consider the following scenario: - `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint - `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`. - Executor has still not processed `StopExecutor` from the Driver - Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)` - `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus` - Executor starts processing the `StopExecutor` and exits - `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore` - `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive. No - Modified the existing unittests. - Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached. Closes apache#33771 from sumeetgajjar/SPARK-35011-br-3.1. Authored-by: Sumeet Gajjar <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…ockManager reregistration ### What changes were proposed in this pull request? Also post the event `SparkListenerExecutorRemoved` when removing an executor, which is known by `BlockManagerMaster` but unknown to `SchedulerBackend`. ### Why are the changes needed? In #32114, it reports an issue that `BlockManagerMaster` could register a `BlockManager` from a dead executor due to reregistration mechanism. The side effect is, the executor will be shown on the UI as an active one, though it's already dead indeed. In #32114, we tried to avoid such reregistration for a to-be-dead executor. However, I just realized that we can actually leave such reregistration alone since `HeartbeatReceiver.expireDeadHosts` should clean up those `BlockManager`s in the end. The problem is, the corresponding executors in UI can't be cleaned along with the `BlockManager`s cleaning. Because executors in UI can only be cleaned by `SparkListenerExecutorRemoved`, while `BlockManager`s cleaning only post `SparkListenerBlockManagerRemoved` (which is ignored by `AppStatusListener`). ### Does this PR introduce _any_ user-facing change? Yes, users would see the false active executor be removed in the end. ### How was this patch tested? Pass existing tests. Closes #34536 from Ngone51/SPARK-35011. Lead-authored-by: wuyi <[email protected]> Co-authored-by: yi.wu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…r has been lost ### What changes were proposed in this pull request? This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend. Along with the major proposal, this PR also includes a few other changes: * Only post `SparkListenerBlockManagerAdded` event when the registration succeeds * Return an "invalid" executor id when the re-registration fails * Do not report all blocks when the re-registration fails ### Why are the changes needed? BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost. Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #38876 from Ngone51/fix-blockmanager-reregister. Authored-by: Yi Wu <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…r has been lost ### What changes were proposed in this pull request? This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend. Along with the major proposal, this PR also includes a few other changes: * Only post `SparkListenerBlockManagerAdded` event when the registration succeeds * Return an "invalid" executor id when the re-registration fails * Do not report all blocks when the re-registration fails ### Why are the changes needed? BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost. Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #38876 from Ngone51/fix-blockmanager-reregister. Authored-by: Yi Wu <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c3f46d5) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…r has been lost ### What changes were proposed in this pull request? This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend. Along with the major proposal, this PR also includes a few other changes: * Only post `SparkListenerBlockManagerAdded` event when the registration succeeds * Return an "invalid" executor id when the re-registration fails * Do not report all blocks when the re-registration fails ### Why are the changes needed? BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost. Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #38876 from Ngone51/fix-blockmanager-reregister. Authored-by: Yi Wu <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c3f46d5) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…r has been lost ### What changes were proposed in this pull request? This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend. Along with the major proposal, this PR also includes a few other changes: * Only post `SparkListenerBlockManagerAdded` event when the registration succeeds * Return an "invalid" executor id when the re-registration fails * Do not report all blocks when the re-registration fails ### Why are the changes needed? BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](apache#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost. Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes apache#38876 from Ngone51/fix-blockmanager-reregister. Authored-by: Yi Wu <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…r has been lost ### What changes were proposed in this pull request? This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend. Along with the major proposal, this PR also includes a few other changes: * Only post `SparkListenerBlockManagerAdded` event when the registration succeeds * Return an "invalid" executor id when the re-registration fails * Do not report all blocks when the re-registration fails ### Why are the changes needed? BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](apache#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost. Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes apache#38876 from Ngone51/fix-blockmanager-reregister. Authored-by: Yi Wu <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c3f46d5) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
What changes were proposed in this pull request?
This patch proposes a fix to prevent triggering BlockManager reregistration while
StopExecutormsg is in-flight.Here on receiving
StopExecutormsg, we do not remove the correspondingBlockManagerInfofromblockManagerInfomap, instead we mark it as dead by updating the correspondingexecutorRemovalTs. There's a separate cleanup thread running to periodically remove the staleBlockManagerInfofromblockManangerInfomap.Now if a recently removed
BlockManagertries to register, the driver simply ignores it since theblockManagerInfomap already contains an entry for it. The same applies toBlockManagerHeartbeat, if the BlockManager belongs to a recently removed executor, theblockManagerInfomap would contain an entry and we shall not ask the correspondingBlockManagerto re-register.Why are the changes needed?
This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark.
Consider the following scenario:
CoarseGrainedSchedulerBackendissues asyncStopExecutoron executorEndpointCoarseGrainedSchedulerBackendremoves that executor from Driver's internal data structures and publishesSparkListenerExecutorRemovedon thelistenerBus.StopExecutorfrom the DriverexecutorIdin its data structures, it responds withHeartbeatResponse(reregisterBlockManager = true)BlockManageron the Executor reregisters with theBlockManagerMasterandSparkListenerBlockManagerAddedis published on thelistenerBusStopExecutorand exitsAppStatusListenerpicks theSparkListenerBlockManagerAddedevent and updatesAppStatusStorestatusTracker.getExecutorInfosrefersAppStatusStoreto get the list of executors which returns the dead executor as alive.Does this PR introduce any user-facing change?
No
How was this patch tested?