- Notifications
You must be signed in to change notification settings - Fork95
fix: Major refactoring of Polling, Retry and Timeout logic#462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
94e4c216a63d34ea8b176ad03684eba0f3823c02c6ac0d65fcb7a0b2fc72914a5a5f2e15db4b4e382041File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -50,10 +50,13 @@ class ExtendedOperation(polling.PollingFuture): | ||
| refresh (Callable[[], type(extended_operation)]): A callable that returns | ||
| the latest state of the operation. | ||
| cancel (Callable[[], None]): A callable that tries to cancel the operation. | ||
| polling Optional(google.api_core.retry.Retry): The configuration used | ||
| for polling. This can be used to control how often :meth:`done` | ||
| is polled. If the ``timeout`` argument to :meth:`result` is | ||
| specified it will override the ``polling.timeout`` property. | ||
| retry Optional(google.api_core.retry.Retry): DEPRECATED use ``polling`` | ||
| instead. If specified it will override ``polling`` parameter to | ||
| maintain backward compatibility. | ||
| Note: Most long-running API methods use google.api_core.operation.Operation | ||
| This class is a wrapper for a subset of methods that use alternative | ||
| @@ -68,9 +71,14 @@ class ExtendedOperation(polling.PollingFuture): | ||
| """ | ||
| def __init__( | ||
atulep marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| self, | ||
| extended_operation, | ||
| refresh, | ||
| cancel, | ||
| polling=polling.DEFAULT_POLLING, | ||
| **kwargs, | ||
| ): | ||
| super().__init__(polling=polling, **kwargs) | ||
| self._extended_operation = extended_operation | ||
| self._refresh = refresh | ||
| self._cancel = cancel | ||
| @@ -114,7 +122,7 @@ def error_message(self): | ||
| def __getattr__(self, name): | ||
| return getattr(self._extended_operation, name) | ||
| def done(self, retry=None): | ||
| self._refresh_and_update(retry) | ||
atulep marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| return self._extended_operation.done | ||
| @@ -137,9 +145,11 @@ def cancelled(self): | ||
| self._refresh_and_update() | ||
| return self._extended_operation.done | ||
| def _refresh_and_update(self, retry=None): | ||
| if not self._extended_operation.done: | ||
| self._extended_operation = ( | ||
| self._refresh(retry=retry) if retry else self._refresh() | ||
atulep marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| ) | ||
| self._handle_refreshed_operation() | ||
| def _handle_refreshed_operation(self): | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -18,7 +18,7 @@ | ||
| import concurrent.futures | ||
| from google.api_core import exceptions | ||
| from google.api_core import retry as retries | ||
| from google.api_core.future import _helpers | ||
| from google.api_core.future import base | ||
| @@ -29,14 +29,37 @@ class _OperationNotComplete(Exception): | ||
| pass | ||
| # DEPRECATED as it conflates RPC retry and polling concepts into one. | ||
| # Use POLLING_PREDICATE instead to configure polling. | ||
| RETRY_PREDICATE = retries.if_exception_type( | ||
| _OperationNotComplete, | ||
| exceptions.TooManyRequests, | ||
| exceptions.InternalServerError, | ||
| exceptions.BadGateway, | ||
| exceptions.ServiceUnavailable, | ||
| ) | ||
| # DEPRECATED: use DEFAULT_POLLING to configure LRO polling logic. Construct | ||
| # Retry object using its default values as a baseline for any custom retry logic | ||
| # (not to be confused with polling logic). | ||
| DEFAULT_RETRY = retries.Retry(predicate=RETRY_PREDICATE) | ||
| # POLLING_PREDICATE is supposed to poll only on _OperationNotComplete. | ||
| # Any RPC-specific errors (like ServiceUnavailable) will be handled | ||
| # by retry logic (not to be confused with polling logic) which is triggered for | ||
| # every polling RPC independently of polling logic but within its context. | ||
| POLLING_PREDICATE = retries.if_exception_type( | ||
| _OperationNotComplete, | ||
| ) | ||
| # Default polling configuration | ||
| DEFAULT_POLLING = retries.Retry( | ||
| predicate=POLLING_PREDICATE, | ||
| initial=1.0, # seconds | ||
| maximum=20.0, # seconds | ||
| multiplier=1.5, | ||
| timeout=900, # seconds | ||
| ) | ||
| class PollingFuture(base.Future): | ||
| @@ -45,21 +68,29 @@ class PollingFuture(base.Future): | ||
| The :meth:`done` method should be implemented by subclasses. The polling | ||
| behavior will repeatedly call ``done`` until it returns True. | ||
| The actuall polling logic is encapsulated in :meth:`result` method. See | ||
| documentation for that method for details on how polling works. | ||
| .. note:: | ||
| Privacy here is intended to prevent the final class from | ||
| overexposing, not to prevent subclasses from accessing methods. | ||
| Args: | ||
| polling (google.api_core.retry.Retry): The configuration used for polling. | ||
| This parameter controls how often :meth:`done` is polled. If the | ||
| ``timeout`` argument is specified in :meth:`result` method it will | ||
| override the ``polling.timeout`` property. | ||
| retry (google.api_core.retry.Retry): DEPRECATED use ``polling`` instead. | ||
| If set, it will override ``polling`` paremeter for backward | ||
| compatibility. | ||
| """ | ||
| _DEFAULT_VALUE = object() | ||
| def __init__(self, polling=DEFAULT_POLLING, **kwargs): | ||
| super(PollingFuture, self).__init__() | ||
| self._polling =kwargs.get("retry", polling) | ||
| self._result = None | ||
| self._exception = None | ||
| self._result_set = False | ||
| @@ -69,57 +100,150 @@ def __init__(self, retry=DEFAULT_RETRY): | ||
| self._done_callbacks = [] | ||
| @abc.abstractmethod | ||
| def done(self, retry=None): | ||
| """Checks to see if the operation is complete. | ||
| Args: | ||
| retry (google.api_core.retry.Retry): (Optional) How to retry the | ||
| polling RPC (to not be confused with polling configuration. See | ||
| the documentation for :meth:`result` for details). | ||
| Returns: | ||
| bool: True if the operation is complete, False otherwise. | ||
| """ | ||
| # pylint: disable=redundant-returns-doc, missing-raises-doc | ||
| raise NotImplementedError() | ||
| def _done_or_raise(self, retry=None): | ||
| """Check if the future is done and raise if it's not.""" | ||
| if not self.done(retry=retry): | ||
| raise _OperationNotComplete() | ||
| def running(self): | ||
| """True if the operation is currently running.""" | ||
| return not self.done() | ||
| def _blocking_poll(self, timeout=_DEFAULT_VALUE, retry=None, polling=None): | ||
Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. It's not obvious whether this bw compatible if timeout and/or retry are not provided. What's the easiest way to check? ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Here it is not a refactoring it is a fix, so it does not intend to be backward-compatible (as old behavior was wrong).
That is being fixed here:
| ||
| """Poll and wait for the Future to be resolved.""" | ||
| if self._result_set: | ||
| return | ||
| polling = polling or self._polling | ||
atulep marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Helpful to add a comment of what type Polling is. Doesn't need to be a formal Pytype type, but something to help the reader. ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. What do you mean by "type polling"? Which types of polling are there? Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Python type of the Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. +1 ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. I don't understand what you mean. Every polling config is of type Retry. | ||
| if timeout is not PollingFuture._DEFAULT_VALUE: | ||
| polling = polling.with_timeout(timeout) | ||
| try: | ||
Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. FYI: this was a breaking change.#477 Member There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Also for python-aiplatform:googleapis/python-aiplatform#1870 ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. that rety logic line never worked (the retry had been i. What broke you is most likely the new default timeout value (instead of None). | ||
| polling(self._done_or_raise)(retry=retry) | ||
| except exceptions.RetryError: | ||
| raise concurrent.futures.TimeoutError( | ||
| f"Operation did not complete within the designated timeout of " | ||
| f"{polling.timeout} seconds." | ||
| ) | ||
| def result(self, timeout=_DEFAULT_VALUE, retry=None, polling=None): | ||
| """Get the result of the operation. | ||
| This method will poll for operation status periodically, blocking if | ||
| necessary. If you just want to make sure that this method does not block | ||
| for more than X seconds and you do not care about the nitty-gritty of | ||
| how this method operates, just call it with ``result(timeout=X)``. The | ||
| other parameters are for advanced use only. | ||
| Every call to this method is controlled by the following three | ||
| parameters, each of which has a specific, distinct role, even though all three | ||
| may look very similar: ``timeout``, ``retry`` and ``polling``. In most | ||
| cases users do not need to specify any custom values for any of these | ||
| parameters and may simply rely on default ones instead. | ||
| If you choose to specify custom parameters, please make sure you've | ||
| read the documentation below carefully. | ||
| First, please check :class:`google.api_core.retry.Retry` | ||
| class documentation for the proper definition of timeout and deadline | ||
| terms and for the definition the three different types of timeouts. | ||
| This class operates in terms of Retry Timeout and Polling Timeout. It | ||
| does not let customizing RPC timeout and the user is expected to rely on | ||
| default behavior for it. | ||
| The roles of each argument of this method are as follows: | ||
| ``timeout`` (int): (Optional) The Polling Timeout as defined in | ||
| :class:`google.api_core.retry.Retry`. If the operation does not complete | ||
| within this timeout an exception will be thrown. This parameter affects | ||
| neither Retry Timeout nor RPC Timeout. | ||
| ``retry`` (google.api_core.retry.Retry): (Optional) How to retry the | ||
| polling RPC. The ``retry.timeout`` property of this parameter is the | ||
| Retry Timeout as defined in :class:`google.api_core.retry.Retry`. | ||
| This parameter defines ONLY how the polling RPC call is retried | ||
| (i.e. what to do if the RPC we used for polling returned an error). It | ||
| does NOT define how the polling is done (i.e. how frequently and for | ||
| how long to call the polling RPC); use the ``polling`` parameter for that. | ||
| If a polling RPC throws and error and retrying it fails, the whole | ||
| future fails with the corresponding exception. If you want to tune which | ||
| server response error codes are not fatal for operation polling, use this | ||
| parameter to control that (``retry.predicate`` in particular). | ||
| ``polling`` (google.api_core.retry.Retry): (Optional) How often and | ||
| for how long to call the polling RPC periodically (i.e. what to do if | ||
| a polling rpc returned successfully but its returned result indicates | ||
| that the long running operation is not completed yet, so we need to | ||
| check it again at some point in future). This parameter does NOT define | ||
| how to retry each individual polling RPC in case of an error; use the | ||
| ``retry`` parameter for that. The ``polling.timeout`` of this parameter | ||
| is Polling Timeout as defined in as defined in | ||
| :class:`google.api_core.retry.Retry`. | ||
| For each of the arguments, there are also default values in place, which | ||
| will be used if a user does not specify their own. The default values | ||
| for the three parameters are not to be confused with the default values | ||
| for the corresponding arguments in this method (those serve as "not set" | ||
| markers for the resolution logic). | ||
| If ``timeout`` is provided (i.e.``timeout is not _DEFAULT VALUE``; note | ||
| the ``None`` value means "infinite timeout"), it will be used to control | ||
| the actual Polling Timeout. Otherwise, the ``polling.timeout`` value | ||
| will be used instead (see below for how the ``polling`` config itself | ||
| gets resolved). In other words, this parameter effectively overrides | ||
| the ``polling.timeout`` value if specified. This is so to preserve | ||
| backward compatibility. | ||
| If ``retry`` is provided (i.e. ``retry is not None``) it will be used to | ||
| control retry behavior for the polling RPC and the ``retry.timeout`` | ||
| will determine the Retry Timeout. If not provided, the | ||
| polling RPC will be called with whichever default retry config was | ||
| specified for the polling RPC at the moment of the construction of the | ||
| polling RPC's client. For example, if the polling RPC is | ||
| ``operations_client.get_operation()``, the ``retry`` parameter will be | ||
| controlling its retry behavior (not polling behavior) and, if not | ||
| specified, that specific method (``operations_client.get_operation()``) | ||
| will be retried according to the default retry config provided during | ||
| creation of ``operations_client`` client instead. This argument exists | ||
| mainly for backward compatibility; users are very unlikely to ever need | ||
| to set this parameter explicitly. | ||
| If ``polling`` is provided (i.e. ``polling is not None``), it will be used | ||
| to controll the overall polling behavior and ``polling.timeout`` will | ||
| controll Polling Timeout unless it is overridden by ``timeout`` parameter | ||
| as described above. If not provided, the``polling`` parameter specified | ||
| during construction of this future (the ``polling`` argument in the | ||
| constructor) will be used instead. Note: since the ``timeout`` argument may | ||
| override ``polling.timeout`` value, this parameter should be viewed as | ||
| coupled with the ``timeout`` parameter as described above. | ||
| Args: | ||
| timeout (int): (Optional) How long (in seconds) to wait for the | ||
| operation to complete. If None, wait indefinitely. | ||
| retry (google.api_core.retry.Retry): (Optional) How to retry the | ||
| polling RPC. This defines ONLY how the polling RPC call is | ||
| retried (i.e. what to do if the RPC we used for polling returned | ||
| an error). It does NOT define how the polling is done (i.e. how | ||
| frequently and for how long to call the polling RPC). | ||
| polling (google.api_core.retry.Retry): (Optional) How often and | ||
Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. We should also repeat here what will be used if not provided. ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. It is all described in the bigger comment above. | ||
| for how long to call polling RPC periodically. This parameter | ||
| does NOT define how to retry each individual polling RPC call | ||
| (use the ``retry`` parameter for that). | ||
| Returns: | ||
| google.protobuf.Message: The Operation's result. | ||
| @@ -128,8 +252,8 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): | ||
| google.api_core.GoogleAPICallError: If the operation errors or if | ||
| the timeout is reached before the operation completes. | ||
| """ | ||
| self._blocking_poll(timeout=timeout,retry=retry, polling=polling) | ||
Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. The default value for ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. It is not backward compatible by design, because it is a fix (this whole PR is not a refactoring only it also contains many fixes, this is one of them. | ||
| if self._exception is not None: | ||
| # pylint: disable=raising-bad-type | ||
| @@ -138,12 +262,18 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): | ||
| return self._result | ||
| def exception(self, timeout=_DEFAULT_VALUE): | ||
| """Get the exception from the operation, blocking if necessary. | ||
| See the documentation for the :meth:`result` method for details on how | ||
| this method operates, as both ``result`` and this method rely on the | ||
| exact same polling logic. The only difference is that this method does | ||
| not accept ``retry`` and ``polling`` arguments but relies on the default ones | ||
| instead. | ||
| Args: | ||
| timeout (int): How long to wait for the operation to complete. | ||
| If None, wait indefinitely. | ||
| Returns: | ||
| Optional[google.api_core.GoogleAPICallError]: The operation's | ||
Uh oh!
There was an error while loading.Please reload this page.