Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork32.4k
gh-115258: Fix failed tests on threading queue shutdown#115940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
gh-115258: Fix failed tests on threading queue shutdown#115940
Uh oh!
There was an error while loading.Please reload this page.
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Thanks for the change! Had some minor comments when reviewing, please see inline
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
8b24985
tof246bf8
CompareHmm I might be missing something but it looks like I can't see the latest changes? |
Hi, I was on vacation. The changes are scheduled for early next week. |
Ah sorry for that! Enjoy your holiday :D |
f246bf8
tofd811b5
CompareThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Test hangs when adding manytime.sleep(random.random() / 10.0)
calls between each statement
Diff (click to expand)
diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.pyindex 9dc7f62999..d223c358e0 100644--- a/Lib/test/test_queue.py+++ b/Lib/test/test_queue.py@@ -320,58 +320,90 @@ def test_shutdown_immediate_all_methods_in_one_thread(self): def _write_msg_thread(self, q, n, results, i_when_exec_shutdown, event_shutdown, barrier_start):+ time.sleep(random.random() / 10.0) # All `write_msg_threads` # put several items into the queue. for i in range(0, i_when_exec_shutdown//2):+ time.sleep(random.random() / 10.0) q.put((i, 'LOYD'))+ time.sleep(random.random() / 10.0) # Wait for the barrier to be complete. barrier_start.wait()+ time.sleep(random.random() / 10.0) for i in range(i, n):+ time.sleep(random.random() / 10.0) try: q.put((i, "YDLO")) except self.queue.ShutDown:+ time.sleep(random.random() / 10.0) results.append(False)+ time.sleep(random.random() / 10.0) break+ time.sleep(random.random() / 10.0) # Trigger queue shutdown. if i == i_when_exec_shutdown:+ time.sleep(random.random() / 10.0) # Only once thread do it. if not event_shutdown.is_set():+ time.sleep(random.random() / 10.0) event_shutdown.set()+ time.sleep(random.random() / 10.0) results.append(True)+ time.sleep(random.random() / 10.0) q.join()+ time.sleep(random.random() / 10.0) def _read_msg_thread(self, q, results, barrier_start): # Wait for the barrier to be complete.+ time.sleep(random.random() / 10.0) barrier_start.wait()+ time.sleep(random.random() / 10.0) while True:+ time.sleep(random.random() / 10.0) try: q.get(False)+ time.sleep(random.random() / 10.0) q.task_done() except self.queue.ShutDown:+ time.sleep(random.random() / 10.0) results.append(True)+ time.sleep(random.random() / 10.0) break except self.queue.Empty: pass+ time.sleep(random.random() / 10.0)+ time.sleep(random.random() / 10.0) q.join()+ time.sleep(random.random() / 10.0) def _shutdown_thread(self, q, results, event_end, immediate):+ time.sleep(random.random() / 10.0) event_end.wait()+ time.sleep(random.random() / 10.0) q.shutdown(immediate)+ time.sleep(random.random() / 10.0) results.append(q.qsize() == 0)+ time.sleep(random.random() / 10.0) q.join()+ time.sleep(random.random() / 10.0) def _join_thread(self, q, barrier_start):+ time.sleep(random.random() / 10.0) # Wait for the barrier to be complete. barrier_start.wait()+ time.sleep(random.random() / 10.0) q.join()+ time.sleep(random.random() / 10.0) def _shutdown_all_methods_in_many_threads(self, immediate): # Run a 'multi-producers/consumers queue' use case, # with enough items into the queue. # When shutdown, all running threads will be concerned.+ time.sleep(random.random() / 10.0) q = self.type2test()+ time.sleep(random.random() / 10.0) ps = [] res_puts = [] res_gets = []@@ -382,11 +414,14 @@ def _shutdown_all_methods_in_many_threads(self, immediate): nb_msgs = 1024*64 nb_msgs_w = nb_msgs // write_threads when_exec_shutdown = nb_msgs_w // 2+ time.sleep(random.random() / 10.0) # Use of a `threading.Barrier`` to ensure that all `_write_msg_threads` # put their part of items into the queue. And trigger the start of # other threads as `_read_msg_thread`and `_join_thread`. barrier_start = threading.Barrier(write_threads+read_threads+join_threads)+ time.sleep(random.random() / 10.0) ev_exec_shutdown = threading.Event()+ time.sleep(random.random() / 10.0) lprocs = ( (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts, when_exec_shutdown, ev_exec_shutdown,@@ -395,19 +430,34 @@ def _shutdown_all_methods_in_many_threads(self, immediate): (self._join_thread, join_threads, (q, barrier_start)), (self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)), )+ time.sleep(random.random() / 10.0) # start all threads. for func, n, args in lprocs:+ time.sleep(random.random() / 10.0) for i in range(n):+ time.sleep(random.random() / 10.0) ps.append(threading.Thread(target=func, args=args))+ time.sleep(random.random() / 10.0) ps[-1].start()+ time.sleep(random.random() / 10.0)+ time.sleep(random.random() / 10.0)+ time.sleep(random.random() / 10.0) for thread in ps:+ time.sleep(random.random() / 10.0) thread.join()+ time.sleep(random.random() / 10.0)+ time.sleep(random.random() / 10.0) self.assertEqual(res_puts.count(True), 1)+ time.sleep(random.random() / 10.0) self.assertLessEqual(res_gets.count(True), read_threads)+ time.sleep(random.random() / 10.0) if immediate:+ time.sleep(random.random() / 10.0) self.assertListEqual(res_shutdown, [True])+ time.sleep(random.random() / 10.0) self.assertTrue(q.empty())+ time.sleep(random.random() / 10.0) def test_shutdown_all_methods_in_many_threads(self): return self._shutdown_all_methods_in_many_threads(False)
Also, perhaps look intothreading_helper.join_thread(thread)
andthreading_helper.start_threads(threads)
(fromtest.support
) for better thread management. They should fail the test instead of hanging
Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
YvesDup commentedMar 13, 2024 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
When I run your modified version of the test, it hangs, but only because |
@YvesDup it wasn't missing, just was outside of the diff context
Tests finish also for me when I instead use FAIL:test_shutdown_all_methods_in_many_threads (test.test_queue.PyPriorityQueueTest.test_shutdown_all_methods_in_many_threads)----------------------------------------------------------------------Traceback (mostrecentcalllast):File"~/src/cpython/Lib/test/test_queue.py",line463,intest_shutdown_all_methods_in_many_threadsreturnself._shutdown_all_methods_in_many_threads(False)~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^File"~/src/cpython/Lib/test/test_queue.py",line451,in_shutdown_all_methods_in_many_threadsself.assertEqual(res_puts.count(True),1)~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^AssertionError:2!=1 |
Thank for adding all these
About this message, I have got this failed test once. That means here that there are 2 That said, I am going to:
I will submit a new version of this test. |
of 'test_shutdown_[immediate_]all_methods_in_many_threads' unittests
…_many_threads` methods, with a code refactoring.Add a `results` list to the `_shutdown_thread` method. Add tests.Fix nit.
50bddd3
tob9ee958
CompareStart `join_thread` only when shutdown is immediate.Update tests.
Please don't force-push. It makes reviewing harder. We squash-merge ultimately anyway. Unless you're not ready to have this reviewed, in which case the PR should be in Draft mode. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I believe I am now following what this does. Let me summarize, please check my understanding.
- We start 4 writer threads, each of which is going to put 16*1024 items into the queue.
- (Actually, one more per thread, probably by mistake.)
- When the first of these threads reaches half that number, it calls shutdown().
- All writer threads also have to reach halfway before passing the barrier.
- There are also 6 reader threads. These each get at least one item, wait for the barrier, and then busy-wait until the queue gets shut down.
- There are two variants, one for shutdown(True), one for shutdown(False).
- For shutdown(immediate=True), we also start a join thread.
- All threads must complete for the test to pass.
I am a little unclear on what the test is trying to prove. It seems to be stress testing concurrent reading and writing of the queue plus shutdown. Is that what it is after?
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Your understanding is correct. The aim is to have a queue not empty and check that all running threads stop correctly when queue shutdows. There are 3 threads types which used each one of methods of Queue class (
if you or/and@EpicWink think this test case is not really useful, I'll let you decide what to do with it. |
Then shouldn't the assert be self.assertEqual(res_gets.count(True),read_threads) (instead of |
Yes |
Fix start value of range.Change `self.assertLessEqual` to `self.assertEqual`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Okay, LGTM.
@EpicWink Do you agree that this test is worth having and that this version looks bullet-proof?
bedevere-bot commentedMar 15, 2024
🤖 New build scheduled with the buildbot fleet by@gvanrossum for commit349d08a 🤖 If you want to schedule another build, you need to add the🔨 test-with-buildbots label again. |
I've run the modified tests ~50 times, both with and without GIL, both single-process and 15-processes, both with and without the I agree the purpose of the test is not very indicative of any real-world scenario, but it does seem to test invariants so I didn't push back against its inclusion. However, the NEWS entry still hasn't been removed. |
Thanks@EpicWink for the mention in thewhatsnew3.13. |
…on#115940)This reinstates `test_shutdown_immediate_all_methods_in_many_threads`and improves `test_shutdown_all_methods_in_many_threads`.
…on#115940)This reinstates `test_shutdown_immediate_all_methods_in_many_threads`and improves `test_shutdown_all_methods_in_many_threads`.
…on#115940)This reinstates `test_shutdown_immediate_all_methods_in_many_threads`and improves `test_shutdown_all_methods_in_many_threads`.
Uh oh!
There was an error while loading.Please reload this page.
Fix infinite loop in
_read_msg_thread
oftest_shutdown_[immediate_]all_methods_in_many_threads
unittests.test_queue
times out #115258