1818package  org .apache .spark .storage 
1919
2020import  java .util .concurrent .Semaphore 
21+ import  java .util .concurrent .atomic .AtomicReference 
2122
2223import  scala .collection .mutable .ArrayBuffer 
2324import  scala .concurrent .duration ._ 
@@ -57,6 +58,11 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
5758      .set(config.STORAGE_DECOMMISSION_ENABLED , true )
5859      .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED , persist)
5960      .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED , shuffle)
61+       //  Force exactly one executor per worker such that all block managers
62+       //  get the shuffle and RDD blocks.
63+       .set(config.EXECUTOR_CORES .key, " 1" 
64+       .set(config.CPUS_PER_TASK .key, " 1" 
65+       .set(config.EXECUTOR_MEMORY .key, " 1024m" 
6066      //  Just replicate blocks as fast as we can during testing, there isn't another
6167      //  workload we need to worry about.
6268      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL , 1L )
@@ -92,6 +98,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
9298    val  executorRemovedSem  =  new  Semaphore (0 )
9399    val  taskEndEvents  =  ArrayBuffer .empty[SparkListenerTaskEnd ]
94100    val  blocksUpdated  =  ArrayBuffer .empty[SparkListenerBlockUpdated ]
101+     val  sched  =  sc.schedulerBackend.asInstanceOf [StandaloneSchedulerBackend ]
102+     val  execToDecommission  =  new  AtomicReference [String ](null )
95103    sc.addSparkListener(new  SparkListener  {
96104
97105      override  def  onExecutorRemoved (execRemoved : SparkListenerExecutorRemoved ):  Unit  =  {
@@ -107,6 +115,21 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
107115      }
108116
109117      override  def  onBlockUpdated (blockUpdated : SparkListenerBlockUpdated ):  Unit  =  {
118+         if  (blockUpdated.blockUpdatedInfo.blockId.isRDD &&  persist) {
119+           //  Persisted RDD blocks are a bit weirder than shuffle blocks: Even though
120+           //  the tasks are run say on executors (0, 1, 2), the RDD blocks might end up only
121+           //  on executors 0 and 1. So we cannot just indiscriminately decommission any executor.
122+           //  Instead we must decommission an executor that actually has an RDD block.
123+           //  Fortunately, this isn't the case for shuffle blocks which are indeed present on all
124+           //  executors and thus any executor can be decommissioned when `persist` is false.
125+           val  candidateExecToDecom  =  blockUpdated.blockUpdatedInfo.blockManagerId.executorId
126+           if  (execToDecommission.compareAndSet(null , candidateExecToDecom)) {
127+             val  decomContext  =  s " Decommissioning executor  ${candidateExecToDecom} for persist " 
128+             logInfo(decomContext)
129+             sched.decommissionExecutor(candidateExecToDecom,
130+               ExecutorDecommissionInfo (decomContext, false ))
131+           }
132+         }
110133        //  Once broadcast start landing on the executors we're good to proceed.
111134        //  We don't only use task start as it can occur before the work is on the executor.
112135        if  (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
@@ -139,14 +162,17 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
139162      ThreadUtils .awaitResult(asyncCount, 15 .seconds)
140163    }
141164
142-     //  Decommission one of the executors.
143-     val  sched  =  sc.schedulerBackend.asInstanceOf [StandaloneSchedulerBackend ]
144165    val  execs  =  sched.getExecutorIds()
145166    assert(execs.size ==  numExecs, s " Expected  ${numExecs} executors but found  ${execs.size}" )
146167
147-     val  execToDecommission  =  execs.head
148-     logDebug(s " Decommissioning executor  ${execToDecommission}" )
149-     sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo (" " false ))
168+     if  (! persist &&  execToDecommission.compareAndSet(null , execs.head)) {
169+       //  But for non persisted blocks, we can decommission any executor since the shuffle blocks
170+       //  are indeed present on any executor.
171+       val  decomContext  =  s " Decommissioning executor  ${execToDecommission.get()}" 
172+       logInfo(decomContext)
173+       sched.decommissionExecutor(execToDecommission.get(),
174+         ExecutorDecommissionInfo (decomContext, false ))
175+     }
150176
151177    //  Wait for job to finish.
152178    val  asyncCountResult  =  ThreadUtils .awaitResult(asyncCount, 15 .seconds)
@@ -206,15 +232,15 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
206232    val  execIdToBlocksMapping  =  storageStatus.map(
207233      status =>  (status.blockManagerId.executorId, status.blocks)).toMap
208234    //  No cached blocks should be present on executor which was decommissioned
209-     assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq ===  Seq (),
235+     assert(execIdToBlocksMapping(execToDecommission.get() ).keys.filter(_.isRDD).toSeq ===  Seq (),
210236      " Cache blocks should be migrated" 
211237    if  (persist) {
212238      //  There should still be all the RDD blocks cached
213239      assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) ===  numParts)
214240    }
215241
216242    //  Make the executor we decommissioned exit
217-     sched.client.killExecutors(List (execToDecommission))
243+     sched.client.killExecutors(List (execToDecommission.get() ))
218244
219245    //  Wait for the executor to be removed
220246    executorRemovedSem.acquire(1 )
0 commit comments