-
-
Notifications
You must be signed in to change notification settings - Fork 33.5k
bpo-32355: Optimize asyncio.gather() #4913
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
441be3f
bpo-32355: Optimize asyncio.gather()
1st1 8bd9852
Use only public Future API
1st1 8ee2b69
Add a couple of comments clarifying cancelled() calls
1st1 b10fbe9
Use public Future API in base_events.py
1st1 a41d3e9
Fix cancelled future case
1st1 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -575,8 +575,7 @@ def cancel(self): | |
|
|
||
|
|
||
| def gather(*coros_or_futures, loop=None, return_exceptions=False): | ||
| """Return a future aggregating results from the given coroutines | ||
| or futures. | ||
| """Return a future aggregating results from the given coroutines/futures. | ||
|
|
||
| Coroutines will be wrapped in a future and scheduled in the event | ||
| loop. They will not necessarily be scheduled in the same order as | ||
|
|
@@ -605,56 +604,76 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): | |
| outer.set_result([]) | ||
| return outer | ||
|
|
||
| arg_to_fut = {} | ||
| for arg in set(coros_or_futures): | ||
| if not futures.isfuture(arg): | ||
| fut = ensure_future(arg, loop=loop) | ||
| if loop is None: | ||
| loop = fut._loop | ||
| # The caller cannot control this future, the "destroy pending task" | ||
| # warning should not be emitted. | ||
| fut._log_destroy_pending = False | ||
| else: | ||
| fut = arg | ||
| if loop is None: | ||
| loop = fut._loop | ||
| elif fut._loop is not loop: | ||
| raise ValueError("futures are tied to different event loops") | ||
| arg_to_fut[arg] = fut | ||
|
|
||
| children = [arg_to_fut[arg] for arg in coros_or_futures] | ||
| nchildren = len(children) | ||
| outer = _GatheringFuture(children, loop=loop) | ||
| nfinished = 0 | ||
| results = [None] * nchildren | ||
|
|
||
| def _done_callback(i, fut): | ||
| def _done_callback(fut): | ||
| nonlocal nfinished | ||
| nfinished += 1 | ||
|
|
||
| if outer.done(): | ||
| if not fut.cancelled(): | ||
| # Mark exception retrieved. | ||
| fut.exception() | ||
| return | ||
|
|
||
| if fut.cancelled(): | ||
| res = futures.CancelledError() | ||
| if not return_exceptions: | ||
| outer.set_exception(res) | ||
| return | ||
| elif fut._exception is not None: | ||
| res = fut.exception() # Mark exception retrieved. | ||
| if not return_exceptions: | ||
| outer.set_exception(res) | ||
| if not return_exceptions: | ||
| if fut.cancelled(): | ||
| # Check if 'fut' is cancelled first, as | ||
| # 'fut.exception()' will *raise* a CancelledError | ||
| # instead of returning it. | ||
| exc = futures.CancelledError() | ||
| outer.set_exception(exc) | ||
| return | ||
| else: | ||
| res = fut._result | ||
| results[i] = res | ||
| nfinished += 1 | ||
| if nfinished == nchildren: | ||
| else: | ||
| exc = fut.exception() | ||
| if exc is not None: | ||
| outer.set_exception(exc) | ||
| return | ||
|
|
||
| if nfinished == nfuts: | ||
| # All futures are done; create a list of results | ||
| # and set it to the 'outer' future. | ||
| results = [] | ||
|
|
||
| for fut in children: | ||
| if fut.cancelled(): | ||
| # Check if 'fut' is cancelled first, as | ||
| # 'fut.exception()' will *raise* a CancelledError | ||
| # instead of returning it. | ||
| res = futures.CancelledError() | ||
| else: | ||
| res = fut.exception() | ||
| if res is None: | ||
| res = fut.result() | ||
| results.append(res) | ||
|
|
||
| outer.set_result(results) | ||
|
|
||
| for i, fut in enumerate(children): | ||
| fut.add_done_callback(functools.partial(_done_callback, i)) | ||
| arg_to_fut = {} | ||
| children = [] | ||
| nfuts = 0 | ||
| nfinished = 0 | ||
| for arg in coros_or_futures: | ||
| if arg not in arg_to_fut: | ||
| fut = ensure_future(arg, loop=loop) | ||
| if loop is None: | ||
| loop = fut._loop | ||
| if fut is not arg: | ||
| # 'arg' was not a Future, therefore, 'fut' is a new | ||
| # Future created specifically for 'arg'. Since the caller | ||
| # can't control it, disable the "destroy pending task" | ||
| # warning. | ||
| fut._log_destroy_pending = False | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unrelated question: maybe add |
||
|
|
||
| nfuts += 1 | ||
| arg_to_fut[arg] = fut | ||
| fut.add_done_callback(_done_callback) | ||
|
|
||
| else: | ||
| # There's a duplicate Future object in coros_or_futures. | ||
| fut = arg_to_fut[arg] | ||
|
|
||
| children.append(fut) | ||
|
|
||
| outer = _GatheringFuture(children, loop=loop) | ||
| return outer | ||
|
|
||
|
|
||
|
|
||
1 change: 1 addition & 0 deletions
1
Misc/NEWS.d/next/Library/2017-12-17-21-42-24.bpo-32355.tbaTWA.rst
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Optimize asyncio.gather(); now up to 15% faster. |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the check for multiple loops (see line 622) has been removed. Is this intentional?
If it is, you may want to add a test for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ensure_future does it. No need to do it twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ensure_futurevalidates that the passed inloopargument matches that of the future passed in. If I were to pass 3 futures each having their own different loop, a new_GatheringFuturewill be created tied to the loop that was passed togather(which can be a separate loop as well).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand what you mean by that,
ensure_futurewill make sure that the future/coro passed is bound to the same loop - either current or what was passed in. Thanks for explaining.