diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 71419d5aea0b4..971cbd2aeba1f 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -80,9 +80,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // executor ID -> timestamp of when the last heartbeat from this executor was received private val executorLastSeen = new HashMap[String, Long] - private val executorTimeoutMs = sc.conf.get( - config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT - ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")) + private val executorTimeoutMs = Utils.executorTimeoutMs(sc.conf) private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 61591b020c600..7d068fd695299 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -96,6 +96,15 @@ class BlockManagerMasterEndpoint( mapper } + private val executorTimeoutMs = Utils.executorTimeoutMs(conf) + private val blockManagerInfoCleaner = { + val cleaningDelay = Math.floorDiv(executorTimeoutMs, 2L) + val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("blockManagerInfo-cleaner") + executor.scheduleWithFixedDelay(() => cleanBlockManagerInfo(), cleaningDelay, cleaningDelay, + TimeUnit.MILLISECONDS) + executor + } + val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE) val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf) @@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint( } } bmIdsExecutor.foreach { bmId => - blockManagerInfo.get(bmId).foreach { bmInfo => + aliveBlockManagerInfo(bmId).foreach { bmInfo => bmInfo.removeBlock(blockId) } } } - val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo => + val removeRddFromExecutorsFutures = allAliveBlockManagerInfos.map { bmInfo => bmInfo.storageEndpoint.ask[Int](removeMsg).recover { // use 0 as default value means no blocks were removed handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0) @@ -304,7 +313,7 @@ class BlockManagerMasterEndpoint( // Nothing to do in the BlockManagerMasterEndpoint data structures val removeMsg = RemoveShuffle(shuffleId) Future.sequence( - blockManagerInfo.values.map { bm => + allAliveBlockManagerInfos.map { bm => bm.storageEndpoint.ask[Boolean](removeMsg).recover { // use false as default value means no shuffle data were removed handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) @@ -320,7 +329,7 @@ class BlockManagerMasterEndpoint( */ private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = { val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver) - val requiredBlockManagers = blockManagerInfo.values.filter { info => + val requiredBlockManagers = allAliveBlockManagerInfos.filter { info => removeFromDriver || !info.blockManagerId.isDriver } val futures = requiredBlockManagers.map { bm => @@ -336,13 +345,24 @@ class BlockManagerMasterEndpoint( private def removeBlockManager(blockManagerId: BlockManagerId): Unit = { val info = blockManagerInfo(blockManagerId) + // SPARK-35011: Not removing info from the blockManagerInfo map, but only setting the removal + // timestamp of the executor in BlockManagerInfo. This info will be removed from + // blockManagerInfo map by the blockManagerInfoCleaner once + // now() - info.executorRemovalTs > executorTimeoutMs. + // + // We are delaying the removal of BlockManagerInfo to avoid a BlockManager reregistration + // while a executor is shutting. This unwanted reregistration causes inconsistent bookkeeping + // of executors in Spark. + // Delaying this removal until blockManagerInfoCleaner decides to remove it ensures + // BlockManagerMasterHeartbeatEndpoint does not ask the BlockManager on a recently removed + // executor to reregister on BlockManagerHeartbeat message. + info.setExecutorRemovalTs() + // Remove the block manager from blockManagerIdByExecutor. blockManagerIdByExecutor -= blockManagerId.executorId decommissioningBlockManagerSet.remove(blockManagerId) - // Remove it from blockManagerInfo and remove all the blocks. - blockManagerInfo.remove(blockManagerId) - + // remove all the blocks. val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next @@ -363,7 +383,7 @@ class BlockManagerMasterEndpoint( val i = (new Random(blockId.hashCode)).nextInt(locations.size) val blockLocations = locations.toSeq val candidateBMId = blockLocations(i) - blockManagerInfo.get(candidateBMId).foreach { bm => + aliveBlockManagerInfo(candidateBMId).foreach { bm => val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId) val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) bm.storageEndpoint.ask[Boolean](replicateMsg) @@ -399,16 +419,16 @@ class BlockManagerMasterEndpoint( */ private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = { try { - val info = blockManagerInfo(blockManagerId) - - val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD) - rddBlocks.map { blockId => - val currentBlockLocations = blockLocations.get(blockId) - val maxReplicas = currentBlockLocations.size + 1 - val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId) - val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) - replicateMsg - }.toSeq + aliveBlockManagerInfo(blockManagerId).map { info => + val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD) + rddBlocks.map { blockId => + val currentBlockLocations = blockLocations.get(blockId) + val maxReplicas = currentBlockLocations.size + 1 + val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId) + val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) + replicateMsg + }.toSeq + }.getOrElse(Seq.empty[ReplicateBlock]) } catch { // If the block manager has already exited, nothing to replicate. case e: java.util.NoSuchElementException => @@ -422,8 +442,7 @@ class BlockManagerMasterEndpoint( val locations = blockLocations.get(blockId) if (locations != null) { locations.foreach { blockManagerId: BlockManagerId => - val blockManager = blockManagerInfo.get(blockManagerId) - blockManager.foreach { bm => + aliveBlockManagerInfo(blockManagerId).foreach { bm => // Remove the block from the BlockManager. // Doesn't actually wait for a confirmation and the message might get lost. // If message loss becomes frequent, we should add retry logic here. @@ -438,14 +457,14 @@ class BlockManagerMasterEndpoint( // Return a map from the block manager id to max memory and remaining memory. private def memoryStatus: Map[BlockManagerId, (Long, Long)] = { - blockManagerInfo.map { case(blockManagerId, info) => - (blockManagerId, (info.maxMem, info.remainingMem)) + allAliveBlockManagerInfos.map { info => + (info.blockManagerId, (info.maxMem, info.remainingMem)) }.toMap } private def storageStatus: Array[StorageStatus] = { - blockManagerInfo.map { case (blockManagerId, info) => - new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem), + allAliveBlockManagerInfos.map { info => + new StorageStatus(info.blockManagerId, info.maxMem, Some(info.maxOnHeapMem), Some(info.maxOffHeapMem), info.blocks.asScala) }.toArray } @@ -467,7 +486,7 @@ class BlockManagerMasterEndpoint( * Futures to avoid potential deadlocks. This can arise if there exists a block manager * that is also waiting for this master endpoint's response to a previous message. */ - blockManagerInfo.values.map { info => + allAliveBlockManagerInfos.map { info => val blockStatusFuture = if (askStorageEndpoints) { info.storageEndpoint.ask[Option[BlockStatus]](getBlockStatus) @@ -491,7 +510,7 @@ class BlockManagerMasterEndpoint( askStorageEndpoints: Boolean): Future[Seq[BlockId]] = { val getMatchingBlockIds = GetMatchingBlockIds(filter) Future.sequence( - blockManagerInfo.values.map { info => + allAliveBlockManagerInfos.map { info => val future = if (askStorageEndpoints) { info.storageEndpoint.ask[Seq[BlockId]](getMatchingBlockIds) @@ -557,9 +576,10 @@ class BlockManagerMasterEndpoint( if (pushBasedShuffleEnabled) { addMergerLocation(id) } + + listenerBus.post(SparkListenerBlockManagerAdded(time, id, + maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) } - listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, - Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) id } @@ -647,7 +667,7 @@ class BlockManagerMasterEndpoint( if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) { Option(blockStatusByShuffleService(bmId).get(blockId)) } else { - blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId)) + aliveBlockManagerInfo(bmId).flatMap(_.getStatus(blockId)) } } @@ -658,8 +678,7 @@ class BlockManagerMasterEndpoint( // can be used to access this block even when the original executor is already stopped. loc.host == requesterHost && (loc.port == externalShuffleServicePort || - blockManagerInfo - .get(loc) + aliveBlockManagerInfo(loc) .flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk)) .getOrElse(false)) }.flatMap { bmId => Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)) } @@ -676,7 +695,7 @@ class BlockManagerMasterEndpoint( /** Get the list of the peers of the given block manager */ private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { - val blockManagerIds = blockManagerInfo.keySet + val blockManagerIds = allAliveBlockManagerInfos.map(_.blockManagerId).toSet if (blockManagerIds.contains(blockManagerId)) { blockManagerIds .filterNot { _.isDriver } @@ -728,7 +747,7 @@ class BlockManagerMasterEndpoint( private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = { for ( blockManagerId <- blockManagerIdByExecutor.get(executorId); - info <- blockManagerInfo.get(blockManagerId) + info <- aliveBlockManagerInfo(blockManagerId) ) yield { info.storageEndpoint } @@ -736,7 +755,27 @@ class BlockManagerMasterEndpoint( override def onStop(): Unit = { askThreadPool.shutdownNow() + blockManagerInfoCleaner.shutdownNow() + } + + private def cleanBlockManagerInfo(): Unit = { + logDebug("Cleaning blockManagerInfo") + val now = System.currentTimeMillis() + val expiredBmIds = blockManagerInfo.filter { case (_, bmInfo) => + // bmInfo.executorRemovalTs.get cannot be None when BM is not alive + !bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs + }.keys + expiredBmIds.foreach { bmId => + logInfo(s"Cleaning expired $bmId from blockManagerInfo") + blockManagerInfo.remove(bmId) + } } + + @inline private def aliveBlockManagerInfo(bmId: BlockManagerId): Option[BlockManagerInfo] = + blockManagerInfo.get(bmId).filter(_.isAlive) + + @inline private def allAliveBlockManagerInfos: Iterable[BlockManagerInfo] = + blockManagerInfo.values.filter(_.isAlive) } @DeveloperApi @@ -764,6 +803,7 @@ private[spark] class BlockManagerInfo( private var _lastSeenMs: Long = timeMs private var _remainingMem: Long = maxMem + private var _executorRemovalTs: Option[Long] = None // Mapping from block id to its status. private val _blocks = new JHashMap[BlockId, BlockStatus] @@ -878,4 +918,16 @@ private[spark] class BlockManagerInfo( def clear(): Unit = { _blocks.clear() } + + def executorRemovalTs: Option[Long] = _executorRemovalTs + + def isAlive: Boolean = _executorRemovalTs.isEmpty + + def setExecutorRemovalTs(): Unit = { + if (!isAlive) { + logWarning(s"executorRemovalTs is already set to ${_executorRemovalTs.get}") + } else { + _executorRemovalTs = Some(System.currentTimeMillis()) + } + } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1643aa68cdb5a..06241dd2a221b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -3057,6 +3057,13 @@ private[spark] object Utils extends Logging { 0 } } + + def executorTimeoutMs(conf: SparkConf): Long = { + // "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses + // "milliseconds" + conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT) + .getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s")) + } } private[util] object CallerContext extends Logging { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index cd319daccc0db..5a8933ebb3c24 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -26,7 +26,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.{Future, TimeoutException} import scala.concurrent.duration._ -import scala.language.implicitConversions +import scala.language.{implicitConversions, postfixOps} import scala.reflect.ClassTag import org.apache.commons.lang3.RandomUtils @@ -101,6 +101,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) .set(Network.RPC_ASK_TIMEOUT, "5s") .set(PUSH_BASED_SHUFFLE_ENABLED, true) + .set(STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key, "5s") } private def makeSortShuffleManager(): SortShuffleManager = { @@ -610,7 +611,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt()) } - test("reregistration on heart beat") { + test("no reregistration on heart beat until executor timeout") { val store = makeBlockManager(2000) val a1 = new Array[Byte](400) @@ -621,10 +622,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") - val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( BlockManagerHeartbeat(store.blockManagerId)) - assert(reregister) + assert(reregister == false, "master told to re-register") + + eventually(timeout(10 seconds), interval(1 seconds)) { + val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( + BlockManagerHeartbeat(store.blockManagerId)) + assert(reregister, "master did not tell to re-register") + } } test("reregistration on block update") { @@ -638,6 +644,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") + eventually(timeout(10 seconds), interval(1 seconds)) { + val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( + BlockManagerHeartbeat(store.blockManagerId)) + assert(reregister, "master did not tell to re-register") + } + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) store.waitForAsyncReregister()