Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc741ad3

Browse files
JustinTArthurserhiy-storchakagvanrossum
authored
gh-77714: Provide an async iterator version of as_completed (GH-22491)
* as_completed returns object that is both iterator and async iterator* Existing tests adjusted to test both the old and new style* New test to ensure iterator can be resumed* New test to ensure async iterator yields any passed-in Futures as-isCo-authored-by: Serhiy Storchaka <storchaka@gmail.com>Co-authored-by: Guido van Rossum <gvanrossum@gmail.com>
1 parentddf814d commitc741ad3

File tree

5 files changed

+387
-120
lines changed

5 files changed

+387
-120
lines changed

‎Doc/library/asyncio-task.rst

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -867,19 +867,50 @@ Waiting Primitives
867867

868868
..function::as_completed(aws, *, timeout=None)
869869

870-
Run:ref:`awaitable objects<asyncio-awaitables>` in the *aws*
871-
iterable concurrently. Return an iterator of coroutines.
872-
Each coroutine returned can be awaited to get the earliest next
873-
result from the iterable of the remaining awaitables.
874-
875-
Raises:exc:`TimeoutError` if the timeout occurs before
876-
all Futures are done.
877-
878-
Example::
879-
880-
for coro in as_completed(aws):
881-
earliest_result = await coro
882-
# ...
870+
Run:ref:`awaitable objects<asyncio-awaitables>` in the *aws* iterable
871+
concurrently. The returned object can be iterated to obtain the results
872+
of the awaitables as they finish.
873+
874+
The object returned by ``as_completed()`` can be iterated as an
875+
:term:`asynchronous iterator` or a plain:term:`iterator`. When asynchronous
876+
iteration is used, the originally-supplied awaitables are yielded if they
877+
are tasks or futures. This makes it easy to correlate previously-scheduled
878+
tasks with their results. Example::
879+
880+
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
881+
ipv6_connect = create_task(open_connection("::1", 80))
882+
tasks = [ipv4_connect, ipv6_connect]
883+
884+
async for earliest_connect in as_completed(tasks):
885+
# earliest_connect is done. The result can be obtained by
886+
# awaiting it or calling earliest_connect.result()
887+
reader, writer = await earliest_connect
888+
889+
if earliest_connect is ipv6_connect:
890+
print("IPv6 connection established.")
891+
else:
892+
print("IPv4 connection established.")
893+
894+
During asynchronous iteration, implicitly-created tasks will be yielded for
895+
supplied awaitables that aren't tasks or futures.
896+
897+
When used as a plain iterator, each iteration yields a new coroutine that
898+
returns the result or raises the exception of the next completed awaitable.
899+
This pattern is compatible with Python versions older than 3.13::
900+
901+
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
902+
ipv6_connect = create_task(open_connection("::1", 80))
903+
tasks = [ipv4_connect, ipv6_connect]
904+
905+
for next_connect in as_completed(tasks):
906+
# next_connect is not one of the original task objects. It must be
907+
# awaited to obtain the result value or raise the exception of the
908+
# awaitable that finishes next.
909+
reader, writer = await next_connect
910+
911+
A:exc:`TimeoutError` is raised if the timeout occurs before all awaitables
912+
are done. This is raised by the ``async for`` loop during asynchronous
913+
iteration or by the coroutines yielded during plain iteration.
883914

884915
..versionchanged::3.10
885916
Removed the *loop* parameter.
@@ -891,6 +922,10 @@ Waiting Primitives
891922
..versionchanged::3.12
892923
Added support for generators yielding tasks.
893924

925+
..versionchanged::3.13
926+
The result can now be used as either an:term:`asynchronous iterator`
927+
or as a plain:term:`iterator` (previously it was only a plain iterator).
928+
894929

895930
Running in Threads
896931
==================

‎Doc/whatsnew/3.13.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,13 @@ asyncio
289289
forcefully close an asyncio server.
290290
(Contributed by Pierre Ossman in:gh:`113538`.)
291291

292+
*:func:`asyncio.as_completed` now returns an object that is both an
293+
:term:`asynchronous iterator` and a plain:term:`iterator` of awaitables.
294+
The awaitables yielded by asynchronous iteration include original task or
295+
future objects that were passed in, making it easier to associate results
296+
with the tasks being completed.
297+
(Contributed by Justin Arthur in:gh:`77714`.)
298+
292299
base64
293300
------
294301

‎Lib/asyncio/tasks.py

Lines changed: 108 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from .importevents
2626
from .importexceptions
2727
from .importfutures
28+
from .importqueues
2829
from .importtimeouts
2930

3031
# Helper to generate new task names
@@ -564,62 +565,125 @@ async def _cancel_and_wait(fut):
564565
fut.remove_done_callback(cb)
565566

566567

567-
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
568+
class_AsCompletedIterator:
569+
"""Iterator of awaitables representing tasks of asyncio.as_completed.
570+
571+
As an asynchronous iterator, iteration yields futures as they finish. As a
572+
plain iterator, new coroutines are yielded that will return or raise the
573+
result of the next underlying future to complete.
574+
"""
575+
def__init__(self,aws,timeout):
576+
self._done=queues.Queue()
577+
self._timeout_handle=None
578+
579+
loop=events.get_event_loop()
580+
todo= {ensure_future(aw,loop=loop)forawinset(aws)}
581+
forfintodo:
582+
f.add_done_callback(self._handle_completion)
583+
iftodoandtimeoutisnotNone:
584+
self._timeout_handle= (
585+
loop.call_later(timeout,self._handle_timeout)
586+
)
587+
self._todo=todo
588+
self._todo_left=len(todo)
589+
590+
def__aiter__(self):
591+
returnself
592+
593+
def__iter__(self):
594+
returnself
595+
596+
asyncdef__anext__(self):
597+
ifnotself._todo_left:
598+
raiseStopAsyncIteration
599+
assertself._todo_left>0
600+
self._todo_left-=1
601+
returnawaitself._wait_for_one()
602+
603+
def__next__(self):
604+
ifnotself._todo_left:
605+
raiseStopIteration
606+
assertself._todo_left>0
607+
self._todo_left-=1
608+
returnself._wait_for_one(resolve=True)
609+
610+
def_handle_timeout(self):
611+
forfinself._todo:
612+
f.remove_done_callback(self._handle_completion)
613+
self._done.put_nowait(None)# Sentinel for _wait_for_one().
614+
self._todo.clear()# Can't do todo.remove(f) in the loop.
615+
616+
def_handle_completion(self,f):
617+
ifnotself._todo:
618+
return# _handle_timeout() was here first.
619+
self._todo.remove(f)
620+
self._done.put_nowait(f)
621+
ifnotself._todoandself._timeout_handleisnotNone:
622+
self._timeout_handle.cancel()
623+
624+
asyncdef_wait_for_one(self,resolve=False):
625+
# Wait for the next future to be done and return it unless resolve is
626+
# set, in which case return either the result of the future or raise
627+
# an exception.
628+
f=awaitself._done.get()
629+
iffisNone:
630+
# Dummy value from _handle_timeout().
631+
raiseexceptions.TimeoutError
632+
returnf.result()ifresolveelsef
633+
634+
568635
defas_completed(fs,*,timeout=None):
569-
"""Return an iteratorwhose values are coroutines.
636+
"""Create an iteratorof awaitables or their results in completion order.
570637
571-
When waiting for the yielded coroutines you'll get the results (or
572-
exceptions!) of the original Futures (or coroutines), in the order
573-
in which and as soon as they complete.
638+
Run the supplied awaitables concurrently. The returned object can be
639+
iterated to obtain the results of the awaitables as they finish.
574640
575-
This differs from PEP 3148; the proper way to use this is:
641+
The object returned can be iterated as an asynchronous iterator or a plain
642+
iterator. When asynchronous iteration is used, the originally-supplied
643+
awaitables are yielded if they are tasks or futures. This makes it easy to
644+
correlate previously-scheduled tasks with their results:
576645
577-
for f in as_completed(fs):
578-
result = await f # The 'await' may raise.
579-
# Use result.
646+
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
647+
ipv6_connect = create_task(open_connection("::1", 80))
648+
tasks = [ipv4_connect, ipv6_connect]
580649
581-
If a timeout is specified, the 'await' will raise
582-
TimeoutError when the timeout occurs before all Futures are done.
650+
async for earliest_connect in as_completed(tasks):
651+
# earliest_connect is done. The result can be obtained by
652+
# awaiting it or calling earliest_connect.result()
653+
reader, writer = await earliest_connect
583654
584-
Note: The futures 'f' are not necessarily members of fs.
585-
"""
586-
iffutures.isfuture(fs)orcoroutines.iscoroutine(fs):
587-
raiseTypeError(f"expect an iterable of futures, not{type(fs).__name__}")
655+
if earliest_connect is ipv6_connect:
656+
print("IPv6 connection established.")
657+
else:
658+
print("IPv4 connection established.")
588659
589-
from .queuesimportQueue# Import here to avoid circular import problem.
590-
done=Queue()
660+
During asynchronous iteration, implicitly-created tasks will be yielded for
661+
supplied awaitables that aren't tasks or futures.
591662
592-
loop=events.get_event_loop()
593-
todo= {ensure_future(f,loop=loop)forfinset(fs)}
594-
timeout_handle=None
663+
When used as a plain iterator, each iteration yields a new coroutine that
664+
returns the result or raises the exception of the next completed awaitable.
665+
This pattern is compatible with Python versions older than 3.13:
595666
596-
def_on_timeout():
597-
forfintodo:
598-
f.remove_done_callback(_on_completion)
599-
done.put_nowait(None)# Queue a dummy value for _wait_for_one().
600-
todo.clear()# Can't do todo.remove(f) in the loop.
667+
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
668+
ipv6_connect = create_task(open_connection("::1", 80))
669+
tasks = [ipv4_connect, ipv6_connect]
601670
602-
def_on_completion(f):
603-
ifnottodo:
604-
return# _on_timeout() was here first.
605-
todo.remove(f)
606-
done.put_nowait(f)
607-
ifnottodoandtimeout_handleisnotNone:
608-
timeout_handle.cancel()
671+
for next_connect in as_completed(tasks):
672+
# next_connect is not one of the original task objects. It must be
673+
# awaited to obtain the result value or raise the exception of the
674+
# awaitable that finishes next.
675+
reader, writer = await next_connect
609676
610-
asyncdef_wait_for_one():
611-
f=awaitdone.get()
612-
iffisNone:
613-
# Dummy value from _on_timeout().
614-
raiseexceptions.TimeoutError
615-
returnf.result()# May raise f.exception().
677+
A TimeoutError is raised if the timeout occurs before all awaitables are
678+
done. This is raised by the async for loop during asynchronous iteration or
679+
by the coroutines yielded during plain iteration.
680+
"""
681+
ifinspect.isawaitable(fs):
682+
raiseTypeError(
683+
f"expects an iterable of awaitables, not{type(fs).__name__}"
684+
)
616685

617-
forfintodo:
618-
f.add_done_callback(_on_completion)
619-
iftodoandtimeoutisnotNone:
620-
timeout_handle=loop.call_later(timeout,_on_timeout)
621-
for_inrange(len(todo)):
622-
yield_wait_for_one()
686+
return_AsCompletedIterator(fs,timeout)
623687

624688

625689
@types.coroutine

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp