Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdf4d84c

Browse files
EpicWinkYvesDup
andauthored
gh-96471: Add asyncio queue shutdown (#104228)
Co-authored-by: Duprat <yduprat@gmail.com>
1 parent1d3225a commitdf4d84c

File tree

5 files changed

+301
-3
lines changed

5 files changed

+301
-3
lines changed

‎Doc/library/asyncio-queue.rst

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ Queue
6262
Remove and return an item from the queue. If queue is empty,
6363
wait until an item is available.
6464

65+
Raises:exc:`QueueShutDown` if the queue has been shut down and
66+
is empty, or if the queue has been shut down immediately.
67+
6568
..method::get_nowait()
6669

6770
Return an item if one is immediately available, else raise
@@ -82,6 +85,8 @@ Queue
8285
Put an item into the queue. If the queue is full, wait until a
8386
free slot is available before adding the item.
8487

88+
Raises:exc:`QueueShutDown` if the queue has been shut down.
89+
8590
..method::put_nowait(item)
8691

8792
Put an item into the queue without blocking.
@@ -92,6 +97,21 @@ Queue
9297

9398
Return the number of items in the queue.
9499

100+
..method::shutdown(immediate=False)
101+
102+
Shut down the queue, making:meth:`~Queue.get` and:meth:`~Queue.put`
103+
raise:exc:`QueueShutDown`.
104+
105+
By default,:meth:`~Queue.get` on a shut down queue will only
106+
raise once the queue is empty. Set *immediate* to true to make
107+
:meth:`~Queue.get` raise immediately instead.
108+
109+
All blocked callers of:meth:`~Queue.put` will be unblocked. If
110+
*immediate* is true, also unblock callers of:meth:`~Queue.get`
111+
and:meth:`~Queue.join`.
112+
113+
..versionadded::3.13
114+
95115
..method::task_done()
96116

97117
Indicate that a formerly enqueued task is complete.
@@ -105,6 +125,9 @@ Queue
105125
call was received for every item that had been:meth:`~Queue.put`
106126
into the queue).
107127

128+
``shutdown(immediate=True)`` calls:meth:`task_done` for each
129+
remaining item in the queue.
130+
108131
Raises:exc:`ValueError` if called more times than there were
109132
items placed in the queue.
110133

@@ -145,6 +168,14 @@ Exceptions
145168
on a queue that has reached its *maxsize*.
146169

147170

171+
..exception::QueueShutDown
172+
173+
Exception raised when:meth:`~Queue.put` or:meth:`~Queue.get` is
174+
called on a queue which has been shut down.
175+
176+
..versionadded::3.13
177+
178+
148179
Examples
149180
========
150181

‎Doc/whatsnew/3.13.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,10 @@ asyncio
296296
with the tasks being completed.
297297
(Contributed by Justin Arthur in:gh:`77714`.)
298298

299+
* Add:meth:`asyncio.Queue.shutdown` (along with
300+
:exc:`asyncio.QueueShutDown`) for queue termination.
301+
(Contributed by Laurie Opperman in:gh:`104228`.)
302+
299303
base64
300304
------
301305

‎Lib/asyncio/queues.py

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
__all__= ('Queue','PriorityQueue','LifoQueue','QueueFull','QueueEmpty')
1+
__all__= (
2+
'Queue',
3+
'PriorityQueue',
4+
'LifoQueue',
5+
'QueueFull',
6+
'QueueEmpty',
7+
'QueueShutDown',
8+
)
29

310
importcollections
411
importheapq
@@ -18,6 +25,11 @@ class QueueFull(Exception):
1825
pass
1926

2027

28+
classQueueShutDown(Exception):
29+
"""Raised when putting on to or getting from a shut-down Queue."""
30+
pass
31+
32+
2133
classQueue(mixins._LoopBoundMixin):
2234
"""A queue, useful for coordinating producer and consumer coroutines.
2335
@@ -41,6 +53,7 @@ def __init__(self, maxsize=0):
4153
self._finished=locks.Event()
4254
self._finished.set()
4355
self._init(maxsize)
56+
self._is_shutdown=False
4457

4558
# These three are overridable in subclasses.
4659

@@ -81,6 +94,8 @@ def _format(self):
8194
result+=f' _putters[{len(self._putters)}]'
8295
ifself._unfinished_tasks:
8396
result+=f' tasks={self._unfinished_tasks}'
97+
ifself._is_shutdown:
98+
result+=' shutdown'
8499
returnresult
85100

86101
defqsize(self):
@@ -112,8 +127,12 @@ async def put(self, item):
112127
113128
Put an item into the queue. If the queue is full, wait until a free
114129
slot is available before adding item.
130+
131+
Raises QueueShutDown if the queue has been shut down.
115132
"""
116133
whileself.full():
134+
ifself._is_shutdown:
135+
raiseQueueShutDown
117136
putter=self._get_loop().create_future()
118137
self._putters.append(putter)
119138
try:
@@ -125,7 +144,7 @@ async def put(self, item):
125144
self._putters.remove(putter)
126145
exceptValueError:
127146
# The putter could be removed from self._putters by a
128-
# previous get_nowait call.
147+
# previous get_nowait call or a shutdown call.
129148
pass
130149
ifnotself.full()andnotputter.cancelled():
131150
# We were woken up by get_nowait(), but can't take
@@ -138,7 +157,11 @@ def put_nowait(self, item):
138157
"""Put an item into the queue without blocking.
139158
140159
If no free slot is immediately available, raise QueueFull.
160+
161+
Raises QueueShutDown if the queue has been shut down.
141162
"""
163+
ifself._is_shutdown:
164+
raiseQueueShutDown
142165
ifself.full():
143166
raiseQueueFull
144167
self._put(item)
@@ -150,8 +173,13 @@ async def get(self):
150173
"""Remove and return an item from the queue.
151174
152175
If queue is empty, wait until an item is available.
176+
177+
Raises QueueShutDown if the queue has been shut down and is empty, or
178+
if the queue has been shut down immediately.
153179
"""
154180
whileself.empty():
181+
ifself._is_shutdownandself.empty():
182+
raiseQueueShutDown
155183
getter=self._get_loop().create_future()
156184
self._getters.append(getter)
157185
try:
@@ -163,7 +191,7 @@ async def get(self):
163191
self._getters.remove(getter)
164192
exceptValueError:
165193
# The getter could be removed from self._getters by a
166-
# previous put_nowait call.
194+
# previous put_nowait call, or a shutdown call.
167195
pass
168196
ifnotself.empty()andnotgetter.cancelled():
169197
# We were woken up by put_nowait(), but can't take
@@ -176,8 +204,13 @@ def get_nowait(self):
176204
"""Remove and return an item from the queue.
177205
178206
Return an item if one is immediately available, else raise QueueEmpty.
207+
208+
Raises QueueShutDown if the queue has been shut down and is empty, or
209+
if the queue has been shut down immediately.
179210
"""
180211
ifself.empty():
212+
ifself._is_shutdown:
213+
raiseQueueShutDown
181214
raiseQueueEmpty
182215
item=self._get()
183216
self._wakeup_next(self._putters)
@@ -194,6 +227,9 @@ def task_done(self):
194227
been processed (meaning that a task_done() call was received for every
195228
item that had been put() into the queue).
196229
230+
shutdown(immediate=True) calls task_done() for each remaining item in
231+
the queue.
232+
197233
Raises ValueError if called more times than there were items placed in
198234
the queue.
199235
"""
@@ -214,6 +250,32 @@ async def join(self):
214250
ifself._unfinished_tasks>0:
215251
awaitself._finished.wait()
216252

253+
defshutdown(self,immediate=False):
254+
"""Shut-down the queue, making queue gets and puts raise QueueShutDown.
255+
256+
By default, gets will only raise once the queue is empty. Set
257+
'immediate' to True to make gets raise immediately instead.
258+
259+
All blocked callers of put() will be unblocked, and also get()
260+
and join() if 'immediate'.
261+
"""
262+
self._is_shutdown=True
263+
ifimmediate:
264+
whilenotself.empty():
265+
self._get()
266+
ifself._unfinished_tasks>0:
267+
self._unfinished_tasks-=1
268+
ifself._unfinished_tasks==0:
269+
self._finished.set()
270+
whileself._getters:
271+
getter=self._getters.popleft()
272+
ifnotgetter.done():
273+
getter.set_result(None)
274+
whileself._putters:
275+
putter=self._putters.popleft()
276+
ifnotputter.done():
277+
putter.set_result(None)
278+
217279

218280
classPriorityQueue(Queue):
219281
"""A subclass of Queue; retrieves entries in priority order (lowest first).

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp