Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-96471: Add threading queue shutdown#104750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
13 commits
Select commitHold shift + click to select a range
cd8ceaf
Add threading queue shutdown
EpicWinkSep 1, 2022
9c2971b
Fix queue shutdown
YvesDupFeb 10, 2023
089eb96
📜🤖 Added by blurb_it.
blurb-it[bot]May 6, 2023
ca118d7
Shut-down immediate consumes queue
EpicWinkMay 22, 2023
88f918d
Update test typing
EpicWinkDec 8, 2023
ab8d975
Remove queue-size tasks instead of setting to zero
EpicWinkFeb 5, 2024
f279f5c
Merge remote-tracking branch 'upstream/main' into threading-queue-shu…
EpicWinkFeb 8, 2024
971f699
Improve doc wording, reference methods
EpicWinkFeb 8, 2024
ddfb8c2
Reference method in news entry
EpicWinkFeb 8, 2024
3570bd8
Remove typing in test script
EpicWinkFeb 8, 2024
9072c6f
Fix join after task-done with no get
EpicWinkFeb 8, 2024
e0927aa
More explicitly update 'q.unfinished_tasks'
EpicWinkFeb 9, 2024
22adada
Add what's new entry
EpicWinkFeb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletionsDoc/library/queue.rst
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -93,6 +93,14 @@ The :mod:`queue` module defines the following classes and exceptions:
on a :class:`Queue` object which is full.


.. exception:: ShutDown

Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is called on
a :class:`Queue` object which has been shut down.

.. versionadded:: 3.13


.. _queueobjects:

Queue Objects
Expand DownExpand Up@@ -135,6 +143,8 @@ provide the public methods described below.
immediately available, else raise the :exc:`Full` exception (*timeout* is
ignored in that case).

Raises :exc:`ShutDown` if the queue has been shut down.


.. method:: Queue.put_nowait(item)

Expand All@@ -155,6 +165,9 @@ provide the public methods described below.
an uninterruptible wait on an underlying lock. This means that no exceptions
can occur, and in particular a SIGINT will not trigger a :exc:`KeyboardInterrupt`.

Raises :exc:`ShutDown` if the queue has been shut down and is empty, or if
the queue has been shut down immediately.


.. method:: Queue.get_nowait()

Expand All@@ -177,6 +190,8 @@ fully processed by daemon consumer threads.
Raises a :exc:`ValueError` if called more times than there were items placed in
the queue.

Raises :exc:`ShutDown` if the queue has been shut down immediately.


.. method:: Queue.join()

Expand All@@ -187,6 +202,8 @@ fully processed by daemon consumer threads.
indicate that the item was retrieved and all work on it is complete. When the
count of unfinished tasks drops to zero, :meth:`join` unblocks.

Raises :exc:`ShutDown` if the queue has been shut down immediately.


Example of how to wait for enqueued tasks to be completed::

Expand DownExpand Up@@ -214,6 +231,27 @@ Example of how to wait for enqueued tasks to be completed::
print('All work completed')


Terminating queues
^^^^^^^^^^^^^^^^^^

:class:`Queue` objects can be made to prevent further interaction by shutting
them down.

.. method:: Queue.shutdown(immediate=False)

Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put` raise
:exc:`ShutDown`.

By default, :meth:`~Queue.get` on a shut down queue will only raise once the
queue is empty. Set *immediate* to true to make :meth:`~Queue.get` raise
immediately instead.

All blocked callers of :meth:`~Queue.put` will be unblocked. If *immediate*
is true, also unblock callers of :meth:`~Queue.get` and :meth:`~Queue.join`.

.. versionadded:: 3.13


SimpleQueue Objects
-------------------

Expand Down
7 changes: 7 additions & 0 deletionsDoc/whatsnew/3.13.rst
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -392,6 +392,13 @@ pdb
command line option or :envvar:`PYTHONSAFEPATH` environment variable).
(Contributed by Tian Gao and Christian Walther in :gh:`111762`.)

queue
-----

* Add :meth:`queue.Queue.shutdown` (along with :exc:`queue.ShutDown`) for queue
termination.
(Contributed by Laurie Opperman and Yves Duprat in :gh:`104750`.)

re
--
* Rename :exc:`!re.error` to :exc:`re.PatternError` for improved clarity.
Expand Down
50 changes: 50 additions & 0 deletionsLib/queue.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -25,6 +25,10 @@ class Full(Exception):
pass


class ShutDown(Exception):
'''Raised when put/get with shut-down queue.'''


class Queue:
'''Create a queue object with a given maximum size.

Expand DownExpand Up@@ -54,6 +58,9 @@ def __init__(self, maxsize=0):
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0

# Queue shutdown state
self.is_shutdown = False

def task_done(self):
'''Indicate that a formerly enqueued task is complete.

Expand All@@ -67,6 +74,8 @@ def task_done(self):

Raises a ValueError if called more times than there were items
placed in the queue.

Raises ShutDown if the queue has been shut down immediately.
'''
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
Expand All@@ -84,6 +93,8 @@ def join(self):
to indicate the item was retrieved and all work on it is complete.

When the count of unfinished tasks drops to zero, join() unblocks.

Raises ShutDown if the queue has been shut down immediately.
'''
with self.all_tasks_done:
while self.unfinished_tasks:
Expand DownExpand Up@@ -129,15 +140,21 @@ def put(self, item, block=True, timeout=None):
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).

Raises ShutDown if the queue has been shut down.
'''
with self.not_full:
if self.is_shutdown:
raise ShutDown
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
if self.is_shutdown:
raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
Expand All@@ -147,6 +164,8 @@ def put(self, item, block=True, timeout=None):
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
if self.is_shutdown:
raise ShutDown
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
Expand All@@ -161,14 +180,21 @@ def get(self, block=True, timeout=None):
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).

Raises ShutDown if the queue has been shut down and is empty,
or if the queue has been shut down immediately.
'''
with self.not_empty:
if self.is_shutdown and not self._qsize():
raise ShutDown
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
if self.is_shutdown and not self._qsize():
raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
Expand All@@ -178,6 +204,8 @@ def get(self, block=True, timeout=None):
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
if self.is_shutdown and not self._qsize():
raise ShutDown
item = self._get()
self.not_full.notify()
return item
Expand All@@ -198,6 +226,28 @@ def get_nowait(self):
'''
return self.get(block=False)

def shutdown(self, immediate=False):
'''Shut-down the queue, making queue gets and puts raise.

By default, gets will only raise once the queue is empty. Set
'immediate' to True to make gets raise immediately instead.

All blocked callers of put() will be unblocked, and also get()
and join() if 'immediate'. The ShutDown exception is raised.
'''
with self.mutex:
self.is_shutdown = True
if immediate:
n_items = self._qsize()
while self._qsize():
self._get()
if self.unfinished_tasks > 0:
self.unfinished_tasks -= 1
self.not_empty.notify_all()
# release all blocked threads in `join()`
self.all_tasks_done.notify_all()
self.not_full.notify_all()

# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp