Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitaffffc7

Browse files
miss-islingtongraingertZeroIntensity
authored
[3.12]gh-124309: fix staggered race on eager tasks (GH-124847) (#125340)
gh-124309: fix staggered race on eager tasks (GH-124847)This patch is entirely by Thomas and Peter(cherry picked from commit979c0df)Co-authored-by: Thomas Grainger <tagrain@gmail.com>Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
1 parent2264c09 commitaffffc7

File tree

4 files changed

+88
-3
lines changed

4 files changed

+88
-3
lines changed

‎Lib/asyncio/staggered.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ async def staggered_race(coro_fns, delay, *, loop=None):
6969
exceptions= []
7070
running_tasks= []
7171

72-
asyncdefrun_one_coro(previous_failed)->None:
72+
asyncdefrun_one_coro(ok_to_start,previous_failed)->None:
73+
# in eager tasks this waits for the calling task to append this task
74+
# to running_tasks, in regular tasks this wait is a no-op that does
75+
# not yield a future. See gh-124309.
76+
awaitok_to_start.wait()
7377
# Wait for the previous task to finish, or for delay seconds
7478
ifprevious_failedisnotNone:
7579
withcontextlib.suppress(exceptions_mod.TimeoutError):
@@ -85,8 +89,12 @@ async def run_one_coro(previous_failed) -> None:
8589
return
8690
# Start task that will run the next coroutine
8791
this_failed=locks.Event()
88-
next_task=loop.create_task(run_one_coro(this_failed))
92+
next_ok_to_start=locks.Event()
93+
next_task=loop.create_task(run_one_coro(next_ok_to_start,this_failed))
8994
running_tasks.append(next_task)
95+
# next_task has been appended to running_tasks so next_task is ok to
96+
# start.
97+
next_ok_to_start.set()
9098
assertlen(running_tasks)==this_index+2
9199
# Prepare place to put this coroutine's exceptions if not won
92100
exceptions.append(None)
@@ -116,8 +124,11 @@ async def run_one_coro(previous_failed) -> None:
116124
ifi!=this_index:
117125
t.cancel()
118126

119-
first_task=loop.create_task(run_one_coro(None))
127+
ok_to_start=locks.Event()
128+
first_task=loop.create_task(run_one_coro(ok_to_start,None))
120129
running_tasks.append(first_task)
130+
# first_task has been appended to running_tasks so first_task is ok to start.
131+
ok_to_start.set()
121132
try:
122133
# Wait for a growing list of tasks to all finish: poor man's version of
123134
# curio's TaskGroup or trio's nursery

‎Lib/test/test_asyncio/test_eager_task_factory.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,52 @@ async def run():
218218

219219
self.run_coro(run())
220220

221+
deftest_staggered_race_with_eager_tasks(self):
222+
# See https://github.com/python/cpython/issues/124309
223+
224+
asyncdeffail():
225+
awaitasyncio.sleep(0)
226+
raiseValueError("no good")
227+
228+
asyncdefrun():
229+
winner,index,excs=awaitasyncio.staggered.staggered_race(
230+
[
231+
lambda:asyncio.sleep(2,result="sleep2"),
232+
lambda:asyncio.sleep(1,result="sleep1"),
233+
lambda:fail()
234+
],
235+
delay=0.25
236+
)
237+
self.assertEqual(winner,'sleep1')
238+
self.assertEqual(index,1)
239+
self.assertIsNone(excs[index])
240+
self.assertIsInstance(excs[0],asyncio.CancelledError)
241+
self.assertIsInstance(excs[2],ValueError)
242+
243+
self.run_coro(run())
244+
245+
deftest_staggered_race_with_eager_tasks_no_delay(self):
246+
# See https://github.com/python/cpython/issues/124309
247+
asyncdeffail():
248+
raiseValueError("no good")
249+
250+
asyncdefrun():
251+
winner,index,excs=awaitasyncio.staggered.staggered_race(
252+
[
253+
lambda:fail(),
254+
lambda:asyncio.sleep(1,result="sleep1"),
255+
lambda:asyncio.sleep(0,result="sleep0"),
256+
],
257+
delay=None
258+
)
259+
self.assertEqual(winner,'sleep1')
260+
self.assertEqual(index,1)
261+
self.assertIsNone(excs[index])
262+
self.assertIsInstance(excs[0],ValueError)
263+
self.assertEqual(len(excs),2)
264+
265+
self.run_coro(run())
266+
221267

222268
classPyEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests,test_utils.TestCase):
223269
Task=tasks._PyTask

‎Lib/test/test_asyncio/test_staggered.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,30 @@ async def coro(index):
9595
self.assertEqual(len(excs),2)
9696
self.assertIsInstance(excs[0],ValueError)
9797
self.assertIsInstance(excs[1],ValueError)
98+
99+
100+
asyncdeftest_multiple_winners(self):
101+
event=asyncio.Event()
102+
103+
asyncdefcoro(index):
104+
awaitevent.wait()
105+
returnindex
106+
107+
asyncdefdo_set():
108+
event.set()
109+
awaitasyncio.Event().wait()
110+
111+
winner,index,excs=awaitstaggered_race(
112+
[
113+
lambda:coro(0),
114+
lambda:coro(1),
115+
do_set,
116+
],
117+
delay=0.1,
118+
)
119+
self.assertIs(winner,0)
120+
self.assertIs(index,0)
121+
self.assertEqual(len(excs),3)
122+
self.assertIsNone(excs[0],None)
123+
self.assertIsInstance(excs[1],asyncio.CancelledError)
124+
self.assertIsInstance(excs[2],asyncio.CancelledError)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed:exc:`AssertionError` when using:func:`!asyncio.staggered.staggered_race` with:attr:`asyncio.eager_task_factory`.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp