Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
poolitzer edited this pageApr 24, 2025 ·22 revisions

Concurrency in PTB

Table of contents

⚠️ Please make sure to read this page in its entirety and in particular the section ontailor-made concurrency

PTB is built on top of Python'sasyncio, which allows writing concurrent code using theasync/await syntax.This greatly helps to design code that efficiently uses the wait time during I/O operations like network communication (e.g. sending a message to a user) or reading/writing on the hard drive.

Note:asyncio code is usually single-threaded and hence PTB currently does not aim to be thread safe (see the readme for more info.)

Default behavior

By default, incoming updates and handler callbacks are processed sequentially, i.e. one after the other.So, if one callback function takes some time to execute, all other updates have to wait for it.

Example:You're running theEchobot and two users (User A andUser B) send a message to the bot at the same time.MaybeUser A was a bit quicker, so their request arrives first, in the form of anUpdate object (Update A).TheApplication checks theUpdate and decides it should be handled by the handler with the callback function namedecho.At the same time, theUpdate ofUser B arrives (Update B).But theApplication is not finished withUpdate A.It calls theecho function withUpdate A, which sends a reply toUser A. Sending a reply takes some time, andUpdate B remains untouched during that time.Only after theecho function finishes forUpdate A, theApplication repeats the same process forUpdate B.

If you have handlers in multiple groups, it gets a tiny bit more complicated.The following pseudocode explains howApplication.process_update roughly works in the default case, i.e. sequential processing (we simplified a bit by e.g. skipping some arguments of the involved methods):

asyncdefprocess_update(self,update):# self is the `Application` instanceforgroup_number,handlersinself.handlers.items():# `handlers` is the list of handlers in group `group_number`forhandlerinhandlers:ifhandler.check_update(update):# Here we `await`, i.e. we only continue after the callback is done!awaithandler.handle_update(update)break# at most one handler per group handles the update

Using concurrency

We want to reply to bothUser A andUser B as fast as possible and while sending the reply to userUser A we'd like to already get started on handlingUpdate B.PTB comes with three built-in mechanisms that can help with that.

Handler.block

Via theblock parameter ofHandler you can specify thatApplication.process_update should not wait for the callback to finish:

application.add_handler(MessageHandler(filters.TEXT&~filters.COMMAND,echo,block=False))

Instead, it will run the callback asasyncio.Task viaasyncio.create_task.Now, when theApplication determined that theecho function should handleUpdate A, it creates a new task fromecho(Update A).Immediately after that, it callsApplication.process_update(Update B) and repeats the process forUpdate B without any further delay.Both replies are sentconcurrently.

Again, let's have a look at pseudocode:

asyncdefprocess_update(self,update):# self is the `Application` instanceforgroup_number,handlersinself.handlers.items():# `handlers` is the list of handlers in group `group_number`forhandlerinhandlers:ifhandler.check_update(update):# Here we *don't* `await`, such that the loop immediately continuesasyncio.create_task(handler.handle_update(update))break# at most one handler per group handles the update

This already helps for many use cases.However, by usingblock=False in a handler, you can no longer rely on handlers in different groups being called one after the other.Depending on your use case, this can be an issue.Hence, PTB comes with a second option.

Application.concurrent_updates

Instead of running single handlers in a non-blocking way, we can tell theApplication to run the whole call ofApplication.process_update concurrently:

Application.builder().token('TOKEN').concurrent_updates(True).build()

Now theApplication will startApplication.process_update(Update A) viaasyncio.create_task and immediately afterwards do the same withUpdate B.Again, pseudocode:

whilenotapplication.update_queue.empty():update=awaitapplication.update_queue.get()asyncio.create_task(application.process_update(update))

This setting isindependent of theblock parameter ofHandler and withinapplication.process_update concurrency still works as explained above.

You can further customize concurrent handlingApplication.process_update also implement your own custom update processor by subclassing theBaseUpdateProcessor interface class. Let's have a look at an example:

classMyUpdateProcessor(BaseUpdateProcessor):asyncdefdo_process_update(self,update,coroutine)->None:# This method is called for every updateifupdate.callback_query:awaitasyncio.sleep(5)awaitcoroutineasyncdefinitialize(self)->None:passasyncdefshutdown(self)->None:passApplication.builder().token('TOKEN').concurrent_updates(MyUpdateProcessor(10)).build()

The above code processes everycallback_query update with a delay of 5 seconds for up to 10 updates simultaneously.The psuedocode for this now looks something like this:

whilenotapplication.update_queue.empty():update=awaitapplication.update_queue.get()coroutine=application.process_update(update)asyncio.create_task(my_update_processor.do_process_update(update,coroutine))

This is just an example of how to use theBaseUpdateProcessor class to handle updates in the way you want, there are endless possibilities to this.For example, you can throttle update processing for specific users or ensure that inline queries are always processed sequentially.See thedocumentation for more information.

Note: The number of concurrently processed updates is limited (the limit defaults to 4096 updates at a time).This is a simple measure to avoid e.g. DDOS attacks

Application.create_task

Handler.block andApplication.concurrent_updates allow running handler callbacks or the entirety of handling an update concurrently.In addition to that, PTB offersApplication.create_task to run specific coroutine function concurrently.Application.create_task is a very thin wrapper aroundasyncio.create_task that adds some book-keeping that comes in handy for using it in PTB.Please consult the documentation ofApplication.create_task for more details.

This wrapper gives you fine-grained control about how you use concurrency in PTB.The next section gives you an idea about why that is helpful.

Tailor-made Concurrency

Even thoughasyncio is usually single-threaded, concurrent programming comes with a number of traps to fall into, and we'll try to give you a few hints on how to spot them.However, this wiki article does not replaceyour psychiatrist a university lecture on concurrency.

Probably the biggest cause of issues of concurrency are shared states, and those issues are hard to fix.So instead of showing you how to fix them, we'll show you how to avoid them altogether. More about that later.

A fair warning: In this section, we'll try to give you a simple talk (if that's possible) on a very complex topic.Many have written about it before, and we're certainly less qualified than most.As usual, we'll use an example to complement the text, and try to stay in the realm of what's important to you.

An example that is often used to illustrate this is that of a bank.Let's say you have been hired by a bank to write a Telegram bot to manage bank accounts. The bot has the command/transaction <amount> <recipient>, and because many people will be using this command, you think it's a good idea to make this command run concurrently.You Some unpaid intern wrote the following (BAD AND DANGEROUS) callback function:

asyncdeftransaction(update,context):bot=context.botchat_id=update.effective_user.idsource_id,target_id,amount=parse_update(update)awaitbot.send_message(chat_id,'Preparing...')bank.log(BEGINNING_TRANSACTION,amount,source_id,target_id)source=bank.read_account(source_id)target=bank.read_account(target_id)source.balance-=amounttarget.balance+=amountawaitbot.send_message(chat_id,'Transferring money...')bank.log(CALCULATED_TRANSACTION,amount,source_id,target_id)bank.write_account(source)awaitbot.send_message(chat_id,'Source account updated...')awaitbot.send_message(chat_id,'Target account updated...')bank.write_account(target)awaitbot.send_message(chat_id,'Done!')bank.log(FINISHED_TRANSACTION,amount,source_id,target_id)application.add_handler(CommandHandler('transaction',transaction,block=False))

We skipped some of the implementation details, so here's a short explanation:

  • parse_update extracts the user id's of the sender (source_id) and receiver (target_id) from the message
  • bank is a globally accessible object that exposes the Python API of the banks operations
    • bank.read_account reads a bank account from the bank's database into a Python object
    • bank.write_account writes a bank account back to the bank's database
    • bank.log must be used to keep a log of all changes to make sure no money is lost

Sadly,you that damn intern fell right into the trap.Let's say there are two morally corrupt customers,Customer A withAccount A andCustomer B withAccount B, who both make a transaction simultaneously.Customer A sendsTransaction AB of$10 toCustomer B.At the same time,Customer B sends aTransaction BA of$100 toCustomer A.

Now theApplication starts two tasks,Task AB andTask BA, almost simultaneously.Both tasks read the accounts from the database with thesame balance and calculate a new balance for both of them.In most cases, one of the two transactions will simply overwrite the other.That's not too bad, but will at least be confusing to the customers.However, eachawait gives control back to the event loop which may then continue another task.Hence, the following may occur:

  1. Task AB executesbank.write_account(source) and updatesAccount A with-$10
  2. Before updatingAccount B,Task AB sends two messages and during that time, the event loop continuesTask BA
  3. Task BA executesbank.write_account(source) and updatesAccount B with-$100
  4. Before updatingAccount A,Task BA sends two messages and during that time, the event loop continuesTask AB
  5. Task AB executesbank.write_account(target) and updatesAccount B with+$10
  6. WhenTask BA is resumed again, it executesbank.write_account(target) and updatesAccount A with+$100

In the end,Account A is at+$100 andAccount B is at+$10.Of course, this won't happen very often.And that's what makes this bug so critical.It will probably be missed by your tests and end up in production, potentially causing a lot of financial damage.

Note: This kind of bug is called arace condition and has been the source of many, many security vulnerabilities.It's also one of the reasons why banking software is not written by unpaid interns.

To be fair, you probably don't write software for banks (if you do, you should already know about this), but this kind of bug can occur in much simpler situations.While in this case, the shared state is thebank object, it can take many forms.A database, adict, alist or any other kind of object that is modified by more than one task.Depending on the situation, race conditions are more or less likely to occur, and the damage they do is bigger or smaller, but as a rule of thumb, they're bad.

As promised in the first paragraph, let's discuss how to avoid such situations.That's not always as easy as it is in this case, but we're lucky:

  1. Our set of tools is very limited -Application.create_task is the onlyasyncio tool we're using
  2. Our goals are not very ambitious - we only want to speed up our I/O

There are two relatively simple steps you have to follow.First, identify those parts of the code thatmust run sequentially (the opposite ofin parallel orconcurrently).Usually, that is code that fitsat least one of these criteria:

  1. Modifies shared state
  2. Reads shared state andrelies on it being correct
  3. Modifies local state (e.g. a variable used later in the same function)

Make sure you have a good idea whatshared state meansDon't hesitate to do a quick Google search on it.

Let's go through our bank example line by line and note which of the criteria it matches:

asyncdeftransaction(update,context):bot=context.botchat_id=update.effective_user.id# 3source_id,target_id,amount=parse_update(update)# 3awaitbot.send_message(chat_id,'Preparing...')# Nonebank.log(BEGINNING_TRANSACTION,amount,source_id,target_id)# Nonesource=bank.read_account(source_id)# 2, 3target=bank.read_account(target_id)# 2, 3source.balance-=amount# 3target.balance+=amount# 3awaitbot.send_message(chat_id,'Transferring money...')# Nonebank.log(CALCULATED_TRANSACTION,amount,source_id,target_id)# Nonebank.write_account(source)# 1awaitbot.send_message(chat_id,'Source account updated...')# Noneawaitbot.send_message(chat_id,'Target account updated...')# Nonebank.write_account(target)# 1awaitbot.send_message(chat_id,'Done!')# Nonebank.log(FINISHED_TRANSACTION,amount,source_id,target_id)# Noneapplication.add_handler(CommandHandler('transaction',transaction,block=False))

Note:One could argue thatbank.log modifies shared state.However, logging libraries are usually thread-safe and it's unlikely that the log has a critical functional role.It's not being read from in this function, and let's assume it's not being read from anywhere else in the code, so maybe consider this an exception to the rule.Also, for the sake of this example, it'd be boring if onlybot.sendMessage would be safe to run in parallel.However, we will keep this in mind for the next step.

As you can see, there's a pretty obvious pattern here:bot.send_message andbank.log are not matching any criteria we have set for strictly sequential code.That means we can run this code asynchronously without risk.Therefore, the second step is to extract that code to separate functions and run only them concurrently.Since our async code parts are all very similar, they can be replaced by a single function.We could have done that before, but then this moment would've been less cool.

asyncdeflog_and_notify(action,amount,source_id,target_id,chat_id,message):bank.log(action,amount,source_id,target_id)awaitbot.send_message(chat_id,message)asyncdeftransaction(update,context):chat_id=update.message.chat_id# 3source_id,target_id,amount=parse_update(update)# 3context.application.create_task(log_and_notify(BEGINNING_TRANSACTION,amount,source_id,target_id,chat_id,'Preparing...',    ),update=update  )source=bank.read_account(source_id)# 2, 3target=bank.read_account(target_id)# 2, 3source.balance-=amount# 3target.balance+=amount# 3context.application.create_task(log_and_notify(CALCULATED_TRANSACTION,amount,source_id,target_id,chat_id,'Transferring money...'    ),update=update  )bank.write_account(source)# 1bank.write_account(target)# 1context.application.create_task(log_and_notify(FINISHED_TRANSACTION,amount,source_id,target_id,chat_id,'Done!',    ),update=update  )application.add_handler(CommandHandler('transaction',transaction,block=True))

Note: You might have noticed that we movedbank.log beforebot.send_message, so the log entries will be in ordermost of the time, assuming the database operations take long enough for the log to complete.

Note: It's likely thatbank.read_account andbank.write_account require some I/O operations to interact with the banks database.You see that it's not always possible to write code concurrently, at least with this simplified method. Read aboutTransactions to learn how databases solve this in "real life".

By separating the strictly sequential code from the concurrent code, we made sure that no race conditions can occur.Thetransaction function won't be executed concurrently anymore, but we still managed to gain some substantial performance boost over completely sequential code, because the logging and user notification is now run in parallel.

That's basically it for now.For the very end, here are a few helpful guidelines for writing concurrent code:

  • Avoid using shared state whenever possible
  • Write self-contained (pure) functions
  • When in doubt, make it sequential

Wiki ofpython-telegram-bot © Copyright 2015-2025 – Licensed byCreative Commons

Must read

  1. Introduction to the API
  2. Tutorial: Your first bot
  3. FAQ
  4. How to ask good questions
  5. How to write an MWE

Concepts & Important Elements

  1. Architecture Overview
  2. Builder Pattern forApplication
  3. Types of Handlers
  4. Working with Files and Media
  5. Exceptions, Warnings and Logging
  6. Concurrency in PTB

Notable Features

  1. Advanced Filters
  2. Storing data
  3. Making your bot persistent
  4. Adding Defaults
  5. Job Queue
  6. Arbitrarycallback_data
  7. Avoiding flood limits
  8. Webhooks
  9. Bot API Forward Compatiblity

Code Resources

  1. Frequently requested design patterns
  2. Code snippets
  3. Performance Optimizations
  4. Telegram Passport
  5. Bots built with PTB
  6. Automated Bot Tests

Examples explained

  1. InlineKeyboard Example

Networking

  1. Working Behind a Proxy
  2. Handling network errors

Other resources

  1. Where to host Telegram Bots
  2. How to host your bot
  3. Local API Server
  4. Type Checking with PTB
  5. Press
  6. Notes on GAE
  7. Related Projects
  8. Emoji

Transition Guides

Administration

Clone this wiki locally

[8]ページ先頭

©2009-2025 Movatter.jp