Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-96471: Add multiprocessing queue shutdown#104230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Draft
EpicWink wants to merge11 commits intopython:main
base:main
Choose a base branch
Loading
fromEpicWink:multiprocessing-queue-shutdown

Conversation

EpicWink
Copy link
Contributor

@EpicWinkEpicWink commentedMay 6, 2023
edited
Loading

Multiprocessing-only changes from#102499 (which supercedes#96474), updated to match the API introduced by#104750

Depends on#104750


📚 Documentation preview 📚:https://cpython-previews--104230.org.readthedocs.build/

EpicWinkand others added2 commitsMay 6, 2023 15:10
* Remove ctypes import (use string identifiers for value types)* Use queue's context to get shared value for queue state* Include queue state in pickle* Factor out queue-state checks and updates to methods* Logic fixes in put and get* Move shutdown method to before close* Raise when shutting down closed queue* Don't re-notify if immediately shutting down a queue already immediately shut-down* Support shutdown in JoinableQueue  * Handle in task_done and join  * Logic fixes in put and shutdown* Updated tests* Document feature added in 3.13
@EpicWink
Copy link
ContributorAuthor

@willingc ready for review

@EpicWink
Copy link
ContributorAuthor

Like with#104750, we may want to modify the implementation ofimmediate=True to simply consume the queue rather than having a separate state.

YvesDup reacted with thumbs up emoji

@gvanrossum
Copy link
Member

Multiprocessing-only changes from#102499 (on top of#96474)

Depends on#104225

Looks like both those PRs were closed without merging -- maybe you can update the description? (I'm not super interested in the history of this PR any more, more in the similarity to what we've already added to queue.py.)

I also am unlikely going to be your reviewer for this PR -- I don't know who maintains multiprocessing these days, maybe@gpshead feels confident. (I've never even used it and neither have I ever read the code before or reviewed any part of it. :-)

@EpicWinkEpicWink marked this pull request as draftFebruary 22, 2024 01:13
@EpicWinkEpicWinkforce-pushed themultiprocessing-queue-shutdown branch 2 times, most recently fromd1ceccf to90279adCompareMarch 26, 2024 09:08
@YvesDup
Copy link
Contributor

YvesDup commentedApr 9, 2024
edited
Loading

Hi@EpicWink ,

I've just done a quick review of the code (I have seen it is a draft). I have got mainly 3 comments:

  • In the__init__ method, I am not sure about thelock=self._rlock argument in thectx.Value instruction. When I runshutdown method on a new queue, my script hangs at theself._is_shutdown.value = True instruction. If I remove the lastlock=self._rlock argument, there is no hang. I can not explain why ?Maybe see documentation here
  • Used ofqsize method ofQueue class is not recommended because it is not implemented on Mac OSX. Theself._sem.release(self.qsize()) instruction ofshutdown method should be replace.
  • Always in this method, the CMwith self._notempty: self._notempty.notify_all() has to be move one block left to be executed whatever theimmediate value (getters unlock case).

I run my tests on a Mac OSX 13.4 m1.
I hope this will be helpfull

@EpicWink
Copy link
ContributorAuthor

@YvesDup this PR is still a work in progress

YvesDup reacted with eyes emoji

@EpicWinkEpicWinkforce-pushed themultiprocessing-queue-shutdown branch from90279ad to19ab554CompareApril 12, 2024 07:05
@EpicWinkEpicWinkforce-pushed themultiprocessing-queue-shutdown branch from19ab554 to42fbea6CompareApril 12, 2024 07:08
@EpicWink
Copy link
ContributorAuthor

EpicWink commentedApr 12, 2024
edited
Loading

In theinit method, I am not sure about the lock=self._rlock argument in the ctx.Value instruction. When I run shutdown method on a new queue, my script hangs at the self._is_shutdown.value = True instruction. If I remove the last lock=self._rlock argument, there is no hang. I can not explain why ?Maybe see documentation here

@YvesDup I had assumed_rlock would be an instance ofRLock, but that's not the case; I think it is short for "read lock". I'll have_is_shutdown use its own lock.


Used of qsize method of Queue class is not recommended because it is not implemented on Mac OSX. The self._sem.release(self.qsize()) instruction of shutdown method should be replace.

Without some way to determine the number of putters to unblock,shutdown() can't work. I'll use_maxsize.


Always in this method, the CM with self._notempty: self._notempty.notify_all() has to be move one block left to be executed whatever the immediate value (getters unlock case).

Thethreading.Condition_notempty is used to tell the feeder thread to check whether there are items to send, not terribly relevant here: I'll remove it.

I need to find some other way to wake up getters and have them check empty then shutdown, otherwise they'll be blocked onconnection.recv_bytes() forever.

@YvesDup
Copy link
Contributor

YvesDup commentedApr 12, 2024
edited
Loading

In theinit method, I am not sure about the lock=self._rlock argument in the ctx.Value instruction. When I run shutdown method on a new queue, my script hangs at the self._is_shutdown.value = True instruction. If I remove the last lock=self._rlock argument, there is no hang. I can not explain why ?Maybe see documentation here

@YvesDup I had assumed_rlock would be an instance ofRLock, but that's not the case; I think it is short for "read lock". I'll have_is_shutdown use its own lock.

That seems good to me. Default lock should be aRLock.

Used of qsize method of Queue class is not recommended because it is not implemented on Mac OSX. The self._sem.release(self.qsize()) instruction of shutdown method should be replace.

Without some way to determine the number of putters to unblock,shutdown() can't work. I'll use_maxsize.

Would be possible to callself._sem.release() instruction in the_clear method loop for each item to remove ?

Always in this method, the CM with self._notempty: self._notempty.notify_all() has to be move one block left to be executed whatever the immediate value (getters unlock case).

Thethreading.Condition_notempty is used to tell the feeder thread to check whether there are items to send, not terribly relevant here: I'll remove it.

Good catch !

I have just seen that unittest tests run well, except in Mac OS (due to theqsize() method). Nice

update: I was too much in hurry, some tests failed, not only on Mac. :-(

@EpicWink
Copy link
ContributorAuthor

Would be possible to callself._sem.release() instruction in the_clear method loop for each item to remove ?

@YvesDup No,_clear is for removing items from the queue whenimmediate is true, and is not relevant to unblocking putters.

Thinking about it, using_maxsize doesn't work anyway because there can be more putters than the queue's maximum size.

@YvesDup
Copy link
Contributor

YvesDup commentedApr 16, 2024
edited
Loading

Hi@EpicWink

I've done a lot of tests over the last few days. It's very difficult to work the code to add the 'shutdown' features. I've succeeded to update theget parts of the feature (with amultiprocessing.Condition), but it's not really maintainable (not only my fault).
I wonder if it wouldn't be better to rewrite the class itself using Condition synchronizations (as a same model as thethreading.Queue class) in order to handle full and empty queue states. This will also be very useful to manage easily pending get/put processes.
IMO, this work is mandatory before adding all 'shutdown' features. I think it's really worth it.
Thanks for your comments, ideas, thoughts.

@EpicWink
Copy link
ContributorAuthor

EpicWink commentedApr 17, 2024
edited
Loading

It's very difficult to work the code to add the 'shutdown' features.

@YvesDup I agree, it wasn't designed for interruptable gets.


I wonder if it wouldn't be better to rewrite the class itself using Condition synchronizations (as a same model as the threading.Queue class) in order to handle full and empty queue states. This will also be very useful to manage easily pending get/put processes.

Instead, it might be better to create a newmultiprocessing.queues.TerminableQueue class with the same API as the threadingqueue.Queue (ie no feed-thread methods likeclose). If this becomes popular,multiprocessing.queues.Queue could be replaced with it. I think this would need more discussion (in Discourse).


IMO, this work is mandatory before adding all 'shutdown' features.

Perhaps, I think the currentself._reader.recv_bytes() could be wrapped with aselect() called and combined with a timeout.

@YvesDup
Copy link
Contributor

Instead, it might be better to create a newmultiprocessing.queues.TerminableQueue class with the same API as the threadingqueue.Queue (ie no feed-thread methods likeclose). If this becomes popular,multiprocessing.queues.Queue could be replaced with it. I think this would need more discussion (in Discourse).

It's good idea, becausemultiprocessing.queues.Queue is used in many others parts ofmutiprocessing asPool. It is a critical tool. So let's go to Discourse about your proposition.

Perhaps, I think the current self._reader.recv_bytes() could be wrapped with aselect() called and combined with a timeout.

select() is really low level, IMO this should be encapsulated in a (derivated) class.

For my side, I have been played with aCondition to handleget part, in replacement of theself._rlock Lock.
The partialget code looks like this :

        if block and timeout is None:            with self._rcond:                self._rcond.wait_for(self._notempty_or_shutdown_and_empty, 0.001) # arbitrary timeout                if self._is_shutdown.value and self.empty():                    raise ShutDown                res = self._recv_bytes()

That seems to work well, but I haven't tested all the use cases, specialy with a timeout or a non blockingget.
I wonder if I'll go on this way ?

@EpicWink
Copy link
ContributorAuthor

select() is really low level, IMO this should be encapsulated in a (derivated) class.

@YvesDup seeselectors


For my side, I have been played with a Condition to handle get part, in replacement of the self._rlock Lock.
...
That seems to work well, but I haven't tested all the use cases, specialy with a timeout or a non blocking get.
I wonder if I'll go on this way ?

Why do you say this is "not really maintainable"?

@YvesDup
Copy link
Contributor

For my side, I have been played with a Condition to handle get part, in replacement of the self._rlock Lock.
...
That seems to work well, but I haven't tested all the use cases, specialy with a timeout or a non blocking get.
I wonder if I'll go on this way ?

Why do you say this is "not really maintainable"?

II said that because there will be a lot of modifications to make to this class before we have something well finished and therefore "maintainable" inthe future. I was really embarrassed not to have an OS-compatible semaphore where we could get the current value. And it's a pain to release pending "put" processes.

Whatever we decide, the most critical point seems to me to be managing the size of the queue (or the number of elements), with the wobbly semaphore or with an internal counter.

PS: I'll be away for the next 2 weeks.

@gvanrossum
Copy link
Member

Hey guys, do I smell a bit of scope creep here? The code is complex, yes, but I worry that creating a new implementation will just add more complexity to the multiprocessing package rather than simplifying anything. I am not hopeful that people will switch to a new queue class with sufficient enthusiasm that we will ever be able to replace the old one with confidence.

Would it perhaps be better to focus on the ability to shut down a queuewithout the "immediate" flag? That might be implemented just by transmitting a sentinel (which seems already supported). Or possibly "immediate=True" could be implemented in a "best effort" way, just flushing what's in the local buffer (you can't control what other processes have already buffered anyways).

In any case it looks like it's best if you all take your time with this project -- don't rush it just because the 3.13 feature freeze is just around the corner. It may also indeed be a good idea to start a discussion on Discourse to see if there are others interested in helping out with the architecture of this feature for multiprocessing.

@EpicWink
Copy link
ContributorAuthor

Hey guys, do I smell a bit of scope creep here?

@gvanrossum yes, I wanted to discuss in Discourse:https://discuss.python.org/t/queue-termination-further-design/26630/13


The code is complex, yes, but I worry that creating a new implementation will just add more complexity to the multiprocessing package rather than simplifying anything. I am not hopeful that people will switch to a new queue class with sufficient enthusiasm that we will ever be able to replace the old one with confidence.

I agree, I would prefer to make small changes to the behaviour of the existing queue class to support shutdown.

At the time, my thinking was to create a new class which could be drop-in replaced, with some methods with no-ops (the feeder-thread managing methods), which will in some future release replace the existing queue class. I don't think this is worth it just for shutdown (especially as process termination is already easy).


Would it perhaps be better to focus on the ability to shut down a queue without the "immediate" flag?

No, the problem here is unblocking getters on shutdown, which needs to happen regardless ofimmediate's value.immediate=True is actually already implemented in this pull-request in a similar way to the other queues.


In any case it looks like it's best if you all take your time with this project -- don't rush it just because the 3.13 feature freeze is just around the corner.

I agree, especially because of what I said before: it's already easy to terminate worker/feeder processes (outside of queues).


It may also indeed be a good idea to start a discussion on Discourse to see if there are others interested in helping out with the architecture of this feature for multiprocessing.

Also linked above


For my side, I have been played with aCondition to handleget part, in replacement of theself._rlock Lock.

@YvesDup why are you setting a timeout in the following? I would think you simply wait for_rcond to be notified.

self._rcond.wait_for(self._notempty_or_shutdown_and_empty,0.001)

Whatever we decide, the most critical point seems to me to be managing the size of the queue (or the number of elements), with the wobbly semaphore or with an internal counter.

A pretty standard pattern:

self._n_items=ctx.Value("Q",0)...withself._n_items.get_lock():self._n_items+=1# or -= 1 in get()
YvesDup reacted with thumbs up emoji

@gvanrossum
Copy link
Member

Thanks for the update. Somehow I missed that Discourse issue (maybe because I've muted the Ideas tag as a whole). Do you want me to pipe up there? It's been 5d since your update and you haven't gotten any feedback yet.

Also happy to sit back and relax -- you two seem to be able to duke out any problem thrown at you.

@EpicWink
Copy link
ContributorAuthor

Do you want me to pipe up there?

@gvanrossum no thanks, we'll just figure something out for 3.14.

@YvesDup
Copy link
Contributor

Hi@EpicWink , how are you ?
I am back and available to move forward and finish this PR if you wish.

EpicWink reacted with thumbs up emojiYvesDup reacted with laugh emoji

@YvesDup
Copy link
Contributor

YvesDup commentedMay 27, 2024
edited
Loading

For these last weeks, I've been doing a lot of thinking and testing to find the simplest solution for managing pending processes when the queue shutdowns.

For pending “put” processes, they are blocked by the_sem BoundedSemaphore. Currently, there's no way of knowing how many processes are pending. If we had this number, we could call as manyrelease method as necessary. An alternative would be to set up a Condition to manage these processes and unblock them with anotify_all. But this would mean rewriting theput method. Not so simple.

As for pending “get” processes, they are blocked by the_recv_bytes method. Here again, their number is not known. To prevent them from being blocked, I first thought of rewriting the code using a Condition and itswait method. Thewait would be exited only when an item is available. Useful, but it requires a lot of modifications to an already complex code.
A another way to unblock a_recv_bytes, is to put an item into the pipe. We could therefore send dummy data (as the sentinel data used in stopping_feed thread part) to unblock them. Whe unblocked input, we have just to raise aShutDown. That seems simple and minimalist with a small impact on the currentget method. You just need to know how many processes are pending.

To implement these 2 cases, all you need are two waiting process counters, in the form of a shared array named_n_pendings. Now when theshutdown method is called, I can release all pending processes and empty the queue if the shutdown is immediate.

The proposed code here passes all current unittests on Mac/Linux only for standard python (no free-threading).
I've added dedicated unit tests to the_test_multiprocessing.py test file.

This working version is available here onthat last commit in my working branch. I hope I'm on a good path.
I look forward to your feedback, and am open to any suggestions.

Note: I`ve just updated my last commit

@YvesDup
Copy link
Contributor

FYIall tests on my working branch, including free-threading, are succeeded.

@YvesDup
Copy link
Contributor

Perhaps, I think the currentself._reader.recv_bytes() could be wrapped with aselect() called and combined with a timeout.

@EpicWink Have you done some tests about this ?

@EpicWink
Copy link
ContributorAuthor

Perhaps, I think the current self._reader.recv_bytes() could be wrapped with aselect() called and combined with a timeout.

@EpicWink Have you done some tests about this ?

I looked into it, I don't think it could work

@YvesDup
Copy link
Contributor

@EpicWink Did you watch myproposition that I've proposed last month ?

Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@YvesDupYvesDupYvesDup requested changes

@gpsheadgpsheadAwaiting requested review from gpsheadgpshead is a code owner

Assignees
No one assigned
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

4 participants
@EpicWink@gvanrossum@YvesDup@bedevere-bot

[8]ページ先頭

©2009-2025 Movatter.jp