Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork33.7k
gh-97696: asyncio eager tasks factory#102853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
a127f9845316d8ac9b7b0402c317563ffd46f2a47ae7743f610a03a05e8ae51b4fea1c441fd924c46a728686a3d14b6f580f9185c679534a70bb3d47edcf3f45e5c8cfbf8d91873a6459c2bc9a9522c542acdc51fef81401eb540c3cef856887771657ccce357b197fc5456450c09767a255ec805870d5a2587a10101742b83ed94b17f605File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -6,6 +6,7 @@ | ||
| 'wait', 'wait_for', 'as_completed', 'sleep', | ||
| 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', | ||
| 'current_task', 'all_tasks', | ||
| 'create_eager_task_factory', 'eager_task_factory', | ||
| '_register_task', '_unregister_task', '_enter_task', '_leave_task', | ||
| ) | ||
| @@ -43,22 +44,26 @@ def all_tasks(loop=None): | ||
| """Return a set of all tasks for the loop.""" | ||
| if loop is None: | ||
| loop = events.get_running_loop() | ||
| # capturing the set of eager tasks first, so if an eager task "graduates" | ||
| # to a regular task in another thread, we don't risk missing it. | ||
| eager_tasks = list(_eager_tasks) | ||
| # Looping over the WeakSet isn't safe as it can be updated from another | ||
| # thread, therefore we cast it to list prior to filtering. The list cast | ||
| # itself requires iteration, so we repeat it several times ignoring | ||
| # RuntimeErrors (which are not very likely to occur). | ||
| # See issues 34970 and 36607 for details. | ||
| scheduled_tasks = None | ||
| i = 0 | ||
| while True: | ||
| try: | ||
| scheduled_tasks = list(_scheduled_tasks) | ||
| except RuntimeError: | ||
| i += 1 | ||
| if i >= 1000: | ||
| raise | ||
| else: | ||
| break | ||
| return {t for t initertools.chain(scheduled_tasks, eager_tasks) | ||
| if futures._get_loop(t) is loop and not t.done()} | ||
| @@ -93,7 +98,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation | ||
| # status is still pending | ||
| _log_destroy_pending = True | ||
| def __init__(self, coro, *, loop=None, name=None, context=None, | ||
| eager_start=False): | ||
| super().__init__(loop=loop) | ||
| if self._source_traceback: | ||
| del self._source_traceback[-1] | ||
| @@ -117,8 +123,11 @@ def __init__(self, coro, *, loop=None, name=None, context=None): | ||
| else: | ||
| self._context = context | ||
| if eager_start and self._loop.is_running(): | ||
| self.__eager_start() | ||
| else: | ||
| self._loop.call_soon(self.__step, context=self._context) | ||
| _register_task(self) | ||
| def __del__(self): | ||
| if self._state == futures._PENDING and self._log_destroy_pending: | ||
| @@ -250,6 +259,25 @@ def uncancel(self): | ||
| self._num_cancels_requested -= 1 | ||
| return self._num_cancels_requested | ||
| def __eager_start(self): | ||
| prev_task = _swap_current_task(self._loop, self) | ||
| try: | ||
| _register_eager_task(self) | ||
| try: | ||
| self._context.run(self.__step_run_and_handle_result, None) | ||
| finally: | ||
| _unregister_eager_task(self) | ||
| finally: | ||
| try: | ||
| curtask = _swap_current_task(self._loop, prev_task) | ||
| assert curtask is self | ||
| finally: | ||
| if self.done(): | ||
| self._coro = None | ||
| self = None # Needed to break cycles when an exception occurs. | ||
| else: | ||
| _register_task(self) | ||
| def __step(self, exc=None): | ||
| if self.done(): | ||
| raise exceptions.InvalidStateError( | ||
| @@ -258,11 +286,17 @@ def __step(self, exc=None): | ||
| if not isinstance(exc, exceptions.CancelledError): | ||
| exc = self._make_cancelled_error() | ||
| self._must_cancel = False | ||
| self._fut_waiter = None | ||
| _enter_task(self._loop, self) | ||
| try: | ||
| self.__step_run_and_handle_result(exc) | ||
| finally: | ||
| _leave_task(self._loop, self) | ||
| self = None # Needed to break cycles when an exception occurs. | ||
| def __step_run_and_handle_result(self, exc): | ||
| coro = self._coro | ||
| try: | ||
| if exc is None: | ||
| # We use the `send` method directly, because coroutines | ||
| @@ -334,7 +368,6 @@ def __step(self, exc=None): | ||
| self._loop.call_soon( | ||
| self.__step, new_exc, context=self._context) | ||
| finally: | ||
| self = None # Needed to break cycles when an exception occurs. | ||
| def __wakeup(self, future): | ||
| @@ -897,17 +930,41 @@ def callback(): | ||
| return future | ||
| def create_eager_task_factory(custom_task_constructor): | ||
| if "eager_start" not in inspect.signature(custom_task_constructor).parameters: | ||
itamaro marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| raise TypeError( | ||
| "Provided constructor does not support eager task execution") | ||
| def factory(loop, coro, *, name=None, context=None): | ||
| return custom_task_constructor( | ||
| coro, loop=loop, name=name, context=context, eager_start=True) | ||
| return factory | ||
| eager_task_factory = create_eager_task_factory(Task) | ||
| # Collectively these two sets hold references to the complete set of active | ||
| # tasks. Eagerly executed tasks use a faster regular set as an optimization | ||
| # but may graduate to a WeakSet if the task blocks on IO. | ||
| _scheduled_tasks = weakref.WeakSet() | ||
| _eager_tasks = set() | ||
| # Dictionary containing tasks that are currently active in | ||
| # all running event loops. {EventLoop: Task} | ||
| _current_tasks = {} | ||
| def _register_task(task): | ||
| """Register an asyncio Task scheduled to run on an event loop.""" | ||
| _scheduled_tasks.add(task) | ||
| def _register_eager_task(task): | ||
| """Register an asyncio Task about to be eagerly executed.""" | ||
| _eager_tasks.add(task) | ||
| def _enter_task(loop, task): | ||
| @@ -926,28 +983,49 @@ def _leave_task(loop, task): | ||
| del _current_tasks[loop] | ||
| def _swap_current_task(loop, task): | ||
| prev_task = _current_tasks.get(loop) | ||
| if task is None: | ||
| del _current_tasks[loop] | ||
| else: | ||
| _current_tasks[loop] = task | ||
| return prev_task | ||
| def _unregister_task(task): | ||
| """Unregister a completed, scheduled Task.""" | ||
| _scheduled_tasks.discard(task) | ||
| def _unregister_eager_task(task): | ||
| """Unregister a task which finished its first eager step.""" | ||
| _eager_tasks.discard(task) | ||
| _py_current_task = current_task | ||
| _py_register_task = _register_task | ||
| _py_register_eager_task = _register_eager_task | ||
| _py_unregister_task = _unregister_task | ||
| _py_unregister_eager_task = _unregister_eager_task | ||
| _py_enter_task = _enter_task | ||
| _py_leave_task = _leave_task | ||
| _py_swap_current_task = _swap_current_task | ||
| try: | ||
| from _asyncio import (_register_task, _register_eager_task, | ||
| _unregister_task, _unregister_eager_task, | ||
| _enter_task, _leave_task, _swap_current_task, | ||
| _scheduled_tasks, _eager_tasks, _current_tasks, | ||
| current_task) | ||
| except ImportError: | ||
| pass | ||
| else: | ||
| _c_current_task = current_task | ||
| _c_register_task = _register_task | ||
| _c_register_eager_task = _register_eager_task | ||
| _c_unregister_task = _unregister_task | ||
| _c_unregister_eager_task = _unregister_eager_task | ||
| _c_enter_task = _enter_task | ||
| _c_leave_task = _leave_task | ||
| _c_swap_current_task = _swap_current_task | ||
Uh oh!
There was an error while loading.Please reload this page.