-
-
Notifications
You must be signed in to change notification settings - Fork 746
Consistent worker selection for no-deps cases #7280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files + 1 15 suites +1 6h 52m 56s ⏱️ + 1h 10m 8s Results for commit e4f62f9. ± Comparison against base commit 27a91dd. ♻️ This comment has been updated with latest results. |
|
|
|
Tests for @crusaderky's "select worker with the least memory in an idle cluster" concern (copied from #7248) also pass with this PR on inf, 1.0, and 1.1. So we'd be improving behavior even when queuing is off. |
distributed/scheduler.py
Outdated
| # Last-used worker is full, unknown, retiring, or paused; | ||
| # pick a new worker for the next few tasks | ||
| ws = min(pool, key=partial(self.worker_objective, ts)) | ||
| ws = min(pool, key=self.worker_objective_no_deps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Queued tasks would now consider occupancy too (as well as memory). This is fine, since the definition of idle will still prevent oversaturation. If root tasks had different duration estimates, we'd maybe assign to workers a little less evenly: workers with quick tasks would fully fill before workers with slow tasks. But all workers would be fully utilized regardless.
distributed/scheduler.py
Outdated
| n_workers: int = len(wp_vals) | ||
| if n_workers < 20: # smart but linear in small case | ||
| ws = min(wp_vals, key=operator.attrgetter("occupancy")) | ||
| ws = min(wp_vals, key=self.worker_objective_no_deps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a small behavioral change: root-ish tasks won't have affinity for workers that hold their dependencies. By definition, root-ish tasks shouldn't consider the location of their dependencies when scheduling, so this is arguably an improvement—it could slightly reduce dogpiling towards an open Zarr dataset, for instance.
distributed/scheduler.py
Outdated
| for i in range(n_workers): | ||
| wp_i = wp_vals[(i + start) % n_workers] | ||
| if wp_i.occupancy == 0: | ||
| if sum(self.worker_objective_no_deps(wp_i)) == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm keeping this round-robin behavior around for now, but hopefully we'd remove it in a separate PR later.
distributed/scheduler.py
Outdated
| Minimize worker occupancy. If a tie then break with data storage. | ||
| """ | ||
| return (ws.occupancy / ws.nthreads, ws.nbytes / (ws.memory_limit or 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy with not dividing by memory_limit if @crusaderky would prefer. See my thoughts in #7266 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I'd rather use the absolute value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. We can keep it consistent for now and then decide what to do everywhere in #7266.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first half of the tuple is sneakily having the same effect, at least as long as the amount of available RAM is roughly correlated with amount of threads.
e.g. if you have a 4-threads and a 16-thread worker, and both have occupancy=1, the above algorithm will always prefer the larger worker. To me it would make more sense if it was just ws.occupancy, without dividing by number of threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dividing by nthreads here is the key change that fixes test_wait_first_completed and #7197. This is also consistent with worker_objective for non-root tasks:
distributed/distributed/scheduler.py
Line 3236 in 176a84c
| stack_time: float = ws.occupancy / ws.nthreads |
If you don't consider nthreads, then when you have two workers with equal occupancy, and one has 1 thread open, and the other has 0 threads open, you might pick the worker with 0 threads open. That's obviously a very bad choice. Indeed, even if the worker with 1 thread open had higher total occupancy—by any factor—you'd still want to pick it.
Or, imagine all workers are saturated (when queuing is disabled, for example). There's a worker with 2 threads, and a worker with 16 threads, both with the same total occupancy. Clearly, if you assign the task to the 2-threaded worker, it will take much longer to be completed. If you treat them the same, then the bigger (more expensive) workers will be under-utilized, and the small workers will be over-utilized, becoming a long tail that requires rebalancing later.
If you don't divide by nthreads, heterogeneous clusters are imbalanced. The small workers get a disproportionate share of the load. I don't think that's reasonable default behavior for Dask. There might be cases where you want that; then you should use resource restrictions.
If I use multiple instance types in AWS to get better spot availability and save money, I would be extremely surprised to see Dask oversaturating my small workers and under-utilizing my big ones. I would think it was a bug.
If I made a mixed CPU/GPU cluster, and saw my GPU workers running more "normal" tasks than I wanted, I would not be surprised. I would think, "that's reasonable, Dask wants to balance the load. It doesn't know that I want to save space on these, so I'll use worker restrictions to tell it". I would not think it was a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The truly buggy case I'm fixing here is when worker-saturation > 1, and you're picking between workers that still have open threads, and some that don't. You should always pick a worker with an open thread over one that's full, no matter the occupancy.
This might fix the worst-case behavior I'm worried about, but still schedule equally (though not equitably) when there's not much work:
(0 if len(ws.processing) < ws.ntheads else 1, ws.occupancy, ...)
distributed/scheduler.py
Outdated
| def worker_objective_no_deps(self, ws: WorkerState) -> tuple[float, float]: | ||
| """ | ||
| Objective function to determine the least-busy worker. | ||
| Meant for use with tasks where dependencies are not relevant for scheduling | ||
| (no dependencies, or widely-shared). | ||
| Minimize worker occupancy. If a tie then break with data storage. | ||
| """ | ||
| return (ws.occupancy / ws.nthreads, ws.nbytes / (ws.memory_limit or 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a different objective function? Without dependencies the original reduces to almost the same thing. It prioritites by
(nactors, start_time, nbytes)
i.e.
(nactors, ws.occupancy / ws.nthreads, nbytes)
which is the same thing as you are proposing here. With the exception that you're dividing nbytes by the memory limit.
Looks like we should just use the same objective function. I suggest not dividing for now (and do this / discuss this in another PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one doesn't iterate over dependencies. The other does. By the definition of is_rootish, it's possible for there to be dependencies. But the point of root-ish-ness is that we should be ignoring them.
fjetter
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall I do like that we're consolidating the objective function and are using the same measure everywhere.
I do not understand why we need a specialized objective function for tasks without deps
See #7280 (comment) and f42a050 I don't know that using this instead of our main The reason for two objective functions is a bit like #7278: |
| with client.get_executor(retries=5, pure=False) as e: | ||
| future = e.submit(varying(args)) | ||
| assert future.result() == 42 | ||
| del future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These seem redundant to me?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this, the future is held on worker, increasing nbytes and changing which worker we pick. (This test is highly dependent on how the worker selection logic works because it runs tasks which change their outputs depending on how many times they've been called on a particular worker; if the order of worker selection changes, the number of retries it takes to get a failing/non-faililng result changes.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. You're relying on the release message to deterministically reach the scheduler before the next submit message. Got it.
Co-authored-by: crusaderky <[email protected]>
This consolidates the logic used in three different cases into one objective function. In
decide_worker_rootish_queuing_disabled,decide_worker_rootish_queuing_enabled, and the no-dependencies case ofdecide_worker_non_rootish, we're selecting the best worker just based on how busy it is, without regard to where dependencies are.These three cases had subtly different criteria for picking the best worker. By switching to use the same objective function everywhere, we smooth over the edge cases caused by the changed definition of
idleand #7274.The one downside is using
occupancyas the objective. It can be quite expensive: #7256.Supersedes #7248
Closes #7197, closes #7248
pre-commit run --all-files