Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita474e04

Browse files
itamarojbower-fbwillingc
authored
gh-97696: asyncio eager tasks factory (#102853)
Co-authored-by: Jacob Bower <jbower@meta.com>Co-authored-by: Carol Willing <carolcode@willingconsulting.com>
1 parent59bc36a commita474e04

File tree

12 files changed

+945
-47
lines changed

12 files changed

+945
-47
lines changed

‎Doc/library/asyncio-task.rst‎

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,42 @@ Running Tasks Concurrently
527527
and there is no running event loop.
528528

529529

530+
Eager Task Factory
531+
==================
532+
533+
..function::eager_task_factory(loop, coro, *, name=None, context=None)
534+
535+
A task factory for eager task execution.
536+
537+
When using this factory (via:meth:`loop.set_task_factory(asyncio.eager_task_factory) <loop.set_task_factory>`),
538+
coroutines begin execution synchronously during:class:`Task` construction.
539+
Tasks are only scheduled on the event loop if they block.
540+
This can be a performance improvement as the overhead of loop scheduling
541+
is avoided for coroutines that complete synchronously.
542+
543+
A common example where this is beneficial is coroutines which employ
544+
caching or memoization to avoid actual I/O when possible.
545+
546+
..note::
547+
548+
Immediate execution of the coroutine is a semantic change.
549+
If the coroutine returns or raises, the task is never scheduled
550+
to the event loop. If the coroutine execution blocks, the task is
551+
scheduled to the event loop. This change may introduce behavior
552+
changes to existing applications. For example,
553+
the application's task execution order is likely to change.
554+
555+
..versionadded::3.12
556+
557+
..function::create_eager_task_factory(custom_task_constructor)
558+
559+
Create an eager task factory, similar to:func:`eager_task_factory`,
560+
using the provided *custom_task_constructor* when creating a new task instead
561+
of the default:class:`Task`.
562+
563+
..versionadded::3.12
564+
565+
530566
Shielding From Cancellation
531567
===========================
532568

‎Doc/whatsnew/3.12.rst‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,11 @@ Optimizations
613613
* Speed up:class:`asyncio.Task` creation by deferring expensive string formatting.
614614
(Contributed by Itamar O in:gh:`103793`.)
615615

616+
* Added:func:`asyncio.eager_task_factory` and:func:`asyncio.create_eager_task_factory`
617+
functions to allow opting an event loop in to eager task execution,
618+
speeding up some use-cases by up to 50%.
619+
(Contributed by Jacob Bower & Itamar O in:gh:`102853`)
620+
616621

617622
CPython bytecode changes
618623
========================

‎Include/internal/pycore_global_objects_fini_generated.h‎

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎Include/internal/pycore_global_strings.h‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ struct _Py_global_strings {
370370
STRUCT_FOR_ID(dst_dir_fd)
371371
STRUCT_FOR_ID(duration)
372372
STRUCT_FOR_ID(e)
373+
STRUCT_FOR_ID(eager_start)
373374
STRUCT_FOR_ID(effective_ids)
374375
STRUCT_FOR_ID(element_factory)
375376
STRUCT_FOR_ID(encode)
@@ -460,6 +461,7 @@ struct _Py_global_strings {
460461
STRUCT_FOR_ID(instructions)
461462
STRUCT_FOR_ID(intern)
462463
STRUCT_FOR_ID(intersection)
464+
STRUCT_FOR_ID(is_running)
463465
STRUCT_FOR_ID(isatty)
464466
STRUCT_FOR_ID(isinstance)
465467
STRUCT_FOR_ID(isoformat)

‎Include/internal/pycore_runtime_init_generated.h‎

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎Include/internal/pycore_unicodeobject_generated.h‎

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎Lib/asyncio/base_tasks.py‎

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ def _task_repr_info(task):
1515

1616
info.insert(1,'name=%r'%task.get_name())
1717

18-
coro=coroutines._format_coroutine(task._coro)
19-
info.insert(2,f'coro=<{coro}>')
20-
2118
iftask._fut_waiterisnotNone:
22-
info.insert(3,f'wait_for={task._fut_waiter!r}')
19+
info.insert(2,f'wait_for={task._fut_waiter!r}')
20+
21+
iftask._coro:
22+
coro=coroutines._format_coroutine(task._coro)
23+
info.insert(2,f'coro=<{coro}>')
24+
2325
returninfo
2426

2527

‎Lib/asyncio/tasks.py‎

Lines changed: 100 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
'wait','wait_for','as_completed','sleep',
77
'gather','shield','ensure_future','run_coroutine_threadsafe',
88
'current_task','all_tasks',
9+
'create_eager_task_factory','eager_task_factory',
910
'_register_task','_unregister_task','_enter_task','_leave_task',
1011
)
1112

@@ -43,22 +44,26 @@ def all_tasks(loop=None):
4344
"""Return a set of all tasks for the loop."""
4445
ifloopisNone:
4546
loop=events.get_running_loop()
46-
# Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
47-
# thread while we do so. Therefore we cast it to list prior to filtering. The list
48-
# cast itself requires iteration, so we repeat it several times ignoring
49-
# RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
50-
# details.
47+
# capturing the set of eager tasks first, so if an eager task "graduates"
48+
# to a regular task in another thread, we don't risk missing it.
49+
eager_tasks=list(_eager_tasks)
50+
# Looping over the WeakSet isn't safe as it can be updated from another
51+
# thread, therefore we cast it to list prior to filtering. The list cast
52+
# itself requires iteration, so we repeat it several times ignoring
53+
# RuntimeErrors (which are not very likely to occur).
54+
# See issues 34970 and 36607 for details.
55+
scheduled_tasks=None
5156
i=0
5257
whileTrue:
5358
try:
54-
tasks=list(_all_tasks)
59+
scheduled_tasks=list(_scheduled_tasks)
5560
exceptRuntimeError:
5661
i+=1
5762
ifi>=1000:
5863
raise
5964
else:
6065
break
61-
return {tfortintasks
66+
return {tfortinitertools.chain(scheduled_tasks,eager_tasks)
6267
iffutures._get_loop(t)isloopandnott.done()}
6368

6469

@@ -93,7 +98,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
9398
# status is still pending
9499
_log_destroy_pending=True
95100

96-
def__init__(self,coro,*,loop=None,name=None,context=None):
101+
def__init__(self,coro,*,loop=None,name=None,context=None,
102+
eager_start=False):
97103
super().__init__(loop=loop)
98104
ifself._source_traceback:
99105
delself._source_traceback[-1]
@@ -117,8 +123,11 @@ def __init__(self, coro, *, loop=None, name=None, context=None):
117123
else:
118124
self._context=context
119125

120-
self._loop.call_soon(self.__step,context=self._context)
121-
_register_task(self)
126+
ifeager_startandself._loop.is_running():
127+
self.__eager_start()
128+
else:
129+
self._loop.call_soon(self.__step,context=self._context)
130+
_register_task(self)
122131

123132
def__del__(self):
124133
ifself._state==futures._PENDINGandself._log_destroy_pending:
@@ -250,6 +259,25 @@ def uncancel(self):
250259
self._num_cancels_requested-=1
251260
returnself._num_cancels_requested
252261

262+
def__eager_start(self):
263+
prev_task=_swap_current_task(self._loop,self)
264+
try:
265+
_register_eager_task(self)
266+
try:
267+
self._context.run(self.__step_run_and_handle_result,None)
268+
finally:
269+
_unregister_eager_task(self)
270+
finally:
271+
try:
272+
curtask=_swap_current_task(self._loop,prev_task)
273+
assertcurtaskisself
274+
finally:
275+
ifself.done():
276+
self._coro=None
277+
self=None# Needed to break cycles when an exception occurs.
278+
else:
279+
_register_task(self)
280+
253281
def__step(self,exc=None):
254282
ifself.done():
255283
raiseexceptions.InvalidStateError(
@@ -258,11 +286,17 @@ def __step(self, exc=None):
258286
ifnotisinstance(exc,exceptions.CancelledError):
259287
exc=self._make_cancelled_error()
260288
self._must_cancel=False
261-
coro=self._coro
262289
self._fut_waiter=None
263290

264291
_enter_task(self._loop,self)
265-
# Call either coro.throw(exc) or coro.send(None).
292+
try:
293+
self.__step_run_and_handle_result(exc)
294+
finally:
295+
_leave_task(self._loop,self)
296+
self=None# Needed to break cycles when an exception occurs.
297+
298+
def__step_run_and_handle_result(self,exc):
299+
coro=self._coro
266300
try:
267301
ifexcisNone:
268302
# We use the `send` method directly, because coroutines
@@ -334,7 +368,6 @@ def __step(self, exc=None):
334368
self._loop.call_soon(
335369
self.__step,new_exc,context=self._context)
336370
finally:
337-
_leave_task(self._loop,self)
338371
self=None# Needed to break cycles when an exception occurs.
339372

340373
def__wakeup(self,future):
@@ -897,17 +930,41 @@ def callback():
897930
returnfuture
898931

899932

900-
# WeakSet containing all alive tasks.
901-
_all_tasks=weakref.WeakSet()
933+
defcreate_eager_task_factory(custom_task_constructor):
934+
935+
if"eager_start"notininspect.signature(custom_task_constructor).parameters:
936+
raiseTypeError(
937+
"Provided constructor does not support eager task execution")
938+
939+
deffactory(loop,coro,*,name=None,context=None):
940+
returncustom_task_constructor(
941+
coro,loop=loop,name=name,context=context,eager_start=True)
942+
943+
944+
returnfactory
945+
946+
eager_task_factory=create_eager_task_factory(Task)
947+
948+
949+
# Collectively these two sets hold references to the complete set of active
950+
# tasks. Eagerly executed tasks use a faster regular set as an optimization
951+
# but may graduate to a WeakSet if the task blocks on IO.
952+
_scheduled_tasks=weakref.WeakSet()
953+
_eager_tasks=set()
902954

903955
# Dictionary containing tasks that are currently active in
904956
# all running event loops. {EventLoop: Task}
905957
_current_tasks= {}
906958

907959

908960
def_register_task(task):
909-
"""Register a new task in asyncio as executed by loop."""
910-
_all_tasks.add(task)
961+
"""Register an asyncio Task scheduled to run on an event loop."""
962+
_scheduled_tasks.add(task)
963+
964+
965+
def_register_eager_task(task):
966+
"""Register an asyncio Task about to be eagerly executed."""
967+
_eager_tasks.add(task)
911968

912969

913970
def_enter_task(loop,task):
@@ -926,28 +983,49 @@ def _leave_task(loop, task):
926983
del_current_tasks[loop]
927984

928985

986+
def_swap_current_task(loop,task):
987+
prev_task=_current_tasks.get(loop)
988+
iftaskisNone:
989+
del_current_tasks[loop]
990+
else:
991+
_current_tasks[loop]=task
992+
returnprev_task
993+
994+
929995
def_unregister_task(task):
930-
"""Unregister a task."""
931-
_all_tasks.discard(task)
996+
"""Unregister a completed, scheduled Task."""
997+
_scheduled_tasks.discard(task)
998+
999+
1000+
def_unregister_eager_task(task):
1001+
"""Unregister a task which finished its first eager step."""
1002+
_eager_tasks.discard(task)
9321003

9331004

9341005
_py_current_task=current_task
9351006
_py_register_task=_register_task
1007+
_py_register_eager_task=_register_eager_task
9361008
_py_unregister_task=_unregister_task
1009+
_py_unregister_eager_task=_unregister_eager_task
9371010
_py_enter_task=_enter_task
9381011
_py_leave_task=_leave_task
1012+
_py_swap_current_task=_swap_current_task
9391013

9401014

9411015
try:
942-
from_asyncioimport (_register_task,_unregister_task,
943-
_enter_task,_leave_task,
944-
_all_tasks,_current_tasks,
1016+
from_asyncioimport (_register_task,_register_eager_task,
1017+
_unregister_task,_unregister_eager_task,
1018+
_enter_task,_leave_task,_swap_current_task,
1019+
_scheduled_tasks,_eager_tasks,_current_tasks,
9451020
current_task)
9461021
exceptImportError:
9471022
pass
9481023
else:
9491024
_c_current_task=current_task
9501025
_c_register_task=_register_task
1026+
_c_register_eager_task=_register_eager_task
9511027
_c_unregister_task=_unregister_task
1028+
_c_unregister_eager_task=_unregister_eager_task
9521029
_c_enter_task=_enter_task
9531030
_c_leave_task=_leave_task
1031+
_c_swap_current_task=_swap_current_task

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp