Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,12 @@ def _ipaddr_info(host, port, family, type, proto):


def _run_until_complete_cb(fut):
exc = fut._exception
if isinstance(exc, BaseException) and not isinstance(exc, Exception):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
if not fut.cancelled():
exc = fut.exception()
if isinstance(exc, BaseException) and not isinstance(exc, Exception):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
fut._loop.stop()


Expand Down
103 changes: 61 additions & 42 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,7 @@ def cancel(self):


def gather(*coros_or_futures, loop=None, return_exceptions=False):
"""Return a future aggregating results from the given coroutines
or futures.
"""Return a future aggregating results from the given coroutines/futures.

Coroutines will be wrapped in a future and scheduled in the event
loop. They will not necessarily be scheduled in the same order as
Expand Down Expand Up @@ -605,56 +604,76 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
outer.set_result([])
return outer

arg_to_fut = {}
for arg in set(coros_or_futures):
if not futures.isfuture(arg):
fut = ensure_future(arg, loop=loop)
if loop is None:
loop = fut._loop
# The caller cannot control this future, the "destroy pending task"
# warning should not be emitted.
fut._log_destroy_pending = False
else:
fut = arg
if loop is None:
loop = fut._loop
elif fut._loop is not loop:
raise ValueError("futures are tied to different event loops")
arg_to_fut[arg] = fut

children = [arg_to_fut[arg] for arg in coros_or_futures]
nchildren = len(children)
outer = _GatheringFuture(children, loop=loop)
nfinished = 0
results = [None] * nchildren

def _done_callback(i, fut):
def _done_callback(fut):
nonlocal nfinished
nfinished += 1

if outer.done():
if not fut.cancelled():
# Mark exception retrieved.
fut.exception()
return

if fut.cancelled():
res = futures.CancelledError()
if not return_exceptions:
outer.set_exception(res)
return
elif fut._exception is not None:
res = fut.exception() # Mark exception retrieved.
if not return_exceptions:
outer.set_exception(res)
if not return_exceptions:
if fut.cancelled():
# Check if 'fut' is cancelled first, as
# 'fut.exception()' will *raise* a CancelledError
# instead of returning it.
exc = futures.CancelledError()
outer.set_exception(exc)
return
else:
res = fut._result
results[i] = res
nfinished += 1
if nfinished == nchildren:
else:
exc = fut.exception()
if exc is not None:
outer.set_exception(exc)
return

if nfinished == nfuts:
# All futures are done; create a list of results
# and set it to the 'outer' future.
results = []

for fut in children:
if fut.cancelled():
# Check if 'fut' is cancelled first, as
# 'fut.exception()' will *raise* a CancelledError
# instead of returning it.
res = futures.CancelledError()
else:
res = fut.exception()
if res is None:
res = fut.result()
results.append(res)

outer.set_result(results)

for i, fut in enumerate(children):
fut.add_done_callback(functools.partial(_done_callback, i))
arg_to_fut = {}
children = []
nfuts = 0
nfinished = 0
for arg in coros_or_futures:
if arg not in arg_to_fut:
fut = ensure_future(arg, loop=loop)
if loop is None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the check for multiple loops (see line 622) has been removed. Is this intentional?
If it is, you may want to add a test for that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure_future does it. No need to do it twice.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure_future validates that the passed in loop argument matches that of the future passed in. If I were to pass 3 futures each having their own different loop, a new _GatheringFuture will be created tied to the loop that was passed to gather (which can be a separate loop as well).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand what you mean by that, ensure_future will make sure that the future/coro passed is bound to the same loop - either current or what was passed in. Thanks for explaining.

loop = fut._loop
if fut is not arg:
# 'arg' was not a Future, therefore, 'fut' is a new
# Future created specifically for 'arg'. Since the caller
# can't control it, disable the "destroy pending task"
# warning.
fut._log_destroy_pending = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated question: maybe add log_destroy_pending keyword-only parameter?
Often I want to see cancelled coroutines in logs.


nfuts += 1
arg_to_fut[arg] = fut
fut.add_done_callback(_done_callback)

else:
# There's a duplicate Future object in coros_or_futures.
fut = arg_to_fut[arg]

children.append(fut)

outer = _GatheringFuture(children, loop=loop)
return outer


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimize asyncio.gather(); now up to 15% faster.