Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork33.7k
gh-97696: asyncio eager tasks factory#102853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes from31 commits
a127f9845316d8ac9b7b0402c317563ffd46f2a47ae7743f610a03a05e8ae51b4fea1c441fd924c46a728686a3d14b6f580f9185c679534a70bb3d47edcf3f45e5c8cfbf8d91873a6459c2bc9a9522c542acdc51fef81401eb540c3cef856887771657ccce357b197fc5456450c09767a255ec805870d5a2587a10101742b83ed94b17f605File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -527,6 +527,42 @@ Running Tasks Concurrently | ||
| and there is no running event loop. | ||
| Eager Task Factory | ||
| ================== | ||
| .. function:: eager_task_factory(loop, coro, *, name=None, context=None) | ||
| A task factory for eager task execution. | ||
| When using this factory (via :meth:`loop.set_task_factory(asyncio.eager_task_factory) <loop.set_task_factory>`), | ||
| coroutines begin execution synchronously during :class:`Task` construction. | ||
| Tasks are only scheduled on the event loop if they block. | ||
| This can be a performance improvement as the overhead of loop scheduling | ||
| is avoided for coroutines that complete synchronously. | ||
| A common example where this is beneficial is coroutines which employ | ||
| caching or memoization to avoid actual I/O when possible. | ||
| .. note:: | ||
| Immediate execution of the coroutine is a semantic change. | ||
| If the coroutine blocks, the task is never scheduled to the event loop. | ||
| If the coroutine execution returns or raises, the task is scheduled to | ||
| the event loop. This change may introduce behavior changes to existing | ||
itamaro marked this conversation as resolved. OutdatedShow resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| applications. For example, the application's task execution order is | ||
| likely to change. | ||
| .. versionadded:: 3.12 | ||
| .. function:: create_eager_task_factory(custom_task_constructor) | ||
| Create an eager task factory, similar to :func:`eager_task_factory`, | ||
| using the provided *custom_task_constructor* when creating a new task instead | ||
| of the default :class:`Task`. | ||
| .. versionadded:: 3.12 | ||
| Shielding From Cancellation | ||
| =========================== | ||
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -6,6 +6,7 @@ | ||
| 'wait', 'wait_for', 'as_completed', 'sleep', | ||
| 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', | ||
| 'current_task', 'all_tasks', | ||
| 'create_eager_task_factory', 'eager_task_factory', | ||
| '_register_task', '_unregister_task', '_enter_task', '_leave_task', | ||
| ) | ||
| @@ -43,22 +44,25 @@ def all_tasks(loop=None): | ||
| """Return a set of all tasks for the loop.""" | ||
| if loop is None: | ||
| loop = events.get_running_loop() | ||
| # Looping over these sets isn't safe as they can be updated from another thread, | ||
| # therefore we cast to lists prior to filtering. The list cast itself requires | ||
itamaro marked this conversation as resolved. OutdatedShow resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| # iteration, so we repeat it several times ignoring RuntimeErrors (which are not | ||
| # very likely to occur). See issues 34970 and 36607 for details. | ||
| scheduled_tasks = None | ||
| eager_tasks = None | ||
| i = 0 | ||
| while True: | ||
| try: | ||
| if scheduled_tasks is None: | ||
| scheduled_tasks = list(_scheduled_tasks) | ||
| eager_tasks = list(_eager_tasks) | ||
carljm marked this conversation as resolved. OutdatedShow resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| except RuntimeError: | ||
| i += 1 | ||
| if i >= 1000: | ||
| raise | ||
| else: | ||
| break | ||
| return {t for t initertools.chain(scheduled_tasks, eager_tasks) | ||
| if futures._get_loop(t) is loop and not t.done()} | ||
| @@ -93,7 +97,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation | ||
| # status is still pending | ||
| _log_destroy_pending = True | ||
| def __init__(self, coro, *, loop=None, name=None, context=None, | ||
| eager_start=False): | ||
| super().__init__(loop=loop) | ||
| if self._source_traceback: | ||
| del self._source_traceback[-1] | ||
| @@ -117,8 +122,11 @@ def __init__(self, coro, *, loop=None, name=None, context=None): | ||
| else: | ||
| self._context = context | ||
| if eager_start and self._loop.is_running(): | ||
| self.__eager_start() | ||
| else: | ||
| self._loop.call_soon(self.__step, context=self._context) | ||
| _register_task(self) | ||
| def __del__(self): | ||
| if self._state == futures._PENDING and self._log_destroy_pending: | ||
| @@ -250,6 +258,25 @@ def uncancel(self): | ||
| self._num_cancels_requested -= 1 | ||
| return self._num_cancels_requested | ||
| def __eager_start(self): | ||
| prev_task = _swap_current_task(self._loop, self) | ||
| try: | ||
| _register_eager_task(self) | ||
| try: | ||
| self._context.run(self.__step_run_and_handle_result, None) | ||
| finally: | ||
| _unregister_eager_task(self) | ||
| finally: | ||
| try: | ||
| curtask = _swap_current_task(self._loop, prev_task) | ||
| assert curtask is self | ||
| finally: | ||
| if self.done(): | ||
| self._coro = None | ||
| self = None # Needed to break cycles when an exception occurs. | ||
| else: | ||
| _register_task(self) | ||
| def __step(self, exc=None): | ||
| if self.done(): | ||
| raise exceptions.InvalidStateError( | ||
| @@ -258,11 +285,17 @@ def __step(self, exc=None): | ||
| if not isinstance(exc, exceptions.CancelledError): | ||
| exc = self._make_cancelled_error() | ||
| self._must_cancel = False | ||
| self._fut_waiter = None | ||
| _enter_task(self._loop, self) | ||
| try: | ||
| self.__step_run_and_handle_result(exc) | ||
| finally: | ||
| _leave_task(self._loop, self) | ||
| self = None # Needed to break cycles when an exception occurs. | ||
| def __step_run_and_handle_result(self, exc): | ||
| coro = self._coro | ||
| try: | ||
| if exc is None: | ||
| # We use the `send` method directly, because coroutines | ||
| @@ -334,7 +367,6 @@ def __step(self, exc=None): | ||
| self._loop.call_soon( | ||
| self.__step, new_exc, context=self._context) | ||
| finally: | ||
| self = None # Needed to break cycles when an exception occurs. | ||
| def __wakeup(self, future): | ||
| @@ -897,17 +929,41 @@ def callback(): | ||
| return future | ||
| def create_eager_task_factory(custom_task_constructor): | ||
| if "eager_start" not in inspect.signature(custom_task_constructor).parameters: | ||
itamaro marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| raise TypeError( | ||
| "Provided constructor does not support eager task execution") | ||
| def factory(loop, coro, *, name=None, context=None): | ||
| return custom_task_constructor( | ||
| coro, loop=loop, name=name, context=context, eager_start=True) | ||
| return factory | ||
| eager_task_factory = create_eager_task_factory(Task) | ||
| # Collectively these two sets hold references to the complete set of active | ||
| # tasks. Eagerly executed tasks use a faster regular set as an optimization | ||
| # but may graduate to a WeakSet if the task blocks on IO. | ||
| _scheduled_tasks = weakref.WeakSet() | ||
| _eager_tasks = set() | ||
| # Dictionary containing tasks that are currently active in | ||
| # all running event loops. {EventLoop: Task} | ||
| _current_tasks = {} | ||
| def _register_task(task): | ||
| """Register an asyncio Task scheduled to run on an event loop.""" | ||
| _scheduled_tasks.add(task) | ||
| def _register_eager_task(task): | ||
| """Register an asyncio Task about to be eagerly executed.""" | ||
| _eager_tasks.add(task) | ||
| def _enter_task(loop, task): | ||
| @@ -926,28 +982,49 @@ def _leave_task(loop, task): | ||
| del _current_tasks[loop] | ||
| def _swap_current_task(loop, task): | ||
| prev_task = _current_tasks.get(loop) | ||
| if task is None: | ||
| del _current_tasks[loop] | ||
| else: | ||
| _current_tasks[loop] = task | ||
| return prev_task | ||
| def _unregister_task(task): | ||
| """Unregister a completed, scheduled Task.""" | ||
| _scheduled_tasks.discard(task) | ||
| def _unregister_eager_task(task): | ||
| """Unregister a task which finished its first eager step.""" | ||
| _eager_tasks.discard(task) | ||
| _py_current_task = current_task | ||
| _py_register_task = _register_task | ||
| _py_register_eager_task = _register_eager_task | ||
| _py_unregister_task = _unregister_task | ||
| _py_unregister_eager_task = _unregister_eager_task | ||
| _py_enter_task = _enter_task | ||
| _py_leave_task = _leave_task | ||
| _py_swap_current_task = _swap_current_task | ||
| try: | ||
| from _asyncio import (_register_task, _register_eager_task, | ||
| _unregister_task, _unregister_eager_task, | ||
| _enter_task, _leave_task, _swap_current_task, | ||
| _scheduled_tasks, _eager_tasks, _current_tasks, | ||
| current_task) | ||
| except ImportError: | ||
| pass | ||
| else: | ||
| _c_current_task = current_task | ||
| _c_register_task = _register_task | ||
| _c_register_eager_task = _register_eager_task | ||
| _c_unregister_task = _unregister_task | ||
| _c_unregister_eager_task = _unregister_eager_task | ||
| _c_enter_task = _enter_task | ||
| _c_leave_task = _leave_task | ||
| _c_swap_current_task = _swap_current_task | ||
Uh oh!
There was an error while loading.Please reload this page.