Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork32.4k
gh-96471: Add queue shutdown, next step.#102499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
… add an empty `shutdown` method
Add an unittest `test_shutdown`transition
Change "shutdown_immediate" to "shutdown-immediate"
Refactoring tests
Hi@gvanrossum , I readyour previous comment in Dec 2022 from the originalPR 'EpicWink: Adding the queue stop' |
gvanrossum commentedApr 11, 2023 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Hi@YvesDup, unfortunately this has stalled and I don't think I will have time to get back to it before PyCon. And after PyCon there are only about two weeks until beta 1 (feature freeze), during which time I'll likely be occupied with helping out with the PEP 695 implementation. I'm a bit concerned that this PR of yours has 67 commits, most of which have meaningless commit messages like "Update of queues.py". Perhaps you can explain how it relates to@EpicWink's original PR? One way to proceed might be to split the work into three new, clean PRs, each targeting one of asyncio, threading, and multiprocessing. Then you could get separate reviews. For example, the only two people able to review asyncio PRs are@kumaraditya303 and myself; for threading there are several core devs (but none eager to jump); for multiprocessing it may be near impossible to find a reviewer who's willing and capable. And really, it would be okay if not all three PRs made it into 3.12. |
EpicWink commentedApr 12, 2023 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Having one PR for each target would help with the architecture here with one assigned reviewer per PR. Might I suggest using aninteractive rebase (on top of You should put the list of high-level changes frommy PR in this PR's description. Reminder to update the documentation: check outmy PR against this branch doing this. |
Also, my test scriptabove still hangs. Are you sure you fixed the problem? |
YvesDup commentedApr 13, 2023 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Sometimes, your test script freezes. I identify two sources not in the implementation but in your script:
But may be I miss something If you decrease the |
I agree with splitting the PRs but following Guido's answer, should we continue the multiprocessing target ?
I agree, this is a good solution, but I have never done it before. Your link will be very helpful, I will do it ASAP (I will out of office the next two weeks).
Can you explain what are you expecting please ?
Okay, but how about changing the |
My PR didn't have multiprocessing working, and I hadn't changed
You're free to make commits to the branch, you have write access. |
Here's the current state of the difference frommy draft PR Miscellansous
diffdiff --git a/Misc/NEWS.d/next/Library/2023-03-07-15-42-27.gh-issue-96471.oWZtwQ.rst b/Misc/NEWS.d/next/Library/2023-03-07-15-42-27.gh-issue-96471.oWZtwQ.rstnew file mode 100644index 0000000000..61af8d77ae--- /dev/null+++ b/Misc/NEWS.d/next/Library/2023-03-07-15-42-27.gh-issue-96471.oWZtwQ.rst@@ -0,0 +1 @@+Add "shutdown" method to multithreading.Queue, asyncio.Queue and multiprocessing.queue.Queue and multiprocessing.queue.JoinableQueue Asyncio
diffdiff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.pyindex d591d0ebab..ae2d554783 100644--- a/Lib/asyncio/queues.py+++ b/Lib/asyncio/queues.py@@ -32,9 +32,9 @@ class QueueShutDown(Exception): class _QueueState(enum.Enum):- alive = "alive"- shutdown = "shutdown"- shutdown_immediate = "shutdown-immediate"+ ALIVE = "alive"+ SHUTDOWN = "shutdown"+ SHUTDOWN_IMMEDIATE = "shutdown-immediate" class Queue(mixins._LoopBoundMixin):@@ -60,7 +60,7 @@ def __init__(self, maxsize=0): self._finished = locks.Event() self._finished.set() self._init(maxsize)- self._shutdown_state = _QueueState.alive+ self._shutdown_state = _QueueState.ALIVE # These three are overridable in subclasses.@@ -101,6 +101,8 @@ def _format(self): result += f' _putters[{len(self._putters)}]' if self._unfinished_tasks: result += f' tasks={self._unfinished_tasks}'+ if not self._is_alive():+ result += f' state={self._shutdown_state.value}' return result def qsize(self):@@ -132,8 +134,10 @@ async def put(self, item): Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.++ Raises QueueShutDown if the queue has been shut down. """- if self._shutdown_state != _QueueState.alive:+ if not self._is_alive(): raise QueueShutDown while self.full(): putter = self._get_loop().create_future()@@ -147,14 +151,14 @@ async def put(self, item): self._putters.remove(putter) except ValueError: # The putter could be removed from self._putters by a- # previous get_nowait call.+ # previous get_nowait call or a shutdown call. pass if not self.full() and not putter.cancelled(): # We were woken up by get_nowait(), but can't take # the call. Wake up the next in line. self._wakeup_next(self._putters) raise- if self._shutdown_state != _QueueState.alive:+ if not self._is_alive(): raise QueueShutDown return self.put_nowait(item)@@ -162,8 +166,10 @@ def put_nowait(self, item): """Put an item into the queue without blocking. If no free slot is immediately available, raise QueueFull.++ Raises QueueShutDown if the queue has been shut down. """- if self._shutdown_state != _QueueState.alive:+ if not self._is_alive(): raise QueueShutDown if self.full(): raise QueueFull@@ -176,11 +182,14 @@ async def get(self): """Remove and return an item from the queue. If queue is empty, wait until an item is available.++ Raises QueueShutDown if the queue has been shut down and is empty, or+ if the queue has been shut down immediately. """- if self._shutdown_state == _QueueState.shutdown_immediate:+ if self._is_shutdown_immediate(): raise QueueShutDown while self.empty():- if self._shutdown_state != _QueueState.alive:+ if self._is_shutdown(): raise QueueShutDown getter = self._get_loop().create_future() self._getters.append(getter)@@ -193,14 +202,15 @@ async def get(self): self._getters.remove(getter) except ValueError: # The getter could be removed from self._getters by a- # previous put_nowait call.+ # previous put_nowait call,+ # or a shutdown call. pass if not self.empty() and not getter.cancelled(): # We were woken up by put_nowait(), but can't take # the call. Wake up the next in line. self._wakeup_next(self._getters) raise- if self._shutdown_state == _QueueState.shutdown_immediate:+ if self._is_shutdown_immediate(): raise QueueShutDown return self.get_nowait()@@ -208,13 +218,16 @@ def get_nowait(self): """Remove and return an item from the queue. Return an item if one is immediately available, else raise QueueEmpty.++ Raises QueueShutDown if the queue has been shut down and is empty, or+ if the queue has been shut down immediately. """+ if self._is_shutdown_immediate():+ raise QueueShutDown if self.empty():- if self._shutdown_state != _QueueState.alive:+ if self._is_shutdown(): raise QueueShutDown raise QueueEmpty- elif self._shutdown_state == _QueueState.shutdown_immediate:- raise QueueShutDown item = self._get() self._wakeup_next(self._putters) return item@@ -232,7 +245,11 @@ def task_done(self): Raises ValueError if called more times than there were items placed in the queue.++ Raises QueueShutDown if the queue has been shut down immediately. """+ if self._is_shutdown_immediate():+ raise QueueShutDown if self._unfinished_tasks <= 0: raise ValueError('task_done() called too many times') self._unfinished_tasks -= 1@@ -246,9 +263,15 @@ async def join(self): queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.++ Raises QueueShutDown if the queue has been shut down immediately. """+ if self._is_shutdown_immediate():+ raise QueueShutDown if self._unfinished_tasks > 0: await self._finished.wait()+ if self._is_shutdown_immediate():+ raise QueueShutDown def shutdown(self, immediate=False): """Shut-down the queue, making queue gets and puts raise.@@ -259,20 +282,39 @@ def shutdown(self, immediate=False): All blocked callers of put() will be unblocked, and also get() and join() if 'immediate'. The QueueShutDown exception is raised. """+ if self._is_shutdown_immediate():+ return+ # here _shutdown_state is ALIVE or SHUTDOWN if immediate:- self._shutdown_state = _QueueState.shutdown_immediate+ self._set_shutdown_immediate() while self._getters: getter = self._getters.popleft() if not getter.done(): getter.set_result(None)+ # Release all 'blocked' tasks/coros in `join()`+ self._finished.set() else:- self._shutdown_state = _QueueState.shutdown+ self._set_shutdown() while self._putters: putter = self._putters.popleft() if not putter.done(): putter.set_result(None)- # Release 'joined' tasks/coros- self._finished.set()++ def _is_alive(self):+ return self._shutdown_state is _QueueState.ALIVE++ def _is_shutdown(self):+ return self._shutdown_state is _QueueState.SHUTDOWN++ def _is_shutdown_immediate(self):+ return self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE++ def _set_shutdown(self):+ self._shutdown_state = _QueueState.SHUTDOWN++ def _set_shutdown_immediate(self):+ self._shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE+ class PriorityQueue(Queue): """A subclass of Queue; retrieves entries in priority order (lowest first).diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.pyindex 75b016f399..bf4a5a78a8 100644--- a/Lib/test/test_asyncio/test_queues.py+++ b/Lib/test/test_asyncio/test_queues.py@@ -525,7 +525,80 @@ class PriorityQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCa class _QueueShutdownTestMixin: q_class = None- async def test_empty(self):+ async def asyncSetUp(self):+ await super().asyncSetUp()+ self.delay = 0.001++ async def _get(self, q, go, results, shutdown=False):+ await go.wait()+ try:+ msg = await q.get()+ results.append(not shutdown)+ return msg+ except asyncio.QueueShutDown:+ results.append(shutdown)+ return shutdown++ async def _get_nowait(self, q, go, results, shutdown=False):+ await go.wait()+ try:+ msg = q.get_nowait()+ results.append(not shutdown)+ return msg+ except asyncio.QueueShutDown:+ results.append(shutdown)+ return shutdown++ async def _get_task_done(self, q, go, results):+ await go.wait()+ try:+ msg = await q.get()+ q.task_done()+ results.append(True)+ return msg+ except asyncio.QueueShutDown:+ results.append(False)+ return False++ async def _put(self, q, go, msg, results, shutdown=False):+ await go.wait()+ try:+ await q.put(msg)+ results.append(not shutdown)+ return not shutdown+ except asyncio.QueueShutDown:+ results.append(shutdown)+ return shutdown++ async def _put_nowait(self, q, go, msg, results, shutdown=False):+ await go.wait()+ try:+ q.put_nowait(msg)+ results.append(False)+ return not shutdown+ except asyncio.QueueShutDown:+ results.append(True)+ return shutdown++ async def _shutdown(self, q, go, immediate):+ q.shutdown(immediate)+ await asyncio.sleep(self.delay)+ go.set()+ await asyncio.sleep(self.delay)++ async def _join(self, q, results, shutdown=False):+ try:+ await q.join()+ results.append(not shutdown)+ return True+ except asyncio.QueueShutDown:+ results.append(shutdown)+ return False+ except asyncio.CancelledError:+ results.append(shutdown)+ raise++ async def test_shutdown_empty(self): q = self.q_class() q.shutdown() with self.assertRaises(@@ -537,7 +610,7 @@ async def test_empty(self): ): await q.get()- async def test_nonempty(self):+ async def test_shutdown_nonempty(self): q = self.q_class() q.put_nowait("data") q.shutdown()@@ -547,7 +620,7 @@ async def test_nonempty(self): ): await q.get()- async def test_immediate(self):+ async def test_shutdown_immediate(self): q = self.q_class() q.put_nowait("data") q.shutdown(immediate=True)@@ -555,98 +628,258 @@ async def test_immediate(self): asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" ): await q.get()- async def test_repr_shutdown(self):- q = self.q_class()- q.shutdown()++ async def test_shutdown_repr(self):+ q = self.q_class(4)+ # when alive, not in repr+ self.assertNotIn("alive", repr(q))++ q = self.q_class(6)+ q.shutdown(immediate=False) self.assertIn("shutdown", repr(q))- q = self.q_class()+ q = self.q_class(8) q.shutdown(immediate=True) self.assertIn("shutdown-immediate", repr(q))- async def test_get_shutdown_immediate(self):+ async def test_shutdown_allowed_transitions(self):+ # allowed transitions would be from alive via shutdown to immediate+ q = self.q_class()+ self.assertEqual("alive", q._shutdown_state.value)++ q.shutdown()+ self.assertEqual("shutdown", q._shutdown_state.value)++ q.shutdown(immediate=True)+ self.assertEqual("shutdown-immediate", q._shutdown_state.value)++ q.shutdown(immediate=False)+ self.assertNotEqual("shutdown", q._shutdown_state.value)++ async def _shutdown_all_methods_in_one_task(self, immediate):+ q = asyncio.Queue()+ await q.put("L")+ q.put_nowait("O")+ q.shutdown(immediate)+ with self.assertRaises(asyncio.QueueShutDown):+ await q.put("E")+ with self.assertRaises(asyncio.QueueShutDown):+ q.put_nowait("W")++ if immediate:+ with self.assertRaises(asyncio.QueueShutDown):+ await q.get()+ with self.assertRaises(asyncio.QueueShutDown):+ q.get_nowait()+ with self.assertRaises(asyncio.QueueShutDown):+ q.task_done()+ with self.assertRaises(asyncio.QueueShutDown):+ await q.join()+ else:+ self.assertIn(await q.get(), "LO")+ q.task_done()+ self.assertIn(q.get_nowait(), "LO")+ q.task_done()+ await q.join()+ # on shutdown(immediate=False)+ # when queue is empty, should raise ShutDown Exception+ with self.assertRaises(asyncio.QueueShutDown):+ await q.get()+ with self.assertRaises(asyncio.QueueShutDown):+ q.get_nowait()++ async def test_shutdown_all_methods_in_one_task(self):+ return await self._shutdown_all_methods_in_one_task(False)++ async def test_shutdown_immediate_all_methods_in_one_task(self):+ return await self._shutdown_all_methods_in_one_task(True)++ async def _shutdown_putters(self, immediate):+ delay = self.delay+ q = self.q_class(2) results = []- maxsize = 2- delay = 1e-3-- async def get_q(q):- try:- msg = await q.get()- results.append(False)- except asyncio.QueueShutDown:- results.append(True)- return True-- async def shutdown(q, delay, immediate):- await asyncio.sleep(delay)+ await q.put("E")+ await q.put("W")+ # queue full+ t = asyncio.create_task(q.put("Y"))+ await asyncio.sleep(delay)+ self.assertTrue(len(q._putters) == 1)+ with self.assertRaises(asyncio.QueueShutDown):+ # here `t` raises a QueueShuDown q.shutdown(immediate)- return True+ await t+ self.assertTrue(not q._putters)- q = self.q_class(maxsize)- t = [asyncio.create_task(get_q(q)) for _ in range(maxsize)]- t += [asyncio.create_task(shutdown(q, delay, True))]- res = await asyncio.gather(*t)+ async def test_shutdown_putters_deque(self):+ return await self._shutdown_putters(False)- self.assertEqual(results, [True]*maxsize)+ async def test_shutdown_immediate_putters_deque(self):+ return await self._shutdown_putters(True)- async def test_put_shutdown(self):- maxsize = 2+ async def _shutdown_getters(self, immediate):+ delay = self.delay+ q = self.q_class(1) results = []- delay = 1e-3+ await q.put("Y")+ nb = q.qsize()+ # queue full- async def put_twice(q, delay, msg):- await q.put(msg)- await asyncio.sleep(delay)- try:- await q.put(msg+maxsize)- results.append(False)- except asyncio.QueueShutDown:- results.append(True)- return msg-- async def shutdown(q, delay, immediate):- await asyncio.sleep(delay)+ asyncio.create_task(q.get())+ await asyncio.sleep(delay)+ t = asyncio.create_task(q.get())+ await asyncio.sleep(delay)+ self.assertTrue(len(q._getters) == 1)+ if immediate:+ # here `t` raises a QueueShuDown+ with self.assertRaises(asyncio.QueueShutDown):+ q.shutdown(immediate)+ await t+ self.assertTrue(not q._getters)+ else:+ # here `t` is always pending q.shutdown(immediate)+ await asyncio.sleep(delay)+ self.assertTrue(q._getters)+ self.assertEqual(q._unfinished_tasks, nb)- q = self.q_class(maxsize)- t = [asyncio.create_task(put_twice(q, delay, i+1)) for i in range(maxsize)]- t += [asyncio.create_task(shutdown(q, delay*2, False))]- res = await asyncio.gather(*t)- self.assertEqual(results, [True]*maxsize)+ async def test_shutdown_getters_deque(self):+ return await self._shutdown_getters(False)- async def test_put_and_join_shutdown(self):- maxsize = 2+ async def test_shutdown_immediate_getters_deque(self):+ return await self._shutdown_getters(True)++ async def _shutdown_get(self, immediate):+ q = self.q_class(2) results = []- delay = 1e-3+ go = asyncio.Event()+ await q.put("Y")+ await q.put("D")+ nb = q.qsize()+ # queue full++ if immediate:+ coros = (+ (self._get(q, go, results, shutdown=True)),+ (self._get_nowait(q, go, results, shutdown=True)),+ )+ else:+ coros = (+ # one of these tasks shoud raise Shutdown+ (self._get(q, go, results)),+ (self._get_nowait(q, go, results)),+ (self._get_nowait(q, go, results)),+ )+ t = []+ for coro in coros:+ t.append(asyncio.create_task(coro))+ t.append(asyncio.create_task(self._shutdown(q, go, immediate)))+ res = await asyncio.gather(*t)+ if immediate:+ self.assertEqual(results, [True]*len(coros))+ else:+ self.assertListEqual(sorted(results), [False] + [True]*(len(coros)-1))- async def put_twice(q, delay, msg):- await q.put(msg)- await asyncio.sleep(delay)- try:- await q.put(msg+maxsize)- results.append(False)- except asyncio.QueueShutDown:- results.append(True)- return msg-- async def shutdown(q, delay, immediate):- await asyncio.sleep(delay)- q.shutdown(immediate)+ async def test_shutdown_get(self):+ return await self._shutdown_get(False)- async def join(q, delay):- await asyncio.sleep(delay)- await q.join()- results.append(True)- return True+ async def test_shutdown_immediate_get(self):+ return await self._shutdown_get(True)++ async def test_shutdown_get_task_done_join(self):+ q = self.q_class(2)+ results = []+ go = asyncio.Event()+ await q.put("Y")+ await q.put("D")+ self.assertEqual(q._unfinished_tasks, q.qsize())++ # queue full++ coros = (+ (self._get_task_done(q, go, results)),+ (self._get_task_done(q, go, results)),+ (self._join(q, results)),+ (self._join(q, results)),+ )+ t = []+ for coro in coros:+ t.append(asyncio.create_task(coro))+ t.append(asyncio.create_task(self._shutdown(q, go, False)))+ res = await asyncio.gather(*t)++ self.assertEqual(results, [True]*len(coros))+ self.assertIn(t[0].result(), "YD")+ self.assertIn(t[1].result(), "YD")+ self.assertNotEqual(t[0].result(), t[1].result())+ self.assertEqual(q._unfinished_tasks, 0)- q = self.q_class(maxsize)- t = [asyncio.create_task(put_twice(q, delay, i+1)) for i in range(maxsize)]- t += [asyncio.create_task(shutdown(q, delay*2, True)),- asyncio.create_task(join(q, delay))]+ async def _shutdown_put(self, immediate):+ q = self.q_class()+ results = []+ go = asyncio.Event()+ # queue not empty++ coros = (+ (self._put(q, go, "Y", results, shutdown=True)),+ (self._put_nowait(q, go, "D", results, shutdown=True)),+ )+ t = []+ for coro in coros:+ t.append(asyncio.create_task(coro))+ t.append(asyncio.create_task(self._shutdown(q, go, immediate))) res = await asyncio.gather(*t)- self.assertEqual(results, [True]*(maxsize+1))+ self.assertEqual(results, [True]*len(coros))++ async def test_shutdown_put(self):+ return await self._shutdown_put(False)++ async def test_shutdown_immediate_put(self):+ return await self._shutdown_put(True)++ async def _shutdown_put_join(self, immediate):+ q = self.q_class(2)+ results = []+ go = asyncio.Event()+ await q.put("Y")+ await q.put("D")+ nb = q.qsize()+ # queue fulled++ async def _cancel_join_task(q, delay, t):+ await asyncio.sleep(delay)+ t.cancel()+ await asyncio.sleep(0)+ q._finished.set()++ coros = (+ (self._put(q, go, "E", results, shutdown=True)),+ (self._put_nowait(q, go, "W", results, shutdown=True)),+ (self._join(q, results, shutdown=True)),+ )+ t = []+ for coro in coros:+ t.append(asyncio.create_task(coro))+ t.append(asyncio.create_task(self._shutdown(q, go, immediate)))+ if not immediate:+ # Here calls `join` is a blocking operation+ # so wait for a delay and cancel this blocked task+ t.append(asyncio.create_task(_cancel_join_task(q, 0.01, t[2])))+ with self.assertRaises(asyncio.CancelledError) as e:+ await asyncio.gather(*t)+ else:+ res = await asyncio.gather(*t)++ self.assertEqual(results, [True]*len(coros))+ self.assertTrue(q._finished.is_set())++ async def test_shutdown_put_join(self):+ return await self._shutdown_put_join(False)++ async def test_shutdown_immediate_put_and_join(self):+ return await self._shutdown_put_join(True)+ class QueueShutdownTests( _QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase Threading
diffdiff --git a/Lib/queue.py b/Lib/queue.pyindex f08dbd47f1..f8a7ba0722 100644--- a/Lib/queue.py+++ b/Lib/queue.py@@ -33,7 +33,6 @@ class ShutDown(Exception): _queue_shutdown = "shutdown" _queue_shutdown_immediate = "shutdown-immediate"- class Queue: '''Create a queue object with a given maximum size.@@ -63,7 +62,7 @@ def __init__(self, maxsize=0): self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0- # Queue shut-down state+ # Queue shutdown state self.shutdown_state = _queue_alive def task_done(self):@@ -79,8 +78,12 @@ def task_done(self): Raises a ValueError if called more times than there were items placed in the queue.++ Raises ShutDown if the queue has been shut down immediately. ''' with self.all_tasks_done:+ if self._is_shutdown_immediate():+ raise ShutDown unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0:@@ -96,12 +99,16 @@ def join(self): to indicate the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.++ Raises ShutDown if the queue has been shut down immediately. ''' with self.all_tasks_done:+ if self._is_shutdown_immediate():+ raise ShutDown while self.unfinished_tasks:- if self.shutdown_state == _queue_shutdown_immediate:- return self.all_tasks_done.wait()+ if self._is_shutdown_immediate():+ raise ShutDown def qsize(self): '''Return the approximate size of the queue (not reliable!).'''@@ -143,10 +150,12 @@ def put(self, item, block=True, timeout=None): Otherwise ('block' is false), put an item on the queue if a free slot is immediately available, else raise the Full exception ('timeout' is ignored in that case).++ Raises ShutDown if the queue has been shut down. '''- if self.shutdown_state != _queue_alive:- raise ShutDown with self.not_full:+ if not self._is_alive():+ raise ShutDown if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize:@@ -154,7 +163,7 @@ def put(self, item, block=True, timeout=None): elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait()- if self.shutdown_state != _queue_alive:+ if not self._is_alive(): raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number")@@ -165,7 +174,7 @@ def put(self, item, block=True, timeout=None): if remaining <= 0.0: raise Full self.not_full.wait(remaining)- if self.shutdown_state != _queue_alive:+ if not self._is_alive(): raise ShutDown self._put(item) self.unfinished_tasks += 1@@ -181,37 +190,33 @@ def get(self, block=True, timeout=None): Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case).++ Raises ShutDown if the queue has been shut down and is empty,+ or if the queue has been shut down immediately. '''- if self.shutdown_state == _queue_shutdown_immediate:- raise ShutDown with self.not_empty:+ if self._is_shutdown_immediate() or\+ (self._is_shutdown() and not self._qsize()):+ raise ShutDown if not block: if not self._qsize():- if self.shutdown_state != _queue_alive:- raise ShutDown raise Empty elif timeout is None: while not self._qsize():- if self.shutdown_state != _queue_alive:- raise ShutDown self.not_empty.wait()- if self.shutdown_state != _queue_alive:+ if self._is_shutdown_immediate(): raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize():- if self.shutdown_state != _queue_alive:- raise ShutDown remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining)- if self.shutdown_state != _queue_alive:+ if self._is_shutdown_immediate(): raise ShutDown- if self.shutdown_state == _queue_shutdown_immediate:- raise ShutDown item = self._get() self.not_full.notify() return item@@ -242,18 +247,33 @@ def shutdown(self, immediate=False): and join() if 'immediate'. The ShutDown exception is raised. ''' with self.mutex:+ if self._is_shutdown_immediate():+ return if immediate:- self.shutdown_state = _queue_shutdown_immediate+ self._set_shutdown_immediate() self.not_empty.notify_all()- # set self.unfinished_tasks to 0- # to break the loop in 'self.join()'- # when quits from `wait()`- self.unfinished_tasks = 0+ # release all blocked threads in `join()` self.all_tasks_done.notify_all() else:- self.shutdown_state = _queue_shutdown+ self._set_shutdown() self.not_full.notify_all()+ def _is_alive(self):+ return self.shutdown_state == _queue_alive++ def _is_shutdown(self):+ return self.shutdown_state == _queue_shutdown++ def _is_shutdown_immediate(self):+ return self.shutdown_state == _queue_shutdown_immediate++ def _set_shutdown(self):+ self.shutdown_state = _queue_shutdown++ def _set_shutdown_immediate(self):+ self.shutdown_state = _queue_shutdown_immediate++ # Override these methods to implement other queue organizations # (e.g. stack or priority queue). # These will only be called with appropriate locks helddiff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.pyindex 354299b9a5..d9e840a7c8 100644--- a/Lib/test/test_queue.py+++ b/Lib/test/test_queue.py@@ -244,38 +244,405 @@ def test_shrinking_queue(self): def test_shutdown_empty(self): q = self.type2test() q.shutdown()- try:+ with self.assertRaises(self.queue.ShutDown): q.put("data")- self.fail("Didn't appear to shut-down queue")- except self.queue.ShutDown:- pass- try:+ with self.assertRaises(self.queue.ShutDown): q.get()- self.fail("Didn't appear to shut-down queue")- except self.queue.ShutDown:- pass def test_shutdown_nonempty(self): q = self.type2test() q.put("data") q.shutdown() q.get()- try:+ with self.assertRaises(self.queue.ShutDown): q.get()- self.fail("Didn't appear to shut-down queue")- except self.queue.ShutDown:- pass def test_shutdown_immediate(self): q = self.type2test() q.put("data") q.shutdown(immediate=True)- try:+ with self.assertRaises(self.queue.ShutDown): q.get()- self.fail("Didn't appear to shut-down queue")++ def test_shutdown_allowed_transitions(self):+ # allowed transitions would be from alive via shutdown to immediate+ q = self.type2test()+ self.assertEqual("alive", q.shutdown_state)++ q.shutdown()+ self.assertEqual("shutdown", q.shutdown_state)++ q.shutdown(immediate=True)+ self.assertEqual("shutdown-immediate", q.shutdown_state)++ q.shutdown(immediate=False)+ self.assertNotEqual("shutdown", q.shutdown_state)++ def _shutdown_all_methods_in_one_thread(self, immediate):+ q = self.type2test(2)+ q.put("L")+ q.put_nowait("O")+ q.shutdown(immediate)++ with self.assertRaises(self.queue.ShutDown):+ q.put("E")+ with self.assertRaises(self.queue.ShutDown):+ q.put_nowait("W")+ if immediate:+ with self.assertRaises(self.queue.ShutDown):+ q.get()+ with self.assertRaises(self.queue.ShutDown):+ q.get_nowait()+ with self.assertRaises(self.queue.ShutDown):+ q.task_done()+ with self.assertRaises(self.queue.ShutDown):+ q.join()+ else:+ self.assertIn(q.get(), "LO")+ q.task_done()+ self.assertIn(q.get(), "LO")+ q.task_done()+ q.join()+ # on shutdown(immediate=False)+ # when queue is empty, should raise ShutDown Exception+ with self.assertRaises(self.queue.ShutDown):+ q.get() # p.get(True)+ with self.assertRaises(self.queue.ShutDown):+ q.get_nowait() # p.get(False)+ with self.assertRaises(self.queue.ShutDown):+ q.get(True, 1.0)++ def test_shutdown_all_methods_in_one_thread(self):+ return self._shutdown_all_methods_in_one_thread(False)++ def test_shutdown_immediate_all_methods_in_one_thread(self):+ return self._shutdown_all_methods_in_one_thread(True)++ def _write_msg_thread(self, q, n, results, delay,+ i_when_exec_shutdown,+ event_start, event_end):+ event_start.wait()+ for i in range(1, n+1):+ try:+ q.put((i, "YDLO"))+ results.append(True)+ except self.queue.ShutDown:+ results.append(False)+ # triggers shutdown of queue+ if i == i_when_exec_shutdown:+ event_end.set()+ time.sleep(delay)+ # end of all puts+ try:+ q.join()+ except self.queue.ShutDown:+ pass++ def _read_msg_thread(self, q, nb, results, delay, event_start):+ event_start.wait()+ block = True+ while nb:+ time.sleep(delay)+ try:+ # Get at least one message+ q.get(block)+ block = False+ q.task_done()+ results.append(True)+ nb -= 1+ except self.queue.ShutDown:+ results.append(False)+ nb -= 1+ except self.queue.Empty:+ pass+ try:+ q.join()+ except self.queue.ShutDown:+ pass++ def _shutdown_thread(self, q, event_end, immediate):+ event_end.wait()+ q.shutdown(immediate)+ try:+ q.join() except self.queue.ShutDown: pass+ def _join_thread(self, q, delay, event_start):+ event_start.wait()+ time.sleep(delay)+ try:+ q.join()+ except self.queue.ShutDown:+ pass++ def _shutdown_all_methods_in_many_threads(self, immediate):+ q = self.type2test()+ ps = []+ ev_start = threading.Event()+ ev_exec_shutdown = threading.Event()+ res_puts = []+ res_gets = []+ delay = 1e-4+ read_process = 4+ nb_msgs = read_process * 16+ nb_msgs_r = nb_msgs // read_process+ when_exec_shutdown = nb_msgs // 2+ lprocs = (+ (self._write_msg_thread, 1, (q, nb_msgs, res_puts, delay,+ when_exec_shutdown,+ ev_start, ev_exec_shutdown)),+ (self._read_msg_thread, read_process, (q, nb_msgs_r,+ res_gets, delay*2,+ ev_start)),+ (self._join_thread, 2, (q, delay*2, ev_start)),+ (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)),+ )+ # start all threds+ for func, n, args in lprocs:+ for i in range(n):+ ps.append(threading.Thread(target=func, args=args))+ ps[-1].start()+ # set event in order to run q.shutdown()+ ev_start.set()++ if not immediate:+ assert(len(res_gets) == len(res_puts))+ assert(res_gets.count(True) == res_puts.count(True))+ else:+ assert(len(res_gets) <= len(res_puts))+ assert(res_gets.count(True) <= res_puts.count(True))++ def test_shutdown_all_methods_in_many_threads(self):+ return self._shutdown_all_methods_in_many_threads(False)++ def test_shutdown_immediate_all_methods_in_many_threads(self):+ return self._shutdown_all_methods_in_many_threads(True)++ def _get(self, q, go, results, shutdown=False):+ go.wait()+ try:+ msg = q.get()+ results.append(not shutdown)+ return not shutdown+ except self.queue.ShutDown:+ results.append(shutdown)+ return shutdown++ def _get_shutdown(self, q, go, results):+ return self._get(q, go, results, True)++ def _get_task_done(self, q, go, results):+ go.wait()+ try:+ msg = q.get()+ q.task_done()+ results.append(True)+ return msg+ except self.queue.ShutDown:+ results.append(False)+ return False++ def _put(self, q, msg, go, results, shutdown=False):+ go.wait()+ try:+ q.put(msg)+ results.append(not shutdown)+ return not shutdown+ except self.queue.ShutDown:+ results.append(shutdown)+ return shutdown++ def _put_shutdown(self, q, msg, go, results):+ return self._put(q, msg, go, results, True)++ def _join(self, q, results, shutdown=False):+ try:+ q.join()+ results.append(not shutdown)+ return not shutdown+ except self.queue.ShutDown:+ results.append(shutdown)+ return shutdown++ def _join_shutdown(self, q, results):+ return self._join(q, results, True)++ def _shutdown_get(self, immediate):+ q = self.type2test(2)+ results = []+ go = threading.Event()+ q.put("Y")+ q.put("D")+ # queue full++ if immediate:+ thrds = (+ (self._get_shutdown, (q, go, results)),+ (self._get_shutdown, (q, go, results)),+ )+ else:+ thrds = (+ # on shutdown(immediate=False)+ # one of these threads shoud raise Shutdown+ (self._get, (q, go, results)),+ (self._get, (q, go, results)),+ (self._get, (q, go, results)),+ )+ threads = []+ for func, params in thrds:+ threads.append(threading.Thread(target=func, args=params))+ threads[-1].start()+ q.shutdown(immediate)+ go.set()+ for t in threads:+ t.join()+ if immediate:+ self.assertListEqual(results, [True, True])+ else:+ self.assertListEqual(sorted(results), [False] + [True]*(len(thrds)-1))++ def test_shutdown_get(self):+ return self._shutdown_get(False)++ def test_shutdown_immediate_get(self):+ return self._shutdown_get(True)++ def _shutdown_put(self, immediate):+ q = self.type2test(2)+ results = []+ go = threading.Event()+ q.put("Y")+ q.put("D")+ # queue fulled++ thrds = (+ (self._put_shutdown, (q, "E", go, results)),+ (self._put_shutdown, (q, "W", go, results)),+ )+ threads = []+ for func, params in thrds:+ threads.append(threading.Thread(target=func, args=params))+ threads[-1].start()+ q.shutdown()+ go.set()+ for t in threads:+ t.join()++ self.assertEqual(results, [True]*len(thrds))++ def test_shutdown_put(self):+ return self._shutdown_put(False)++ def test_shutdown_immediate_put(self):+ return self._shutdown_put(True)++ def _shutdown_join(self, immediate):+ q = self.type2test()+ results = []+ q.put("Y")+ go = threading.Event()+ nb = q.qsize()++ if immediate:+ thrds = (+ (self._join_shutdown, (q, results)),+ (self._join_shutdown, (q, results)),+ )+ else:+ thrds = (+ (self._join, (q, results)),+ (self._join, (q, results)),+ )+ threads = []+ for func, params in thrds:+ threads.append(threading.Thread(target=func, args=params))+ threads[-1].start()+ if not immediate:+ res = []+ for i in range(nb):+ threads.append(threading.Thread(target=self._get_task_done, args=(q, go, res)))+ threads[-1].start()+ q.shutdown(immediate)+ go.set()+ for t in threads:+ t.join()++ self.assertEqual(results, [True]*len(thrds))++ def test_shutdown_immediate_join(self):+ return self._shutdown_join(True)++ def test_shutdown_join(self):+ return self._shutdown_join(False)++ def _shutdown_put_join(self, immediate):+ q = self.type2test(2)+ results = []+ go = threading.Event()+ q.put("Y")+ nb = q.qsize()+ # queue not fulled++ if immediate:+ thrds = (+ (self._put_shutdown, (q, "E", go, results)),+ (self._join_shutdown, (q, results)),+ )+ else:+ thrds = (+ (self._put_shutdown, (q, "E", go, results)),+ (self._join, (q, results)),+ )+ threads = []+ for func, params in thrds:+ threads.append(threading.Thread(target=func, args=params))+ threads[-1].start()+ if not immediate:+ self.assertEqual(q.unfinished_tasks, nb)+ for i in range(nb):+ t = threading.Thread(target=q.task_done)+ t.start()+ threads.append(t)+ go.set()+ q.shutdown(immediate)+ for t in threads:+ t.join()++ self.assertEqual(results, [True]*len(thrds))++ def test_shutdown_immediate_put_join(self):+ return self._shutdown_put_join(True)++ def test_shutdown_put_join(self):+ return self._shutdown_put_join(False)++ def test_shutdown_get_task_done_join(self):+ q = self.type2test(2)+ results = []+ go = threading.Event()+ q.put("Y")+ q.put("D")+ self.assertEqual(q.unfinished_tasks, q.qsize())++ thrds = (+ (self._get_task_done, (q, go, results)),+ (self._get_task_done, (q, go, results)),+ (self._join, (q, results)),+ (self._join, (q, results)),+ )+ threads = []+ for func, params in thrds:+ threads.append(threading.Thread(target=func, args=params))+ threads[-1].start()+ go.set()+ q.shutdown(False)+ for t in threads:+ t.join()++ self.assertEqual(results, [True]*len(thrds))++ class QueueTest(BaseQueueTestMixin): def setUp(self): Multiprocessing
diffdiff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.pyindex 5220504369..c9d5d4b567 100644--- a/Lib/multiprocessing/queues.py+++ b/Lib/multiprocessing/queues.py@@ -17,7 +17,6 @@ import types import weakref import errno-import ctypes from queue import Empty, Full, ShutDown@@ -55,9 +54,7 @@ def __init__(self, maxsize=0, *, ctx): # For use by concurrent.futures self._ignore_epipe = False self._reset()- self._shutdown_state = context._default_context.Value(- ctypes.c_uint8, lock=self._rlock- )+ self._shutdown_state = ctx.Value('i', _queue_alive) if sys.platform != 'win32': register_after_fork(self, Queue._after_fork)@@ -65,11 +62,13 @@ def __init__(self, maxsize=0, *, ctx): def __getstate__(self): context.assert_spawning(self) return (self._ignore_epipe, self._maxsize, self._reader, self._writer,- self._rlock, self._wlock, self._sem, self._opid)+ self._rlock, self._wlock, self._sem, self._opid,+ self._shutdown_state) def __setstate__(self, state): (self._ignore_epipe, self._maxsize, self._reader, self._writer,- self._rlock, self._wlock, self._sem, self._opid) = state+ self._rlock, self._wlock, self._sem, self._opid,+ self._shutdown_state) = state self._reset() def _after_fork(self):@@ -91,30 +90,45 @@ def _reset(self, after_fork=False): self._recv_bytes = self._reader.recv_bytes self._poll = self._reader.poll+ def _is_alive(self):+ return self._shutdown_state.value == _queue_alive++ def _is_shutdown(self):+ return self._shutdown_state.value == _queue_shutdown++ def _is_shutdown_immediate(self):+ return self._shutdown_state.value == _queue_shutdown_immediate++ def _set_shutdown(self):+ self._shutdown_state.value = _queue_shutdown++ def _set_shutdown_immediate(self):+ self._shutdown_state.value = _queue_shutdown_immediate+ def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed")- if self._shutdown_state.value != _queue_alive:+ if not self._is_alive(): raise ShutDown if not self._sem.acquire(block, timeout):+ if not self._is_alive():+ raise ShutDown raise Full with self._notempty:- if self._shutdown_state.value != _queue_alive:- raise ShutDown if self._thread is None: self._start_thread() self._buffer.append(obj) self._notempty.notify() def get(self, block=True, timeout=None):- if self._shutdown_state.value == _queue_shutdown_immediate:- raise ShutDown if self._closed: raise ValueError(f"Queue {self!r} is closed") if block and timeout is None: with self._rlock:- if self._shutdown_state.value != _queue_alive:+ # checks shutdown state+ if (self._is_shutdown_immediate()+ or (self._is_shutdown() and self.empty())): raise ShutDown res = self._recv_bytes() self._sem.release()@@ -122,24 +136,31 @@ def get(self, block=True, timeout=None): if block: deadline = time.monotonic() + timeout if not self._rlock.acquire(block, timeout):+ if (self._is_shutdown_immediate()+ or (self._is_shutdown() and self.empty())):+ raise ShutDown raise Empty try: if block: timeout = deadline - time.monotonic() if not self._poll(timeout):- if self._shutdown_state.value != _queue_alive:+ if not self._is_alive(): raise ShutDown raise Empty- if self._shutdown_state.value != _queue_alive :- raise ShutDown elif not self._poll():+ if not self._is_alive():+ raise ShutDown raise Empty++ # here queue is not empty+ if self._is_shutdown_immediate():+ raise ShutDown+ # here shutdown state queue is alive or shutdown res = self._recv_bytes() self._sem.release() finally: self._rlock.release()- if self._shutdown_state.value == _queue_shutdown:- raise ShutDown+ # unserialize the data after having released the lock return _ForkingPickler.loads(res)@@ -159,6 +180,19 @@ def get_nowait(self): def put_nowait(self, obj): return self.put(obj, False)+ def shutdown(self, immediate=False):+ if self._closed:+ raise ValueError(f"Queue {self!r} is closed")+ with self._shutdown_state.get_lock():+ if self._is_shutdown_immediate():+ return+ if immediate:+ self._set_shutdown_immediate()+ with self._notempty:+ self._notempty.notify_all()+ else:+ self._set_shutdown()+ def close(self): self._closed = True close = self._close@@ -180,14 +214,6 @@ def cancel_join_thread(self): except AttributeError: pass- def shutdown(self, immediate=False):- with self._rlock:- if immediate:- self._shutdown_state = _queue_shutdown_immediate- self._notempty.notify_all()- else:- self._shutdown_state = _queue_shutdown- def _start_thread(self): debug('Queue._start_thread()')@@ -340,7 +366,11 @@ def __setstate__(self, state): def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed")+ if not self._is_alive():+ raise ShutDown if not self._sem.acquire(block, timeout):+ if not self._is_alive():+ raise ShutDown raise Full with self._notempty, self._cond:@@ -352,6 +382,8 @@ def put(self, obj, block=True, timeout=None): def task_done(self): with self._cond:+ if self._is_shutdown_immediate():+ raise ShutDown if not self._unfinished_tasks.acquire(False): raise ValueError('task_done() called too many times') if self._unfinished_tasks._semlock._is_zero():@@ -359,10 +391,19 @@ def task_done(self): def join(self): with self._cond:- if self._shutdown_state.value == _queue_shutdown_immediate:- return+ if self._is_shutdown_immediate():+ raise ShutDown if not self._unfinished_tasks._semlock._is_zero(): self._cond.wait()+ if self._is_shutdown_immediate():+ raise ShutDown++ def shutdown(self, immediate=False):+ with self._cond:+ is_alive = self._is_alive()+ super().shutdown(immediate)+ if is_alive:+ self._cond.notify_all() # # Simplified Queue type -- really just a locked pipediff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.pyindex d9264ed62f..ae76c697c7 100644--- a/Lib/test/_test_multiprocessing.py+++ b/Lib/test/_test_multiprocessing.py@@ -1279,35 +1279,260 @@ def test_closed_queue_put_get_exceptions(self): q.get() def test_shutdown_empty(self):- q = multiprocessing.Queue()- q.shutdown()- with self.assertRaises(- pyqueue.ShutDown, msg="Didn't appear to shut-down queue"- ):- q.put("data")- with self.assertRaises(- pyqueue.ShutDown, msg="Didn't appear to shut-down queue"- ):- q.get()+ for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():+ q.shutdown()+ _wait()+ with self.assertRaises(+ pyqueue.ShutDown, msg="Didn't appear to shut-down queue"+ ):+ q.put("data")+ with self.assertRaises(+ pyqueue.ShutDown, msg="Didn't appear to shut-down queue"+ ):+ q.get() def test_shutdown_nonempty(self):- q = multiprocessing.Queue()- q.put("data")- q.shutdown()- q.get()- with self.assertRaises(- pyqueue.ShutDown, msg="Didn't appear to shut-down queue"- ):+ for q in multiprocessing.Queue(1), multiprocessing.JoinableQueue(1):+ q.put("data")+ q.shutdown()+ _wait() q.get()+ with self.assertRaises(+ pyqueue.ShutDown, msg="Didn't appear to shut-down queue"+ ):+ q.get() def test_shutdown_immediate(self):- q = multiprocessing.Queue()- q.put("data")- q.shutdown(immediate=True)- with self.assertRaises(- pyqueue.ShutDown, msg="Didn't appear to shut-down queue"- ):- q.get()+ for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():+ q.put("data")+ q.shutdown(immediate=True)+ _wait()+ with self.assertRaises(+ pyqueue.ShutDown, msg="Didn't appear to shut-down queue"+ ):+ q.get()++ def test_shutdown_allowed_transitions(self):+ # allowed transitions would be from `alive`` via `shutdown` to `shutdown_immediate``+ mod_q = multiprocessing.queues+ for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():+ self.assertEqual(mod_q._queue_alive, q._shutdown_state.value)++ # default -> immediate=False+ q.shutdown()+ self.assertEqual(mod_q._queue_shutdown, q._shutdown_state.value)++ q.shutdown(immediate=True)+ self.assertEqual(mod_q._queue_shutdown_immediate, q._shutdown_state.value)++ q.shutdown(immediate=False)+ self.assertNotEqual(mod_q._queue_shutdown, q._shutdown_state.value)++ def _shutdown_all_methods_in_one_process(self, immediate):+ # part 1: Queue+ q = multiprocessing.Queue(2)+ q.put("L")+ _wait() # Give time to simulate many processes+ q.put_nowait("O")+ q.shutdown(immediate)+ _wait() # simulate time of synchro primitive++ with self.assertRaises(pyqueue.ShutDown):+ q.put("E")+ with self.assertRaises(pyqueue.ShutDown):+ q.put_nowait("W")+ if immediate:+ with self.assertRaises(pyqueue.ShutDown):+ q.get()+ with self.assertRaises(pyqueue.ShutDown):+ q.get_nowait()+ else:+ # Neither `task_done`, neither `join`methods` to test+ self.assertEqual(q.get(), "L")+ self.assertEqual(q.get_nowait(), "O")+ _wait()++ # on shutdown(immediate=False)+ # when queue is empty, should raise ShutDown Exception+ with self.assertRaises(pyqueue.ShutDown):+ q.get() # p.get(True)+ with self.assertRaises(pyqueue.ShutDown):+ q.get_nowait() # q.get(False)+ with self.assertRaises(pyqueue.ShutDown):+ q.get(True, 1.0)++ # part 2: JoinableQueue+ q = multiprocessing.JoinableQueue(2)+ q.put("L")+ _wait()+ q.put_nowait("O")+ q.shutdown(immediate)+ _wait()++ with self.assertRaises(pyqueue.ShutDown):+ q.put("E")+ with self.assertRaises(pyqueue.ShutDown):+ q.put_nowait("W")+ if immediate:+ with self.assertRaises(pyqueue.ShutDown):+ q.get()+ with self.assertRaises(pyqueue.ShutDown):+ q.get_nowait()+ with self.assertRaises(pyqueue.ShutDown):+ q.task_done()+ with self.assertRaises(pyqueue.ShutDown):+ q.join()+ else:+ self.assertEqual(q.get(), "L")+ q.task_done()+ _wait()+ self.assertEqual(q.get(), "O")+ q.task_done()+ _wait()+ q.join()+ # when `shutdown` queue is empty, should raise ShutDown Exception+ with self.assertRaises(pyqueue.ShutDown):+ q.get() # p.get(True)+ with self.assertRaises(pyqueue.ShutDown):+ q.get_nowait() # p.get(False)+ with self.assertRaises(pyqueue.ShutDown):+ q.get(True, 1.0)++ def test_shutdown_all_methods_in_one_process(self):+ return self._shutdown_all_methods_in_one_process(False)++ def test_shutdown_immediate_all_methods_in_one_process(self):+ return self._shutdown_all_methods_in_one_process(True)++ @classmethod+ def _write_msg_process(cls, q, n, results, delay,+ i_when_exec_shutdown,+ event_start, event_end):+ event_start.wait()+ for i in range(1, n+1):+ try:+ q.put((i, "YDLO"))+ results.append(True)+ except pyqueue.ShutDown:+ results.append(False)+ # triggers shutdown of queue+ if i == i_when_exec_shutdown:+ event_end.set()+ time.sleep(delay)+ # end of all puts+ if isinstance(q, type(multiprocessing.JoinableQueue())):+ try:+ q.join()+ except pyqueue.ShutDown:+ pass++ @classmethod+ def _read_msg_process(cls, q, nb, results, delay, event_start):+ event_start.wait()+ block = True+ while nb:+ time.sleep(delay)+ try:+ # Get at least one message+ q.get(block)+ block = False+ if isinstance(q, type(multiprocessing.JoinableQueue())):+ q.task_done()+ results.append(True)+ nb -= 1+ except pyqueue.ShutDown:+ results.append(False)+ nb -= 1+ except pyqueue.Empty:+ pass+ # end of all gets+ if isinstance(q, type(multiprocessing.JoinableQueue())):+ try:+ q.join()+ except pyqueue.ShutDown:+ pass++ @classmethod+ def _shutdown_process(cls, q, event_end, immediate):+ event_end.wait()+ q.shutdown(immediate)+ if isinstance(q, type(multiprocessing.JoinableQueue())):+ try:+ q.join()+ except pyqueue.ShutDown:+ pass++ @classmethod+ def _join_process(cls, q, delay, event_start):+ event_start.wait()+ time.sleep(delay)+ try:+ q.join()+ except pyqueue.ShutDown:+ pass++ #@classmethod+ def _shutdown_all_methods_in_many_processes(self, immediate):+ for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():+ ps = []+ ev_start = multiprocessing.Event()+ ev_exec_shutdown = multiprocessing.Event()+ m = multiprocessing.Manager()+ res_puts = m.list()+ res_gets = m.list()+ delay = 1e-4+ read_process = 4+ nb_msgs = read_process * 16+ nb_msgs_r = nb_msgs // read_process+ when_exec_shutdown = nb_msgs // 2+ if isinstance(q, type(multiprocessing.Queue())):+ lprocs = (+ (self._write_msg_process, 1, (q, nb_msgs, res_puts, delay,+ when_exec_shutdown,+ ev_start, ev_exec_shutdown)),+ (self._read_msg_process, read_process, (q, nb_msgs_r,+ res_gets, delay*2,+ ev_start)),+ (self._shutdown_process, 1, (q, ev_exec_shutdown, immediate)),+ )+ else:+ # add 2 self._join process processes+ lprocs = (+ (self._write_msg_process, 1, (q, nb_msgs, res_puts, delay,+ when_exec_shutdown,+ ev_start, ev_exec_shutdown)),+ (self._read_msg_process, read_process, (q, nb_msgs_r,+ res_gets, delay*2,+ ev_start)),+ (self._join_process, 2, (q, delay*2, ev_start)),+ (self._shutdown_process, 1, (q, ev_exec_shutdown, immediate)),+ )+ # start all processes+ for func, n, args in lprocs:+ for i in range(n):+ ps.append(multiprocessing.Process(target=func, args=args))+ ps[-1].start()+ # set event in order to run q.shutdown()+ ev_start.set()+ _wait()+ # wait+ if isinstance(q, type(multiprocessing.Queue())):+ for p in ps:+ p.join()++ if not immediate:+ self.assertTrue(q.empty())+ self.assertEqual(res_gets.count(True), res_puts.count(True))+ else:+ self.assertTrue(res_gets.count(True) <= res_puts.count(True))++ def test_shutdown_all_methods_in_many_processes(self):+ return self._shutdown_all_methods_in_many_processes(False)++ def test_shutdown_immediate_all_methods_in_many_processes(self):+ return self._shutdown_all_methods_in_many_processes(True)++ # # # |
I agree that the reviews would be easier if this PR is split into three. Feel free to ping me when you open one for asyncio. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Please split this PR into 3 PRs to make it easier for reviewers. Thanks.
@YvesDup If you want I can split them up and include your authorship |
I am very busy at this time, so yes I agree with your proposal. |
I'm planning to squash all commits into two (for each library), one for each of my work and yours, and include the changes in longer commit messages (which I'll copy to the PR description). Almost all PRs in this repo are squashed to one commit anyway, which GitHub adds co-authorship commit message footers for each commit author. |
Uh oh!
There was an error while loading.Please reload this page.
Linked topic, issue and PR
Reminders
I just created this new PR from@EpicWink's original PR, to resume and continue working on the 'queue shutdown' feature with sufficient rights. Currently, I'm on Mac OSX and I need to have the results of the tests on Win and Linux.
About this PR, here is the list of concerned classes:
threading.Queue
and its derived classes,asyncio.Queue
and its derived classes,multiprocessing.queue.Queue
andmultiprocessing.queue.JoinableQueue
.As a reminder, here is a list of features, which I I've collected fromdiscuss.org, thegh-96471 issue and thefirst PR:
A queue is created with a new
shutdown_state
attribute with a list of possible values as:alive
,shutdown
orshutdown-immediate
. When created, the new attribute is set toalive
.The allowed transitions are:
alive
toshutdown
,alive
toshutdown-immediate
,shutdown
toshutdown-immediate
.When the
shutdown
method is called to stop the queue:put
,put_nowait
raise aShutDown
exception.put
methods are released and raise aShutDown
exception.get
,get_nowait
,task_done
andjoin
raiseShutDown
exception as soon as the queue is empty.get
,get_nowait
,task_done
,join
raise theShutDown
exception,get
,join
(here including processes) methods are released and raise aShutDown
exception.Currently, there is a solution for each class, with dedicated tests.
Thanks you all for your comments.