Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,23 +489,24 @@ private[spark] class Master(
// First schedule drivers, they take strict precedence over applications
// Randomization helps balance drivers
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val aliveWorkerNum = shuffledAliveWorkers.size
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0

for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
curPos = (curPos + 1) % aliveWorkerNum
val startPos = curPos
var launched = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the correct thing to do here is keep a counter that keeps track of how many workers we've looked at so far, then the while loop would look like:

var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
  ...
  numWorkersVisited += 1
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then you don't need startPos or stopPos

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's one way but if lots of driver waits, while loop have to start shuffleedWorkers(0) per one driver. I think it's not efficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, what do you mean? That's just another way of writing the same thing, and we still start at shuffledWorkers(curPos) as before. Why is it less efficient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use numWorkersVisited and numWorkersAlive, numWorkersVisited is set to 0 before entering while loop even though still in for loop.
If it's still in for loop and last visited index is N, the index of a worker which is to be visited next should be N+1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand what you mean.

numWorkersVisited is a counter, not an index into shuffledWorkers. The behavior we want here is this: for each driver, we start from the position where the last driver left off, and we want to loop through each worker at most once (i.e. we don't want to exit the loop as soon as we have looked at numWorkersAlive). If the last driver's last visited index is N, then we still start from N + 1 because we keep track of the position curPos. Does that make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I noticed and wrote same code.
(2014/09/18 6:31), andrewor14 wrote:

In core/src/main/scala/org/apache/spark/deploy/master/Master.scala:

 for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
   // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
   // start from the last worker that was assigned a driver, and continue onwards until we have
   // explored all alive workers.
  •  curPos = (curPos + 1) % aliveWorkerNum
    
  •  val startPos = curPos
    

This is what I mean:
andrewor14@apache:master...andrewor14:fix-standalone-cluster
andrewor14/spark@apache:master...andrewor14:fix-standalone-cluster


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/2436/files#r17695766.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had misunderstanding. I thought you mentioned removing stopPos and curPos. Now I understand keeping curPos.

while (curPos != startPos && !launched) {
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % aliveWorkerNum
curPos = (curPos + 1) % numWorkersAlive
}
}

Expand Down