From c912a2ae4c907505e30d9a465644ea245116679e Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 24 Sep 2015 15:31:29 -0700 Subject: [PATCH 1/4] Fix initial executor number not set issue and clean the codes --- .../spark/deploy/yarn/ClientArguments.scala | 20 +-------------- .../spark/deploy/yarn/YarnAllocator.scala | 7 +----- .../deploy/yarn/YarnSparkHadoopUtil.scala | 25 ++++++++++++++++++- .../cluster/YarnClusterSchedulerBackend.scala | 18 ++----------- 4 files changed, 28 insertions(+), 42 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 54f62e6b723ac..a20ef00eb8cca 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -81,25 +81,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) .orNull // If dynamic allocation is enabled, start at the configured initial number of executors. // Default to minExecutors if no initialExecutors is set. - if (isDynamicAllocationEnabled) { - val minExecutorsConf = "spark.dynamicAllocation.minExecutors" - val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors" - val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors" - val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0) - val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors) - val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE) - - // If defined, initial executors must be between min and max - if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) { - throw new IllegalArgumentException( - s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!") - } - - numExecutors = initialNumExecutors - } else { - val numExecutorsConf = "spark.executor.instances" - numExecutors = sparkConf.getInt(numExecutorsConf, numExecutors) - } + numExecutors = YarnSparkHadoopUtil.getTargetExecutorNumber(sparkConf) principal = Option(principal) .orElse(sparkConf.getOption("spark.yarn.principal")) .orNull diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index fd88b8b7fe3b9..0050117c60996 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -88,12 +88,7 @@ private[yarn] class YarnAllocator( private var executorIdCounter = 0 @volatile private var numExecutorsFailed = 0 - @volatile private var targetNumExecutors = - if (Utils.isDynamicAllocationEnabled(sparkConf)) { - sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0) - } else { - sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS) - } + @volatile private var targetNumExecutors = YarnSparkHadoopUtil.getTargetExecutorNumber(sparkConf) // Executor loss reason requests that are pending - maps from executor ID for inquiry to a // list of requesters that should be responded to once we find out why the given executor diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 445d3dcd266db..f343a4d9b3edb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -39,7 +39,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.{SecurityManager, SparkConf, SparkException} -import org.apache.spark.util.Utils +import org.apache.spark.util.{IntParam, Utils} /** * Contains util methods to interact with Hadoop from spark. @@ -314,5 +314,28 @@ object YarnSparkHadoopUtil { def getClassPathSeparator(): String = { classPathSeparatorField.get(null).asInstanceOf[String] } + + /** Get the initial target number of executors depends on dynamic allocation is enabled or not */ + def getTargetExecutorNumber(conf: SparkConf): Int = { + if (Utils.isDynamicAllocationEnabled(conf)) { + val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) + val initialNumExecutors = + conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors) + val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue) + require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, + s"initial executor number $initialNumExecutors must between min executor number" + + s"$minNumExecutors and max executor number $maxNumExecutors") + + initialNumExecutors + } else { + var targetNumExecutors = DEFAULT_NUMBER_EXECUTORS + if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { + targetNumExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")) + .getOrElse(targetNumExecutors) + } + // System property can override environment variable. + conf.getInt("spark.executor.instances", targetNumExecutors) + } + } } diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 1aed5a1675075..a34d4fdfd3068 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -17,21 +17,13 @@ package org.apache.spark.scheduler.cluster -import java.net.NetworkInterface - import org.apache.hadoop.yarn.api.ApplicationConstants.Environment - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.yarn.api.records.NodeState -import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.SparkContext import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.{IntParam, Utils} +import org.apache.spark.util.Utils private[spark] class YarnClusterSchedulerBackend( scheduler: TaskSchedulerImpl, @@ -40,13 +32,7 @@ private[spark] class YarnClusterSchedulerBackend( override def start() { super.start() - totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS - if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { - totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")) - .getOrElse(totalExpectedExecutors) - } - // System property can override environment variable. - totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors) + totalExpectedExecutors = YarnSparkHadoopUtil.getTargetExecutorNumber(sc.conf) } override def applicationId(): String = From e6547fb78841bcac055647deee93301ab6b8e744 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 25 Sep 2015 09:58:20 -0700 Subject: [PATCH 2/4] Address the comments --- .../org/apache/spark/deploy/yarn/ClientArguments.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 3 ++- .../apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala | 10 +++++----- .../cluster/YarnClusterSchedulerBackend.scala | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index a20ef00eb8cca..1165061db21e3 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -81,7 +81,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) .orNull // If dynamic allocation is enabled, start at the configured initial number of executors. // Default to minExecutors if no initialExecutors is set. - numExecutors = YarnSparkHadoopUtil.getTargetExecutorNumber(sparkConf) + numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf) principal = Option(principal) .orElse(sparkConf.getOption("spark.yarn.principal")) .orNull diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 0050117c60996..9e1ef1b3b4229 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -88,7 +88,8 @@ private[yarn] class YarnAllocator( private var executorIdCounter = 0 @volatile private var numExecutorsFailed = 0 - @volatile private var targetNumExecutors = YarnSparkHadoopUtil.getTargetExecutorNumber(sparkConf) + @volatile private var targetNumExecutors = + YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf) // Executor loss reason requests that are pending - maps from executor ID for inquiry to a // list of requesters that should be responded to once we find out why the given executor diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index f343a4d9b3edb..8263fcea6c640 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -316,7 +316,7 @@ object YarnSparkHadoopUtil { } /** Get the initial target number of executors depends on dynamic allocation is enabled or not */ - def getTargetExecutorNumber(conf: SparkConf): Int = { + def getInitialTargetExecutorNumber(conf: SparkConf): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) val initialNumExecutors = @@ -328,10 +328,10 @@ object YarnSparkHadoopUtil { initialNumExecutors } else { - var targetNumExecutors = DEFAULT_NUMBER_EXECUTORS - if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { - targetNumExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")) - .getOrElse(targetNumExecutors) + val targetNumExecutors = if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { + IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).get + } else { + DEFAULT_NUMBER_EXECUTORS } // System property can override environment variable. conf.getInt("spark.executor.instances", targetNumExecutors) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index a34d4fdfd3068..50b699f11b21c 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -32,7 +32,7 @@ private[spark] class YarnClusterSchedulerBackend( override def start() { super.start() - totalExpectedExecutors = YarnSparkHadoopUtil.getTargetExecutorNumber(sc.conf) + totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf) } override def applicationId(): String = From d53d5c8071463133c27c23e09a8303b2575d5207 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 25 Sep 2015 11:56:55 -0700 Subject: [PATCH 3/4] Comment and style changes --- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 8263fcea6c640..7d8bbade2fe3e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -315,7 +315,10 @@ object YarnSparkHadoopUtil { classPathSeparatorField.get(null).asInstanceOf[String] } - /** Get the initial target number of executors depends on dynamic allocation is enabled or not */ + /** + * Getting the initial target number of executors depends on whether dynamic allocation is + * enabled. + */ def getInitialTargetExecutorNumber(conf: SparkConf): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) @@ -328,11 +331,8 @@ object YarnSparkHadoopUtil { initialNumExecutors } else { - val targetNumExecutors = if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { - IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).get - } else { - DEFAULT_NUMBER_EXECUTORS - } + val targetNumExecutors = + sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS) // System property can override environment variable. conf.getInt("spark.executor.instances", targetNumExecutors) } From 0e54fb1941f05f919f9223318dc86cb7cb9e276a Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 25 Sep 2015 13:43:59 -0700 Subject: [PATCH 4/4] Remove unused import --- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 7d8bbade2fe3e..f276e7efde9d7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -39,7 +39,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.{SecurityManager, SparkConf, SparkException} -import org.apache.spark.util.{IntParam, Utils} +import org.apache.spark.util.Utils /** * Contains util methods to interact with Hadoop from spark.