Skip to content

Commit 79ea014

Browse files
sumeetgajjardongjoon-hyun
authored andcommitted
[SPARK-35011][CORE][3.1] Avoid Block Manager registrations when StopExecutor msg is in-flight
This PR backports #32114 to 3.1 <hr> ### What changes were proposed in this pull request? This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight. Here on receiving `StopExecutor` msg, we do not remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, instead we mark it as dead by updating the corresponding `executorRemovalTs`. There's a separate cleanup thread running to periodically remove the stale `BlockManagerInfo` from `blockManangerInfo` map. Now if a recently removed `BlockManager` tries to register, the driver simply ignores it since the `blockManagerInfo` map already contains an entry for it. The same applies to `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, the `blockManagerInfo` map would contain an entry and we shall not ask the corresponding `BlockManager` to re-register. ### Why are the changes needed? This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark. Consider the following scenario: - `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint - `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`. - Executor has still not processed `StopExecutor` from the Driver - Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)` - `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus` - Executor starts processing the `StopExecutor` and exits - `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore` - `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Modified the existing unittests. - Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached. Closes #33771 from sumeetgajjar/SPARK-35011-br-3.1. Authored-by: Sumeet Gajjar <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 31d771d commit 79ea014

File tree

4 files changed

+109
-40
lines changed

4 files changed

+109
-40
lines changed

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
8080
// executor ID -> timestamp of when the last heartbeat from this executor was received
8181
private val executorLastSeen = new HashMap[String, Long]
8282

83-
private val executorTimeoutMs = sc.conf.get(
84-
config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
85-
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
83+
private val executorTimeoutMs = Utils.executorTimeoutMs(sc.conf)
8684

8785
private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
8886

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 85 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ class BlockManagerMasterEndpoint(
9696
mapper
9797
}
9898

99+
private val executorTimeoutMs = Utils.executorTimeoutMs(conf)
100+
private val blockManagerInfoCleaner = {
101+
val cleaningDelay = Math.floorDiv(executorTimeoutMs, 2L)
102+
val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("blockManagerInfo-cleaner")
103+
executor.scheduleWithFixedDelay(() => cleanBlockManagerInfo(), cleaningDelay, cleaningDelay,
104+
TimeUnit.MILLISECONDS)
105+
executor
106+
}
107+
99108
val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE)
100109

101110
val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf)
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
273282
}
274283
}
275284
bmIdsExecutor.foreach { bmId =>
276-
blockManagerInfo.get(bmId).foreach { bmInfo =>
285+
aliveBlockManagerInfo(bmId).foreach { bmInfo =>
277286
bmInfo.removeBlock(blockId)
278287
}
279288
}
280289
}
281-
val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
290+
val removeRddFromExecutorsFutures = allAliveBlockManagerInfos.map { bmInfo =>
282291
bmInfo.storageEndpoint.ask[Int](removeMsg).recover {
283292
// use 0 as default value means no blocks were removed
284293
handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0)
@@ -304,7 +313,7 @@ class BlockManagerMasterEndpoint(
304313
// Nothing to do in the BlockManagerMasterEndpoint data structures
305314
val removeMsg = RemoveShuffle(shuffleId)
306315
Future.sequence(
307-
blockManagerInfo.values.map { bm =>
316+
allAliveBlockManagerInfos.map { bm =>
308317
bm.storageEndpoint.ask[Boolean](removeMsg).recover {
309318
// use false as default value means no shuffle data were removed
310319
handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
@@ -320,7 +329,7 @@ class BlockManagerMasterEndpoint(
320329
*/
321330
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
322331
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
323-
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
332+
val requiredBlockManagers = allAliveBlockManagerInfos.filter { info =>
324333
removeFromDriver || !info.blockManagerId.isDriver
325334
}
326335
val futures = requiredBlockManagers.map { bm =>
@@ -336,13 +345,24 @@ class BlockManagerMasterEndpoint(
336345
private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
337346
val info = blockManagerInfo(blockManagerId)
338347

348+
// SPARK-35011: Not removing info from the blockManagerInfo map, but only setting the removal
349+
// timestamp of the executor in BlockManagerInfo. This info will be removed from
350+
// blockManagerInfo map by the blockManagerInfoCleaner once
351+
// now() - info.executorRemovalTs > executorTimeoutMs.
352+
//
353+
// We are delaying the removal of BlockManagerInfo to avoid a BlockManager reregistration
354+
// while a executor is shutting. This unwanted reregistration causes inconsistent bookkeeping
355+
// of executors in Spark.
356+
// Delaying this removal until blockManagerInfoCleaner decides to remove it ensures
357+
// BlockManagerMasterHeartbeatEndpoint does not ask the BlockManager on a recently removed
358+
// executor to reregister on BlockManagerHeartbeat message.
359+
info.setExecutorRemovalTs()
360+
339361
// Remove the block manager from blockManagerIdByExecutor.
340362
blockManagerIdByExecutor -= blockManagerId.executorId
341363
decommissioningBlockManagerSet.remove(blockManagerId)
342364

343-
// Remove it from blockManagerInfo and remove all the blocks.
344-
blockManagerInfo.remove(blockManagerId)
345-
365+
// remove all the blocks.
346366
val iterator = info.blocks.keySet.iterator
347367
while (iterator.hasNext) {
348368
val blockId = iterator.next
@@ -363,7 +383,7 @@ class BlockManagerMasterEndpoint(
363383
val i = (new Random(blockId.hashCode)).nextInt(locations.size)
364384
val blockLocations = locations.toSeq
365385
val candidateBMId = blockLocations(i)
366-
blockManagerInfo.get(candidateBMId).foreach { bm =>
386+
aliveBlockManagerInfo(candidateBMId).foreach { bm =>
367387
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
368388
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
369389
bm.storageEndpoint.ask[Boolean](replicateMsg)
@@ -399,16 +419,16 @@ class BlockManagerMasterEndpoint(
399419
*/
400420
private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
401421
try {
402-
val info = blockManagerInfo(blockManagerId)
403-
404-
val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
405-
rddBlocks.map { blockId =>
406-
val currentBlockLocations = blockLocations.get(blockId)
407-
val maxReplicas = currentBlockLocations.size + 1
408-
val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
409-
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
410-
replicateMsg
411-
}.toSeq
422+
aliveBlockManagerInfo(blockManagerId).map { info =>
423+
val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
424+
rddBlocks.map { blockId =>
425+
val currentBlockLocations = blockLocations.get(blockId)
426+
val maxReplicas = currentBlockLocations.size + 1
427+
val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
428+
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
429+
replicateMsg
430+
}.toSeq
431+
}.getOrElse(Seq.empty[ReplicateBlock])
412432
} catch {
413433
// If the block manager has already exited, nothing to replicate.
414434
case e: java.util.NoSuchElementException =>
@@ -422,8 +442,7 @@ class BlockManagerMasterEndpoint(
422442
val locations = blockLocations.get(blockId)
423443
if (locations != null) {
424444
locations.foreach { blockManagerId: BlockManagerId =>
425-
val blockManager = blockManagerInfo.get(blockManagerId)
426-
blockManager.foreach { bm =>
445+
aliveBlockManagerInfo(blockManagerId).foreach { bm =>
427446
// Remove the block from the BlockManager.
428447
// Doesn't actually wait for a confirmation and the message might get lost.
429448
// If message loss becomes frequent, we should add retry logic here.
@@ -438,14 +457,14 @@ class BlockManagerMasterEndpoint(
438457

439458
// Return a map from the block manager id to max memory and remaining memory.
440459
private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
441-
blockManagerInfo.map { case(blockManagerId, info) =>
442-
(blockManagerId, (info.maxMem, info.remainingMem))
460+
allAliveBlockManagerInfos.map { info =>
461+
(info.blockManagerId, (info.maxMem, info.remainingMem))
443462
}.toMap
444463
}
445464

446465
private def storageStatus: Array[StorageStatus] = {
447-
blockManagerInfo.map { case (blockManagerId, info) =>
448-
new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
466+
allAliveBlockManagerInfos.map { info =>
467+
new StorageStatus(info.blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
449468
Some(info.maxOffHeapMem), info.blocks.asScala)
450469
}.toArray
451470
}
@@ -467,7 +486,7 @@ class BlockManagerMasterEndpoint(
467486
* Futures to avoid potential deadlocks. This can arise if there exists a block manager
468487
* that is also waiting for this master endpoint's response to a previous message.
469488
*/
470-
blockManagerInfo.values.map { info =>
489+
allAliveBlockManagerInfos.map { info =>
471490
val blockStatusFuture =
472491
if (askStorageEndpoints) {
473492
info.storageEndpoint.ask[Option[BlockStatus]](getBlockStatus)
@@ -491,7 +510,7 @@ class BlockManagerMasterEndpoint(
491510
askStorageEndpoints: Boolean): Future[Seq[BlockId]] = {
492511
val getMatchingBlockIds = GetMatchingBlockIds(filter)
493512
Future.sequence(
494-
blockManagerInfo.values.map { info =>
513+
allAliveBlockManagerInfos.map { info =>
495514
val future =
496515
if (askStorageEndpoints) {
497516
info.storageEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
@@ -557,9 +576,10 @@ class BlockManagerMasterEndpoint(
557576
if (pushBasedShuffleEnabled) {
558577
addMergerLocation(id)
559578
}
579+
580+
listenerBus.post(SparkListenerBlockManagerAdded(time, id,
581+
maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
560582
}
561-
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
562-
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
563583
id
564584
}
565585

@@ -647,7 +667,7 @@ class BlockManagerMasterEndpoint(
647667
if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) {
648668
Option(blockStatusByShuffleService(bmId).get(blockId))
649669
} else {
650-
blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId))
670+
aliveBlockManagerInfo(bmId).flatMap(_.getStatus(blockId))
651671
}
652672
}
653673

@@ -658,8 +678,7 @@ class BlockManagerMasterEndpoint(
658678
// can be used to access this block even when the original executor is already stopped.
659679
loc.host == requesterHost &&
660680
(loc.port == externalShuffleServicePort ||
661-
blockManagerInfo
662-
.get(loc)
681+
aliveBlockManagerInfo(loc)
663682
.flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk))
664683
.getOrElse(false))
665684
}.flatMap { bmId => Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)) }
@@ -676,7 +695,7 @@ class BlockManagerMasterEndpoint(
676695

677696
/** Get the list of the peers of the given block manager */
678697
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
679-
val blockManagerIds = blockManagerInfo.keySet
698+
val blockManagerIds = allAliveBlockManagerInfos.map(_.blockManagerId).toSet
680699
if (blockManagerIds.contains(blockManagerId)) {
681700
blockManagerIds
682701
.filterNot { _.isDriver }
@@ -728,15 +747,35 @@ class BlockManagerMasterEndpoint(
728747
private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
729748
for (
730749
blockManagerId <- blockManagerIdByExecutor.get(executorId);
731-
info <- blockManagerInfo.get(blockManagerId)
750+
info <- aliveBlockManagerInfo(blockManagerId)
732751
) yield {
733752
info.storageEndpoint
734753
}
735754
}
736755

737756
override def onStop(): Unit = {
738757
askThreadPool.shutdownNow()
758+
blockManagerInfoCleaner.shutdownNow()
759+
}
760+
761+
private def cleanBlockManagerInfo(): Unit = {
762+
logDebug("Cleaning blockManagerInfo")
763+
val now = System.currentTimeMillis()
764+
val expiredBmIds = blockManagerInfo.filter { case (_, bmInfo) =>
765+
// bmInfo.executorRemovalTs.get cannot be None when BM is not alive
766+
!bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs
767+
}.keys
768+
expiredBmIds.foreach { bmId =>
769+
logInfo(s"Cleaning expired $bmId from blockManagerInfo")
770+
blockManagerInfo.remove(bmId)
771+
}
739772
}
773+
774+
@inline private def aliveBlockManagerInfo(bmId: BlockManagerId): Option[BlockManagerInfo] =
775+
blockManagerInfo.get(bmId).filter(_.isAlive)
776+
777+
@inline private def allAliveBlockManagerInfos: Iterable[BlockManagerInfo] =
778+
blockManagerInfo.values.filter(_.isAlive)
740779
}
741780

742781
@DeveloperApi
@@ -764,6 +803,7 @@ private[spark] class BlockManagerInfo(
764803

765804
private var _lastSeenMs: Long = timeMs
766805
private var _remainingMem: Long = maxMem
806+
private var _executorRemovalTs: Option[Long] = None
767807

768808
// Mapping from block id to its status.
769809
private val _blocks = new JHashMap[BlockId, BlockStatus]
@@ -878,4 +918,16 @@ private[spark] class BlockManagerInfo(
878918
def clear(): Unit = {
879919
_blocks.clear()
880920
}
921+
922+
def executorRemovalTs: Option[Long] = _executorRemovalTs
923+
924+
def isAlive: Boolean = _executorRemovalTs.isEmpty
925+
926+
def setExecutorRemovalTs(): Unit = {
927+
if (!isAlive) {
928+
logWarning(s"executorRemovalTs is already set to ${_executorRemovalTs.get}")
929+
} else {
930+
_executorRemovalTs = Some(System.currentTimeMillis())
931+
}
932+
}
881933
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3057,6 +3057,13 @@ private[spark] object Utils extends Logging {
30573057
0
30583058
}
30593059
}
3060+
3061+
def executorTimeoutMs(conf: SparkConf): Long = {
3062+
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
3063+
// "milliseconds"
3064+
conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
3065+
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
3066+
}
30603067
}
30613068

30623069
private[util] object CallerContext extends Logging {

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.collection.mutable
2626
import scala.collection.mutable.ArrayBuffer
2727
import scala.concurrent.{Future, TimeoutException}
2828
import scala.concurrent.duration._
29-
import scala.language.implicitConversions
29+
import scala.language.{implicitConversions, postfixOps}
3030
import scala.reflect.ClassTag
3131

3232
import org.apache.commons.lang3.RandomUtils
@@ -101,6 +101,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
101101
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
102102
.set(Network.RPC_ASK_TIMEOUT, "5s")
103103
.set(PUSH_BASED_SHUFFLE_ENABLED, true)
104+
.set(STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key, "5s")
104105
}
105106

106107
private def makeSortShuffleManager(): SortShuffleManager = {
@@ -610,7 +611,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
610611
mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt())
611612
}
612613

613-
test("reregistration on heart beat") {
614+
test("no reregistration on heart beat until executor timeout") {
614615
val store = makeBlockManager(2000)
615616
val a1 = new Array[Byte](400)
616617

@@ -621,10 +622,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
621622

622623
master.removeExecutor(store.blockManagerId.executorId)
623624
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
624-
625625
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
626626
BlockManagerHeartbeat(store.blockManagerId))
627-
assert(reregister)
627+
assert(reregister == false, "master told to re-register")
628+
629+
eventually(timeout(10 seconds), interval(1 seconds)) {
630+
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
631+
BlockManagerHeartbeat(store.blockManagerId))
632+
assert(reregister, "master did not tell to re-register")
633+
}
628634
}
629635

630636
test("reregistration on block update") {
@@ -638,6 +644,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
638644
master.removeExecutor(store.blockManagerId.executorId)
639645
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
640646

647+
eventually(timeout(10 seconds), interval(1 seconds)) {
648+
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
649+
BlockManagerHeartbeat(store.blockManagerId))
650+
assert(reregister, "master did not tell to re-register")
651+
}
652+
641653
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
642654
store.waitForAsyncReregister()
643655

0 commit comments

Comments
 (0)