Skip to content

Commit accc8f6

Browse files
author
Andrew Or
committed
Address comments
1 parent ee686a8 commit accc8f6

File tree

2 files changed

+11
-18
lines changed

2 files changed

+11
-18
lines changed

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,6 @@ private[spark] class ApplicationInfo(
101101

102102
private[master] def coresLeft: Int = requestedCores - coresGranted
103103

104-
/**
105-
* Return whether this application should launch at most one executor per worker.
106-
*
107-
* This is true if cores per executor is not defined, in which case the executor should
108-
* grab all the available cores on the worker instead.
109-
*/
110-
private[master] def oneExecutorPerWorker(): Boolean = desc.coresPerExecutor.isEmpty
111-
112104
private var _retryCount = 0
113105

114106
private[master] def retryCount = _retryCount

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,7 @@ private[master] class Master(
572572
spreadOutApps: Boolean): Array[Int] = {
573573
val coresPerExecutor = app.desc.coresPerExecutor
574574
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
575+
val oneExecutorPerWorker = coresPerExecutor.isEmpty
575576
val memoryPerExecutor = app.desc.memoryPerExecutorMB
576577
val numUsable = usableWorkers.length
577578
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
@@ -580,16 +581,16 @@ private[master] class Master(
580581

581582
/** Return whether the specified worker can launch an executor for this app. */
582583
def canLaunchExecutor(pos: Int): Boolean = {
583-
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
584+
// If we allow multiple executors per worker, then we can always launch new executors.
585+
// Otherwise, we may have already started assigning cores to the executor on this worker.
586+
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
584587
val underLimit =
585-
if (app.oneExecutorPerWorker() && assignedExecutors(pos) == 1) {
586-
// We only have one executor per worker and have already started to assign cores to it,
587-
// so assigning more to it does not change the number of executors we'll end up with
588-
true
589-
} else {
590-
// Otherwise, we should launch a new executor only if we do not exceed the limit
588+
if (launchingNewExecutor) {
591589
assignedExecutors.sum + app.executors.size < app.executorLimit
590+
} else {
591+
true
592592
}
593+
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
593594
usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor &&
594595
usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor &&
595596
coresToAssign >= minCoresPerExecutor &&
@@ -608,7 +609,7 @@ private[master] class Master(
608609

609610
// If we are launching one executor per worker, then every iteration assigns 1 core
610611
// to the executor. Otherwise, every iteration assigns cores to a new executor.
611-
if (app.oneExecutorPerWorker()) {
612+
if (oneExecutorPerWorker) {
612613
assignedExecutors(pos) = 1
613614
} else {
614615
assignedExecutors(pos) += 1
@@ -851,8 +852,8 @@ private[master] class Master(
851852
* Handle a kill request from the given application.
852853
*
853854
* This method assumes the executor limit has already been adjusted downwards through
854-
* a separate [[RequestExecutors]] message, such that we do not immediately launch new
855-
* executors immediately after the old ones are removed.
855+
* a separate [[RequestExecutors]] message, such that we do not launch new executors
856+
* immediately after the old ones are removed.
856857
*
857858
* @return whether the application has previously registered with this Master.
858859
*/

0 commit comments

Comments
 (0)