Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb2d9d13

Browse files
EpicWinkYvesDup
andauthored
gh-96471: Add shutdown() method to queue.Queue (#104750)
Co-authored-by: Duprat <yduprat@gmail.com>
1 parentd4d5bae commitb2d9d13

File tree

5 files changed

+474
-0
lines changed

5 files changed

+474
-0
lines changed

‎Doc/library/queue.rst

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,14 @@ The :mod:`queue` module defines the following classes and exceptions:
9393
on a:class:`Queue` object which is full.
9494

9595

96+
..exception::ShutDown
97+
98+
Exception raised when:meth:`~Queue.put` or:meth:`~Queue.get` is called on
99+
a:class:`Queue` object which has been shut down.
100+
101+
..versionadded::3.13
102+
103+
96104
.. _queueobjects:
97105

98106
Queue Objects
@@ -135,6 +143,8 @@ provide the public methods described below.
135143
immediately available, else raise the:exc:`Full` exception (*timeout* is
136144
ignored in that case).
137145

146+
Raises:exc:`ShutDown` if the queue has been shut down.
147+
138148

139149
..method::Queue.put_nowait(item)
140150

@@ -155,6 +165,9 @@ provide the public methods described below.
155165
an uninterruptible wait on an underlying lock. This means that no exceptions
156166
can occur, and in particular a SIGINT will not trigger a:exc:`KeyboardInterrupt`.
157167

168+
Raises:exc:`ShutDown` if the queue has been shut down and is empty, or if
169+
the queue has been shut down immediately.
170+
158171

159172
..method::Queue.get_nowait()
160173

@@ -177,6 +190,8 @@ fully processed by daemon consumer threads.
177190
Raises a:exc:`ValueError` if called more times than there were items placed in
178191
the queue.
179192

193+
Raises:exc:`ShutDown` if the queue has been shut down immediately.
194+
180195

181196
..method::Queue.join()
182197

@@ -187,6 +202,8 @@ fully processed by daemon consumer threads.
187202
indicate that the item was retrieved and all work on it is complete. When the
188203
count of unfinished tasks drops to zero,:meth:`join` unblocks.
189204

205+
Raises:exc:`ShutDown` if the queue has been shut down immediately.
206+
190207

191208
Example of how to wait for enqueued tasks to be completed::
192209

@@ -214,6 +231,27 @@ Example of how to wait for enqueued tasks to be completed::
214231
print('All work completed')
215232

216233

234+
Terminating queues
235+
^^^^^^^^^^^^^^^^^^
236+
237+
:class:`Queue` objects can be made to prevent further interaction by shutting
238+
them down.
239+
240+
..method::Queue.shutdown(immediate=False)
241+
242+
Shut down the queue, making:meth:`~Queue.get` and:meth:`~Queue.put` raise
243+
:exc:`ShutDown`.
244+
245+
By default,:meth:`~Queue.get` on a shut down queue will only raise once the
246+
queue is empty. Set *immediate* to true to make:meth:`~Queue.get` raise
247+
immediately instead.
248+
249+
All blocked callers of:meth:`~Queue.put` will be unblocked. If *immediate*
250+
is true, also unblock callers of:meth:`~Queue.get` and:meth:`~Queue.join`.
251+
252+
..versionadded::3.13
253+
254+
217255
SimpleQueue Objects
218256
-------------------
219257

‎Doc/whatsnew/3.13.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,13 @@ pdb
403403
command line option or:envvar:`PYTHONSAFEPATH` environment variable).
404404
(Contributed by Tian Gao and Christian Walther in:gh:`111762`.)
405405

406+
queue
407+
-----
408+
409+
* Add:meth:`queue.Queue.shutdown` (along with:exc:`queue.ShutDown`) for queue
410+
termination.
411+
(Contributed by Laurie Opperman and Yves Duprat in:gh:`104750`.)
412+
406413
re
407414
--
408415
* Rename:exc:`!re.error` to:exc:`re.PatternError` for improved clarity.

‎Lib/queue.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ class Full(Exception):
2525
pass
2626

2727

28+
classShutDown(Exception):
29+
'''Raised when put/get with shut-down queue.'''
30+
31+
2832
classQueue:
2933
'''Create a queue object with a given maximum size.
3034
@@ -54,6 +58,9 @@ def __init__(self, maxsize=0):
5458
self.all_tasks_done=threading.Condition(self.mutex)
5559
self.unfinished_tasks=0
5660

61+
# Queue shutdown state
62+
self.is_shutdown=False
63+
5764
deftask_done(self):
5865
'''Indicate that a formerly enqueued task is complete.
5966
@@ -67,6 +74,8 @@ def task_done(self):
6774
6875
Raises a ValueError if called more times than there were items
6976
placed in the queue.
77+
78+
Raises ShutDown if the queue has been shut down immediately.
7079
'''
7180
withself.all_tasks_done:
7281
unfinished=self.unfinished_tasks-1
@@ -84,6 +93,8 @@ def join(self):
8493
to indicate the item was retrieved and all work on it is complete.
8594
8695
When the count of unfinished tasks drops to zero, join() unblocks.
96+
97+
Raises ShutDown if the queue has been shut down immediately.
8798
'''
8899
withself.all_tasks_done:
89100
whileself.unfinished_tasks:
@@ -129,15 +140,21 @@ def put(self, item, block=True, timeout=None):
129140
Otherwise ('block' is false), put an item on the queue if a free slot
130141
is immediately available, else raise the Full exception ('timeout'
131142
is ignored in that case).
143+
144+
Raises ShutDown if the queue has been shut down.
132145
'''
133146
withself.not_full:
147+
ifself.is_shutdown:
148+
raiseShutDown
134149
ifself.maxsize>0:
135150
ifnotblock:
136151
ifself._qsize()>=self.maxsize:
137152
raiseFull
138153
eliftimeoutisNone:
139154
whileself._qsize()>=self.maxsize:
140155
self.not_full.wait()
156+
ifself.is_shutdown:
157+
raiseShutDown
141158
eliftimeout<0:
142159
raiseValueError("'timeout' must be a non-negative number")
143160
else:
@@ -147,6 +164,8 @@ def put(self, item, block=True, timeout=None):
147164
ifremaining<=0.0:
148165
raiseFull
149166
self.not_full.wait(remaining)
167+
ifself.is_shutdown:
168+
raiseShutDown
150169
self._put(item)
151170
self.unfinished_tasks+=1
152171
self.not_empty.notify()
@@ -161,14 +180,21 @@ def get(self, block=True, timeout=None):
161180
Otherwise ('block' is false), return an item if one is immediately
162181
available, else raise the Empty exception ('timeout' is ignored
163182
in that case).
183+
184+
Raises ShutDown if the queue has been shut down and is empty,
185+
or if the queue has been shut down immediately.
164186
'''
165187
withself.not_empty:
188+
ifself.is_shutdownandnotself._qsize():
189+
raiseShutDown
166190
ifnotblock:
167191
ifnotself._qsize():
168192
raiseEmpty
169193
eliftimeoutisNone:
170194
whilenotself._qsize():
171195
self.not_empty.wait()
196+
ifself.is_shutdownandnotself._qsize():
197+
raiseShutDown
172198
eliftimeout<0:
173199
raiseValueError("'timeout' must be a non-negative number")
174200
else:
@@ -178,6 +204,8 @@ def get(self, block=True, timeout=None):
178204
ifremaining<=0.0:
179205
raiseEmpty
180206
self.not_empty.wait(remaining)
207+
ifself.is_shutdownandnotself._qsize():
208+
raiseShutDown
181209
item=self._get()
182210
self.not_full.notify()
183211
returnitem
@@ -198,6 +226,28 @@ def get_nowait(self):
198226
'''
199227
returnself.get(block=False)
200228

229+
defshutdown(self,immediate=False):
230+
'''Shut-down the queue, making queue gets and puts raise.
231+
232+
By default, gets will only raise once the queue is empty. Set
233+
'immediate' to True to make gets raise immediately instead.
234+
235+
All blocked callers of put() will be unblocked, and also get()
236+
and join() if 'immediate'. The ShutDown exception is raised.
237+
'''
238+
withself.mutex:
239+
self.is_shutdown=True
240+
ifimmediate:
241+
n_items=self._qsize()
242+
whileself._qsize():
243+
self._get()
244+
ifself.unfinished_tasks>0:
245+
self.unfinished_tasks-=1
246+
self.not_empty.notify_all()
247+
# release all blocked threads in `join()`
248+
self.all_tasks_done.notify_all()
249+
self.not_full.notify_all()
250+
201251
# Override these methods to implement other queue organizations
202252
# (e.g. stack or priority queue).
203253
# These will only be called with appropriate locks held

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp