Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-115258: Fix failed tests on threading queue shutdown#115940

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged

Conversation

YvesDup
Copy link
Contributor

@YvesDupYvesDup commentedFeb 26, 2024
edited by gvanrossum
Loading

Fix infinite loop in_read_msg_thread oftest_shutdown_[immediate_]all_methods_in_many_threads unittests.

@bedevere-appbedevere-appbot mentioned this pull requestFeb 26, 2024
@bedevere-appbedevere-appbot added the testsTests in the Lib/test dir labelFeb 26, 2024
@rhettingerrhettinger removed their request for reviewFebruary 26, 2024 12:55
Copy link
Contributor

@Jason-Y-ZJason-Y-Z left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Thanks for the change! Had some minor comments when reviewing, please see inline

YvesDup reacted with eyes emoji
@YvesDupYvesDupforce-pushed theThreading-queue-shutdown-test-failed branch from8b24985 tof246bf8CompareMarch 9, 2024 12:01
@Jason-Y-Z
Copy link
Contributor

Hmm I might be missing something but it looks like I can't see the latest changes?

@YvesDup
Copy link
ContributorAuthor

Hmm I might be missing something but it looks like I can't see the latest changes?

Hi, I was on vacation. The changes are scheduled for early next week.

@Jason-Y-Z
Copy link
Contributor

Ah sorry for that! Enjoy your holiday :D

Copy link
Contributor

@EpicWinkEpicWink left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Test hangs when adding manytime.sleep(random.random() / 10.0) calls between each statement

Diff (click to expand)
diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.pyindex 9dc7f62999..d223c358e0 100644--- a/Lib/test/test_queue.py+++ b/Lib/test/test_queue.py@@ -320,58 +320,90 @@ def test_shutdown_immediate_all_methods_in_one_thread(self):     def _write_msg_thread(self, q, n, results,                             i_when_exec_shutdown, event_shutdown,                             barrier_start):+        time.sleep(random.random() / 10.0)         # All `write_msg_threads`         # put several items into the queue.         for i in range(0, i_when_exec_shutdown//2):+            time.sleep(random.random() / 10.0)             q.put((i, 'LOYD'))+        time.sleep(random.random() / 10.0)         # Wait for the barrier to be complete.         barrier_start.wait()+        time.sleep(random.random() / 10.0)          for i in range(i, n):+            time.sleep(random.random() / 10.0)             try:                 q.put((i, "YDLO"))             except self.queue.ShutDown:+                time.sleep(random.random() / 10.0)                 results.append(False)+                time.sleep(random.random() / 10.0)                 break+            time.sleep(random.random() / 10.0)              # Trigger queue shutdown.             if i == i_when_exec_shutdown:+                time.sleep(random.random() / 10.0)                 # Only once thread do it.                 if not event_shutdown.is_set():+                    time.sleep(random.random() / 10.0)                     event_shutdown.set()+                    time.sleep(random.random() / 10.0)                     results.append(True)+        time.sleep(random.random() / 10.0)         q.join()+        time.sleep(random.random() / 10.0)      def _read_msg_thread(self, q, results, barrier_start):         # Wait for the barrier to be complete.+        time.sleep(random.random() / 10.0)         barrier_start.wait()+        time.sleep(random.random() / 10.0)         while True:+            time.sleep(random.random() / 10.0)             try:                 q.get(False)+                time.sleep(random.random() / 10.0)                 q.task_done()             except self.queue.ShutDown:+                time.sleep(random.random() / 10.0)                 results.append(True)+                time.sleep(random.random() / 10.0)                 break             except self.queue.Empty:                 pass+            time.sleep(random.random() / 10.0)+        time.sleep(random.random() / 10.0)         q.join()+        time.sleep(random.random() / 10.0)      def _shutdown_thread(self, q, results, event_end, immediate):+        time.sleep(random.random() / 10.0)         event_end.wait()+        time.sleep(random.random() / 10.0)         q.shutdown(immediate)+        time.sleep(random.random() / 10.0)         results.append(q.qsize() == 0)+        time.sleep(random.random() / 10.0)         q.join()+        time.sleep(random.random() / 10.0)      def _join_thread(self, q, barrier_start):+        time.sleep(random.random() / 10.0)         # Wait for the barrier to be complete.         barrier_start.wait()+        time.sleep(random.random() / 10.0)         q.join()+        time.sleep(random.random() / 10.0)      def _shutdown_all_methods_in_many_threads(self, immediate):         # Run a 'multi-producers/consumers queue' use case,         # with enough items into the queue.         # When shutdown, all running threads will be concerned.+        time.sleep(random.random() / 10.0)         q = self.type2test()+        time.sleep(random.random() / 10.0)         ps = []         res_puts = []         res_gets = []@@ -382,11 +414,14 @@ def _shutdown_all_methods_in_many_threads(self, immediate):         nb_msgs = 1024*64         nb_msgs_w = nb_msgs // write_threads         when_exec_shutdown = nb_msgs_w // 2+        time.sleep(random.random() / 10.0)         # Use of a `threading.Barrier`` to ensure that all `_write_msg_threads`         # put their part of items into the queue. And trigger the start of         # other threads as `_read_msg_thread`and `_join_thread`.         barrier_start = threading.Barrier(write_threads+read_threads+join_threads)+        time.sleep(random.random() / 10.0)         ev_exec_shutdown = threading.Event()+        time.sleep(random.random() / 10.0)         lprocs = (             (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts,                                             when_exec_shutdown, ev_exec_shutdown,@@ -395,19 +430,34 @@ def _shutdown_all_methods_in_many_threads(self, immediate):             (self._join_thread, join_threads, (q, barrier_start)),             (self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)),             )+        time.sleep(random.random() / 10.0)         # start all threads.         for func, n, args in lprocs:+            time.sleep(random.random() / 10.0)             for i in range(n):+                time.sleep(random.random() / 10.0)                 ps.append(threading.Thread(target=func, args=args))+                time.sleep(random.random() / 10.0)                 ps[-1].start()+                time.sleep(random.random() / 10.0)+            time.sleep(random.random() / 10.0)+        time.sleep(random.random() / 10.0)         for thread in ps:+            time.sleep(random.random() / 10.0)             thread.join()+            time.sleep(random.random() / 10.0)+        time.sleep(random.random() / 10.0)          self.assertEqual(res_puts.count(True), 1)+        time.sleep(random.random() / 10.0)         self.assertLessEqual(res_gets.count(True), read_threads)+        time.sleep(random.random() / 10.0)         if immediate:+            time.sleep(random.random() / 10.0)             self.assertListEqual(res_shutdown, [True])+            time.sleep(random.random() / 10.0)             self.assertTrue(q.empty())+            time.sleep(random.random() / 10.0)      def test_shutdown_all_methods_in_many_threads(self):         return self._shutdown_all_methods_in_many_threads(False)

Also, perhaps look intothreading_helper.join_thread(thread) andthreading_helper.start_threads(threads) (fromtest.support) for better thread management. They should fail the test instead of hanging

@YvesDup
Copy link
ContributorAuthor

YvesDup commentedMar 13, 2024
edited
Loading

Test hangs when adding manytime.sleep(random.random() / 10.0) calls between each statement

When I run your modified version of the test, it hangs, but only because_read_msg_thread threads are not started.
The line(self._read_msg_thread, read_threads, (q, res_gets, barrier_start)), in theprocs tuple is missing: so the barrier waits for ever,partiesis never reached. Could you check that please ?
When I add the missing line, test succeeds. I reduce number of threads and msg because alltime.sleep() are too much long ;-).

@EpicWink
Copy link
Contributor

The line (self._read_msg_thread, read_threads, (q, res_gets, barrier_start)), in the procs tuple is missing: so the barrier waits for ever, partiesis never reached. Could you check that please ?

@YvesDup it wasn't missing, just was outside of the diff context


test succeeds. I reduce number of threads and msg because all time.sleep() are too much long

Tests finish also for me when I instead usetime.sleep(random.random() / 1000.0) (and/ 10000.0). However, sometimes I get:

FAIL:test_shutdown_all_methods_in_many_threads (test.test_queue.PyPriorityQueueTest.test_shutdown_all_methods_in_many_threads)----------------------------------------------------------------------Traceback (mostrecentcalllast):File"~/src/cpython/Lib/test/test_queue.py",line463,intest_shutdown_all_methods_in_many_threadsreturnself._shutdown_all_methods_in_many_threads(False)~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^File"~/src/cpython/Lib/test/test_queue.py",line451,in_shutdown_all_methods_in_many_threadsself.assertEqual(res_puts.count(True),1)~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^AssertionError:2!=1

@YvesDup
Copy link
ContributorAuthor

Thank for adding all thesetime.sleep() in the test. It's very instructive.

File"~/src/cpython/Lib/test/test_queue.py",line451,in_shutdown_all_methods_in_many_threadsself.assertEqual(res_puts.count(True),1)~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^AssertionError:2!=1

About this message, I have got this failed test once. That means here that there are 2_write_msg_threads threads which reach thei_when_exec_shutdown. I'am going to modify it to check there is at least one True value.

That said, I am going to:

  • modify the test above.
  • modify the _read_msg_thread to ensure at least one msg was got, and check that aShutDown was raised for each thread.
  • removeq.join() instructions from_write_msg_thread,_read_msg_thread and_shutdown_thread. They can cause hangs when shutdown is not immediate.
  • start_join_thread only when shutdow, is immediate.

I will submit a new version of this test.

YvesDupand others added5 commitsMarch 13, 2024 16:22
of 'test_shutdown_[immediate_]all_methods_in_many_threads' unittests
…_many_threads` methods, with a code refactoring.Add a `results` list to the `_shutdown_thread` method. Add tests.Fix nit.
@YvesDupYvesDupforce-pushed theThreading-queue-shutdown-test-failed branch from50bddd3 tob9ee958CompareMarch 13, 2024 15:22
Start `join_thread` only when shutdown is immediate.Update tests.
@gvanrossum
Copy link
Member

Please don't force-push. It makes reviewing harder. We squash-merge ultimately anyway. Unless you're not ready to have this reviewed, in which case the PR should be in Draft mode.

Copy link
Member

@gvanrossumgvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I believe I am now following what this does. Let me summarize, please check my understanding.

  • We start 4 writer threads, each of which is going to put 16*1024 items into the queue.
  • (Actually, one more per thread, probably by mistake.)
  • When the first of these threads reaches half that number, it calls shutdown().
  • All writer threads also have to reach halfway before passing the barrier.
  • There are also 6 reader threads. These each get at least one item, wait for the barrier, and then busy-wait until the queue gets shut down.
  • There are two variants, one for shutdown(True), one for shutdown(False).
  • For shutdown(immediate=True), we also start a join thread.
  • All threads must complete for the test to pass.

I am a little unclear on what the test is trying to prove. It seems to be stress testing concurrent reading and writing of the queue plus shutdown. Is that what it is after?

@YvesDup
Copy link
ContributorAuthor

I am a little unclear on what the test is trying to prove. It seems to be stress testing concurrent reading and writing of the queue plus shutdown. Is that what it is after?

Your understanding is correct. The aim is to have a queue not empty and check that all running threads stop correctly when queue shutdows.

There are 3 threads types which used each one of methods of Queue class (put, coupleget+task_done andjoin). Each king of thread is started several times. All threads are really doing operations before the shutdown.
At this end of test case, we check:

  • all threads are stopped,
  • allread_msg threads threads raised ofShutDown exception,
  • and queue is empty when shutdown is immediate.

if you or/and@EpicWink think this test case is not really useful, I'll let you decide what to do with it.

@gvanrossum
Copy link
Member

  • allread_msg threads threads raised ofShutDown exception,

Then shouldn't the assert be

self.assertEqual(res_gets.count(True),read_threads)

(instead ofassertLessEqual)?

@YvesDup
Copy link
ContributorAuthor

Then shouldn't the assert be

self.assertEqual(res_gets.count(True),read_threads)

(instead ofassertLessEqual)?

Yes

Fix start value of range.Change `self.assertLessEqual` to `self.assertEqual`.
Copy link
Member

@gvanrossumgvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Okay, LGTM.

@EpicWink Do you agree that this test is worth having and that this version looks bullet-proof?

@gvanrossumgvanrossum added the 🔨 test-with-buildbotsTest PR w/ buildbots; report in status section labelMar 15, 2024
@bedevere-bot
Copy link

🤖 New build scheduled with the buildbot fleet by@gvanrossum for commit349d08a 🤖

If you want to schedule another build, you need to add the🔨 test-with-buildbots label again.

@bedevere-botbedevere-bot removed the 🔨 test-with-buildbotsTest PR w/ buildbots; report in status section labelMar 15, 2024
@EpicWink
Copy link
Contributor

I've run the modified tests ~50 times, both with and without GIL, both single-process and 15-processes, both with and without thetime.sleep()s; after updating withmain. No failures, so the test fix looks good to me.

I agree the purpose of the test is not very indicative of any real-world scenario, but it does seem to test invariants so I didn't push back against its inclusion.

However, the NEWS entry still hasn't been removed.

@gvanrossumgvanrossumenabled auto-merge (squash)March 18, 2024 15:51
@gvanrossumgvanrossum merged commit7707b14 intopython:mainMar 18, 2024
@YvesDup
Copy link
ContributorAuthor

Thanks@EpicWink for the mention in thewhatsnew3.13.

Jason-Y-Z reacted with thumbs up emoji

vstinner pushed a commit to vstinner/cpython that referenced this pull requestMar 20, 2024
…on#115940)This reinstates `test_shutdown_immediate_all_methods_in_many_threads`and improves `test_shutdown_all_methods_in_many_threads`.
adorilson pushed a commit to adorilson/cpython that referenced this pull requestMar 25, 2024
…on#115940)This reinstates `test_shutdown_immediate_all_methods_in_many_threads`and improves `test_shutdown_all_methods_in_many_threads`.
diegorusso pushed a commit to diegorusso/cpython that referenced this pull requestApr 17, 2024
…on#115940)This reinstates `test_shutdown_immediate_all_methods_in_many_threads`and improves `test_shutdown_all_methods_in_many_threads`.
@YvesDupYvesDup deleted the Threading-queue-shutdown-test-failed branchOctober 21, 2024 20:26
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@Jason-Y-ZJason-Y-ZJason-Y-Z left review comments

@EpicWinkEpicWinkEpicWink requested changes

@gvanrossumgvanrossumgvanrossum approved these changes

Assignees
No one assigned
Labels
skip newstestsTests in the Lib/test dir
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

5 participants
@YvesDup@Jason-Y-Z@EpicWink@gvanrossum@bedevere-bot

[8]ページ先頭

©2009-2025 Movatter.jp