-
-
Notifications
You must be signed in to change notification settings - Fork 746
Description
test_wait_first_completed is failing in #7191, with the worker-saturation value set to 1.1
distributed/distributed/tests/test_client.py
Lines 732 to 746 in 0983731
| @gen_cluster(client=True) | |
| async def test_wait_first_completed(c, s, a, b): | |
| event = Event() | |
| x = c.submit(block_on_event, event) | |
| y = c.submit(block_on_event, event) | |
| z = c.submit(inc, 2) | |
| done, not_done = await wait([x, y, z], return_when="FIRST_COMPLETED") | |
| assert done == {z} | |
| assert not_done == {x, y} | |
| assert z.status == "finished" | |
| assert x.status == "pending" | |
| assert y.status == "pending" | |
| await event.set() |
It works fine with 1.0, but because of the round-up logic #7116 allowing workers to be oversaturated, fails for 1.1
It blocks forever because the worker with 1 thread gets assigned [block_on_event, inc], and the worker with 2 threads gets assigned [block_on_event]. It should be the other way around.
The culprit has something to do with the round-robin logic that only applies to rare situations like this, where the cluster is small but larger than the TaskGroup being assigned
distributed/distributed/scheduler.py
Lines 2210 to 2236 in 0983731
| # TODO if `is_rootish` would always return True for tasks without dependencies, | |
| # we could remove all this logic. The rootish assignment logic would behave | |
| # more or less the same as this, maybe without gauranteed round-robin though? | |
| # This path is only reachable when `ts` doesn't have dependencies, but its | |
| # group is also smaller than the cluster. | |
| # Fastpath when there are no related tasks or restrictions | |
| worker_pool = self.idle or self.workers | |
| # FIXME idle and workers are SortedDict's declared as dicts | |
| # because sortedcontainers is not annotated | |
| wp_vals = cast("Sequence[WorkerState]", worker_pool.values()) | |
| n_workers: int = len(wp_vals) | |
| if n_workers < 20: # smart but linear in small case | |
| ws = min(wp_vals, key=operator.attrgetter("occupancy")) | |
| assert ws | |
| if ws.occupancy == 0: | |
| # special case to use round-robin; linear search | |
| # for next worker with zero occupancy (or just | |
| # land back where we started). | |
| wp_i: WorkerState | |
| start: int = self.n_tasks % n_workers | |
| i: int | |
| for i in range(n_workers): | |
| wp_i = wp_vals[(i + start) % n_workers] | |
| if wp_i.occupancy == 0: | |
| ws = wp_i | |
| break |
If I update is_rootish like so:
diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index cf240240..802df12d 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -3043,6 +3043,8 @@ class SchedulerState:
"""
if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions:
return False
+ if not ts.dependencies:
+ return True
tg = ts.group
# TODO short-circuit to True if `not ts.dependencies`?
return (the test passes.