From d1fe96a61b0c0631c33d555ad5be790cab8aa61f Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Thu, 30 Mar 2017 22:17:49 +0800 Subject: [PATCH 1/4] SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode. --- .../spark/ExecutorAllocationManager.scala | 4 +++- .../ExecutorAllocationManagerSuite.scala | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 261b3329a7b9c..d7cc8fb144065 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -249,7 +249,9 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { - initializing = true + if (maxNumExecutorsNeeded() == 0) { + initializing = true + } numExecutorsTarget = initialNumExecutors numExecutorsToAdd = 1 diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 4ea42fc7d5c22..7e110965151a4 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -854,6 +854,20 @@ class ExecutorAllocationManagerSuite assert(maxNumExecutorsNeeded(manager) === 1) } + test("SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode") { + sc = createSparkContext() + val manager = sc.executorAllocationManager.get + assert(initializing(manager)) + val stageInfo = createStageInfo(0, 2) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo)) + assert(!initializing(manager)) + manager.reset() + assert(!initializing(manager)) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfo)) + manager.reset() + assert(initializing(manager)) + } + test("reset the state of allocation manager") { sc = createSparkContext() val manager = sc.executorAllocationManager.get @@ -988,6 +1002,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { | Helper methods for accessing private methods and fields | * ------------------------------------------------------- */ + private val _initializing = PrivateMethod[Boolean]('initializing) private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd) private val _numExecutorsTarget = PrivateMethod[Int]('numExecutorsTarget) private val _maxNumExecutorsNeeded = PrivateMethod[Int]('maxNumExecutorsNeeded) @@ -1011,6 +1026,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks) private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount) + private def initializing(manager: ExecutorAllocationManager): Boolean = { + manager invokePrivate _initializing() + } + private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = { manager invokePrivate _numExecutorsToAdd() } From 2cc4dcea51b075a3c7796d7f443f9627ed79035a Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Tue, 4 Apr 2017 12:33:45 +0800 Subject: [PATCH 2/4] review commits --- .../scala/org/apache/spark/ExecutorAllocationManager.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index d7cc8fb144065..d9385e51d261e 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -249,6 +249,11 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { + /** + * When some tasks need to be scheduled, resetting the initializing field may cause + * it to not be set to false in yarn. + * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079 + */ if (maxNumExecutorsNeeded() == 0) { initializing = true } From 013474a8a9ce9038446e6a9a7e458ca8ad8eb93d Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Fri, 7 Apr 2017 18:32:29 +0800 Subject: [PATCH 3/4] review commits --- .../scala/org/apache/spark/ExecutorAllocationManager.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index d9385e51d261e..8ef514c30c0d5 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -250,8 +250,8 @@ private[spark] class ExecutorAllocationManager( */ def reset(): Unit = synchronized { /** - * When some tasks need to be scheduled, resetting the initializing field may cause - * it to not be set to false in yarn. + * When some tasks need to be scheduled and initial executor = 0, resetting the initializing + * field may cause it to not be set to false in yarn. * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079 */ if (maxNumExecutorsNeeded() == 0) { From d3e69cf66d77ba02cfa13e8e27273e59248885f1 Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Sun, 23 Apr 2017 12:56:58 +0800 Subject: [PATCH 4/4] Delete "initializing = true" in ExecutorAllocationManager.reset --- .../org/apache/spark/ExecutorAllocationManager.scala | 8 -------- .../org/apache/spark/ExecutorAllocationManagerSuite.scala | 3 --- 2 files changed, 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 8ef514c30c0d5..1a6453f779d2a 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -249,14 +249,6 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { - /** - * When some tasks need to be scheduled and initial executor = 0, resetting the initializing - * field may cause it to not be set to false in yarn. - * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079 - */ - if (maxNumExecutorsNeeded() == 0) { - initializing = true - } numExecutorsTarget = initialNumExecutors numExecutorsToAdd = 1 diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 7e110965151a4..480df6da525af 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -863,9 +863,6 @@ class ExecutorAllocationManagerSuite assert(!initializing(manager)) manager.reset() assert(!initializing(manager)) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfo)) - manager.reset() - assert(initializing(manager)) } test("reset the state of allocation manager") {