Skip to content

Conversation

@sumeetgajjar
Copy link
Contributor

@sumeetgajjar sumeetgajjar commented Apr 9, 2021

What changes were proposed in this pull request?

This patch proposes a fix to prevent triggering BlockManager reregistration while StopExecutor msg is in-flight.
Here on receiving StopExecutor msg, we do not remove the corresponding BlockManagerInfo from blockManagerInfo map, instead we mark it as dead by updating the corresponding executorRemovalTs. There's a separate cleanup thread running to periodically remove the stale BlockManagerInfo from blockManangerInfo map.

Now if a recently removed BlockManager tries to register, the driver simply ignores it since the blockManagerInfo map already contains an entry for it. The same applies to BlockManagerHeartbeat, if the BlockManager belongs to a recently removed executor, the blockManagerInfo map would contain an entry and we shall not ask the corresponding BlockManager to re-register.

Why are the changes needed?

This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark.
Consider the following scenario:

  • CoarseGrainedSchedulerBackend issues async StopExecutor on executorEndpoint
  • CoarseGrainedSchedulerBackend removes that executor from Driver's internal data structures and publishes SparkListenerExecutorRemoved on the listenerBus.
  • Executor has still not processed StopExecutor from the Driver
  • Driver receives heartbeat from the Executor, since it cannot find the executorId in its data structures, it responds with HeartbeatResponse(reregisterBlockManager = true)
  • BlockManager on the Executor reregisters with the BlockManagerMaster and SparkListenerBlockManagerAdded is published on the listenerBus
  • Executor starts processing the StopExecutor and exits
  • AppStatusListener picks the SparkListenerBlockManagerAdded event and updates AppStatusStore
  • statusTracker.getExecutorInfos refers AppStatusStore to get the list of executors which returns the dead executor as alive.

Does this PR introduce any user-facing change?

No

How was this patch tested?

  • Modified the existing unittests.
  • Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached.

@sumeetgajjar
Copy link
Contributor Author

sumeetgajjar commented Apr 9, 2021

It turns out "on-disk storage" (encryptionTest) under BlockManagerSuite fails when "encryption = on" on my mac. But Intellij does not report it as a failure since the jvm simply exited. It also does not run the next set of tests under BlockManagerSuite given such scenarios and hence I didn't notice NPE for two of those unrunned tests.

However, the same encryption test passes on Github. JVM exits while dynamically loading org.apache.commons.crypto.random.OpenSslCryptoRandom using commons-crypto on my machine.

Lesson learnt: always perform a final sanity test run using sbt/mvn before concluding the test passes 😋

@sumeetgajjar
Copy link
Contributor Author

It turns out "on-disk storage" (encryptionTest) under BlockManagerSuite fails when "encryption = on" on my mac. But Intellij does not report it as a failure since the jvm simply exited. It also does not run the next set of tests under BlockManagerSuite given such scenarios and hence I didn't notice NPE for two of those unrunned tests.

However, the same encryption test passes on Github. JVM exits while dynamically loading org.apache.commons.crypto.random.OpenSslCryptoRandom using commons-crypto on my machine.

Lesson learned: always perform a final sanity test run using sbt/mvn before concluding the test passes 😋

I had [email protected] installed on my mac (Catalina 10.15.7), however, the corresponding shared libs libcrypto.1.1.dylib and libssl.1.1.dylib were missing from my /usr/local/lib dir.
Running the following commands solved the issue and I was able to run the encryptionTest.

cd /usr/local/Cellar/[email protected]/1.1.1k/lib
cp libssl.1.1.dylib libcrypto.1.1.dylib /usr/local/lib
cd /usr/local/lib
ln -s libcrypto.1.1.dylib libcrypto.dylib
ln -s libssl.1.1.dylib libssl.dylib 

You could check if the commons-crypto is using the correct openssl shared libs by running the following command

java -jar Desktop/commons-crypto-1.1.0/commons-crypto-1.1.0.jar Crypto
Apache Commons Crypto 1.1.0
Native code loaded OK: 1.1.0
Native name: Apache Commons Crypto
Native built: Aug 28 2020
OpenSSL library loaded OK, version: 0x101010bf
OpenSSL library info: OpenSSL 1.1.1k  25 Mar 2021
Random instance created OK: org.apache.commons.crypto.random.OpenSslCryptoRandom@2a84aee7
Cipher AES/CTR/NoPadding instance created OK: org.apache.commons.crypto.cipher.OpenSslCipher@1fb3ebeb
Additional OpenSSL_version(n) details:
1: compiler: clang -fPIC -arch x86_64 -O3 -Wall -DL_ENDIAN -DOPENSSL_PIC -DOPENSSL_CPUID_OBJ -DOPENSSL_IA32_SSE2 -DOPENSSL_BN_ASM_MONT -DOPENSSL_BN_ASM_MONT5 -DOPENSSL_BN_ASM_GF2m -DSHA1_ASM -DSHA256_ASM -DSHA512_ASM -DKECCAK1600_ASM -DRC4_ASM -DMD5_ASM -DAESNI_ASM -DVPAES_ASM -DGHASH_ASM -DECP_NISTZ256_ASM -DX25519_ASM -DPOLY1305_ASM -D_REENTRANT -DNDEBUG
2: built on: Thu Mar 25 21:01:02 2021 UTC
3: platform: darwin64-x86_64-cc
4: OPENSSLDIR: "/usr/local/etc/[email protected]"
5: ENGINESDIR: "/usr/local/Cellar/[email protected]/1.1.1k/lib/engines-1.1"

@sumeetgajjar
Copy link
Contributor Author

I just realized I can now re-run the checks in my personal fork, instead of pushing empty commits.

@sumeetgajjar sumeetgajjar force-pushed the SPARK-35011 branch 2 times, most recently from a04d352 to 01b8ae4 Compare April 11, 2021 00:01
@HyukjinKwon
Copy link
Member

cc @Ngone51 FYI

@HyukjinKwon HyukjinKwon changed the title [Spark 35011][CORE] Avoid Block Manager registerations when StopExecutor msg is in-flight [SPARK-35011][CORE] Avoid Block Manager registerations when StopExecutor msg is in-flight Apr 11, 2021
@attilapiros attilapiros changed the title [SPARK-35011][CORE] Avoid Block Manager registerations when StopExecutor msg is in-flight [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight Apr 11, 2021
@attilapiros
Copy link
Contributor

I fixed a typo in the title and description: registerations => registrations.
But I will review this properly only on next week.

Copy link
Contributor

@mridulm mridulm Apr 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason for such a high cache size ?
Also, expire after some time ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The high cache size is to ensure the fix works for a large enough job with 30000 executors.

Sure, does an expiry of 10min (or larger) sounds good?
This should give the executor long enough to process StopExecutor (in-flight) message and complete the shutdown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout should be modeled based on what is the max expected delay for heartbeat to come in from executor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another possible solution is to extend the BlockManagerInfo with the timestamp of the removing. So modelling the removing as a new state and this way we could avoid using this separate cache completely and all the bm related data would be in the same place.

Of course in this case you should implement the cleanup.

For example it could be just a simple Long var which is 0 by default which means the BlockManager is alive/active (this special value can be hidden behind a method of BlockMangerInfo like isAlive(currentTs)). The cleanup would triggered for delay plus some extra time to avoid too frequent iteration on the blockManagerInfo collection.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this a bit more but haven't checked the code yet: Is it possible to separate driver commanded intentionally removed executors from unintentional executor loss?

Intentionally removed executors shouldn't be re-registered.

cc @Ngone51

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When CoarseGrainedSchedulerBackend receives RemoveExecutor, it has the ExecutorLossReason. However, after processing the message, when it publishes SparkListenerExecutorRemoved, the reason is passed in form of a string.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, on RemoveExecutor, we remove the corresponding BlockManagerInfo from blockManagerInfo map...

As per #32114 (comment), I don't think there would be a BlockManagerMessages.RemoveExecutor raised in this PR case.

Could you point out on which code path that the BlockManagerMessages.RemoveExecutor is raised?

If there's no more code path raises the BlockManagerMessages.RemoveExecutor in this PR case, then @attilapiros definitely works. But, I'd also suggest another idea in #32114 (comment).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to separate driver commanded intentionally removed executors from unintentional executor loss?

@attilapiros It's possible to know that an executor is removed intentionally by the driver or not. The problem is, currently, the executor info is stored in many different places. So you have to update many methods or messages to add the isIntentional filed (for example), to let all the components know and make certain decisions on it, which could be miscellaneous.

Copy link
Contributor Author

@sumeetgajjar sumeetgajjar Apr 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, @Ngone51 as pointed out in #32114, BlockManagerMessages.RemoveExecutor is raised in limited cases. I could not find any more code path apart from the one you pointed.

Redacting this due to new findings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a code path which will break @attilapiros's proposed solution since the following holds true.

However, we will have to abstract blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo].
Currently, ...

Let's consider the following set of events:

Copy link
Contributor

@mridulm mridulm Apr 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the cache into BlockManagerMasterEndpoint and hide the impl detail ?
We can have a cleaner interface here ... BlockManagerMasterHeartbeatEndpoint simply needs a way to validate if an executor was recently removed - does not need to know if it was a Cache/Set/etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.
The same comment applies to already present blockManagerInfo map as well.
I could refactor that as well. I was thinking of creating a BlockMangerEndpointSharedState class which contains blockManagerInfo and recentlyRemovedExecutors. The class would expose corresponding methods for lookup and updates.

Copy link
Contributor

@attilapiros attilapiros Apr 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: for just checking whether an object is null you do not need to build an Option instance (Option is good when when you pass the object to another method to emphasize it can be null).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: same here (Option is not needed)

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey guys, I'd like to propose a simpler (but might be a little bit ticky) fix if I understand the issue correctly. The idea is,

Instead of removing the executor directly, we set executorLastSeen(executorId) = -1L when we receives ExecutorRemoved in HeartbeatReceiver. And then,

  1. if Heartbeat comes first before ExpireDeadHosts, we remove the executor from executorLastSeen by checking the value "< 0" and avoid the re-register.

  2. if ExpireDeadHosts comes first before Heartbeat, we set executorLastSeen(executorId) = -2L. We can't remove it this time in ExpireDeadHosts because if Heartbeat comes later we'd have the same issue again.

2.1 if Heartbeat comes later, we remove the executor from executorLastSeen by checking the value "< 0" too and also avoid the re-register.

2.2 if Heartbeat doesn't come (that means the executor stopped before sending the heartbeat), we remove the executor from executorLastSeen by checking the value = -2L in next ExpireDeadHosts.

In this way, we can avoid the extra cache and all changes should be limited to HeartbeatReceiver.

Any thoughts?

@mridulm
Copy link
Contributor

mridulm commented Apr 13, 2021

In essence, if I understood correctly, we are adding a lostExecutorCandidates:Map[String, ExpirationState] ?

  • If we detect a request to expire an executor comes in - then expire based on (some) policy : timeout since initial expiry/number of expirations/other reasons : else add/update expiration state of candidate.
  • If heartbeat comes in, then remove from candidate set.
  • If explicit remove, then remove from both executorLastSeen and lostExecutorCandidates:Set.

Did I miss anything ? I am fine with this approach.
(I explicitly pulled out magic values out for explanation clarity)

@sumeetgajjar
Copy link
Contributor Author

Hey guys, I'd like to propose a simpler (but might be a little bit ticky) fix if I understand the issue correctly. The idea is,

Instead of removing the executor directly, we set executorLastSeen(executorId) = -1L when we receives ExecutorRemoved in HeartbeatReceiver. And then,

1. if `Heartbeat` comes first before `ExpireDeadHosts`, we remove the executor from `executorLastSeen` by checking the value "< 0" and avoid the re-register.

2. if `ExpireDeadHosts` comes first before `Heartbeat`,  we set `executorLastSeen(executorId) = -2L`. We can't remove it this time in `ExpireDeadHosts` because if `Heartbeat` comes later we'd have the same issue again.

2.1 if Heartbeat comes later, we remove the executor from executorLastSeen by checking the value "< 0" too and also avoid the re-register.

2.2 if Heartbeat doesn't come (that means the executor stopped before sending the heartbeat), we remove the executor from executorLastSeen by checking the value = -2L in next ExpireDeadHosts.

In this way, we can avoid the extra cache and all changes should be limited to HeartbeatReceiver.

Any thoughts?

Thanks for the comment @Ngone51 .
There are two places from where the re-registration can be triggered

  • From HeartbeatReceiver - by responding HeartbeatResponse(reregisterBlockManager = true).
    Your solution will take care of this.
  • From BlockManager - e.g. reportBlockStatus.
    Just modifying HeartbeatReceiver won't solve the re-registration issue here. We will also have to implement a similar kind of tracking inside BlockManagerMasterEndpoint. And now since both tracking are independent of each other, it might introduce some race condition (please correct me if I am wrong).

@sumeetgajjar
Copy link
Contributor Author

In essence, if I understood correctly, we are adding a lostExecutorCandidates:Map[String, ExpirationState] ?

* If we detect a request to expire an executor comes in - then expire based on (some) policy : timeout since initial expiry/number of expirations/other reasons : else add/update expiration state of candidate.

* If heartbeat comes in, then remove from candidate set.

* If explicit remove, then remove from both `executorLastSeen` and `lostExecutorCandidates:Set`.

Did I miss anything ? I am fine with this approach.
(I explicitly pulled out magic values out for explanation clarity)

Thanks for the comment @mridulm .
I believe this comment applies here as well.

I believe @attilapiros suggestion would take care of both cases where re-registration is trigger without introducing another Cache of recentlyRemovedExecutors.

@mridulm
Copy link
Contributor

mridulm commented Apr 13, 2021

I am getting a little confused between PR description and the subsequent discussion.
What exactly is the behavior we are trying to converge towards/address ?

An expiration of executor from heartbeat master not only sends a StopExecutor to voluntarily get executor to exit, but also gets the cluster manager to force termination (in case of MIA/hung executor). So in steady state, once transitionary/overlapping updates are done, the executor should be gone according to driver.

My understanding was, there is a race here between cluster manager notifying application (after killing executor) and the executor heartbeat/blockmanager re-registration : which ends up causing a dead executor to be marked live indefinitely.

Is this the only case we are addressing ? Or are there any other paths that are impacted ?

(@Ngone51 Not sure if standalone has nuances that I am missing here).

@Ngone51
Copy link
Member

Ngone51 commented Apr 14, 2021

Standalone should be the same. @mridulm

@Ngone51
Copy link
Member

Ngone51 commented Apr 14, 2021

From BlockManager - e.g. reportBlockStatus.
Just modifying HeartbeatReceiver won't solve the re-registration issue here. We will also have to implement a similar kind of tracking inside BlockManagerMasterEndpoint. And now since both tracking are independent of each other, it might introduce some race condition (please correct me if I am wrong).

You're right. I followed the PR description only so I thought HeartbeatReceiver is the only problematic place.

I checked the code and surprisingly find that we don't remove BlockManager when we remove an executor. And removing BlockManager happens in few cases only,

If that's the case (it seems not correct but exits for a long time already), I think posting the SparkListenerBlockManagerAdded inside the if (!blockManagerInfo.contains(id)) would be enough for the whole fix?

if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(oldId) =>
// A block manager of the same executor already exists, so remove it (assumed dead)
logError("Got two different block manager registrations on same executor - "
+ s" will replace old one $oldId with new one $id")
removeExecutor(id.executorId)
case None =>
}
logInfo("Registering block manager %s with %s RAM, %s".format(
id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))
blockManagerIdByExecutor(id.executorId) = id
val externalShuffleServiceBlockStatus =
if (externalShuffleServiceRddFetchEnabled) {
val externalShuffleServiceBlocks = blockStatusByShuffleService
.getOrElseUpdate(externalShuffleServiceIdOnHost(id), new JHashMap[BlockId, BlockStatus])
Some(externalShuffleServiceBlocks)
} else {
None
}
blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint, externalShuffleServiceBlockStatus)
if (pushBasedShuffleEnabled) {
addMergerLocation(id)
}
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))

@sumeetgajjar
Copy link
Contributor Author

If that's the case (it seems not correct but exits for a long time already), I think posting the SparkListenerBlockManagerAdded inside the if (!blockManagerInfo.contains(id)) would be enough for the whole fix?

@Ngone51 I believe moving SparkListenerBlockManagerAdded inside the if-loop should be enough.
I will give it a try and check if any other cases surface.

Thanks!

@sumeetgajjar
Copy link
Contributor Author

I was wondering what is policy for merging the latest changes to a upstream-dev-branch?
For e.g. I have a branch SPARK-35011 which contains my changes, it is behind upstream/master

  • Should I be rebasing my commits on the new changes from upstream/master ?

    • current branch is SPARK-35011 -- git pull upstream master --rebase
    • In this case when I push my changes to PR, I will require a --force push since the commit ordering is now changed.
  • Or should I merge the upstream/master into SPARK-35011?

    • current branch is SPARK-35011 -- git merge upstream/master
    • No force push required, however my commits are too far behind in the history.

@sumeetgajjar
Copy link
Contributor Author

If that's the case (it seems not correct but exits for a long time already), I think posting the SparkListenerBlockManagerAdded inside the if (!blockManagerInfo.contains(id)) would be enough for the whole fix?

@Ngone51 I believe moving SparkListenerBlockManagerAdded inside the if-loop should be enough.
I will give it a try and check if any other cases surface.

Thanks!

I tried this and the issue still exists. When we apply the sequence of events mentioned in #32114, the issue surfaces again.

@Ngone51
Copy link
Member

Ngone51 commented Apr 23, 2021

Should I be rebasing my commits on the new changes from upstream/master ?

Rebase is recommended whenever it's possible. @sumeetgajjar

@Ngone51
Copy link
Member

Ngone51 commented Apr 23, 2021

I tried this and the issue still exists. When we apply the sequence of events mentioned in #32114, the issue surfaces again.

I see. I think I missed the code path of scheduler.executorLost(executorId, lossReason).

Thanks for the experiment.

@Ngone51
Copy link
Member

Ngone51 commented Apr 24, 2021

So I think the solution now would be:

For the heartbeat, using #32114 (review).

For theBlokmanager, adding a BlockManagerEndpointSharedState (as mentioned by @sumeetgajjar in #32114 (comment)) for both BlockManagerMasterEndpoint and BlockManagerMasterHeartbeatEndpoint. It's true that if we only adding a removal state to the blockManagerInfo, we have to filter the removed blockmanagers first before traversing it (e.g., we won't expect to return a removed blockmanager in getPeers).

In BlockManagerEndpointSharedState, we'd have both activeBlockManagerInfo and the removedBlockManagerInfo. We don't have to set up a new cleaner to clear the removedBlockManagerInfo. Instead, we can reuse the fix of HeartbeatReceiver as whenever there's a sure removal in HeartbeatReceiver, we can send a removal request to BlockManagerEndpointSharedState as well by following the code path of !scheduler.executorHeartbeatReceived(e.g., we could have scheduler.clearBlockManagerInfo similarly).

WDYT?

@sumeetgajjar
Copy link
Contributor Author

So I think the solution now would be:

For the heartbeat, using #32114 (review).

For theBlokmanager, adding a BlockManagerEndpointSharedState (as mentioned by @sumeetgajjar in #32114 (comment)) for both BlockManagerMasterEndpoint and BlockManagerMasterHeartbeatEndpoint. It's true that if we only adding a removal state to the blockManagerInfo, we have to filter the removed blockmanagers first before traversing it (e.g., we won't expect to return a removed blockmanager in getPeers).

In BlockManagerEndpointSharedState, we'd have both activeBlockManagerInfo and the removedBlockManagerInfo. We don't have to set up a new cleaner to clear the removedBlockManagerInfo. Instead, we can reuse the fix of HeartbeatReceiver as whenever there's a sure removal in HeartbeatReceiver, we can send a removal request to BlockManagerEndpointSharedState as well by following the code path of !scheduler.executorHeartbeatReceived(e.g., we could have scheduler.clearBlockManagerInfo similarly).

WDYT?

@Ngone51 Thank you for this suggestion, I understand the solution, however, I believe this might be slight complex to keep track of things since the cleanup/removal is triggered from a different component i.e. HeartbeatReceiver.

I spoke to @attilapiros offline regarding his solution and it seems he missed to mention one thing that BlockManagerInfo will not be removed here:

Insterad, the cleanup thread would take care of removal. This will keep the whole logic in the same component i.e. BlockManagerMasterEndpoint and would be easy to track from a code understanding point of view.

I will proceed with @attilapiros proposal where we model BlockManager removal as a new state, run some tests and update more.

a0x8o added a commit to a0x8o/spark that referenced this pull request Jun 7, 2021
…oveRdd

### What changes were proposed in this pull request?

In `BlockManagerMasterEndpoint` for the disk persisted RDDs (when `spark.shuffle.service.fetch.rdd.enable` is enabled) we are keeping track the block status entries by external shuffle service instances (so on YARN we are basically keeping them by nodes). This is the `blockStatusByShuffleService` member val. And when all the RDD blocks are removed for one external shuffle service instance then the key and the empty map can be removed from `blockStatusByShuffleService`.

### Why are the changes needed?

It is a small leak and I was asked to take care of it in apache/spark#32114 (comment).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually by adding a temporary log line to check `blockStatusByShuffleService` value before and after the `removeRdd` and run the `SPARK-25888: using external shuffle service fetching disk persisted blocks` test in `ExternalShuffleServiceSuite`.

Closes #32790 from attilapiros/SPARK-35543.

Authored-by: attilapiros <[email protected]>
Signed-off-by: attilapiros <[email protected]>
@Ngone51
Copy link
Member

Ngone51 commented Aug 17, 2021

I just realized this bug does cause the real problem when working in conjunction with #24533. Basically, the re-registration issue leads to the driver thinks an executor is alive while it's actually dead, which in turn causes the client to retry the block on the dead executor, while it shouldn't. Could you @sumeetgajjar backport this fix to 3.1/3.0 as well?
cc @mridulm @attilapiros

@sumeetgajjar
Copy link
Contributor Author

I just realized this bug does cause the real problem when working in conjunction with #24533. Basically, the re-registration issue leads to the driver thinks an executor is alive while it's actually dead, which in turn causes the client to retry the block on the dead executor, while it shouldn't. Could you @sumeetgajjar backport this fix to 3.1/3.0 as well?
cc @mridulm @attilapiros

@Ngone51, sure I will backport it to 3.1 and 3.0 as well.

@sumeetgajjar
Copy link
Contributor Author

SPARK-34949 should also be backported to close any gaps.

P.S. It is already in 3.1 we just need to backport it to 3.0

sumeetgajjar added a commit to sumeetgajjar/spark that referenced this pull request Aug 17, 2021
…or msg is in-flight

### What changes were proposed in this pull request?

This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight.
Here on receiving `StopExecutor` msg, we do not remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, instead we mark it as dead by updating the corresponding `executorRemovalTs`. There's a separate cleanup thread running to periodically remove the stale `BlockManagerInfo` from `blockManangerInfo` map.

Now if a recently removed `BlockManager` tries to register, the driver simply ignores it since the `blockManagerInfo` map already contains an entry for it. The same applies to `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, the `blockManagerInfo` map would contain an entry and we shall not ask the corresponding `BlockManager` to re-register.

### Why are the changes needed?

This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark.
Consider the following scenario:
- `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint
- `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`.
- Executor has still not processed `StopExecutor` from the Driver
- Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)`
- `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus`
- Executor starts processing the `StopExecutor` and exits
- `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore`
- `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- Modified the existing unittests.
- Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached.

Closes apache#32114 from sumeetgajjar/SPARK-35011.

Authored-by: Sumeet Gajjar <[email protected]>
Signed-off-by: yi.wu <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Aug 18, 2021
…xecutor msg is in-flight

This PR backports #32114 to 3.1
<hr>

### What changes were proposed in this pull request?

This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight.
Here on receiving `StopExecutor` msg, we do not remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, instead we mark it as dead by updating the corresponding `executorRemovalTs`. There's a separate cleanup thread running to periodically remove the stale `BlockManagerInfo` from `blockManangerInfo` map.

Now if a recently removed `BlockManager` tries to register, the driver simply ignores it since the `blockManagerInfo` map already contains an entry for it. The same applies to `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, the `blockManagerInfo` map would contain an entry and we shall not ask the corresponding `BlockManager` to re-register.

### Why are the changes needed?

This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark.
Consider the following scenario:
- `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint
- `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`.
- Executor has still not processed `StopExecutor` from the Driver
- Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)`
- `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus`
- Executor starts processing the `StopExecutor` and exits
- `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore`
- `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- Modified the existing unittests.
- Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached.

Closes #33771 from sumeetgajjar/SPARK-35011-br-3.1.

Authored-by: Sumeet Gajjar <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
sumeetgajjar added a commit to sumeetgajjar/spark that referenced this pull request Aug 19, 2021
…or msg is in-flight

This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight.
Here on receiving `StopExecutor` msg, we do not remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, instead we mark it as dead by updating the corresponding `executorRemovalTs`. There's a separate cleanup thread running to periodically remove the stale `BlockManagerInfo` from `blockManangerInfo` map.

Now if a recently removed `BlockManager` tries to register, the driver simply ignores it since the `blockManagerInfo` map already contains an entry for it. The same applies to `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, the `blockManagerInfo` map would contain an entry and we shall not ask the corresponding `BlockManager` to re-register.

This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark.
Consider the following scenario:
- `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint
- `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`.
- Executor has still not processed `StopExecutor` from the Driver
- Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)`
- `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus`
- Executor starts processing the `StopExecutor` and exits
- `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore`
- `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive.

No

- Modified the existing unittests.
- Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached.

Closes apache#32114 from sumeetgajjar/SPARK-35011.

Authored-by: Sumeet Gajjar <[email protected]>
Signed-off-by: yi.wu <[email protected]>
Ngone51 pushed a commit that referenced this pull request Aug 20, 2021
…xecutor msg is in-flight

This PR backports #32114 to 3.0
<hr>

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
  7. If you want to add a new configuration, please read the guideline first for naming configurations in
     'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->
This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight.
Here on receiving `StopExecutor` msg, we do not remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, instead we mark it as dead by updating the corresponding `executorRemovalTs`. There's a separate cleanup thread running to periodically remove the stale `BlockManagerInfo` from `blockManangerInfo` map.

Now if a recently removed `BlockManager` tries to register, the driver simply ignores it since the `blockManagerInfo` map already contains an entry for it. The same applies to `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, the `blockManagerInfo` map would contain an entry and we shall not ask the corresponding `BlockManager` to re-register.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->
This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark.
Consider the following scenario:
- `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint
- `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`.
- Executor has still not processed `StopExecutor` from the Driver
- Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)`
- `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus`
- Executor starts processing the `StopExecutor` and exits
- `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore`
- `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive.

### Does this PR introduce _any_ user-facing change?
<!--
Note that it means *any* user-facing change including all aspects such as the documentation fix.
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
If no, write 'No'.
-->
No

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->

- Modified the existing unittests.
- Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached.

Closes #33782 from sumeetgajjar/SPARK-35011-br-3.0.

Authored-by: Sumeet Gajjar <[email protected]>
Signed-off-by: yi.wu <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Sep 10, 2021
…opExecutor msg is in-flight"

This reverts commit b9e53f8.

### What changes were proposed in this pull request?

Revert #32114

### Why are the changes needed?

It breaks the expected `BlockManager` re-registration (e.g., heartbeat loss of an active executor) due to deferred removal of `BlockManager`, see the check:
https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass existing tests.

Closes #33942 from Ngone51/SPARK-36700.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
a0x8o added a commit to a0x8o/spark that referenced this pull request Sep 10, 2021
…opExecutor msg is in-flight"

This reverts commit b9e53f89379c34bde36b9c37471e21f037092749.

### What changes were proposed in this pull request?

Revert apache/spark#32114

### Why are the changes needed?

It breaks the expected `BlockManager` re-registration (e.g., heartbeat loss of an active executor) due to deferred removal of `BlockManager`, see the check:
https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass existing tests.

Closes #33942 from Ngone51/SPARK-36700.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Sep 10, 2021
…opExecutor msg is in-flight"

This reverts commit b9e53f8.

### What changes were proposed in this pull request?

Revert #32114

### Why are the changes needed?

It breaks the expected `BlockManager` re-registration (e.g., heartbeat loss of an active executor) due to deferred removal of `BlockManager`, see the check:
https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass existing tests.

Closes #33959 from Ngone51/revert-35011-3.2.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
…xecutor msg is in-flight

This PR backports apache#32114 to 3.1
<hr>

This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight.
Here on receiving `StopExecutor` msg, we do not remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, instead we mark it as dead by updating the corresponding `executorRemovalTs`. There's a separate cleanup thread running to periodically remove the stale `BlockManagerInfo` from `blockManangerInfo` map.

Now if a recently removed `BlockManager` tries to register, the driver simply ignores it since the `blockManagerInfo` map already contains an entry for it. The same applies to `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, the `blockManagerInfo` map would contain an entry and we shall not ask the corresponding `BlockManager` to re-register.

This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark.
Consider the following scenario:
- `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint
- `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`.
- Executor has still not processed `StopExecutor` from the Driver
- Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)`
- `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus`
- Executor starts processing the `StopExecutor` and exits
- `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore`
- `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive.

No

- Modified the existing unittests.
- Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached.

Closes apache#33771 from sumeetgajjar/SPARK-35011-br-3.1.

Authored-by: Sumeet Gajjar <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Nov 12, 2021
…ockManager reregistration

### What changes were proposed in this pull request?

Also post the event `SparkListenerExecutorRemoved` when removing an executor, which is known by `BlockManagerMaster` but unknown to `SchedulerBackend`.

### Why are the changes needed?

In #32114, it reports an issue that `BlockManagerMaster` could register a `BlockManager` from a dead executor due to reregistration mechanism. The side effect is, the executor will be shown on the UI as an active one, though it's already dead indeed.

In #32114, we tried to avoid such reregistration for a to-be-dead executor. However, I just realized that we can actually leave such reregistration alone since `HeartbeatReceiver.expireDeadHosts` should clean up those `BlockManager`s in the end. The problem is, the corresponding executors in UI can't be cleaned along with the `BlockManager`s cleaning. Because executors in UI can only be cleaned by `SparkListenerExecutorRemoved`,
 while `BlockManager`s  cleaning only post `SparkListenerBlockManagerRemoved` (which is ignored by `AppStatusListener`).

### Does this PR introduce _any_ user-facing change?

Yes, users would see the false active executor be removed in the end.

### How was this patch tested?

Pass existing tests.

Closes #34536 from Ngone51/SPARK-35011.

Lead-authored-by: wuyi <[email protected]>
Co-authored-by: yi.wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
mridulm pushed a commit that referenced this pull request Dec 12, 2022
…r has been lost

### What changes were proposed in this pull request?

This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend.

Along with the major proposal, this PR also includes a few other changes:
* Only post `SparkListenerBlockManagerAdded` event when the registration succeeds
* Return an "invalid" executor id when the re-registration fails
* Do not report all blocks when the re-registration fails

### Why are the changes needed?

BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost.

Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #38876 from Ngone51/fix-blockmanager-reregister.

Authored-by: Yi Wu <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
mridulm pushed a commit that referenced this pull request Dec 12, 2022
…r has been lost

### What changes were proposed in this pull request?

This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend.

Along with the major proposal, this PR also includes a few other changes:
* Only post `SparkListenerBlockManagerAdded` event when the registration succeeds
* Return an "invalid" executor id when the re-registration fails
* Do not report all blocks when the re-registration fails

### Why are the changes needed?

BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost.

Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #38876 from Ngone51/fix-blockmanager-reregister.

Authored-by: Yi Wu <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit c3f46d5)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
mridulm pushed a commit that referenced this pull request Dec 12, 2022
…r has been lost

### What changes were proposed in this pull request?

This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend.

Along with the major proposal, this PR also includes a few other changes:
* Only post `SparkListenerBlockManagerAdded` event when the registration succeeds
* Return an "invalid" executor id when the re-registration fails
* Do not report all blocks when the re-registration fails

### Why are the changes needed?

BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost.

Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #38876 from Ngone51/fix-blockmanager-reregister.

Authored-by: Yi Wu <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit c3f46d5)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
…r has been lost

### What changes were proposed in this pull request?

This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend.

Along with the major proposal, this PR also includes a few other changes:
* Only post `SparkListenerBlockManagerAdded` event when the registration succeeds
* Return an "invalid" executor id when the re-registration fails
* Do not report all blocks when the re-registration fails

### Why are the changes needed?

BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](apache#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost.

Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes apache#38876 from Ngone51/fix-blockmanager-reregister.

Authored-by: Yi Wu <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…r has been lost

### What changes were proposed in this pull request?

This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend.

Along with the major proposal, this PR also includes a few other changes:
* Only post `SparkListenerBlockManagerAdded` event when the registration succeeds
* Return an "invalid" executor id when the re-registration fails
* Do not report all blocks when the re-registration fails

### Why are the changes needed?

BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](apache#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost.

Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes apache#38876 from Ngone51/fix-blockmanager-reregister.

Authored-by: Yi Wu <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit c3f46d5)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants