Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork32.4k
gh-96471: Add asyncio queue shutdown#104228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
* Include docs
* Queue state enum members are capitalised* Termination state in str/repr* Include raised exception in docstrings* Factor out queue-state checks and updates to methods* Logic fixes in get_nowait and shutdown* Handle queue shutdown in task_done and join* Updated tests* Document feature added in 3.13
@willingc ready for review |
@EpicWink Could you merge main? |
Done, but like#104750, we may want to modify the implementation of |
Eh,@EpicWink, did you see the hubbub about the hanging test in yourprevious PR? The test was disabled because it kept hanging in CI on various platforms. Could you look into what's wrong with it? (At least the feature wasn't reverted.) |
I did see the follow-on issue and pull-requests. Yves and I are working on fixing it (you can follow the changes here:main...EpicWink:cpython:fix-thread-queue-shutdown-test) |
Awesome! (An acknowledgement that you were working on it would have lessened my stress. :-) |
Looking at this with an eye towards review, I realized that the threaded-queue implementation doesn't appear to match its documentation: It appears that |
Now that the immediate-consume implementation of the threading queue shutdown has been accepted, I'm going to do the same here and for multiprocessing. I'll personally rewrite the tests to be more readable and obvious.Laurie …________________________________From: Guido van Rossum ***@***.***>Sent: Thursday, February 22, 2024 11:00:09 AMTo: python/cpython ***@***.***>Cc: Laurie O ***@***.***>; Mention ***@***.***>Subject: Re: [python/cpython]gh-96471: Add asyncio queue shutdown (PR#104228)Looking at this with an eye towards review, I realized that the threaded-queue implementation doesn't appear to match its documentation: It appears that q.join() does not raise ShutDown when the queue is shutdown immediately. The tests don't seem to be testing for this either. (Come to think of it, the test never seems to take the except self.queue.ShutDown path -- if I put a breakpoint there it never gets hit during any of the tests. The test logic is pretty convoluted, which is my only excuse for not having caught this in the review ofgh-104750<#104750>.)—Reply to this email directly, view it on GitHub<#104228 (comment)>, or unsubscribe<https://github.com/notifications/unsubscribe-auth/AF72GRNKMPJRCRIJHOB4NUDYU2KBTAVCNFSM6AAAAAAXX3PWIGVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSNJYGQ3TENZYGQ>.You are receiving this because you were mentioned.Message ID: ***@***.***> |
And what about |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I'll wait reviewing until you've rewritten shutdown, but here's one doc markup nit.
Also, please mark PRs that aren't ready for review as Draft.
Uh oh!
There was an error while loading.Please reload this page.
The behaviour should (and I intend to implement it to) be the same between the threading, multiprocessing and asyncio queues |
YvesDup commentedFeb 22, 2024 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
By the way, should |
EpicWink commentedFeb 25, 2024 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
@gvanrossum please don't trust, have you seen
Oops, sorry. I forgot to hold off my most recent push
No, I think it was because I hadn't updated the implementation to match the new agreed-upon behaviour (this PR was in draft at that time anyway, so I don't think the label was needed).
I should probably add a comment |
# Setup queue with 2 items (1 retrieved), and a join() task | ||
q = self.q_class() | ||
loop = asyncio.get_running_loop() | ||
q.put_nowait("data") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I suggest:
q.put_nowait("data1")q.put_nowait("data2")
q.put_nowait("data") | ||
q.put_nowait("data") | ||
join_task = loop.create_task(q.join()) | ||
self.assertEqual(await q.get(), "data") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
and testing on"data1"
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Except whenq
is an instance ofLifoQueue
, when the value should be"data2"
. Is it necessary to test queue ordering in the shutdown tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I forgot theLifoQueue
class. Is this worth commenting on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I have implemented your suggestion, but I have not committed. I am considering it. I don't think it's necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
The test case (thus includingLifoQueue
) is implicit. A comment would be enough.
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yup! This version looks great. Let me know if you agree -- I've removed the DO-NOT-MERGE label, but I'll wait until you and@YvesDup are happy too.
EpicWink commentedApr 6, 2024 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
I think it's good to go
I could do this, but I don't think it's worth revoking the PR's approval |
All blocked callers of:meth:`~Queue.put` will be unblocked. If | ||
*immediate* is true, also unblock callers of:meth:`~Queue.get` | ||
and:meth:`~Queue.join`. | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Sorry but I have a doubt, shouldn't this documentation block be rather:
All blocked callers of :meth:`~Queue.put` and :meth:`~Queue.get` will be unblocked. If *immediate* is true, also unblock callers of :meth:`~Queue.join`.
In event of change, the docstring of theshutdown
method must be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
join
callers aren't necessarily even unblocked anyway, if consumers are processing any items. I should probably say that a task is marked as done for each item in the queue if immediate shutdown.
Also, I think the threading queue docs are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
a task is marked as done for each item in the queue if immediate shutdown.
It's very precise, better.
Also, I think the threading queue docs are the same.
Yes, I commented here so as not to forget (see#117532 (comment)).
English is your native language, I think it'is best if you update documentations and docstrings.
Update: but I can create the follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I've made a PR:#117621
All blocked callers of put() will be unblocked, and also get() | ||
and join() if 'immediate'. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Docstring to modify depending of agree/disagree on my first remark about `blocked callers'.
Changes could be:
All blocked callers of put() and get() will be unblocked, andalso join() if 'immediate
* Add :meth:`asyncio.Queue.shutdown` (along with | ||
:exc:`asyncio.QueueShutDown`) for queue termination. | ||
(Contributed by Laurie Opperman in :gh:`104228`.) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
We've worked well together, I'd be nice if you'd mention me.
:-)
Co-authored-by: Duprat <yduprat@gmail.com>
john-parton commentedAug 27, 2024 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
I'd like to add that the asyncdefasync_map[T,R](func:Callable[[T],Awaitable[R]],iterable:AsyncIterable[T],*,limit:int,maxsize:int=-1,)->AsyncIterator[R]:ifmaxsize<0:maxsize=limitarguments_queue=Queue[T](maxsize=maxsize)results_queue=Queue[R](maxsize=maxsize)asyncdefdrain():asyncforargumentiniterable:awaitarguments_queue.put(argument)arguments_queue.shutdown(immediate=False)asyncdefworker():whileTrue:try:argument=awaitarguments_queue.get()exceptQueueShutDown:breakawaitresults_queue.put(awaitfunc(argument))asyncdefbackground():asyncwithasyncio.TaskGroup()asbackground_task_group:background_task_group.create_task(drain())for_inrange(limit):background_task_group.create_task(worker())results_queue.shutdown(immediate=False)asyncwithasyncio.TaskGroup()astask_group:task_group.create_task(background())whileTrue:try:yieldawaitresults_queue.get()exceptQueueShutDown:break I believe this has a common issue that a lot of async generators do, in that if you don't consume the entire generator, it will still continue processing and end up with a lot of futures never awaited. I know there's some |
Uh oh!
There was an error while loading.Please reload this page.
asyncio-only changes from#102499 (which supercedes#96474), updated to match the API introduced by#104750
📚 Documentation preview 📚:https://cpython-previews--104228.org.readthedocs.build/