Skip to content

Commit ef062c1

Browse files
carsonwangAndrew Or
authored andcommitted
[SPARK-9731] Standalone scheduling incorrect cores if spark.executor.cores is not set
The issue only happens if `spark.executor.cores` is not set and executor memory is set to a high value. For example, if we have a worker with 4G and 10 cores and we set `spark.executor.memory` to 3G, then only 1 core is assigned to the executor. The correct number should be 10 cores. I've added a unit test to illustrate the issue. Author: Carson Wang <[email protected]> Closes #8017 from carsonwang/SPARK-9731 and squashes the following commits: d09ec48 [Carson Wang] Fix code style 86b651f [Carson Wang] Simplify the code 943cc4c [Carson Wang] fix scheduling correct cores to executors
1 parent c564b27 commit ef062c1

File tree

2 files changed

+29
-12
lines changed

2 files changed

+29
-12
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -581,20 +581,22 @@ private[deploy] class Master(
581581

582582
/** Return whether the specified worker can launch an executor for this app. */
583583
def canLaunchExecutor(pos: Int): Boolean = {
584+
val keepScheduling = coresToAssign >= minCoresPerExecutor
585+
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
586+
584587
// If we allow multiple executors per worker, then we can always launch new executors.
585-
// Otherwise, we may have already started assigning cores to the executor on this worker.
588+
// Otherwise, if there is already an executor on this worker, just give it more cores.
586589
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
587-
val underLimit =
588-
if (launchingNewExecutor) {
589-
assignedExecutors.sum + app.executors.size < app.executorLimit
590-
} else {
591-
true
592-
}
593-
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
594-
usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor &&
595-
usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor &&
596-
coresToAssign >= minCoresPerExecutor &&
597-
underLimit
590+
if (launchingNewExecutor) {
591+
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
592+
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
593+
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
594+
keepScheduling && enoughCores && enoughMemory && underLimit
595+
} else {
596+
// We're adding cores to an existing executor, so no need
597+
// to check memory and executor limits
598+
keepScheduling && enoughCores
599+
}
598600
}
599601

600602
// Keep launching executors until no more workers can accommodate any

core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,14 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
151151
basicScheduling(spreadOut = false)
152152
}
153153

154+
test("basic scheduling with more memory - spread out") {
155+
basicSchedulingWithMoreMemory(spreadOut = true)
156+
}
157+
158+
test("basic scheduling with more memory - no spread out") {
159+
basicSchedulingWithMoreMemory(spreadOut = false)
160+
}
161+
154162
test("scheduling with max cores - spread out") {
155163
schedulingWithMaxCores(spreadOut = true)
156164
}
@@ -214,6 +222,13 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
214222
assert(scheduledCores === Array(10, 10, 10))
215223
}
216224

225+
private def basicSchedulingWithMoreMemory(spreadOut: Boolean): Unit = {
226+
val master = makeMaster()
227+
val appInfo = makeAppInfo(3072)
228+
val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
229+
assert(scheduledCores === Array(10, 10, 10))
230+
}
231+
217232
private def schedulingWithMaxCores(spreadOut: Boolean): Unit = {
218233
val master = makeMaster()
219234
val appInfo1 = makeAppInfo(1024, maxCores = Some(8))

0 commit comments

Comments
 (0)