From ab0e38cacf4a2021d95f2dcf2aec8dffd1c3a1fe Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Tue, 10 Mar 2020 16:55:11 +0530 Subject: [PATCH 01/17] [SPARK-20732][CORE] Decommission cache blocks to other executors when an executor is decommissioned --- .../scala/org/apache/spark/SparkContext.scala | 19 ++- .../spark/internal/config/package.scala | 8 ++ .../CoarseGrainedSchedulerBackend.scala | 17 ++- .../apache/spark/storage/BlockManager.scala | 118 ++++++++++++++++-- .../spark/storage/BlockManagerMaster.scala | 13 ++ .../storage/BlockManagerMasterEndpoint.scala | 55 +++++++- .../spark/storage/BlockManagerMessages.scala | 8 ++ .../storage/BlockManagerSlaveEndpoint.scala | 3 + ...nDecommissionedBlockManagerException.scala | 21 ++++ .../spark/storage/BlockManagerSuite.scala | 83 ++++++++++++ 10 files changed, 328 insertions(+), 17 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/storage/RDDBlockSavedOnDecommissionedBlockManagerException.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cdb98db80158b..4f9d9fb8f01da 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -28,9 +28,8 @@ import scala.collection.Map import scala.collection.immutable import scala.collection.mutable.HashMap import scala.language.implicitConversions -import scala.reflect.{classTag, ClassTag} +import scala.reflect.{ClassTag, classTag} import scala.util.control.NonFatal - import com.google.common.collect.MapMaker import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -38,7 +37,6 @@ import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, Doub import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} - import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} @@ -57,7 +55,7 @@ import org.apache.spark.resource._ import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.shuffle.ShuffleDataIOUtils import org.apache.spark.shuffle.api.ShuffleDriverComponents @@ -1586,7 +1584,7 @@ class SparkContext(config: SparkConf) extends Logging { listenerBus.removeListener(listener) } - private[spark] def getExecutorIds(): Seq[String] = { + def getExecutorIds(): Seq[String] = { schedulerBackend match { case b: ExecutorAllocationClient => b.getExecutorIds() @@ -1721,6 +1719,17 @@ class SparkContext(config: SparkConf) extends Logging { } } + + @DeveloperApi + def decommissionExecutors(executorIds: Seq[String]): Unit = { + schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + executorIds.foreach(b.decommissionExecutor) + case _ => + logWarning("Decommissioning executors is not supported by current scheduler.") + } + } + /** The version of Spark on which this application is running. */ def version: String = SPARK_VERSION diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f3195d978ec6d..f8b39ea46dab8 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -412,6 +412,14 @@ package object config { .intConf .createWithDefault(1) + private[spark] val STORAGE_DECOMMISSION_ENABLED = + ConfigBuilder("spark.storage.decommission.enabled") + .booleanConf.createWithDefault(false) + + private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE = + ConfigBuilder("spark.storage.decommission.maxReplicationFailures") + .intConf.createWithDefault(3) + private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE = ConfigBuilder("spark.storage.replication.topologyFile") .version("2.1.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 6e1efdaf5beb2..ab4b4c57ee105 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -432,6 +432,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logError(s"Unexpected error during decommissioning ${e.toString}", e) } logInfo(s"Finished decommissioning executor $executorId.") + + if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { + try { + logInfo(s"Starting decommissioning block manager corresponding to " + + s"executor $executorId.") + scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) + } catch { + case e: Exception => + logError(s"Unexpected error during block manager " + + s"decommissioning for executor $executorId: ${e.toString}", e) + } + logInfo(s"Finished decommissioning block manager corresponding to $executorId.") + } } else { logInfo(s"Skipping decommissioning of executor $executorId.") } @@ -568,7 +581,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ private[spark] def decommissionExecutor(executorId: String): Unit = { if (driverEndpoint != null) { - logInfo("Propegating executor decommission to driver.") + logInfo("Propogating executor decommission to driver.") driverEndpoint.send(DecommissionExecutor(executorId)) } } @@ -652,7 +665,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Update the cluster manager on our scheduling needs. Three bits of information are included * to help it make decisions. - * @param resourceProfileToNumExecutors The total number of executors we'd like to have per + * @param resourceProfileIdToNumExecutors The total number of executors we'd like to have per * ResourceProfile. The cluster manager shouldn't kill any * running executor to reach this number, but, if all * existing executors were to die, this is the number diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e7f8de5ab7e4a..9492d9aa147d9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -54,6 +54,7 @@ import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter} +import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock import org.apache.spark.storage.memory._ import org.apache.spark.unsafe.Platform import org.apache.spark.util._ @@ -241,6 +242,9 @@ private[spark] class BlockManager( private var blockReplicationPolicy: BlockReplicationPolicy = _ + private var blockManagerDecommissioning: Boolean = false + private var decommissionManager: Option[BlockManagerDecommissionManager] = None + // A DownloadFileManager used to track all the files of remote blocks which are above the // specified memory threshold. Files will be deleted automatically based on weak reference. // Exposed for test @@ -1281,6 +1285,9 @@ private[spark] class BlockManager( require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") + if (blockManagerDecommissioning && blockId.isRDD) { + throw new RDDBlockSavedOnDecommissionedBlockManagerException(blockId.asRDDId.get) + } val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) @@ -1560,9 +1567,13 @@ private[spark] class BlockManager( def replicateBlock( blockId: BlockId, existingReplicas: Set[BlockManagerId], - maxReplicas: Int): Unit = { + maxReplicas: Int, + forceFetch: Boolean = true, + maxReplicationFailures: Option[Int] = None): Boolean = { + var replicatedSuccessfully = true logInfo(s"Using $blockManagerId to pro-actively replicate $blockId") blockInfoManager.lockForReading(blockId).foreach { info => + replicatedSuccessfully = false val data = doGetLocalBytes(blockId, info) val storageLevel = StorageLevel( useDisk = info.level.useDisk, @@ -1570,16 +1581,16 @@ private[spark] class BlockManager( useOffHeap = info.level.useOffHeap, deserialized = info.level.deserialized, replication = maxReplicas) - // we know we are called as a result of an executor removal, so we refresh peer cache - // this way, we won't try to replicate to a missing executor with a stale reference - getPeers(forceFetch = true) + getPeers(forceFetch) try { - replicate(blockId, data, storageLevel, info.classTag, existingReplicas) + replicatedSuccessfully = replicate( + blockId, data, storageLevel, info.classTag, existingReplicas, maxReplicationFailures) } finally { logDebug(s"Releasing lock for $blockId") releaseLockAndDispose(blockId, data) } } + replicatedSuccessfully } /** @@ -1591,9 +1602,11 @@ private[spark] class BlockManager( data: BlockData, level: StorageLevel, classTag: ClassTag[_], - existingReplicas: Set[BlockManagerId] = Set.empty): Unit = { + existingReplicas: Set[BlockManagerId] = Set.empty, + maxReplicationFailures: Option[Int] = None): Boolean = { - val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE) + val maxReplicationFailureCount = maxReplicationFailures.getOrElse( + conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)) val tLevel = StorageLevel( useDisk = level.useDisk, useMemory = level.useMemory, @@ -1617,7 +1630,7 @@ private[spark] class BlockManager( blockId, numPeersToReplicateTo) - while(numFailures <= maxReplicationFailures && + while(numFailures <= maxReplicationFailureCount && !peersForReplication.isEmpty && peersReplicatedTo.size < numPeersToReplicateTo) { val peer = peersForReplication.head @@ -1665,9 +1678,11 @@ private[spark] class BlockManager( if (peersReplicatedTo.size < numPeersToReplicateTo) { logWarning(s"Block $blockId replicated to only " + s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers") + return false } - logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}") + logInfo(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}") + return true } /** @@ -1761,6 +1776,54 @@ private[spark] class BlockManager( blocksToRemove.size } + def decommissionBlockManager(): Unit = { + if (!blockManagerDecommissioning) { + logInfo("Starting block manager decommissioning process") + blockManagerDecommissioning = true + decommissionManager = Some(new BlockManagerDecommissionManager) + decommissionManager.foreach(_.start()) + } + } + + // Visible for testing + def replicateRddCacheBlocks(): Unit = { + val replicateBlocksInfo: Seq[ReplicateBlock] = + master.getReplicateInfoForRDDBlocks(blockManagerId) + + if (replicateBlocksInfo.nonEmpty) { + logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " + + s"for block manager decommissioning") + // Refresh peer list once before starting replication + getPeers(true) + } + + // Maximum number of storage replication failure which replicateBlock can handle + // before giving up for one block + val maxReplicationFailures = conf.get( + config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE) + + val blocksFailedReplication = replicateBlocksInfo.filterNot { + case ReplicateBlock(blockId, existingReplicas, maxReplicas) => + val replicatedSuccessfully = replicateBlock( + blockId, + existingReplicas.toSet, + maxReplicas, + forceFetch = false, + maxReplicationFailures = Some(maxReplicationFailures)) + if (replicatedSuccessfully) { + logInfo(s"Block $blockId offloaded successfully, Removing block now") + removeBlock(blockId) + logInfo(s"Block $blockId removed") + } else { + logWarning(s"Failed to offload block $blockId") + } + replicatedSuccessfully + } + if (blocksFailedReplication.nonEmpty) { + logWarning(s"Blocks failed replication: ${blocksFailedReplication.mkString(",")}") + } + } + /** * Remove all blocks belonging to the given broadcast. */ @@ -1829,7 +1892,44 @@ private[spark] class BlockManager( data.dispose() } + class BlockManagerDecommissionManager { + @volatile private var stopped = false + private val cacheReplicationThread = new Thread { + override def run(): Unit = { + while (blockManagerDecommissioning && !stopped) { + try { + logDebug(s"Attempting to replicate all cached RDD blocks") + replicateRddCacheBlocks() + logInfo(s"Attempt to replicate all cached blocks done") + Thread.sleep(30000) + } catch { + case _: InterruptedException => + // no-op + case NonFatal(e) => + logError("Error occurred while trying to " + + "replicate cached RDD blocks for block manager decommissioning", e) + } + } + } + } + cacheReplicationThread.setDaemon(true) + cacheReplicationThread.setName("cache-replication-thread") + + def start(): Unit = { + cacheReplicationThread.start() + } + + def stop(): Unit = { + if (!stopped) { + stopped = true + cacheReplicationThread.interrupt() + cacheReplicationThread.join() + } + } + } + def stop(): Unit = { + decommissionManager.foreach(_.stop()) blockTransferService.close() if (blockStoreClient ne blockTransferService) { // Closing should be idempotent, but maybe not for the NioBlockTransferService. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index e335eb6ddb761..eb42c27fb4742 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -43,6 +43,19 @@ class BlockManagerMaster( logInfo("Removed " + execId + " successfully in removeExecutor") } + /** Decommission block managers corresponding to given set of executors */ + def decommissionBlockManagers(executorIds: Seq[String]): Unit = { + driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds)) + } + + /** Get Replication Info for all the RDD blocks stored in given blockManagerId */ + def getReplicateInfoForRDDBlocks( + blockManagerId: BlockManagerId): Seq[ReplicateBlock] = { + driverEndpoint.askSync[Seq[ReplicateBlock]]( + GetReplicateInfoForRDDBlocks(blockManagerId)) + } + + /** Request removal of a dead executor from the driver endpoint. * This is only called on the driver side. Non-blocking */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index d7f7eedc7f33b..3a4a42d9d4965 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -65,6 +65,10 @@ class BlockManagerMasterEndpoint( // Mapping from executor ID to block manager ID. private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] + // Set of block managers which are decommissioning + private val decommissioningBlockManagerSet = new mutable.HashSet[BlockManagerId] + + // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] @@ -153,6 +157,14 @@ class BlockManagerMasterEndpoint( removeExecutor(execId) context.reply(true) + case DecommissionBlockManagers(executorIds) => + val bmIds = executorIds.flatMap(blockManagerIdByExecutor.get) + decommissionBlockManagers(bmIds) + context.reply(true) + + case GetReplicateInfoForRDDBlocks(blockManagerId) => + context.reply(getReplicateInfoForRDDBlocks(blockManagerId)) + case StopBlockManagerMaster => context.reply(true) stop() @@ -257,6 +269,7 @@ class BlockManagerMasterEndpoint( // Remove the block manager from blockManagerIdByExecutor. blockManagerIdByExecutor -= blockManagerId.executorId + decommissioningBlockManagerSet.remove(blockManagerId) // Remove it from blockManagerInfo and remove all the blocks. blockManagerInfo.remove(blockManagerId) @@ -299,6 +312,42 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + /** + * Decommission the given Seq of blockmanagers + * - Adds these block managers to decommissioningBlockManagerSet Set + * - Sends the DecommissionBlockManager message to each of the [[BlockManagerSlaveEndpoint]] + */ + def decommissionBlockManagers( + blockManagerIds: Seq[BlockManagerId]): Future[Seq[Unit]] = { + + val newBlockManagersToDecommission = blockManagerIds.toSet.diff(decommissioningBlockManagerSet) + val futures = newBlockManagersToDecommission.map { blockManagerId => + decommissioningBlockManagerSet.add(blockManagerId) + val info = blockManagerInfo(blockManagerId) + info.slaveEndpoint.ask[Unit](DecommissionBlockManager) + } + Future.sequence{ futures.toSeq } + } + + /** + * Returns a Seq of ReplicateBlock for each RDD block stored by given blockManagerId + * @param blockManagerId - block manager id for which ReplicateBlock info is needed + * @return Seq of ReplicateBlock + */ + private def getReplicateInfoForRDDBlocks( + blockManagerId: BlockManagerId): Seq[ReplicateBlock] = { + val info = blockManagerInfo(blockManagerId) + + val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD) + rddBlocks.map { blockId => + val currentBlockLocations = blockLocations.get(blockId) + val maxReplicas = currentBlockLocations.size + 1 + val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId) + val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) + replicateMsg + }.toSeq + } + // Remove a block from the slaves that have it. This can only be used to remove // blocks that the master knows about. private def removeBlockFromWorkers(blockId: BlockId): Unit = { @@ -536,7 +585,11 @@ class BlockManagerMasterEndpoint( private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { val blockManagerIds = blockManagerInfo.keySet if (blockManagerIds.contains(blockManagerId)) { - blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq + blockManagerIds + .filterNot { _.isDriver } + .filterNot { _ == blockManagerId } + .diff(decommissioningBlockManagerSet) + .toSeq } else { Seq.empty } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 895f48d0709fb..37b927a9c4d32 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -36,6 +36,9 @@ private[spark] object BlockManagerMessages { case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int) extends ToBlockManagerSlave + case object DecommissionBlockManager + extends ToBlockManagerSlave + // Remove all blocks belonging to a specific RDD. case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave @@ -125,6 +128,11 @@ private[spark] object BlockManagerMessages { case object GetStorageStatus extends ToBlockManagerMaster + case class DecommissionBlockManagers(executorIds: Seq[String]) extends ToBlockManagerMaster + + case class GetReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId) + extends ToBlockManagerMaster + case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true) extends ToBlockManagerMaster diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala index 29e21142ce449..a3a7149103491 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala @@ -61,6 +61,9 @@ class BlockManagerSlaveEndpoint( SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) } + case DecommissionBlockManager => + context.reply(blockManager.decommissionBlockManager()) + case RemoveBroadcast(broadcastId, _) => doAsync[Int]("removing broadcast " + broadcastId, context) { blockManager.removeBroadcast(broadcastId, tellMaster = true) diff --git a/core/src/main/scala/org/apache/spark/storage/RDDBlockSavedOnDecommissionedBlockManagerException.scala b/core/src/main/scala/org/apache/spark/storage/RDDBlockSavedOnDecommissionedBlockManagerException.scala new file mode 100644 index 0000000000000..e6cef4dcc5e38 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/RDDBlockSavedOnDecommissionedBlockManagerException.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +class RDDBlockSavedOnDecommissionedBlockManagerException(blockId: RDDBlockId) + extends Exception(s"RDD Block $blockId cannot be saved on decommissioned executor") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 8d06768a2b284..8b7644364e594 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1706,6 +1706,89 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE verify(liveListenerBus, never()).post(SparkListenerBlockUpdated(BlockUpdatedInfo(updateInfo))) } + test("test decommissioning block manager should not accept any new block directly " + + "or indirectly (via replication)") { + val store = makeBlockManager(2000, "exec1") + val store2 = makeBlockManager(2000, "exec2") + + val data1 = new Array[Byte](400) + val block1 = rdd(0, 0) + store.putSingle(block1, data1, StorageLevel.DISK_ONLY_2) + assert(master.getLocations(block1).size === 2, "master did not report 2 locations for a1") + + store.decommissionBlockManager() + val data2 = new Array[Byte](400) + val block2 = rdd(0, 1) + intercept[Exception] { + store.putSingle(block2, data2, StorageLevel.DISK_ONLY_2) + } + assert(master.getLocations(block2).size === 0, "block manager accepted blocks even" + + " after decommissioning") + + val data3 = new Array[Byte](400) + val block3 = rdd(0, 2) + store2.putSingle(block3, data3, StorageLevel.DISK_ONLY_2) + assert(master.getLocations(block3).size === 1) + } + + test("test decommission block manager should not be part of peers") { + val exec1 = "exec1" + val exec2 = "exec2" + val exec3 = "exec3" + val store1 = makeBlockManager(2000, exec1) + val store2 = makeBlockManager(2000, exec2) + val store3 = makeBlockManager(2000, exec3) + + assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec1, exec2)) + + val data = new Array[Byte](400) + val blockId = rdd(0, 0) + store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2) + assert(master.getLocations(blockId).size === 2) + + master.decommissionBlockManagers(Seq(exec1)) + // store1 is decommissioned, so it should not be part of peer list for store3 + assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec2)) + } + + test("test replicateRddCacheBlocks should offload all cached blocks") { + val store1 = makeBlockManager(2000, "exec1") + val store2 = makeBlockManager(2000, "exec2") + val store3 = makeBlockManager(2000, "exec3") + + val data = new Array[Byte](400) + val blockId = rdd(0, 0) + store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2) + assert(master.getLocations(blockId).size === 2) + assert(master.getLocations(blockId).contains(store1.blockManagerId)) + + store1.replicateRddCacheBlocks() + assert(master.getLocations(blockId).size === 2) + assert(master.getLocations(blockId).toSet === Set(store2.blockManagerId, + store3.blockManagerId)) + } + + test("test replicateRddCacheBlocks should keep the block if it is not able to offload") { + val store1 = makeBlockManager(12000, "exec1") + val store2 = makeBlockManager(2000, "exec2") + + val dataLarge = new Array[Byte](5000) + val blockIdLarge = rdd(0, 0) + val dataSmall = new Array[Byte](500) + val blockIdSmall = rdd(0, 1) + + store1.putSingle(blockIdLarge, dataLarge, StorageLevel.MEMORY_ONLY) + store1.putSingle(blockIdSmall, dataSmall, StorageLevel.MEMORY_ONLY) + assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId)) + assert(master.getLocations(blockIdSmall) === Seq(store1.blockManagerId)) + + store1.replicateRddCacheBlocks() + // Smaller block offloaded to store2 + assert(master.getLocations(blockIdSmall) === Seq(store2.blockManagerId)) + // Larger block still present in store1 as it can't be offloaded + assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId)) + } + class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { var numCalls = 0 var tempFileManager: DownloadFileManager = null From 6a47615223e5b1454811da68fa5f52e58d3b8076 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Tue, 10 Mar 2020 18:48:44 +0530 Subject: [PATCH 02/17] changes --- .../scala/org/apache/spark/SparkContext.scala | 19 +++++-------------- .../apache/spark/storage/BlockManager.scala | 1 + 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4f9d9fb8f01da..cdb98db80158b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -28,8 +28,9 @@ import scala.collection.Map import scala.collection.immutable import scala.collection.mutable.HashMap import scala.language.implicitConversions -import scala.reflect.{ClassTag, classTag} +import scala.reflect.{classTag, ClassTag} import scala.util.control.NonFatal + import com.google.common.collect.MapMaker import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -37,6 +38,7 @@ import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, Doub import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} @@ -55,7 +57,7 @@ import org.apache.spark.resource._ import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} +import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.shuffle.ShuffleDataIOUtils import org.apache.spark.shuffle.api.ShuffleDriverComponents @@ -1584,7 +1586,7 @@ class SparkContext(config: SparkConf) extends Logging { listenerBus.removeListener(listener) } - def getExecutorIds(): Seq[String] = { + private[spark] def getExecutorIds(): Seq[String] = { schedulerBackend match { case b: ExecutorAllocationClient => b.getExecutorIds() @@ -1719,17 +1721,6 @@ class SparkContext(config: SparkConf) extends Logging { } } - - @DeveloperApi - def decommissionExecutors(executorIds: Seq[String]): Unit = { - schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => - executorIds.foreach(b.decommissionExecutor) - case _ => - logWarning("Decommissioning executors is not supported by current scheduler.") - } - } - /** The version of Spark on which this application is running. */ def version: String = SPARK_VERSION diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 9492d9aa147d9..e921daf970c54 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1922,6 +1922,7 @@ private[spark] class BlockManager( def stop(): Unit = { if (!stopped) { stopped = true + logInfo("Stopping cache replication thread") cacheReplicationThread.interrupt() cacheReplicationThread.join() } From 622e1ba22aa2b143480282ade1e0b8cff0067d42 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Fri, 13 Mar 2020 21:55:34 +0530 Subject: [PATCH 03/17] minor changes --- .../org/apache/spark/storage/BlockManager.scala | 15 +++++++++------ .../apache/spark/storage/BlockManagerSuite.scala | 4 ++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e921daf970c54..af8b0830b286b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1785,10 +1785,12 @@ private[spark] class BlockManager( } } - // Visible for testing - def replicateRddCacheBlocks(): Unit = { - val replicateBlocksInfo: Seq[ReplicateBlock] = - master.getReplicateInfoForRDDBlocks(blockManagerId) + /** + * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers + * Visible for testing + */ + def offloadRddCacheBlocks(): Unit = { + val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId) if (replicateBlocksInfo.nonEmpty) { logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " + @@ -1820,7 +1822,8 @@ private[spark] class BlockManager( replicatedSuccessfully } if (blocksFailedReplication.nonEmpty) { - logWarning(s"Blocks failed replication: ${blocksFailedReplication.mkString(",")}") + logWarning(s"Blocks failed replication in cache decommissioning " + + s"process: ${blocksFailedReplication.mkString(",")}") } } @@ -1899,7 +1902,7 @@ private[spark] class BlockManager( while (blockManagerDecommissioning && !stopped) { try { logDebug(s"Attempting to replicate all cached RDD blocks") - replicateRddCacheBlocks() + offloadRddCacheBlocks() logInfo(s"Attempt to replicate all cached blocks done") Thread.sleep(30000) } catch { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 8b7644364e594..806ca9787965a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1762,7 +1762,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations(blockId).size === 2) assert(master.getLocations(blockId).contains(store1.blockManagerId)) - store1.replicateRddCacheBlocks() + store1.offloadRddCacheBlocks() assert(master.getLocations(blockId).size === 2) assert(master.getLocations(blockId).toSet === Set(store2.blockManagerId, store3.blockManagerId)) @@ -1782,7 +1782,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId)) assert(master.getLocations(blockIdSmall) === Seq(store1.blockManagerId)) - store1.replicateRddCacheBlocks() + store1.offloadRddCacheBlocks() // Smaller block offloaded to store2 assert(master.getLocations(blockIdSmall) === Seq(store2.blockManagerId)) // Larger block still present in store1 as it can't be offloaded From d792092569a3f63da6aa303f962974b547197078 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Sat, 14 Mar 2020 03:30:42 +0530 Subject: [PATCH 04/17] review comments addressed --- .../apache/spark/internal/config/package.scala | 15 +++++++++++---- .../org/apache/spark/storage/BlockManager.scala | 12 +++++++++--- .../apache/spark/storage/BlockManagerMaster.scala | 7 ++----- .../storage/BlockManagerMasterEndpoint.scala | 8 ++------ .../spark/storage/BlockManagerMessages.scala | 3 +-- 5 files changed, 25 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f8b39ea46dab8..f9eea7583f29f 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -414,11 +414,18 @@ package object config { private[spark] val STORAGE_DECOMMISSION_ENABLED = ConfigBuilder("spark.storage.decommission.enabled") - .booleanConf.createWithDefault(false) + .doc("Whether to decommission the block manager when decommissioning executor") + .version("3.1.0") + .booleanConf + .createWithDefault(false) - private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE = - ConfigBuilder("spark.storage.decommission.maxReplicationFailures") - .intConf.createWithDefault(3) + private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = + ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") + .doc("Maximum number of failures to tolerate for offloading " + + "one block in single decommission cache blocks iteration") + .version("3.1.0") + .intConf + .createWithDefault(3) private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE = ConfigBuilder("spark.storage.replication.topologyFile") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index af8b0830b286b..159ccc33a596d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1681,7 +1681,7 @@ private[spark] class BlockManager( return false } - logInfo(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}") + logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}") return true } @@ -1782,6 +1782,8 @@ private[spark] class BlockManager( blockManagerDecommissioning = true decommissionManager = Some(new BlockManagerDecommissionManager) decommissionManager.foreach(_.start()) + } else { + logDebug(s"Block manager already in decommissioning state") } } @@ -1802,7 +1804,7 @@ private[spark] class BlockManager( // Maximum number of storage replication failure which replicateBlock can handle // before giving up for one block val maxReplicationFailures = conf.get( - config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE) + config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) val blocksFailedReplication = replicateBlocksInfo.filterNot { case ReplicateBlock(blockId, existingReplicas, maxReplicas) => @@ -1895,7 +1897,11 @@ private[spark] class BlockManager( data.dispose() } - class BlockManagerDecommissionManager { + /** + * Class to handle block manager decommissioning retries + * It creates a Thread to retry offloading all RDD cache blocks + */ + private class BlockManagerDecommissionManager { @volatile private var stopped = false private val cacheReplicationThread = new Thread { override def run(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index eb42c27fb4742..3ce989fc26d81 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -49,13 +49,10 @@ class BlockManagerMaster( } /** Get Replication Info for all the RDD blocks stored in given blockManagerId */ - def getReplicateInfoForRDDBlocks( - blockManagerId: BlockManagerId): Seq[ReplicateBlock] = { - driverEndpoint.askSync[Seq[ReplicateBlock]]( - GetReplicateInfoForRDDBlocks(blockManagerId)) + def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = { + driverEndpoint.askSync[Seq[ReplicateBlock]](GetReplicateInfoForRDDBlocks(blockManagerId)) } - /** Request removal of a dead executor from the driver endpoint. * This is only called on the driver side. Non-blocking */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 3a4a42d9d4965..38c8d6fa3d5fd 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -68,7 +68,6 @@ class BlockManagerMasterEndpoint( // Set of block managers which are decommissioning private val decommissioningBlockManagerSet = new mutable.HashSet[BlockManagerId] - // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] @@ -317,9 +316,7 @@ class BlockManagerMasterEndpoint( * - Adds these block managers to decommissioningBlockManagerSet Set * - Sends the DecommissionBlockManager message to each of the [[BlockManagerSlaveEndpoint]] */ - def decommissionBlockManagers( - blockManagerIds: Seq[BlockManagerId]): Future[Seq[Unit]] = { - + def decommissionBlockManagers(blockManagerIds: Seq[BlockManagerId]): Future[Seq[Unit]] = { val newBlockManagersToDecommission = blockManagerIds.toSet.diff(decommissioningBlockManagerSet) val futures = newBlockManagersToDecommission.map { blockManagerId => decommissioningBlockManagerSet.add(blockManagerId) @@ -334,8 +331,7 @@ class BlockManagerMasterEndpoint( * @param blockManagerId - block manager id for which ReplicateBlock info is needed * @return Seq of ReplicateBlock */ - private def getReplicateInfoForRDDBlocks( - blockManagerId: BlockManagerId): Seq[ReplicateBlock] = { + private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = { val info = blockManagerInfo(blockManagerId) val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 37b927a9c4d32..7d4f2fff5c34c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -36,8 +36,7 @@ private[spark] object BlockManagerMessages { case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int) extends ToBlockManagerSlave - case object DecommissionBlockManager - extends ToBlockManagerSlave + case object DecommissionBlockManager extends ToBlockManagerSlave // Remove all blocks belonging to a specific RDD. case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave From b9906c2d48bd34ec7e181c12d5175ba2dbcd2adc Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Sun, 29 Mar 2020 15:04:11 +0530 Subject: [PATCH 05/17] review comments --- .../CoarseGrainedSchedulerBackend.scala | 2 +- .../apache/spark/storage/BlockManager.scala | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ab4b4c57ee105..72681829c2f64 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -581,7 +581,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ private[spark] def decommissionExecutor(executorId: String): Unit = { if (driverEndpoint != null) { - logInfo("Propogating executor decommission to driver.") + logInfo("Propagating executor decommission to driver.") driverEndpoint.send(DecommissionExecutor(executorId)) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 159ccc33a596d..381cca5465e88 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1558,22 +1558,22 @@ private[spark] class BlockManager( } /** - * Called for pro-active replenishment of blocks lost due to executor failures - * * @param blockId blockId being replicate * @param existingReplicas existing block managers that have a replica * @param maxReplicas maximum replicas needed + * @param forceFetchPeers whether to force refresh the peer list or not + * @param maxReplicationFailures number of replication failures to tolerate before + * giving up. + * @return whether block was successfully replicated or not */ def replicateBlock( blockId: BlockId, existingReplicas: Set[BlockManagerId], maxReplicas: Int, - forceFetch: Boolean = true, + forceFetchPeers: Boolean = true, maxReplicationFailures: Option[Int] = None): Boolean = { - var replicatedSuccessfully = true logInfo(s"Using $blockManagerId to pro-actively replicate $blockId") - blockInfoManager.lockForReading(blockId).foreach { info => - replicatedSuccessfully = false + blockInfoManager.lockForReading(blockId).forall { info => val data = doGetLocalBytes(blockId, info) val storageLevel = StorageLevel( useDisk = info.level.useDisk, @@ -1581,16 +1581,15 @@ private[spark] class BlockManager( useOffHeap = info.level.useOffHeap, deserialized = info.level.deserialized, replication = maxReplicas) - getPeers(forceFetch) + getPeers(forceFetchPeers) try { - replicatedSuccessfully = replicate( + replicate( blockId, data, storageLevel, info.classTag, existingReplicas, maxReplicationFailures) } finally { logDebug(s"Releasing lock for $blockId") releaseLockAndDispose(blockId, data) } } - replicatedSuccessfully } /** @@ -1812,7 +1811,7 @@ private[spark] class BlockManager( blockId, existingReplicas.toSet, maxReplicas, - forceFetch = false, + forceFetchPeers = false, maxReplicationFailures = Some(maxReplicationFailures)) if (replicatedSuccessfully) { logInfo(s"Block $blockId offloaded successfully, Removing block now") From 076dd671d9a9ac3c8d5926279f54517b8b39426a Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Thu, 2 Apr 2020 19:35:53 +0530 Subject: [PATCH 06/17] fix comment --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 381cca5465e88..e6ebc278c2a2b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1558,6 +1558,8 @@ private[spark] class BlockManager( } /** + * Replicates a block to peer block managers based on existingReplicas and maxReplicas + * * @param blockId blockId being replicate * @param existingReplicas existing block managers that have a replica * @param maxReplicas maximum replicas needed From d12dbffa3402fbfdb7d7b38e6b1a9cc94a518630 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Mon, 6 Apr 2020 19:28:48 +0530 Subject: [PATCH 07/17] accept new blocks even on a decommissiong block manager --- .../apache/spark/storage/BlockManager.scala | 18 ++- ...nDecommissionedBlockManagerException.scala | 21 ---- .../BlockManagerDecommissionSuite.scala | 114 ++++++++++++++++++ .../spark/storage/BlockManagerSuite.scala | 25 ---- 4 files changed, 122 insertions(+), 56 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/storage/RDDBlockSavedOnDecommissionedBlockManagerException.scala create mode 100644 core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e6ebc278c2a2b..c212e84957c07 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1285,9 +1285,6 @@ private[spark] class BlockManager( require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") - if (blockManagerDecommissioning && blockId.isRDD) { - throw new RDDBlockSavedOnDecommissionedBlockManagerException(blockId.asRDDId.get) - } val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) @@ -1563,7 +1560,6 @@ private[spark] class BlockManager( * @param blockId blockId being replicate * @param existingReplicas existing block managers that have a replica * @param maxReplicas maximum replicas needed - * @param forceFetchPeers whether to force refresh the peer list or not * @param maxReplicationFailures number of replication failures to tolerate before * giving up. * @return whether block was successfully replicated or not @@ -1572,7 +1568,6 @@ private[spark] class BlockManager( blockId: BlockId, existingReplicas: Set[BlockManagerId], maxReplicas: Int, - forceFetchPeers: Boolean = true, maxReplicationFailures: Option[Int] = None): Boolean = { logInfo(s"Using $blockManagerId to pro-actively replicate $blockId") blockInfoManager.lockForReading(blockId).forall { info => @@ -1583,7 +1578,10 @@ private[spark] class BlockManager( useOffHeap = info.level.useOffHeap, deserialized = info.level.deserialized, replication = maxReplicas) - getPeers(forceFetchPeers) + // we know we are called as a result of an executor removal or because the current executor + // is getting decommissioned. so we refresh peer cache before trying replication, we won't + // try to replicate to a missing executor/another decommissioning executor + getPeers(forceFetch = true) try { replicate( blockId, data, storageLevel, info.classTag, existingReplicas, maxReplicationFailures) @@ -1798,8 +1796,6 @@ private[spark] class BlockManager( if (replicateBlocksInfo.nonEmpty) { logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " + s"for block manager decommissioning") - // Refresh peer list once before starting replication - getPeers(true) } // Maximum number of storage replication failure which replicateBlock can handle @@ -1807,13 +1803,14 @@ private[spark] class BlockManager( val maxReplicationFailures = conf.get( config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) + // TODO: We can sort these blocks based on some policy (LRU/blockSize etc) + // so that we end up prioritize them over each other val blocksFailedReplication = replicateBlocksInfo.filterNot { case ReplicateBlock(blockId, existingReplicas, maxReplicas) => val replicatedSuccessfully = replicateBlock( blockId, existingReplicas.toSet, maxReplicas, - forceFetchPeers = false, maxReplicationFailures = Some(maxReplicationFailures)) if (replicatedSuccessfully) { logInfo(s"Block $blockId offloaded successfully, Removing block now") @@ -1911,7 +1908,8 @@ private[spark] class BlockManager( logDebug(s"Attempting to replicate all cached RDD blocks") offloadRddCacheBlocks() logInfo(s"Attempt to replicate all cached blocks done") - Thread.sleep(30000) + val sleepInterval = if (Utils.isTesting) 100 else 30000 + Thread.sleep(sleepInterval) } catch { case _: InterruptedException => // no-op diff --git a/core/src/main/scala/org/apache/spark/storage/RDDBlockSavedOnDecommissionedBlockManagerException.scala b/core/src/main/scala/org/apache/spark/storage/RDDBlockSavedOnDecommissionedBlockManagerException.scala deleted file mode 100644 index e6cef4dcc5e38..0000000000000 --- a/core/src/main/scala/org/apache/spark/storage/RDDBlockSavedOnDecommissionedBlockManagerException.scala +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -class RDDBlockSavedOnDecommissionedBlockManagerException(blockId: RDDBlockId) - extends Exception(s"RDD Block $blockId cannot be saved on decommissioned executor") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala new file mode 100644 index 0000000000000..eeffcc1d9bbe0 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.util.concurrent.Semaphore + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, Success} +import org.apache.spark.internal.config +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart} +import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend +import org.apache.spark.util.ThreadUtils + +class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext { + + override def beforeEach(): Unit = { + val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) + .set(config.STORAGE_DECOMMISSION_ENABLED, true) + + sc = new SparkContext("local-cluster[3, 1, 1024]", "test", conf) + } + + test(s"verify that an already running task which is going to cache data succeeds " + + s"on a decommissioned executor") { + // Create input RDD with 10 partitions + val input = sc.parallelize(1 to 10, 10) + val accum = sc.longAccumulator("mapperRunAccumulator") + // Do a count to wait for the executors to be registered. + input.count() + + // Create a new RDD where we have sleep in each partition, we are also increasing + // the value of accumulator in each partition + val sleepyRdd = input.mapPartitions { x => + Thread.sleep(500) + accum.add(1) + x + } + + // Listen for the job + val sem = new Semaphore(0) + val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd] + sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + sem.release() + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + taskEndEvents.append(taskEnd) + } + }) + + // Cache the RDD lazily + sleepyRdd.persist() + + // Start the computation of RDD - this step will also cache the RDD + val asyncCount = sleepyRdd.countAsync() + + // Wait for the job to have started + sem.acquire(1) + + // Decommission one of the executor + val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] + val execs = sched.getExecutorIds() + assert(execs.size == 3, s"Expected 3 executors but found ${execs.size}") + val execToDecommission = execs.head + sched.decommissionExecutor(execToDecommission) + + // Wait for job to finish + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 3.seconds) + assert(asyncCountResult === 10) + // All 10 tasks finished, so accum should have been increased 10 times + assert(accum.value === 10) + + // All tasks should be successful, nothing should have failed + sc.listenerBus.waitUntilEmpty() + assert(taskEndEvents.size === 10) // 10 mappers + assert(taskEndEvents.map(_.reason).toSet === Set(Success)) + + // Since the RDD is cached, so further usage of same RDD should use the + // cached data. Original RDD partitions should not be recomputed i.e. accum + // should have same value like before + assert(sleepyRdd.count() === 10) + assert(accum.value === 10) + + // all cache block should have been shifted from decommissioned block manager + // after some time + Thread.sleep(1000) + val storageStatus = sc.env.blockManager.master.getStorageStatus + val execIdToBlocksMapping = storageStatus.map( + status => (status.blockManagerId.executorId, status.blocks)).toMap + // No cached blocks should be present on executor which was decommissioned + assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq()) + // There should still be all 10 RDD blocks cached + assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === 10) + } +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 806ca9787965a..48b8c1a584323 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1706,31 +1706,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE verify(liveListenerBus, never()).post(SparkListenerBlockUpdated(BlockUpdatedInfo(updateInfo))) } - test("test decommissioning block manager should not accept any new block directly " + - "or indirectly (via replication)") { - val store = makeBlockManager(2000, "exec1") - val store2 = makeBlockManager(2000, "exec2") - - val data1 = new Array[Byte](400) - val block1 = rdd(0, 0) - store.putSingle(block1, data1, StorageLevel.DISK_ONLY_2) - assert(master.getLocations(block1).size === 2, "master did not report 2 locations for a1") - - store.decommissionBlockManager() - val data2 = new Array[Byte](400) - val block2 = rdd(0, 1) - intercept[Exception] { - store.putSingle(block2, data2, StorageLevel.DISK_ONLY_2) - } - assert(master.getLocations(block2).size === 0, "block manager accepted blocks even" + - " after decommissioning") - - val data3 = new Array[Byte](400) - val block3 = rdd(0, 2) - store2.putSingle(block3, data3, StorageLevel.DISK_ONLY_2) - assert(master.getLocations(block3).size === 1) - } - test("test decommission block manager should not be part of peers") { val exec1 = "exec1" val exec2 = "exec2" From f6b4f7cfbcd0f3d6f2ab3ef0a1d37de97c012957 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Wed, 8 Apr 2020 18:52:59 +0530 Subject: [PATCH 08/17] review comments addressed --- .../spark/internal/config/package.scala | 12 +++++++ .../CoarseGrainedSchedulerBackend.scala | 2 +- .../apache/spark/storage/BlockManager.scala | 35 ++++++++++--------- .../storage/BlockManagerMasterEndpoint.scala | 3 +- .../BlockManagerDecommissionSuite.scala | 1 + 5 files changed, 33 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 6cecc6d52158c..94fcfa1111dc4 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -422,12 +422,24 @@ package object config { private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") + .internal() .doc("Maximum number of failures to tolerate for offloading " + "one block in single decommission cache blocks iteration") .version("3.1.0") .intConf .createWithDefault(3) + private[spark] val STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL = + ConfigBuilder("spark.storage.decommission.replicationReattemptInterval") + .internal() + .doc("The interval of time between consecutive cache block replication reattempts " + + "happening on each decommissioning executor (due to storage decommissioning).") + .version("3.1.0") + .timeConf(TimeUnit.MILLISECONDS) + .checkValue(_ > 0, "Time interval between two consecutive attempts of " + + "cache block replication should be positive.") + .createWithDefaultString("30s") + private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE = ConfigBuilder("spark.storage.replication.topologyFile") .version("2.1.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 1f8eded0f0294..9bf13b487ed9f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -441,7 +441,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { try { - logInfo(s"Starting decommissioning block manager corresponding to " + + logInfo("Starting decommissioning block manager corresponding to " + s"executor $executorId.") scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) } catch { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c212e84957c07..81eb00e2edad5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1779,10 +1779,10 @@ private[spark] class BlockManager( if (!blockManagerDecommissioning) { logInfo("Starting block manager decommissioning process") blockManagerDecommissioning = true - decommissionManager = Some(new BlockManagerDecommissionManager) + decommissionManager = Some(new BlockManagerDecommissionManager(conf)) decommissionManager.foreach(_.start()) } else { - logDebug(s"Block manager already in decommissioning state") + logDebug("Block manager already in decommissioning state") } } @@ -1795,11 +1795,10 @@ private[spark] class BlockManager( if (replicateBlocksInfo.nonEmpty) { logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " + - s"for block manager decommissioning") + "for block manager decommissioning") } // Maximum number of storage replication failure which replicateBlock can handle - // before giving up for one block val maxReplicationFailures = conf.get( config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) @@ -1822,7 +1821,7 @@ private[spark] class BlockManager( replicatedSuccessfully } if (blocksFailedReplication.nonEmpty) { - logWarning(s"Blocks failed replication in cache decommissioning " + + logWarning("Blocks failed replication in cache decommissioning " + s"process: ${blocksFailedReplication.mkString(",")}") } } @@ -1899,20 +1898,21 @@ private[spark] class BlockManager( * Class to handle block manager decommissioning retries * It creates a Thread to retry offloading all RDD cache blocks */ - private class BlockManagerDecommissionManager { + private class BlockManagerDecommissionManager(conf: SparkConf) { @volatile private var stopped = false - private val cacheReplicationThread = new Thread { + private val blockReplicationThread = new Thread { override def run(): Unit = { while (blockManagerDecommissioning && !stopped) { try { - logDebug(s"Attempting to replicate all cached RDD blocks") + logDebug("Attempting to replicate all cached RDD blocks") offloadRddCacheBlocks() - logInfo(s"Attempt to replicate all cached blocks done") - val sleepInterval = if (Utils.isTesting) 100 else 30000 + logInfo("Attempt to replicate all cached blocks done") + val sleepInterval = conf.get( + config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) Thread.sleep(sleepInterval) } catch { case _: InterruptedException => - // no-op + // no-op case NonFatal(e) => logError("Error occurred while trying to " + "replicate cached RDD blocks for block manager decommissioning", e) @@ -1920,19 +1920,20 @@ private[spark] class BlockManager( } } } - cacheReplicationThread.setDaemon(true) - cacheReplicationThread.setName("cache-replication-thread") + blockReplicationThread.setDaemon(true) + blockReplicationThread.setName("block-replication-thread") def start(): Unit = { - cacheReplicationThread.start() + logInfo("Starting block replication thread ") + blockReplicationThread.start() } def stop(): Unit = { if (!stopped) { stopped = true - logInfo("Stopping cache replication thread") - cacheReplicationThread.interrupt() - cacheReplicationThread.join() + logInfo("Stopping block replication thread") + blockReplicationThread.interrupt() + blockReplicationThread.join() } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 38c8d6fa3d5fd..d936420a99276 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -157,8 +157,7 @@ class BlockManagerMasterEndpoint( context.reply(true) case DecommissionBlockManagers(executorIds) => - val bmIds = executorIds.flatMap(blockManagerIdByExecutor.get) - decommissionBlockManagers(bmIds) + decommissionBlockManagers(executorIds.flatMap(blockManagerIdByExecutor.get)) context.reply(true) case GetReplicateInfoForRDDBlocks(blockManagerId) => diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index eeffcc1d9bbe0..bed4453ff1880 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -33,6 +33,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext override def beforeEach(): Unit = { val conf = new SparkConf().setAppName("test").setMaster("local") .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 100L) .set(config.STORAGE_DECOMMISSION_ENABLED, true) sc = new SparkContext("local-cluster[3, 1, 1024]", "test", conf) From 9c6bdb626d228eb2ebf0d0c82e0a0aa8d0e9ec21 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Wed, 8 Apr 2020 19:42:50 +0530 Subject: [PATCH 09/17] remove extra space --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9bf13b487ed9f..b224983c7abd2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -446,7 +446,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) } catch { case e: Exception => - logError(s"Unexpected error during block manager " + + logError("Unexpected error during block manager " + s"decommissioning for executor $executorId: ${e.toString}", e) } logInfo(s"Finished decommissioning block manager corresponding to $executorId.") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 81eb00e2edad5..3a47a667ef050 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1924,7 +1924,7 @@ private[spark] class BlockManager( blockReplicationThread.setName("block-replication-thread") def start(): Unit = { - logInfo("Starting block replication thread ") + logInfo("Starting block replication thread") blockReplicationThread.start() } From bb324f946019b8d700c517cc5eb2f7c11dc70cfc Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Thu, 23 Apr 2020 19:52:33 +0530 Subject: [PATCH 10/17] parallely replicate blocks --- .../org/apache/spark/internal/config/package.scala | 5 +++-- .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../org/apache/spark/storage/BlockManager.scala | 13 +++++++------ .../storage/BlockManagerDecommissionSuite.scala | 11 ----------- .../apache/spark/storage/BlockManagerSuite.scala | 8 ++++---- 5 files changed, 15 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 94fcfa1111dc4..2a80e422cb161 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -423,8 +423,9 @@ package object config { private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") .internal() - .doc("Maximum number of failures to tolerate for offloading " + - "one block in single decommission cache blocks iteration") + .doc("Maximum number of failures which can be handled for the replication of " + + "one RDD block when block manager is decommissioning and trying to move its " + + "existing blocks.") .version("3.1.0") .intConf .createWithDefault(3) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b224983c7abd2..69f30f2d21773 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -449,7 +449,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logError("Unexpected error during block manager " + s"decommissioning for executor $executorId: ${e.toString}", e) } - logInfo(s"Finished decommissioning block manager corresponding to $executorId.") + logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") } } else { logInfo(s"Skipping decommissioning of executor $executorId.") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 3a47a667ef050..aa15d1253b3f7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1790,7 +1790,7 @@ private[spark] class BlockManager( * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers * Visible for testing */ - def offloadRddCacheBlocks(): Unit = { + def decommissionRddCacheBlocks(): Unit = { val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId) if (replicateBlocksInfo.nonEmpty) { @@ -1803,8 +1803,9 @@ private[spark] class BlockManager( config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) // TODO: We can sort these blocks based on some policy (LRU/blockSize etc) - // so that we end up prioritize them over each other - val blocksFailedReplication = replicateBlocksInfo.filterNot { + // so that we end up prioritize them over each other + val blocksFailedReplication = ThreadUtils.parmap( + replicateBlocksInfo, "decommissionRddCacheBlocks", 4) { case ReplicateBlock(blockId, existingReplicas, maxReplicas) => val replicatedSuccessfully = replicateBlock( blockId, @@ -1818,8 +1819,8 @@ private[spark] class BlockManager( } else { logWarning(s"Failed to offload block $blockId") } - replicatedSuccessfully - } + (blockId, replicatedSuccessfully) + }.filterNot(_._2).map(_._1) if (blocksFailedReplication.nonEmpty) { logWarning("Blocks failed replication in cache decommissioning " + s"process: ${blocksFailedReplication.mkString(",")}") @@ -1905,7 +1906,7 @@ private[spark] class BlockManager( while (blockManagerDecommissioning && !stopped) { try { logDebug("Attempting to replicate all cached RDD blocks") - offloadRddCacheBlocks() + decommissionRddCacheBlocks() logInfo("Attempt to replicate all cached blocks done") val sleepInterval = conf.get( config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index bed4453ff1880..59fb056dae628 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -100,16 +100,5 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // should have same value like before assert(sleepyRdd.count() === 10) assert(accum.value === 10) - - // all cache block should have been shifted from decommissioned block manager - // after some time - Thread.sleep(1000) - val storageStatus = sc.env.blockManager.master.getStorageStatus - val execIdToBlocksMapping = storageStatus.map( - status => (status.blockManagerId.executorId, status.blocks)).toMap - // No cached blocks should be present on executor which was decommissioned - assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq()) - // There should still be all 10 RDD blocks cached - assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === 10) } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 48b8c1a584323..eb875dcc44223 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1726,7 +1726,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec2)) } - test("test replicateRddCacheBlocks should offload all cached blocks") { + test("test decommissionRddCacheBlocks should offload all cached blocks") { val store1 = makeBlockManager(2000, "exec1") val store2 = makeBlockManager(2000, "exec2") val store3 = makeBlockManager(2000, "exec3") @@ -1737,13 +1737,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations(blockId).size === 2) assert(master.getLocations(blockId).contains(store1.blockManagerId)) - store1.offloadRddCacheBlocks() + store1.decommissionRddCacheBlocks() assert(master.getLocations(blockId).size === 2) assert(master.getLocations(blockId).toSet === Set(store2.blockManagerId, store3.blockManagerId)) } - test("test replicateRddCacheBlocks should keep the block if it is not able to offload") { + test("test decommissionRddCacheBlocks should keep the block if it is not able to offload") { val store1 = makeBlockManager(12000, "exec1") val store2 = makeBlockManager(2000, "exec2") @@ -1757,7 +1757,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId)) assert(master.getLocations(blockIdSmall) === Seq(store1.blockManagerId)) - store1.offloadRddCacheBlocks() + store1.decommissionRddCacheBlocks() // Smaller block offloaded to store2 assert(master.getLocations(blockIdSmall) === Seq(store2.blockManagerId)) // Larger block still present in store1 as it can't be offloaded From a2a81f66d2e419a08682dfe7afe5b43a19483e68 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Thu, 30 Apr 2020 21:08:42 +0530 Subject: [PATCH 11/17] commenting code to debug test failure --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index aa15d1253b3f7..388cb3fd23c5f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1940,7 +1940,7 @@ private[spark] class BlockManager( } def stop(): Unit = { - decommissionManager.foreach(_.stop()) + // decommissionManager.foreach(_.stop()) blockTransferService.close() if (blockStoreClient ne blockTransferService) { // Closing should be idempotent, but maybe not for the NioBlockTransferService. From 6ab11e3b49b49c25f1fdb1c6f3d2af1455e4b8fb Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Sat, 2 May 2020 15:10:27 +0530 Subject: [PATCH 12/17] more changes done to debug test failures --- .../scala/org/apache/spark/storage/BlockManager.scala | 10 +++++----- .../spark/storage/BlockManagerDecommissionSuite.scala | 1 - 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 388cb3fd23c5f..df9c49a57fca3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1804,8 +1804,7 @@ private[spark] class BlockManager( // TODO: We can sort these blocks based on some policy (LRU/blockSize etc) // so that we end up prioritize them over each other - val blocksFailedReplication = ThreadUtils.parmap( - replicateBlocksInfo, "decommissionRddCacheBlocks", 4) { + val blocksFailedReplication = replicateBlocksInfo.map { case ReplicateBlock(blockId, existingReplicas, maxReplicas) => val replicatedSuccessfully = replicateBlock( blockId, @@ -1901,6 +1900,9 @@ private[spark] class BlockManager( */ private class BlockManagerDecommissionManager(conf: SparkConf) { @volatile private var stopped = false + private val sleepInterval = conf.get( + config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) + private val blockReplicationThread = new Thread { override def run(): Unit = { while (blockManagerDecommissioning && !stopped) { @@ -1908,8 +1910,6 @@ private[spark] class BlockManager( logDebug("Attempting to replicate all cached RDD blocks") decommissionRddCacheBlocks() logInfo("Attempt to replicate all cached blocks done") - val sleepInterval = conf.get( - config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) Thread.sleep(sleepInterval) } catch { case _: InterruptedException => @@ -1940,7 +1940,7 @@ private[spark] class BlockManager( } def stop(): Unit = { - // decommissionManager.foreach(_.stop()) + decommissionManager.foreach(_.stop()) blockTransferService.close() if (blockStoreClient ne blockTransferService) { // Closing should be idempotent, but maybe not for the NioBlockTransferService. diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 59fb056dae628..b7ddbe0020499 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -33,7 +33,6 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext override def beforeEach(): Unit = { val conf = new SparkConf().setAppName("test").setMaster("local") .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) - .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 100L) .set(config.STORAGE_DECOMMISSION_ENABLED, true) sc = new SparkContext("local-cluster[3, 1, 1024]", "test", conf) From c645582e2df06fe4736dc3b1673b377d4baf96f0 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Sun, 3 May 2020 21:48:01 +0530 Subject: [PATCH 13/17] test debug: use two execs instead of 3 --- .../scala/org/apache/spark/storage/BlockManager.scala | 4 ++-- .../spark/storage/BlockManagerDecommissionSuite.scala | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index df9c49a57fca3..78cf953355b43 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1913,7 +1913,8 @@ private[spark] class BlockManager( Thread.sleep(sleepInterval) } catch { case _: InterruptedException => - // no-op + logInfo("Interrupted during migration, will not refresh migrations.") + stopped = true case NonFatal(e) => logError("Error occurred while trying to " + "replicate cached RDD blocks for block manager decommissioning", e) @@ -1934,7 +1935,6 @@ private[spark] class BlockManager( stopped = true logInfo("Stopping block replication thread") blockReplicationThread.interrupt() - blockReplicationThread.join() } } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index b7ddbe0020499..95b15fd348794 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -35,7 +35,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) .set(config.STORAGE_DECOMMISSION_ENABLED, true) - sc = new SparkContext("local-cluster[3, 1, 1024]", "test", conf) + sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) } test(s"verify that an already running task which is going to cache data succeeds " + @@ -76,15 +76,17 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // Wait for the job to have started sem.acquire(1) + // Give Spark a tiny bit to start the tasks after the listener says hello + Thread.sleep(100) // Decommission one of the executor val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() - assert(execs.size == 3, s"Expected 3 executors but found ${execs.size}") + assert(execs.size == 2, s"Expected 2 executors but found ${execs.size}") val execToDecommission = execs.head sched.decommissionExecutor(execToDecommission) // Wait for job to finish - val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 3.seconds) + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 6.seconds) assert(asyncCountResult === 10) // All 10 tasks finished, so accum should have been increased 10 times assert(accum.value === 10) From 37ad18953dbefa808af94f024e33303e50ce0b92 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Thu, 14 May 2020 13:36:59 +0530 Subject: [PATCH 14/17] cherry-pick fixes from https://github.com/holdenk/spark/tree/SPARK-20732-rddcache-1-merge --- .../apache/spark/storage/BlockManager.scala | 7 ++++++ .../BlockManagerDecommissionSuite.scala | 7 +++--- .../spark/storage/BlockManagerSuite.scala | 24 +++++++++---------- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 78cf953355b43..58b7acfd95831 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1653,6 +1653,10 @@ private[spark] class BlockManager( peersForReplication = peersForReplication.tail peersReplicatedTo += peer } catch { + // Rethrow interrupt exception + case e: InterruptedException => + throw e + // Everything else we may retry case NonFatal(e) => logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e) peersFailedToReplicateTo += peer @@ -1796,6 +1800,9 @@ private[spark] class BlockManager( if (replicateBlocksInfo.nonEmpty) { logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " + "for block manager decommissioning") + } else { + logWarning(s"Asked to decommission RDD cache blocks, but no blocks to migrate") + return } // Maximum number of storage replication failure which replicateBlock can handle diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 95b15fd348794..7456ca7f02a2e 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -26,12 +26,13 @@ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSui import org.apache.spark.internal.config import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart} import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{ResetSystemProperties, ThreadUtils} -class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext { +class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext + with ResetSystemProperties { override def beforeEach(): Unit = { - val conf = new SparkConf().setAppName("test").setMaster("local") + val conf = new SparkConf().setAppName("test") .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) .set(config.STORAGE_DECOMMISSION_ENABLED, true) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index eb875dcc44223..bfef8f1ab29d8 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1710,13 +1710,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val exec1 = "exec1" val exec2 = "exec2" val exec3 = "exec3" - val store1 = makeBlockManager(2000, exec1) - val store2 = makeBlockManager(2000, exec2) - val store3 = makeBlockManager(2000, exec3) + val store1 = makeBlockManager(800, exec1) + val store2 = makeBlockManager(800, exec2) + val store3 = makeBlockManager(800, exec3) assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec1, exec2)) - val data = new Array[Byte](400) + val data = new Array[Byte](4) val blockId = rdd(0, 0) store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2) assert(master.getLocations(blockId).size === 2) @@ -1727,11 +1727,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("test decommissionRddCacheBlocks should offload all cached blocks") { - val store1 = makeBlockManager(2000, "exec1") - val store2 = makeBlockManager(2000, "exec2") - val store3 = makeBlockManager(2000, "exec3") + val store1 = makeBlockManager(800, "exec1") + val store2 = makeBlockManager(800, "exec2") + val store3 = makeBlockManager(800, "exec3") - val data = new Array[Byte](400) + val data = new Array[Byte](4) val blockId = rdd(0, 0) store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2) assert(master.getLocations(blockId).size === 2) @@ -1744,12 +1744,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("test decommissionRddCacheBlocks should keep the block if it is not able to offload") { - val store1 = makeBlockManager(12000, "exec1") - val store2 = makeBlockManager(2000, "exec2") + val store1 = makeBlockManager(3500, "exec1") + val store2 = makeBlockManager(1000, "exec2") - val dataLarge = new Array[Byte](5000) + val dataLarge = new Array[Byte](1500) val blockIdLarge = rdd(0, 0) - val dataSmall = new Array[Byte](500) + val dataSmall = new Array[Byte](1) val blockIdSmall = rdd(0, 1) store1.putSingle(blockIdLarge, dataLarge, StorageLevel.MEMORY_ONLY) From b36592191b32e347ae2bc3ffd9a3887fc2a3891d Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Thu, 14 May 2020 22:57:54 +0530 Subject: [PATCH 15/17] empty commit to trigger test From 75d6daacb542575b63da4e19e375ca621ef5f36c Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Thu, 14 May 2020 23:52:36 +0530 Subject: [PATCH 16/17] added Thread.interrupted check and max failure limit --- .../scala/org/apache/spark/storage/BlockManager.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 58b7acfd95831..e0478ad09601d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1912,7 +1912,11 @@ private[spark] class BlockManager( private val blockReplicationThread = new Thread { override def run(): Unit = { - while (blockManagerDecommissioning && !stopped) { + var failures = 0 + while (blockManagerDecommissioning + && !stopped + && !Thread.interrupted() + && failures < 20) { try { logDebug("Attempting to replicate all cached RDD blocks") decommissionRddCacheBlocks() @@ -1923,8 +1927,9 @@ private[spark] class BlockManager( logInfo("Interrupted during migration, will not refresh migrations.") stopped = true case NonFatal(e) => - logError("Error occurred while trying to " + - "replicate cached RDD blocks for block manager decommissioning", e) + failures += 1 + logError("Error occurred while trying to replicate cached RDD blocks" + + s" for block manager decommissioning (failure count: $failures)", e) } } } From c34305662cd91d05b5160c7de72e35e4108b6755 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Fri, 15 May 2020 00:37:24 +0530 Subject: [PATCH 17/17] Add set -ex in dev/test-dependencies.sh for debugging --- dev/test-dependencies.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh index 936ac00f6b9e7..e7efba59db927 100755 --- a/dev/test-dependencies.sh +++ b/dev/test-dependencies.sh @@ -17,7 +17,7 @@ # limitations under the License. # -set -e +set -ex FWDIR="$(cd "`dirname $0`"/..; pwd)" cd "$FWDIR"