Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork33.3k
gh-124309: Modernize thestaggered_race implementation to support eager task factories#124390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
gh-124309: Modernize thestaggered_race implementation to support eager task factories#124390
Uh oh!
There was an error while loading.Please reload this page.
Conversation
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
You're using private methods of TaskGroup and starting tasks on the loop rather than the TaskGroup
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
I think I'm just going to refactor this to not use |
I think it's worth persevering with TaskGroup, you just need to write it without using add_done_callback or private attributes |
I'll try it, but I'm worried that it isn't possible when considering an eager task factory. The previous implementation used a variation of a task group (a list containing tasks, since it predated While we're here, |
A demo of what I mean wrt TaskGroup: """Support for running coroutines in parallel with staggered start times."""__all__='staggered_race',from .importlocksfrom .importtasksfrom .importtaskgroupsasyncdefstaggered_race(coro_fns,delay,*,loop=None):"""Run coroutines with staggered start times and take the first to finish. This method takes an iterable of coroutine functions. The first one is started immediately. From then on, whenever the immediately preceding one fails (raises an exception), or when *delay* seconds has passed, the next coroutine is started. This continues until one of the coroutines complete successfully, in which case all others are cancelled, or until all coroutines fail. The coroutines provided should be well-behaved in the following way: * They should only ``return`` if completed successfully. * They should always raise an exception if they did not complete successfully. In particular, if they handle cancellation, they should probably reraise, like this:: try: # do work except asyncio.CancelledError: # undo partially completed work raise Args: coro_fns: an iterable of coroutine functions, i.e. callables that return a coroutine object when called. Use ``functools.partial`` or lambdas to pass arguments. delay: amount of time, in seconds, between starting coroutines. If ``None``, the coroutines will run sequentially. loop: the event loop to use. Returns: tuple *(winner_result, winner_index, exceptions)* where - *winner_result*: the result of the winning coroutine, or ``None`` if no coroutines won. - *winner_index*: the index of the winning coroutine in ``coro_fns``, or ``None`` if no coroutines won. If the winning coroutine may return None on success, *winner_index* can be used to definitively determine whether any coroutine won. - *exceptions*: list of exceptions returned by the coroutines. ``len(exceptions)`` is equal to the number of coroutines actually started, and the order is the same as in ``coro_fns``. The winning coroutine's entry is ``None``. """# TODO: when we have aiter() and anext(), allow async iterables in coro_fns.winner_result=Nonewinner_index=Noneexceptions= []class_Done(Exception):passasyncdefrun_one_coro(this_index,coro_fn,this_failed):try:result=awaitcoro_fn()except (SystemExit,KeyboardInterrupt):raiseexceptBaseExceptionase:exceptions[this_index]=ethis_failed.set()# Kickstart the next coroutineelse:# Store winner's resultsnonlocalwinner_index,winner_result# There could be more than one winnerwinner_index=this_indexwinner_result=resultraise_Donetry:asyncwithtaskgroups.TaskGroup()astg:forthis_index,coro_fninenumerate(coro_fns):this_failed=locks.Event()exceptions.append(None)tg.create_task(run_one_coro(this_index,coro_fn,this_failed))try:awaittasks.wait_for(this_failed.wait(),delay)exceptTimeoutError:pass except*_Done:passreturnwinner_result,winner_index,exceptions |
Co-authored-by: Thomas Grainger <tagrain@gmail.com>
Uh oh!
There was an error while loading.Please reload this page.
Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Thanks@ZeroIntensity.
Uh oh!
There was an error while loading.Please reload this page.
Co-authored-by: Carol Willing <carolcode@willingconsulting.com>
de929f3 intopython:mainUh oh!
There was an error while loading.Please reload this page.
Thanks@ZeroIntensity for the PR, and@kumaraditya303 for merging it 🌮🎉.. I'm working now to backport this PR to: 3.12, 3.13. |
…port eager task factories (pythonGH-124390)(cherry picked from commitde929f3)Co-authored-by: Peter Bierma <zintensitydev@gmail.com>Co-authored-by: Thomas Grainger <tagrain@gmail.com>Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com>Co-authored-by: Carol Willing <carolcode@willingconsulting.com>Co-authored-by: Kumar Aditya <kumaraditya@python.org>
Sorry,@ZeroIntensity and@kumaraditya303, I could not cleanly backport this to |
GH-124573 is a backport of this pull request to the3.13 branch. |
GH-124574 is a backport of this pull request to the3.12 branch. |
…pport e… (#124574)gh-124309: Modernize the `staggered_race` implementation to support eager task factories (#124390)Co-authored-by: Thomas Grainger <tagrain@gmail.com>Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com>Co-authored-by: Carol Willing <carolcode@willingconsulting.com>Co-authored-by: Kumar Aditya <kumaraditya@python.org>(cherry picked from commitde929f3)Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
…n to support eager task factories (python#124390)"This reverts commitde929f3.
…wnstream (pythonGH-124810)* Revert "pythonGH-124639: add back loop param to staggered_race (pythonGH-124700)"This reverts commite0a41a5.* Revert "pythongh-124309: Modernize the `staggered_race` implementation to support eager task factories (pythonGH-124390)"This reverts commitde929f3.(cherry picked from commit133e929)Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
…ownstream (GH-124810) (#124817)gh-124309: Revert eager task factory fix to prevent breaking downstream (GH-124810)* Revert "GH-124639: add back loop param to staggered_race (GH-124700)"This reverts commite0a41a5.* Revert "gh-124309: Modernize the `staggered_race` implementation to support eager task factories (GH-124390)"This reverts commitde929f3.(cherry picked from commit133e929)Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
Uh oh!
There was an error while loading.Please reload this page.