Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork32.4k
gh-96471: Add asyncio queue shutdown#104228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
440a702
fb458db
e5951ac
a72aedd
d5e925d
f3517fb
bd2a7c3
e9ac8de
1e7813a
1275bb6
eec29bb
2c6156f
17f1f32
a233830
420a247
25ad2ac
f3321b4
6d9edd6
1135d85
ddc6ad6
2fa1bd9
aef4063
d49c6dd
5a435a6
c8db40e
ca01ee1
b02c4dd
8deca77
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -62,6 +62,9 @@ Queue | ||
Remove and return an item from the queue. If queue is empty, | ||
wait until an item is available. | ||
Raises:exc:`QueueShutDown` if the queue has been shut down and | ||
is empty, or if the queue has been shut down immediately. | ||
..method::get_nowait() | ||
Return an item if one is immediately available, else raise | ||
@@ -82,6 +85,8 @@ Queue | ||
Put an item into the queue. If the queue is full, wait until a | ||
free slot is available before adding the item. | ||
Raises:exc:`QueueShutDown` if the queue has been shut down. | ||
..method::put_nowait(item) | ||
Put an item into the queue without blocking. | ||
@@ -92,6 +97,21 @@ Queue | ||
Return the number of items in the queue. | ||
..method::shutdown(immediate=False) | ||
Shut down the queue, making:meth:`~Queue.get` and:meth:`~Queue.put` | ||
raise:exc:`QueueShutDown`. | ||
By default,:meth:`~Queue.get` on a shut down queue will only | ||
raise once the queue is empty. Set *immediate* to true to make | ||
:meth:`~Queue.get` raise immediately instead. | ||
All blocked callers of:meth:`~Queue.put` will be unblocked. If | ||
*immediate* is true, also unblock callers of:meth:`~Queue.get` | ||
and:meth:`~Queue.join`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Sorry but I have a doubt, shouldn't this documentation block be rather:
In event of change, the docstring of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more.
Also, I think the threading queue docs are the same. Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more.
It's very precise, better.
Yes, I commented here so as not to forget (see#117532 (comment)). English is your native language, I think it'is best if you update documentations and docstrings. Update: but I can create the follow-up PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. I've made a PR:#117621 | ||
..versionadded::3.13 | ||
..method::task_done() | ||
Indicate that a formerly enqueued task is complete. | ||
@@ -105,6 +125,9 @@ Queue | ||
call was received for every item that had been:meth:`~Queue.put` | ||
into the queue). | ||
``shutdown(immediate=True)`` calls:meth:`task_done` for each | ||
remaining item in the queue. | ||
Raises:exc:`ValueError` if called more times than there were | ||
items placed in the queue. | ||
@@ -145,6 +168,14 @@ Exceptions | ||
on a queue that has reached its *maxsize*. | ||
..exception::QueueShutDown | ||
Exception raised when:meth:`~Queue.put` or:meth:`~Queue.get` is | ||
called on a queue which has been shut down. | ||
..versionadded::3.13 | ||
Examples | ||
======== | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -296,6 +296,10 @@ asyncio | ||
with the tasks being completed. | ||
(Contributed by Justin Arthur in :gh:`77714`.) | ||
* Add :meth:`asyncio.Queue.shutdown` (along with | ||
:exc:`asyncio.QueueShutDown`) for queue termination. | ||
(Contributed by Laurie Opperman in :gh:`104228`.) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. We've worked well together, I'd be nice if you'd mention me. | ||
base64 | ||
------ | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,11 @@ | ||
__all__= ( | ||
'Queue', | ||
'PriorityQueue', | ||
'LifoQueue', | ||
'QueueFull', | ||
'QueueEmpty', | ||
'QueueShutDown', | ||
) | ||
importcollections | ||
importheapq | ||
@@ -18,6 +25,11 @@ class QueueFull(Exception): | ||
pass | ||
classQueueShutDown(Exception): | ||
"""Raised when putting on to or getting from a shut-down Queue.""" | ||
pass | ||
classQueue(mixins._LoopBoundMixin): | ||
"""A queue, useful for coordinating producer and consumer coroutines. | ||
@@ -41,6 +53,7 @@ def __init__(self, maxsize=0): | ||
self._finished=locks.Event() | ||
self._finished.set() | ||
self._init(maxsize) | ||
self._is_shutdown=False | ||
# These three are overridable in subclasses. | ||
@@ -81,6 +94,8 @@ def _format(self): | ||
result+=f' _putters[{len(self._putters)}]' | ||
ifself._unfinished_tasks: | ||
result+=f' tasks={self._unfinished_tasks}' | ||
ifself._is_shutdown: | ||
result+=' shutdown' | ||
returnresult | ||
defqsize(self): | ||
@@ -112,8 +127,12 @@ async def put(self, item): | ||
Put an item into the queue. If the queue is full, wait until a free | ||
slot is available before adding item. | ||
Raises QueueShutDown if the queue has been shut down. | ||
""" | ||
whileself.full(): | ||
ifself._is_shutdown: | ||
raiseQueueShutDown | ||
putter=self._get_loop().create_future() | ||
self._putters.append(putter) | ||
try: | ||
@@ -125,7 +144,7 @@ async def put(self, item): | ||
self._putters.remove(putter) | ||
exceptValueError: | ||
# The putter could be removed from self._putters by a | ||
# previous get_nowait call or a shutdown call. | ||
pass | ||
ifnotself.full()andnotputter.cancelled(): | ||
# We were woken up by get_nowait(), but can't take | ||
@@ -138,7 +157,11 @@ def put_nowait(self, item): | ||
"""Put an item into the queue without blocking. | ||
If no free slot is immediately available, raise QueueFull. | ||
Raises QueueShutDown if the queue has been shut down. | ||
""" | ||
ifself._is_shutdown: | ||
raiseQueueShutDown | ||
ifself.full(): | ||
raiseQueueFull | ||
self._put(item) | ||
@@ -150,8 +173,13 @@ async def get(self): | ||
"""Remove and return an item from the queue. | ||
If queue is empty, wait until an item is available. | ||
Raises QueueShutDown if the queue has been shut down and is empty, or | ||
if the queue has been shut down immediately. | ||
""" | ||
whileself.empty(): | ||
ifself._is_shutdownandself.empty(): | ||
raiseQueueShutDown | ||
getter=self._get_loop().create_future() | ||
self._getters.append(getter) | ||
try: | ||
@@ -163,7 +191,7 @@ async def get(self): | ||
self._getters.remove(getter) | ||
exceptValueError: | ||
# The getter could be removed from self._getters by a | ||
# previous put_nowait call, or a shutdown call. | ||
pass | ||
ifnotself.empty()andnotgetter.cancelled(): | ||
# We were woken up by put_nowait(), but can't take | ||
@@ -176,8 +204,13 @@ def get_nowait(self): | ||
"""Remove and return an item from the queue. | ||
Return an item if one is immediately available, else raise QueueEmpty. | ||
Raises QueueShutDown if the queue has been shut down and is empty, or | ||
if the queue has been shut down immediately. | ||
""" | ||
ifself.empty(): | ||
EpicWink marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
ifself._is_shutdown: | ||
raiseQueueShutDown | ||
raiseQueueEmpty | ||
item=self._get() | ||
self._wakeup_next(self._putters) | ||
@@ -194,6 +227,9 @@ def task_done(self): | ||
been processed (meaning that a task_done() call was received for every | ||
item that had been put() into the queue). | ||
shutdown(immediate=True) calls task_done() for each remaining item in | ||
the queue. | ||
Raises ValueError if called more times than there were items placed in | ||
the queue. | ||
""" | ||
@@ -214,6 +250,32 @@ async def join(self): | ||
ifself._unfinished_tasks>0: | ||
awaitself._finished.wait() | ||
defshutdown(self,immediate=False): | ||
"""Shut-down the queue, making queue gets and puts raise QueueShutDown. | ||
By default, gets will only raise once the queue is empty. Set | ||
'immediate' to True to make gets raise immediately instead. | ||
All blocked callers of put() will be unblocked, and also get() | ||
and join() if 'immediate'. | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Docstring to modify depending of agree/disagree on my first remark about `blocked callers'.
| ||
self._is_shutdown=True | ||
ifimmediate: | ||
whilenotself.empty(): | ||
self._get() | ||
ifself._unfinished_tasks>0: | ||
self._unfinished_tasks-=1 | ||
ifself._unfinished_tasks==0: | ||
self._finished.set() | ||
whileself._getters: | ||
getter=self._getters.popleft() | ||
ifnotgetter.done(): | ||
getter.set_result(None) | ||
whileself._putters: | ||
putter=self._putters.popleft() | ||
ifnotputter.done(): | ||
putter.set_result(None) | ||
classPriorityQueue(Queue): | ||
"""A subclass of Queue; retrieves entries in priority order (lowest first). | ||
Uh oh!
There was an error while loading.Please reload this page.