diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 261b3329a7b9c..1a6453f779d2a 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -249,7 +249,6 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { - initializing = true numExecutorsTarget = initialNumExecutors numExecutorsToAdd = 1 diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 4ea42fc7d5c22..480df6da525af 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -854,6 +854,17 @@ class ExecutorAllocationManagerSuite assert(maxNumExecutorsNeeded(manager) === 1) } + test("SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode") { + sc = createSparkContext() + val manager = sc.executorAllocationManager.get + assert(initializing(manager)) + val stageInfo = createStageInfo(0, 2) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo)) + assert(!initializing(manager)) + manager.reset() + assert(!initializing(manager)) + } + test("reset the state of allocation manager") { sc = createSparkContext() val manager = sc.executorAllocationManager.get @@ -988,6 +999,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { | Helper methods for accessing private methods and fields | * ------------------------------------------------------- */ + private val _initializing = PrivateMethod[Boolean]('initializing) private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd) private val _numExecutorsTarget = PrivateMethod[Int]('numExecutorsTarget) private val _maxNumExecutorsNeeded = PrivateMethod[Int]('maxNumExecutorsNeeded) @@ -1011,6 +1023,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks) private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount) + private def initializing(manager: ExecutorAllocationManager): Boolean = { + manager invokePrivate _initializing() + } + private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = { manager invokePrivate _numExecutorsToAdd() }