Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-96471: Add asyncio queue shutdown#104228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
gvanrossum merged 28 commits intopython:mainfromEpicWink:asyncio-queue-shutdown
Apr 6, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
28 commits
Select commitHold shift + click to select a range
440a702
Add asyncio queue shutdown
EpicWinkSep 1, 2022
fb458db
Fix queue shutdown
YvesDupFeb 10, 2023
e5951ac
📜🤖 Added by blurb_it.
blurb-it[bot]May 6, 2023
a72aedd
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWinkFeb 20, 2024
d5e925d
Add references in docs and news entry
EpicWinkFeb 20, 2024
f3517fb
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWinkMar 20, 2024
bd2a7c3
Improve docs
EpicWinkMar 20, 2024
e9ac8de
Consume queue on immediate shutdown
EpicWinkMar 20, 2024
1e7813a
Fix links in what's-new
EpicWinkMar 22, 2024
1275bb6
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWinkMar 22, 2024
eec29bb
Fix formatting in news entry
EpicWinkMar 22, 2024
2c6156f
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWinkMar 22, 2024
17f1f32
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWinkMar 26, 2024
a233830
Improve tests
EpicWinkMar 26, 2024
420a247
Improve tests even more
EpicWinkMar 26, 2024
25ad2ac
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWinkMar 26, 2024
f3321b4
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWinkMar 27, 2024
6d9edd6
Document tests
EpicWinkMar 27, 2024
1135d85
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWinkMar 28, 2024
ddc6ad6
Always allow getters to re-check queue empty
EpicWinkMar 28, 2024
2fa1bd9
Merge branch 'main' into asyncio-queue-shutdown
gvanrossumApr 3, 2024
aef4063
Simplify shutdown-check in put and get
EpicWinkApr 4, 2024
d49c6dd
Format shutdown docstring
EpicWinkApr 4, 2024
5a435a6
Check for 0 unfinised tasks in shutdown
EpicWinkApr 4, 2024
c8db40e
Use asyncio.sleep to run other tasks
EpicWinkApr 4, 2024
ca01ee1
Use public method to shut down queue in format test
EpicWinkApr 4, 2024
b02c4dd
Only start queue join after shutdown in test
EpicWinkApr 4, 2024
8deca77
Test join before failing task-done
EpicWinkApr 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletionsDoc/library/asyncio-queue.rst
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -62,6 +62,9 @@ Queue
Remove and return an item from the queue. If queue is empty,
wait until an item is available.

Raises:exc:`QueueShutDown` if the queue has been shut down and
is empty, or if the queue has been shut down immediately.

..method::get_nowait()

Return an item if one is immediately available, else raise
Expand All@@ -82,6 +85,8 @@ Queue
Put an item into the queue. If the queue is full, wait until a
free slot is available before adding the item.

Raises:exc:`QueueShutDown` if the queue has been shut down.

..method::put_nowait(item)

Put an item into the queue without blocking.
Expand All@@ -92,6 +97,21 @@ Queue

Return the number of items in the queue.

..method::shutdown(immediate=False)

Shut down the queue, making:meth:`~Queue.get` and:meth:`~Queue.put`
raise:exc:`QueueShutDown`.

By default,:meth:`~Queue.get` on a shut down queue will only
raise once the queue is empty. Set *immediate* to true to make
:meth:`~Queue.get` raise immediately instead.

All blocked callers of:meth:`~Queue.put` will be unblocked. If
*immediate* is true, also unblock callers of:meth:`~Queue.get`
and:meth:`~Queue.join`.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Sorry but I have a doubt, shouldn't this documentation block be rather:

    All blocked callers of :meth:`~Queue.put` and :meth:`~Queue.get`     will be unblocked. If *immediate* is true, also unblock callers of     :meth:`~Queue.join`.

In event of change, the docstring of theshutdown method must be updated.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

join callers aren't necessarily even unblocked anyway, if consumers are processing any items. I should probably say that a task is marked as done for each item in the queue if immediate shutdown.

Also, I think the threading queue docs are the same.

Copy link
Contributor

@YvesDupYvesDupApr 7, 2024
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

a task is marked as done for each item in the queue if immediate shutdown.

It's very precise, better.

Also, I think the threading queue docs are the same.

Yes, I commented here so as not to forget (see#117532 (comment)).

English is your native language, I think it'is best if you update documentations and docstrings.

Update: but I can create the follow-up PR.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I've made a PR:#117621

YvesDup reacted with thumbs up emoji
..versionadded::3.13

..method::task_done()

Indicate that a formerly enqueued task is complete.
Expand All@@ -105,6 +125,9 @@ Queue
call was received for every item that had been:meth:`~Queue.put`
into the queue).

``shutdown(immediate=True)`` calls:meth:`task_done` for each
remaining item in the queue.

Raises:exc:`ValueError` if called more times than there were
items placed in the queue.

Expand DownExpand Up@@ -145,6 +168,14 @@ Exceptions
on a queue that has reached its *maxsize*.


..exception::QueueShutDown

Exception raised when:meth:`~Queue.put` or:meth:`~Queue.get` is
called on a queue which has been shut down.

..versionadded::3.13


Examples
========

Expand Down
4 changes: 4 additions & 0 deletionsDoc/whatsnew/3.13.rst
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -296,6 +296,10 @@ asyncio
with the tasks being completed.
(Contributed by Justin Arthur in :gh:`77714`.)

* Add :meth:`asyncio.Queue.shutdown` (along with
:exc:`asyncio.QueueShutDown`) for queue termination.
(Contributed by Laurie Opperman in :gh:`104228`.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

We've worked well together, I'd be nice if you'd mention me.
:-)

base64
------

Expand Down
68 changes: 65 additions & 3 deletionsLib/asyncio/queues.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
__all__= ('Queue','PriorityQueue','LifoQueue','QueueFull','QueueEmpty')
__all__= (
'Queue',
'PriorityQueue',
'LifoQueue',
'QueueFull',
'QueueEmpty',
'QueueShutDown',
)

importcollections
importheapq
Expand All@@ -18,6 +25,11 @@ class QueueFull(Exception):
pass


classQueueShutDown(Exception):
"""Raised when putting on to or getting from a shut-down Queue."""
pass


classQueue(mixins._LoopBoundMixin):
"""A queue, useful for coordinating producer and consumer coroutines.
Expand All@@ -41,6 +53,7 @@ def __init__(self, maxsize=0):
self._finished=locks.Event()
self._finished.set()
self._init(maxsize)
self._is_shutdown=False

# These three are overridable in subclasses.

Expand DownExpand Up@@ -81,6 +94,8 @@ def _format(self):
result+=f' _putters[{len(self._putters)}]'
ifself._unfinished_tasks:
result+=f' tasks={self._unfinished_tasks}'
ifself._is_shutdown:
result+=' shutdown'
returnresult

defqsize(self):
Expand DownExpand Up@@ -112,8 +127,12 @@ async def put(self, item):
Put an item into the queue. If the queue is full, wait until a free
slot is available before adding item.
Raises QueueShutDown if the queue has been shut down.
"""
whileself.full():
ifself._is_shutdown:
raiseQueueShutDown
putter=self._get_loop().create_future()
self._putters.append(putter)
try:
Expand All@@ -125,7 +144,7 @@ async def put(self, item):
self._putters.remove(putter)
exceptValueError:
# The putter could be removed from self._putters by a
# previous get_nowait call.
# previous get_nowait call or a shutdown call.
pass
ifnotself.full()andnotputter.cancelled():
# We were woken up by get_nowait(), but can't take
Expand All@@ -138,7 +157,11 @@ def put_nowait(self, item):
"""Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
Raises QueueShutDown if the queue has been shut down.
"""
ifself._is_shutdown:
raiseQueueShutDown
ifself.full():
raiseQueueFull
self._put(item)
Expand All@@ -150,8 +173,13 @@ async def get(self):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
Raises QueueShutDown if the queue has been shut down and is empty, or
if the queue has been shut down immediately.
"""
whileself.empty():
ifself._is_shutdownandself.empty():
raiseQueueShutDown
getter=self._get_loop().create_future()
self._getters.append(getter)
try:
Expand All@@ -163,7 +191,7 @@ async def get(self):
self._getters.remove(getter)
exceptValueError:
# The getter could be removed from self._getters by a
# previous put_nowait call.
# previous put_nowait call, or a shutdown call.
pass
ifnotself.empty()andnotgetter.cancelled():
# We were woken up by put_nowait(), but can't take
Expand All@@ -176,8 +204,13 @@ def get_nowait(self):
"""Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
Raises QueueShutDown if the queue has been shut down and is empty, or
if the queue has been shut down immediately.
"""
ifself.empty():
ifself._is_shutdown:
raiseQueueShutDown
raiseQueueEmpty
item=self._get()
self._wakeup_next(self._putters)
Expand All@@ -194,6 +227,9 @@ def task_done(self):
been processed (meaning that a task_done() call was received for every
item that had been put() into the queue).
shutdown(immediate=True) calls task_done() for each remaining item in
the queue.
Raises ValueError if called more times than there were items placed in
the queue.
"""
Expand All@@ -214,6 +250,32 @@ async def join(self):
ifself._unfinished_tasks>0:
awaitself._finished.wait()

defshutdown(self,immediate=False):
"""Shut-down the queue, making queue gets and puts raise QueueShutDown.
By default, gets will only raise once the queue is empty. Set
'immediate' to True to make gets raise immediately instead.
All blocked callers of put() will be unblocked, and also get()
and join() if 'immediate'.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Docstring to modify depending of agree/disagree on my first remark about `blocked callers'.
Changes could be:

All blocked callers of put() and get() will be unblocked, andalso join() if 'immediate

self._is_shutdown=True
ifimmediate:
whilenotself.empty():
self._get()
ifself._unfinished_tasks>0:
self._unfinished_tasks-=1
ifself._unfinished_tasks==0:
self._finished.set()
whileself._getters:
getter=self._getters.popleft()
ifnotgetter.done():
getter.set_result(None)
whileself._putters:
putter=self._putters.popleft()
ifnotputter.done():
putter.set_result(None)


classPriorityQueue(Queue):
"""A subclass of Queue; retrieves entries in priority order (lowest first).
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp