Skip to content

Commit 558962a

Browse files
WangTaoTheTonicandrewor14
authored andcommitted
[SPARK-3411] Improve load-balancing of concurrently-submitted drivers across workers
If the waiting driver array is too big, the drivers in it will be dispatched to the first worker we get(if it has enough resources), with or without the Randomization. We should do randomization every time we dispatch a driver, in order to better balance drivers. Author: WangTaoTheTonic <[email protected]> Author: WangTao <[email protected]> Closes apache#1106 from WangTaoTheTonic/fixBalanceDrivers and squashes the following commits: d1a928b [WangTaoTheTonic] Minor adjustment b6560cf [WangTaoTheTonic] solve the shuffle problem for HashSet f674e59 [WangTaoTheTonic] add comment and minor fix 2835929 [WangTao] solve the failed test and avoid filtering 2ca3091 [WangTao] fix checkstyle bc91bb1 [WangTao] Avoid shuffle every time we schedule the driver using round robin bbc7087 [WangTaoTheTonic] Optimize the schedule in Master
1 parent e4f4886 commit 558962a

File tree

1 file changed

+15
-3
lines changed
  • core/src/main/scala/org/apache/spark/deploy/master

1 file changed

+15
-3
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -487,13 +487,25 @@ private[spark] class Master(
487487
if (state != RecoveryState.ALIVE) { return }
488488

489489
// First schedule drivers, they take strict precedence over applications
490-
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
491-
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
492-
for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers
490+
// Randomization helps balance drivers
491+
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
492+
val aliveWorkerNum = shuffledAliveWorkers.size
493+
var curPos = 0
494+
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
495+
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
496+
// start from the last worker that was assigned a driver, and continue onwards until we have
497+
// explored all alive workers.
498+
curPos = (curPos + 1) % aliveWorkerNum
499+
val startPos = curPos
500+
var launched = false
501+
while (curPos != startPos && !launched) {
502+
val worker = shuffledAliveWorkers(curPos)
493503
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
494504
launchDriver(worker, driver)
495505
waitingDrivers -= driver
506+
launched = true
496507
}
508+
curPos = (curPos + 1) % aliveWorkerNum
497509
}
498510
}
499511

0 commit comments

Comments
 (0)