Skip to content

Conversation

@gjoseph92
Copy link
Collaborator

@gjoseph92 gjoseph92 commented Nov 9, 2022

This consolidates the logic used in three different cases into one objective function. In decide_worker_rootish_queuing_disabled, decide_worker_rootish_queuing_enabled, and the no-dependencies case of decide_worker_non_rootish, we're selecting the best worker just based on how busy it is, without regard to where dependencies are.

These three cases had subtly different criteria for picking the best worker. By switching to use the same objective function everywhere, we smooth over the edge cases caused by the changed definition of idle and #7274.

The one downside is using occupancy as the objective. It can be quite expensive: #7256.

Supersedes #7248

Closes #7197, closes #7248

  • Tests added / passed
  • Passes pre-commit run --all-files

@gjoseph92 gjoseph92 mentioned this pull request Nov 9, 2022
2 tasks
@github-actions
Copy link
Contributor

github-actions bot commented Nov 9, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  +       1         15 suites  +1   6h 52m 56s ⏱️ + 1h 10m 8s
  3 223 tests +       9    3 138 ✔️ +     20    85 💤  - 11  0 ±0 
23 833 runs  +1 322  22 916 ✔️ +1 366  917 💤  - 44  0 ±0 

Results for commit e4f62f9. ± Comparison against base commit 27a91dd.

♻️ This comment has been updated with latest results.

@gjoseph92
Copy link
Collaborator Author

distributed/tests/test_client.py::test_wait_first_completed also passes here with worker-saturation: 1.1, confirming this is a fix for #7197 and a path to un-block #7213.

@gjoseph92
Copy link
Collaborator Author

distributed/tests/test_steal.py::test_stop_in_flight failed once on macOS, which is the only test I'd be concerned about. However, I couldn't reproduce the failure locally with 10,000 repeats. All task submission in that test uses worker restrictions, so it doesn't use code paths affected in this PR.

@gjoseph92
Copy link
Collaborator Author

Tests for @crusaderky's "select worker with the least memory in an idle cluster" concern (copied from #7248) also pass with this PR on inf, 1.0, and 1.1. So we'd be improving behavior even when queuing is off.

# Last-used worker is full, unknown, retiring, or paused;
# pick a new worker for the next few tasks
ws = min(pool, key=partial(self.worker_objective, ts))
ws = min(pool, key=self.worker_objective_no_deps)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queued tasks would now consider occupancy too (as well as memory). This is fine, since the definition of idle will still prevent oversaturation. If root tasks had different duration estimates, we'd maybe assign to workers a little less evenly: workers with quick tasks would fully fill before workers with slow tasks. But all workers would be fully utilized regardless.

n_workers: int = len(wp_vals)
if n_workers < 20: # smart but linear in small case
ws = min(wp_vals, key=operator.attrgetter("occupancy"))
ws = min(wp_vals, key=self.worker_objective_no_deps)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a small behavioral change: root-ish tasks won't have affinity for workers that hold their dependencies. By definition, root-ish tasks shouldn't consider the location of their dependencies when scheduling, so this is arguably an improvement—it could slightly reduce dogpiling towards an open Zarr dataset, for instance.

for i in range(n_workers):
wp_i = wp_vals[(i + start) % n_workers]
if wp_i.occupancy == 0:
if sum(self.worker_objective_no_deps(wp_i)) == 0:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm keeping this round-robin behavior around for now, but hopefully we'd remove it in a separate PR later.

Minimize worker occupancy. If a tie then break with data storage.
"""
return (ws.occupancy / ws.nthreads, ws.nbytes / (ws.memory_limit or 1))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy with not dividing by memory_limit if @crusaderky would prefer. See my thoughts in #7266 (comment).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I'd rather use the absolute value

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. We can keep it consistent for now and then decide what to do everywhere in #7266.

Copy link
Collaborator

@crusaderky crusaderky Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first half of the tuple is sneakily having the same effect, at least as long as the amount of available RAM is roughly correlated with amount of threads.
e.g. if you have a 4-threads and a 16-thread worker, and both have occupancy=1, the above algorithm will always prefer the larger worker. To me it would make more sense if it was just ws.occupancy, without dividing by number of threads.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dividing by nthreads here is the key change that fixes test_wait_first_completed and #7197. This is also consistent with worker_objective for non-root tasks:

stack_time: float = ws.occupancy / ws.nthreads

If you don't consider nthreads, then when you have two workers with equal occupancy, and one has 1 thread open, and the other has 0 threads open, you might pick the worker with 0 threads open. That's obviously a very bad choice. Indeed, even if the worker with 1 thread open had higher total occupancy—by any factor—you'd still want to pick it.

Or, imagine all workers are saturated (when queuing is disabled, for example). There's a worker with 2 threads, and a worker with 16 threads, both with the same total occupancy. Clearly, if you assign the task to the 2-threaded worker, it will take much longer to be completed. If you treat them the same, then the bigger (more expensive) workers will be under-utilized, and the small workers will be over-utilized, becoming a long tail that requires rebalancing later.


If you don't divide by nthreads, heterogeneous clusters are imbalanced. The small workers get a disproportionate share of the load. I don't think that's reasonable default behavior for Dask. There might be cases where you want that; then you should use resource restrictions.

If I use multiple instance types in AWS to get better spot availability and save money, I would be extremely surprised to see Dask oversaturating my small workers and under-utilizing my big ones. I would think it was a bug.

If I made a mixed CPU/GPU cluster, and saw my GPU workers running more "normal" tasks than I wanted, I would not be surprised. I would think, "that's reasonable, Dask wants to balance the load. It doesn't know that I want to save space on these, so I'll use worker restrictions to tell it". I would not think it was a bug.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The truly buggy case I'm fixing here is when worker-saturation > 1, and you're picking between workers that still have open threads, and some that don't. You should always pick a worker with an open thread over one that's full, no matter the occupancy.

This might fix the worst-case behavior I'm worried about, but still schedule equally (though not equitably) when there's not much work:

(0 if len(ws.processing) < ws.ntheads else 1, ws.occupancy, ...)

gjoseph92 added a commit to gjoseph92/snakebench that referenced this pull request Nov 10, 2022
@gjoseph92 gjoseph92 marked this pull request as ready for review November 10, 2022 16:54
Comment on lines 3240 to 3249
def worker_objective_no_deps(self, ws: WorkerState) -> tuple[float, float]:
"""
Objective function to determine the least-busy worker.
Meant for use with tasks where dependencies are not relevant for scheduling
(no dependencies, or widely-shared).
Minimize worker occupancy. If a tie then break with data storage.
"""
return (ws.occupancy / ws.nthreads, ws.nbytes / (ws.memory_limit or 1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a different objective function? Without dependencies the original reduces to almost the same thing. It prioritites by

(nactors, start_time, nbytes)

i.e.
(nactors, ws.occupancy / ws.nthreads, nbytes)

which is the same thing as you are proposing here. With the exception that you're dividing nbytes by the memory limit.

Looks like we should just use the same objective function. I suggest not dividing for now (and do this / discuss this in another PR)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one doesn't iterate over dependencies. The other does. By the definition of is_rootish, it's possible for there to be dependencies. But the point of root-ish-ness is that we should be ignoring them.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall I do like that we're consolidating the objective function and are using the same measure everywhere.

I do not understand why we need a specialized objective function for tasks without deps

@gjoseph92
Copy link
Collaborator Author

I do not understand why we need a specialized objective function for tasks without deps

See #7280 (comment) and f42a050

I don't know that using this instead of our main worker_objective makes a difference (I doubt it does). I just know that all of the decide_worker_rootish functions are assuming that dependencies don't matter, so I want to be certain there's no weird interaction by using an objective function that does consider dependencies.

The reason for two objective functions is a bit like #7278: worker_objective already is intended for something very specific; this avoids overloading it for a new purpose. We have two separate codepaths for root vs non-root tasks; splitting the objective functions helps keep them separate.

with client.get_executor(retries=5, pure=False) as e:
future = e.submit(varying(args))
assert future.result() == 42
del future
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem redundant to me?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, the future is held on worker, increasing nbytes and changing which worker we pick. (This test is highly dependent on how the worker selection logic works because it runs tasks which change their outputs depending on how many times they've been called on a particular worker; if the order of worker selection changes, the number of retries it takes to get a failing/non-faililng result changes.)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. You're relying on the release message to deterministically reach the scheduler before the next submit message. Got it.

@gjoseph92 gjoseph92 mentioned this pull request Nov 16, 2022
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Round-robin worker selection makes poor choices with worker-saturation > 1.0

3 participants