Skip to content

Commit dcddf9f

Browse files
committed
Revert "[SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight"
This reverts commit b9e53f8.
1 parent 67421d8 commit dcddf9f

File tree

4 files changed

+40
-109
lines changed

4 files changed

+40
-109
lines changed

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
8080
// executor ID -> timestamp of when the last heartbeat from this executor was received
8181
private val executorLastSeen = new HashMap[String, Long]
8282

83-
private val executorTimeoutMs = Utils.executorTimeoutMs(sc.conf)
83+
private val executorTimeoutMs = sc.conf.get(
84+
config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
85+
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
8486

8587
private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
8688

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 33 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,6 @@ class BlockManagerMasterEndpoint(
9696
mapper
9797
}
9898

99-
private val executorTimeoutMs = Utils.executorTimeoutMs(conf)
100-
private val blockManagerInfoCleaner = {
101-
val cleaningDelay = Math.floorDiv(executorTimeoutMs, 2L)
102-
val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("blockManagerInfo-cleaner")
103-
executor.scheduleWithFixedDelay(() => cleanBlockManagerInfo(), cleaningDelay, cleaningDelay,
104-
TimeUnit.MILLISECONDS)
105-
executor
106-
}
107-
10899
val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE)
109100

110101
val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf)
@@ -282,12 +273,12 @@ class BlockManagerMasterEndpoint(
282273
}
283274
}
284275
bmIdsExecutor.foreach { bmId =>
285-
aliveBlockManagerInfo(bmId).foreach { bmInfo =>
276+
blockManagerInfo.get(bmId).foreach { bmInfo =>
286277
bmInfo.removeBlock(blockId)
287278
}
288279
}
289280
}
290-
val removeRddFromExecutorsFutures = allAliveBlockManagerInfos.map { bmInfo =>
281+
val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
291282
bmInfo.storageEndpoint.ask[Int](removeMsg).recover {
292283
// use 0 as default value means no blocks were removed
293284
handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0)
@@ -314,7 +305,7 @@ class BlockManagerMasterEndpoint(
314305
// Nothing to do in the BlockManagerMasterEndpoint data structures
315306
val removeMsg = RemoveShuffle(shuffleId)
316307
Future.sequence(
317-
allAliveBlockManagerInfos.map { bm =>
308+
blockManagerInfo.values.map { bm =>
318309
bm.storageEndpoint.ask[Boolean](removeMsg).recover {
319310
// use false as default value means no shuffle data were removed
320311
handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
@@ -330,7 +321,7 @@ class BlockManagerMasterEndpoint(
330321
*/
331322
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
332323
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
333-
val requiredBlockManagers = allAliveBlockManagerInfos.filter { info =>
324+
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
334325
removeFromDriver || !info.blockManagerId.isDriver
335326
}
336327
val futures = requiredBlockManagers.map { bm =>
@@ -346,24 +337,13 @@ class BlockManagerMasterEndpoint(
346337
private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
347338
val info = blockManagerInfo(blockManagerId)
348339

349-
// SPARK-35011: Not removing info from the blockManagerInfo map, but only setting the removal
350-
// timestamp of the executor in BlockManagerInfo. This info will be removed from
351-
// blockManagerInfo map by the blockManagerInfoCleaner once
352-
// now() - info.executorRemovalTs > executorTimeoutMs.
353-
//
354-
// We are delaying the removal of BlockManagerInfo to avoid a BlockManager reregistration
355-
// while a executor is shutting. This unwanted reregistration causes inconsistent bookkeeping
356-
// of executors in Spark.
357-
// Delaying this removal until blockManagerInfoCleaner decides to remove it ensures
358-
// BlockManagerMasterHeartbeatEndpoint does not ask the BlockManager on a recently removed
359-
// executor to reregister on BlockManagerHeartbeat message.
360-
info.setExecutorRemovalTs()
361-
362340
// Remove the block manager from blockManagerIdByExecutor.
363341
blockManagerIdByExecutor -= blockManagerId.executorId
364342
decommissioningBlockManagerSet.remove(blockManagerId)
365343

366-
// remove all the blocks.
344+
// Remove it from blockManagerInfo and remove all the blocks.
345+
blockManagerInfo.remove(blockManagerId)
346+
367347
val iterator = info.blocks.keySet.iterator
368348
while (iterator.hasNext) {
369349
val blockId = iterator.next
@@ -384,7 +364,7 @@ class BlockManagerMasterEndpoint(
384364
val i = (new Random(blockId.hashCode)).nextInt(locations.size)
385365
val blockLocations = locations.toSeq
386366
val candidateBMId = blockLocations(i)
387-
aliveBlockManagerInfo(candidateBMId).foreach { bm =>
367+
blockManagerInfo.get(candidateBMId).foreach { bm =>
388368
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
389369
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
390370
bm.storageEndpoint.ask[Boolean](replicateMsg)
@@ -420,16 +400,16 @@ class BlockManagerMasterEndpoint(
420400
*/
421401
private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
422402
try {
423-
aliveBlockManagerInfo(blockManagerId).map { info =>
424-
val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
425-
rddBlocks.map { blockId =>
426-
val currentBlockLocations = blockLocations.get(blockId)
427-
val maxReplicas = currentBlockLocations.size + 1
428-
val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
429-
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
430-
replicateMsg
431-
}.toSeq
432-
}.getOrElse(Seq.empty[ReplicateBlock])
403+
val info = blockManagerInfo(blockManagerId)
404+
405+
val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
406+
rddBlocks.map { blockId =>
407+
val currentBlockLocations = blockLocations.get(blockId)
408+
val maxReplicas = currentBlockLocations.size + 1
409+
val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
410+
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
411+
replicateMsg
412+
}.toSeq
433413
} catch {
434414
// If the block manager has already exited, nothing to replicate.
435415
case e: java.util.NoSuchElementException =>
@@ -443,7 +423,8 @@ class BlockManagerMasterEndpoint(
443423
val locations = blockLocations.get(blockId)
444424
if (locations != null) {
445425
locations.foreach { blockManagerId: BlockManagerId =>
446-
aliveBlockManagerInfo(blockManagerId).foreach { bm =>
426+
val blockManager = blockManagerInfo.get(blockManagerId)
427+
blockManager.foreach { bm =>
447428
// Remove the block from the BlockManager.
448429
// Doesn't actually wait for a confirmation and the message might get lost.
449430
// If message loss becomes frequent, we should add retry logic here.
@@ -458,14 +439,14 @@ class BlockManagerMasterEndpoint(
458439

459440
// Return a map from the block manager id to max memory and remaining memory.
460441
private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
461-
allAliveBlockManagerInfos.map { info =>
462-
(info.blockManagerId, (info.maxMem, info.remainingMem))
442+
blockManagerInfo.map { case(blockManagerId, info) =>
443+
(blockManagerId, (info.maxMem, info.remainingMem))
463444
}.toMap
464445
}
465446

466447
private def storageStatus: Array[StorageStatus] = {
467-
allAliveBlockManagerInfos.map { info =>
468-
new StorageStatus(info.blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
448+
blockManagerInfo.map { case (blockManagerId, info) =>
449+
new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
469450
Some(info.maxOffHeapMem), info.blocks.asScala)
470451
}.toArray
471452
}
@@ -487,7 +468,7 @@ class BlockManagerMasterEndpoint(
487468
* Futures to avoid potential deadlocks. This can arise if there exists a block manager
488469
* that is also waiting for this master endpoint's response to a previous message.
489470
*/
490-
allAliveBlockManagerInfos.map { info =>
471+
blockManagerInfo.values.map { info =>
491472
val blockStatusFuture =
492473
if (askStorageEndpoints) {
493474
info.storageEndpoint.ask[Option[BlockStatus]](getBlockStatus)
@@ -511,7 +492,7 @@ class BlockManagerMasterEndpoint(
511492
askStorageEndpoints: Boolean): Future[Seq[BlockId]] = {
512493
val getMatchingBlockIds = GetMatchingBlockIds(filter)
513494
Future.sequence(
514-
allAliveBlockManagerInfos.map { info =>
495+
blockManagerInfo.values.map { info =>
515496
val future =
516497
if (askStorageEndpoints) {
517498
info.storageEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
@@ -581,10 +562,9 @@ class BlockManagerMasterEndpoint(
581562
if (pushBasedShuffleEnabled) {
582563
addMergerLocation(id)
583564
}
584-
585-
listenerBus.post(SparkListenerBlockManagerAdded(time, id,
586-
maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
587565
}
566+
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
567+
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
588568
id
589569
}
590570

@@ -672,7 +652,7 @@ class BlockManagerMasterEndpoint(
672652
if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) {
673653
blockStatusByShuffleService.get(bmId).flatMap(m => m.get(blockId))
674654
} else {
675-
aliveBlockManagerInfo(bmId).flatMap(_.getStatus(blockId))
655+
blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId))
676656
}
677657
}
678658

@@ -683,7 +663,8 @@ class BlockManagerMasterEndpoint(
683663
// can be used to access this block even when the original executor is already stopped.
684664
loc.host == requesterHost &&
685665
(loc.port == externalShuffleServicePort ||
686-
aliveBlockManagerInfo(loc)
666+
blockManagerInfo
667+
.get(loc)
687668
.flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk))
688669
.getOrElse(false))
689670
}.flatMap { bmId => Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)) }
@@ -700,7 +681,7 @@ class BlockManagerMasterEndpoint(
700681

701682
/** Get the list of the peers of the given block manager */
702683
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
703-
val blockManagerIds = allAliveBlockManagerInfos.map(_.blockManagerId).toSet
684+
val blockManagerIds = blockManagerInfo.keySet
704685
if (blockManagerIds.contains(blockManagerId)) {
705686
blockManagerIds
706687
.filterNot { _.isDriver }
@@ -753,35 +734,15 @@ class BlockManagerMasterEndpoint(
753734
private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
754735
for (
755736
blockManagerId <- blockManagerIdByExecutor.get(executorId);
756-
info <- aliveBlockManagerInfo(blockManagerId)
737+
info <- blockManagerInfo.get(blockManagerId)
757738
) yield {
758739
info.storageEndpoint
759740
}
760741
}
761742

762743
override def onStop(): Unit = {
763744
askThreadPool.shutdownNow()
764-
blockManagerInfoCleaner.shutdownNow()
765-
}
766-
767-
private def cleanBlockManagerInfo(): Unit = {
768-
logDebug("Cleaning blockManagerInfo")
769-
val now = System.currentTimeMillis()
770-
val expiredBmIds = blockManagerInfo.filter { case (_, bmInfo) =>
771-
// bmInfo.executorRemovalTs.get cannot be None when BM is not alive
772-
!bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs
773-
}.keys
774-
expiredBmIds.foreach { bmId =>
775-
logInfo(s"Cleaning expired $bmId from blockManagerInfo")
776-
blockManagerInfo.remove(bmId)
777-
}
778745
}
779-
780-
@inline private def aliveBlockManagerInfo(bmId: BlockManagerId): Option[BlockManagerInfo] =
781-
blockManagerInfo.get(bmId).filter(_.isAlive)
782-
783-
@inline private def allAliveBlockManagerInfos: Iterable[BlockManagerInfo] =
784-
blockManagerInfo.values.filter(_.isAlive)
785746
}
786747

787748
@DeveloperApi
@@ -834,7 +795,6 @@ private[spark] class BlockManagerInfo(
834795

835796
private var _lastSeenMs: Long = timeMs
836797
private var _remainingMem: Long = maxMem
837-
private var _executorRemovalTs: Option[Long] = None
838798

839799
// Mapping from block id to its status.
840800
private val _blocks = new JHashMap[BlockId, BlockStatus]
@@ -949,16 +909,4 @@ private[spark] class BlockManagerInfo(
949909
def clear(): Unit = {
950910
_blocks.clear()
951911
}
952-
953-
def executorRemovalTs: Option[Long] = _executorRemovalTs
954-
955-
def isAlive: Boolean = _executorRemovalTs.isEmpty
956-
957-
def setExecutorRemovalTs(): Unit = {
958-
if (!isAlive) {
959-
logWarning(s"executorRemovalTs is already set to ${_executorRemovalTs.get}")
960-
} else {
961-
_executorRemovalTs = Some(System.currentTimeMillis())
962-
}
963-
}
964912
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3096,13 +3096,6 @@ private[spark] object Utils extends Logging {
30963096
}
30973097
}
30983098

3099-
def executorTimeoutMs(conf: SparkConf): Long = {
3100-
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
3101-
// "milliseconds"
3102-
conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
3103-
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
3104-
}
3105-
31063099
/** Returns a string message about delegation token generation failure */
31073100
def createFailedToGetTokenMessage(serviceName: String, e: scala.Throwable): String = {
31083101
val message = "Failed to get token from service %s due to %s. " +

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.collection.mutable
2626
import scala.collection.mutable.ArrayBuffer
2727
import scala.concurrent.{Future, TimeoutException}
2828
import scala.concurrent.duration._
29-
import scala.language.{implicitConversions, postfixOps}
29+
import scala.language.implicitConversions
3030
import scala.reflect.ClassTag
3131

3232
import org.apache.commons.lang3.RandomUtils
@@ -102,7 +102,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
102102
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
103103
.set(Network.RPC_ASK_TIMEOUT, "5s")
104104
.set(PUSH_BASED_SHUFFLE_ENABLED, true)
105-
.set(STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key, "5s")
106105
}
107106

108107
private def makeSortShuffleManager(conf: Option[SparkConf] = None): SortShuffleManager = {
@@ -645,7 +644,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
645644
mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt())
646645
}
647646

648-
test("no reregistration on heart beat until executor timeout") {
647+
test("reregistration on heart beat") {
649648
val store = makeBlockManager(2000)
650649
val a1 = new Array[Byte](400)
651650

@@ -656,15 +655,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
656655

657656
master.removeExecutor(store.blockManagerId.executorId)
658657
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
658+
659659
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
660660
BlockManagerHeartbeat(store.blockManagerId))
661-
assert(reregister == false, "master told to re-register")
662-
663-
eventually(timeout(10 seconds), interval(1 seconds)) {
664-
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
665-
BlockManagerHeartbeat(store.blockManagerId))
666-
assert(reregister, "master did not tell to re-register")
667-
}
661+
assert(reregister)
668662
}
669663

670664
test("reregistration on block update") {
@@ -678,12 +672,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
678672
master.removeExecutor(store.blockManagerId.executorId)
679673
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
680674

681-
eventually(timeout(10 seconds), interval(1 seconds)) {
682-
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
683-
BlockManagerHeartbeat(store.blockManagerId))
684-
assert(reregister, "master did not tell to re-register")
685-
}
686-
687675
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
688676
store.waitForAsyncReregister()
689677

0 commit comments

Comments
 (0)