Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-77714: Implement as_completed as an asynchronous generator#10251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Closed
mivade wants to merge5 commits intopython:mainfrommivade:async-as-completed
Closed
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 55 additions & 38 deletionsLib/asyncio/tasks.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -570,62 +570,79 @@ async def _cancel_and_wait(fut):
fut.remove_done_callback(cb)


# This is *not* a @coroutine! It is just an iterator (yielding Futures).
def as_completed(fs, *, timeout=None):
"""Return an iterator whose values are coroutines.
class as_completed(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Suggested change
classas_completed(object):
classas_completed:

Inheriting from object isn't necessary

bvd0 reacted with thumbs up emoji
"""Asynchronous iterator over awaitables which yields each future as it
is completed consistent with PEP 3148. Usage::

When waiting for the yielded coroutines you'll get the results (or
exceptions!) of the original Futures (or coroutines), in the order
in which and as soon as they complete.
async for future in as_completed(fs):
print(future.result())

This differs from PEP 3148; the proper way to use this is:
For backwards compatibility, this can also be used as a regular iterator::

for f in as_completed(fs):
result = await f # The 'await' may raise.
# Use result.

If a timeout is specified, the 'await' will raise
TimeoutError when the timeout occurs before all Futures are done.

Note: The futures 'f' are not necessarily members of fs.
"""
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
def __init__(self, fs, *, timeout=None):
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")

from .queues import Queue # Import here to avoid circular import problem.
done = Queue()
from .queues import Queue # Import here to avoid circular import problem.
self._loop = events.get_event_loop()
self._pending = set(ensure_future(f, loop=self._loop) for f in fs)
self._completed = Queue()
self._timeout = timeout
self._timeout_handle = None

loop = events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None
for future in self._pending:
future.add_done_callback(self._on_completion)

def _on_timeout():
for f in todo:
f.remove_done_callback(_on_completion)
done.put_nowait(None) # Queue a dummy value for _wait_for_one().
todo.clear() # Can't do todo.remove(f) in the loop.
def _on_completion(self, future):
self._pending.remove(future)
self._completed.put_nowait(future)

def _on_completion(f):
if not todo:
return # _on_timeout() was here first.
todo.remove(f)
done.put_nowait(f)
if not todo and timeout_handle is not None:
timeout_handle.cancel()
if not self._pending and self._timeout_handle is not None:
self._timeout_handle.cancel()

async def _wait_for_one():
f = awaitdone.get()
async def _wait_for_one(self):
f = awaitself._completed.get()
if f is None:
# Dummy value from _on_timeout().
raise exceptions.TimeoutError

return f.result() # May raise f.exception().

for f in todo:
f.add_done_callback(_on_completion)
if todo and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(todo)):
yield _wait_for_one()
def _on_timeout(self):
for f in self._pending:
f.remove_done_callback(self._on_completion)

def _start_timeout(self):
if self._pending and self._timeout is not None:
self._timeout_handle = self._loop.call_later(self._timeout,
self._on_timeout)

async def __aiter__(self):
self._start_timeout()

try:
while self._pending:
future = await self._completed.get()
yield future
finally:
# If an exception happened, we want to ensure that the done
# callback doesn't run anyway
for future in self._pending:
future.remove_done_callback(self._on_completion)

def __iter__(self):
for f in self._pending:
f.add_done_callback(self._on_completion)

self._start_timeout()

for _ in range(len(self._pending)):
yield self._wait_for_one()


@types.coroutine
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp