Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-96471: Add asyncio queue shutdown#104228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
gvanrossum merged 28 commits intopython:mainfromEpicWink:asyncio-queue-shutdown
Apr 6, 2024

Conversation

EpicWink
Copy link
Contributor

@EpicWinkEpicWink commentedMay 6, 2023
edited
Loading

asyncio-only changes from#102499 (which supercedes#96474), updated to match the API introduced by#104750


📚 Documentation preview 📚:https://cpython-previews--104228.org.readthedocs.build/

heckad and aalekhpatel07 reacted with thumbs up emoji
EpicWinkand others added2 commitsMay 6, 2023 14:50
* Queue state enum members are capitalised* Termination state in str/repr* Include raised exception in docstrings* Factor out queue-state checks and updates to methods* Logic fixes in get_nowait and shutdown* Handle queue shutdown in task_done and join* Updated tests* Document feature added in 3.13
@EpicWink
Copy link
ContributorAuthor

@willingc ready for review

@gvanrossum
Copy link
Member

asyncio-only changes from#102499 (on top of#96474)

I assume the "on top of ..." part is obsolete, since that PR was closed without merging. I'll add@willingc as a reviewer per your request.

EpicWink reacted with thumbs up emoji

@gvanrossumgvanrossum requested a review fromwillingcMay 9, 2023 17:56
@gvanrossum
Copy link
Member

@EpicWink Could you merge main?

@EpicWink
Copy link
ContributorAuthor

Could you merge main?

Done, but like#104750, we may want to modify the implementation ofimmediate=True to simply consume the queue rather than having a separate state.

@gvanrossum
Copy link
Member

Eh,@EpicWink, did you see the hubbub about the hanging test in yourprevious PR? The test was disabled because it kept hanging in CI on various platforms. Could you look into what's wrong with it? (At least the feature wasn't reverted.)

YvesDup reacted with eyes emoji

@EpicWink
Copy link
ContributorAuthor

Eh,@EpicWink, did you see the hubbub about the hanging test in yourprevious PR? The test was disabled because it kept hanging in CI on various platforms. Could you look into what's wrong with it?

I did see the follow-on issue and pull-requests. Yves and I are working on fixing it (you can follow the changes here:main...EpicWink:cpython:fix-thread-queue-shutdown-test)

@gvanrossum
Copy link
Member

Awesome! (An acknowledgement that you were working on it would have lessened my stress. :-)

@gvanrossum
Copy link
Member

Looking at this with an eye towards review, I realized that the threaded-queue implementation doesn't appear to match its documentation: It appears thatq.join() doesnot raiseShutDown when the queue is shutdown immediately. The tests don't seem to be testing for this either. (Come to think of it, the test never seems to take theexcept self.queue.ShutDown path -- if I put a breakpoint there it never gets hit duringany of the tests. The test logic is pretty convoluted, which is my only excuse for not having caught this in the review ofgh-104750.)

@EpicWink
Copy link
ContributorAuthor

EpicWink commentedFeb 22, 2024 via email

Now that the immediate-consume implementation of the threading queue shutdown has been accepted, I'm going to do the same here and for multiprocessing. I'll personally rewrite the tests to be more readable and obvious.Laurie
________________________________From: Guido van Rossum ***@***.***>Sent: Thursday, February 22, 2024 11:00:09 AMTo: python/cpython ***@***.***>Cc: Laurie O ***@***.***>; Mention ***@***.***>Subject: Re: [python/cpython]gh-96471: Add asyncio queue shutdown (PR#104228)Looking at this with an eye towards review, I realized that the threaded-queue implementation doesn't appear to match its documentation: It appears that q.join() does not raise ShutDown when the queue is shutdown immediately. The tests don't seem to be testing for this either. (Come to think of it, the test never seems to take the except self.queue.ShutDown path -- if I put a breakpoint there it never gets hit during any of the tests. The test logic is pretty convoluted, which is my only excuse for not having caught this in the review ofgh-104750<#104750>.)—Reply to this email directly, view it on GitHub<#104228 (comment)>, or unsubscribe<https://github.com/notifications/unsubscribe-auth/AF72GRNKMPJRCRIJHOB4NUDYU2KBTAVCNFSM6AAAAAAXX3PWIGVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSNJYGQ3TENZYGQ>.You are receiving this because you were mentioned.Message ID: ***@***.***>

@gvanrossum
Copy link
Member

Now that the immediate-consume implementation of the threading queue shutdown has been accepted, I'm going to do the same here and for multiprocessing. I'll personally rewrite the tests to be more readable and obvious.

And what aboutqueue.Queue.join()? Should it raise aftershutdown(immediate=True)? Either way some changes are needed -- either in the implementation or in the docs. And the tests there too (what you wrote aboveappears to apply to the asyncio and multiprocessing versions only.)

Copy link
Member

@gvanrossumgvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I'll wait reviewing until you've rewritten shutdown, but here's one doc markup nit.

Also, please mark PRs that aren't ready for review as Draft.

@EpicWinkEpicWink marked this pull request as draftFebruary 22, 2024 01:14
@EpicWink
Copy link
ContributorAuthor

And what aboutqueue.Queue.join()? Should it raise aftershutdown(immediate=True)? Either way some changes are needed -- either in the implementation or in the docs. And the tests there too (what you wrote aboveappears to apply to the asyncio and multiprocessing versions only.)

The behaviour should (and I intend to implement it to) be the same between the threading, multiprocessing and asyncio queues

@YvesDup
Copy link
Contributor

YvesDup commentedFeb 22, 2024
edited
Loading

By the way, shouldtask_done() raise an exception aftershutdown or not ?
update; seehttps://docs.python.org/3.13/library/queue.html#queue.Queue.task_done

@EpicWink
Copy link
ContributorAuthor

EpicWink commentedFeb 25, 2024
edited
Loading

By the way, shouldtask_done() raise an exception aftershutdown or not ?

@YvesDup No (except the usualValueError); see#115838

@EpicWink
Copy link
ContributorAuthor

I want to trust that you two have done the right thing there

@gvanrossum please don't trust, have you seenxz-utils's backdoor?

but not push until the big test run completes? (Buildbot tests can take many hours, sorry.)

Oops, sorry. I forgot to hold off my most recent push

ALso, why is the DO-NOT-MERGE label set? Is it because of#104228?

No, I think it was because I hadn't updated the implementation to match the new agreed-upon behaviour (this PR was in draft at that time anyway, so I don't think the label was needed).

wondering why you always wake up all getters -- and came to the conclusion that it is needed

I should probably add a comment

@EpicWink
Copy link
ContributorAuthor

there is the same bug on threading queue shutdown. I will open an issue.

@YvesDup I've created issue#117531, and fix in PR#117532

# Setup queue with 2 items (1 retrieved), and a join() task
q = self.q_class()
loop = asyncio.get_running_loop()
q.put_nowait("data")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I suggest:

q.put_nowait("data1")q.put_nowait("data2")

q.put_nowait("data")
q.put_nowait("data")
join_task = loop.create_task(q.join())
self.assertEqual(await q.get(), "data")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

and testing on"data1".

Copy link
ContributorAuthor

@EpicWinkEpicWinkApr 4, 2024
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Except whenq is an instance ofLifoQueue, when the value should be"data2". Is it necessary to test queue ordering in the shutdown tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I forgot theLifoQueue class. Is this worth commenting on?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I have implemented your suggestion, but I have not committed. I am considering it. I don't think it's necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

The test case (thus includingLifoQueue) is implicit. A comment would be enough.

@YvesDup
Copy link
Contributor

there is the same bug on threading queue shutdown. I will open an issue.

@YvesDup I've created issue#117531, and fix in PR#117532

I have seen and commented

Copy link
Member

@gvanrossumgvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Yup! This version looks great. Let me know if you agree -- I've removed the DO-NOT-MERGE label, but I'll wait until you and@YvesDup are happy too.

Mohsin1910 reacted with thumbs up emoji
@EpicWink
Copy link
ContributorAuthor

EpicWink commentedApr 6, 2024
edited
Loading

I think it's good to go

wondering why you always wake up all getters -- and came to the conclusion that it is needed

I should probably add a comment

I could do this, but I don't think it's worth revoking the PR's approval

@gvanrossumgvanrossum merged commitdf4d84c intopython:mainApr 6, 2024
All blocked callers of:meth:`~Queue.put` will be unblocked. If
*immediate* is true, also unblock callers of:meth:`~Queue.get`
and:meth:`~Queue.join`.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Sorry but I have a doubt, shouldn't this documentation block be rather:

    All blocked callers of :meth:`~Queue.put` and :meth:`~Queue.get`     will be unblocked. If *immediate* is true, also unblock callers of     :meth:`~Queue.join`.

In event of change, the docstring of theshutdown method must be updated.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

join callers aren't necessarily even unblocked anyway, if consumers are processing any items. I should probably say that a task is marked as done for each item in the queue if immediate shutdown.

Also, I think the threading queue docs are the same.

Copy link
Contributor

@YvesDupYvesDupApr 7, 2024
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

a task is marked as done for each item in the queue if immediate shutdown.

It's very precise, better.

Also, I think the threading queue docs are the same.

Yes, I commented here so as not to forget (see#117532 (comment)).

English is your native language, I think it'is best if you update documentations and docstrings.

Update: but I can create the follow-up PR.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I've made a PR:#117621

YvesDup reacted with thumbs up emoji
All blocked callers of put() will be unblocked, and also get()
and join() if 'immediate'.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Docstring to modify depending of agree/disagree on my first remark about `blocked callers'.
Changes could be:

All blocked callers of put() and get() will be unblocked, andalso join() if 'immediate

* Add :meth:`asyncio.Queue.shutdown` (along with
:exc:`asyncio.QueueShutDown`) for queue termination.
(Contributed by Laurie Opperman in :gh:`104228`.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

We've worked well together, I'd be nice if you'd mention me.
:-)

@gvanrossum
Copy link
Member

Sorry for merging prematurely.@EpicWink Can you prepare a follow-up PR? I will merge it as soon as@YvesDup approves it. (Or, alternatively,@YvesDup can make a PR and I will merge when@EpicWink approves.)

Mohsin1910 reacted with thumbs up emojiEpicWink reacted with eyes emoji

@EpicWinkEpicWink deleted the asyncio-queue-shutdown branchApril 8, 2024 00:49
@EpicWink
Copy link
ContributorAuthor

@gvanrossum see#117621

YvesDup reacted with thumbs up emoji

diegorusso pushed a commit to diegorusso/cpython that referenced this pull requestApr 17, 2024
Co-authored-by: Duprat <yduprat@gmail.com>
@john-parton
Copy link

john-parton commentedAug 27, 2024
edited
Loading

I'd like to add that theshutdown() method and task groups makes certain patterns really easy to express. I was struggling with how to properly make a generic async map function with a fixed number of workers (mostly as an exercise), but when I saw the new shutdown method, it came together really easily:

asyncdefasync_map[T,R](func:Callable[[T],Awaitable[R]],iterable:AsyncIterable[T],*,limit:int,maxsize:int=-1,)->AsyncIterator[R]:ifmaxsize<0:maxsize=limitarguments_queue=Queue[T](maxsize=maxsize)results_queue=Queue[R](maxsize=maxsize)asyncdefdrain():asyncforargumentiniterable:awaitarguments_queue.put(argument)arguments_queue.shutdown(immediate=False)asyncdefworker():whileTrue:try:argument=awaitarguments_queue.get()exceptQueueShutDown:breakawaitresults_queue.put(awaitfunc(argument))asyncdefbackground():asyncwithasyncio.TaskGroup()asbackground_task_group:background_task_group.create_task(drain())for_inrange(limit):background_task_group.create_task(worker())results_queue.shutdown(immediate=False)asyncwithasyncio.TaskGroup()astask_group:task_group.create_task(background())whileTrue:try:yieldawaitresults_queue.get()exceptQueueShutDown:break

I believe this has a common issue that a lot of async generators do, in that if you don't consume the entire generator, it will still continue processing and end up with a lot of futures never awaited.

I know there's somebackports. Is there a backports package for asyncio stuff already?

aalekhpatel07 reacted with heart emoji

Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@YvesDupYvesDupYvesDup left review comments

@gvanrossumgvanrossumgvanrossum approved these changes

@willingcwillingcAwaiting requested review from willingcwillingc is a code owner

Assignees
No one assigned
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

6 participants
@EpicWink@gvanrossum@YvesDup@bedevere-bot@john-parton@willingc

[8]ページ先頭

©2009-2025 Movatter.jp