- Notifications
You must be signed in to change notification settings - Fork5.7k
Updater improvements#1018
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Updater improvements#1018
Changes fromall commits
ce447b8
fc98577
c5f0ed8
b7c9ace
66a03ee
597c9f5
152d034
aa35961
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -149,15 +149,15 @@ def _thread_wrapper(self, target, *args, **kwargs): | ||
target(*args, **kwargs) | ||
except Exception: | ||
self.__exception_event.set() | ||
self.logger.exception('unhandled exception in %s', thr_name) | ||
raise | ||
self.logger.debug('{0} - ended'.format(thr_name)) | ||
def start_polling(self, | ||
poll_interval=0.0, | ||
timeout=10, | ||
clean=False, | ||
bootstrap_retries=-1, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. The docstring doesn;t reflect this change to the default value There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. will fix | ||
read_latency=2., | ||
allowed_updates=None): | ||
"""Starts polling updates from Telegram. | ||
@@ -171,8 +171,8 @@ def start_polling(self, | ||
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the | ||
`Updater` will retry on failures on the Telegram server. | ||
* < 0 - retry indefinitely (default) | ||
* 0 - no retries | ||
* > 0 - retry up to X times | ||
allowed_updates (List[:obj:`str`], optional): Passed to | ||
@@ -229,8 +229,8 @@ def start_webhook(self, | ||
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the | ||
`Updater` will retry on failures on the Telegram server. | ||
* < 0 - retry indefinitely (default) | ||
* 0 - no retries | ||
Comment on lines +232 to +233 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. documentation was updated to state indefinite retries, but the default value in the signature was not updated | ||
* > 0 - retry up to X times | ||
webhook_url (:obj:`str`, optional): Explicitly specify the webhook url. Useful behind | ||
@@ -242,7 +242,6 @@ def start_webhook(self, | ||
:obj:`Queue`: The update queue that can be filled from the main thread. | ||
""" | ||
with self.__lock: | ||
if not self.running: | ||
self.running = True | ||
@@ -262,46 +261,72 @@ def _start_polling(self, poll_interval, timeout, read_latency, bootstrap_retries | ||
# updates from Telegram and inserts them in the update queue of the | ||
# Dispatcher. | ||
self.logger.debug('Updater thread started (polling)') | ||
self._bootstrap(bootstrap_retries, clean=clean, webhook_url='', allowed_updates=None) | ||
self.logger.debug('Bootstrap done') | ||
def polling_action_cb(): | ||
updates = self.bot.get_updates( | ||
self.last_update_id, timeout=timeout, read_latency=read_latency, | ||
allowed_updates=allowed_updates) | ||
if updates: | ||
if not self.running: | ||
self.logger.debug('Updates ignored and will be pulled again on restart') | ||
else: | ||
for update in updates: | ||
self.update_queue.put(update) | ||
self.last_update_id = updates[-1].update_id + 1 | ||
return True | ||
def polling_onerr_cb(exc): | ||
# Put the error into the update queue and let the Dispatcher | ||
# broadcast it | ||
self.update_queue.put(exc) | ||
self._network_loop_retry(polling_action_cb, polling_onerr_cb, 'getting Updates', | ||
poll_interval) | ||
def _network_loop_retry(self, action_cb, onerr_cb, description, interval): | ||
"""Perform a loop calling `action_cb`, retrying after network errors. | ||
Stop condition for loop: `self.running` evaluates False or return value of `action_cb` | ||
evaluates False. | ||
Args: | ||
action_cb (:obj:`callable`): Network oriented callback function to call. | ||
onerr_cb (:obj:`callable`): Callback to call when TelegramError is caught. Receives the | ||
exception object as a parameter. | ||
description (:obj:`str`): Description text to use for logs and exception raised. | ||
interval (:obj:`float` | :obj:`int`): Interval to sleep between each call to | ||
`action_cb`. | ||
""" | ||
self.logger.debug('Start network loop retry %s', description) | ||
cur_interval = interval | ||
while self.running: | ||
try: | ||
if not action_cb(): | ||
break | ||
except RetryAfter as e: | ||
self.logger.info('%s', e) | ||
cur_interval = 0.5 + e.retry_after | ||
except TimedOut as toe: | ||
self.logger.debug('Timed out%s: %s', description, toe) | ||
# Iffailure is due to timeout, we should retry asap. | ||
cur_interval = 0 | ||
except InvalidToken as pex: | ||
self.logger.error('Invalid token; aborting') | ||
raise pex | ||
except TelegramError as te: | ||
self.logger.error('Error while %s: %s', description, te) | ||
onerr_cb(te) | ||
cur_interval = self._increase_poll_interval(cur_interval) | ||
else: | ||
cur_interval = interval | ||
if cur_interval: | ||
sleep(cur_interval) | ||
@@ -319,7 +344,7 @@ def _increase_poll_interval(current_interval): | ||
def _start_webhook(self, listen, port, url_path, cert, key, bootstrap_retries, clean, | ||
webhook_url, allowed_updates): | ||
self.logger.debug('Updater thread started (webhook)') | ||
use_ssl = cert is not None and key is not None | ||
if not url_path.startswith('/'): | ||
url_path = '/{0}'.format(url_path) | ||
@@ -370,39 +395,56 @@ def _check_ssl_cert(self, cert, key): | ||
def _gen_webhook_url(listen, port, url_path): | ||
return 'https://{listen}:{port}{path}'.format(listen=listen, port=port, path=url_path) | ||
def _bootstrap(self, max_retries, clean, webhook_url, allowed_updates, cert=None, | ||
bootstrap_interval=5): | ||
retries = [0] | ||
def bootstrap_del_webhook(): | ||
self.bot.delete_webhook() | ||
return False | ||
def bootstrap_clean_updates(): | ||
self.logger.debug('Cleaning updates from Telegram server') | ||
updates = self.bot.get_updates() | ||
while updates: | ||
updates = self.bot.get_updates(updates[-1].update_id + 1) | ||
return False | ||
def bootstrap_set_webhook(): | ||
self.bot.set_webhook( | ||
url=webhook_url, certificate=cert, allowed_updates=allowed_updates) | ||
return False | ||
def bootstrap_onerr_cb(exc): | ||
if not isinstance(exc, Unauthorized) and (max_retries < 0 or retries[0] < max_retries): | ||
retries[0] += 1 | ||
self.logger.warning('Failed bootstrap phase; try=%s max_retries=%s', | ||
retries[0], max_retries) | ||
else: | ||
self.logger.error('Failed bootstrap phase after %s retries (%s)', retries[0], exc) | ||
raise exc | ||
# Cleaning pending messages is done by polling for them - so we need to delete webhook if | ||
# one is configured. | ||
# We also take this chance to delete pre-configured webhook if this is a polling Updater. | ||
# NOTE: We don't know ahead if a webhook is configured, so we just delete. | ||
if clean or not webhook_url: | ||
self._network_loop_retry(bootstrap_del_webhook, bootstrap_onerr_cb, | ||
'bootstrap del webhook', bootstrap_interval) | ||
retries[0] = 0 | ||
# Clean pending messages, if requested. | ||
if clean: | ||
self._network_loop_retry(bootstrap_clean_updates, bootstrap_onerr_cb, | ||
'bootstrap clean updates', bootstrap_interval) | ||
retries[0] = 0 | ||
sleep(1) | ||
# Restore/set webhook settings, if needed. Again, we don't know ahead if a webhook is set, | ||
# so we set it anyhow. | ||
if webhook_url: | ||
self._network_loop_retry(bootstrap_set_webhook, bootstrap_onerr_cb, | ||
'bootstrap set webhook', bootstrap_interval) | ||
def stop(self): | ||
"""Stops the polling/webhook thread, the dispatcher and the job queue.""" | ||