- Notifications
You must be signed in to change notification settings - Fork5.7k
Concurrency
⚠️ Please make sure to read this page in its entirety and in particular the section ontailor-made concurrency
PTB is built on top of Python'sasyncio
, which allows writing concurrent code using theasync
/await
syntax.This greatly helps to design code that efficiently uses the wait time during I/O operations like network communication (e.g. sending a message to a user) or reading/writing on the hard drive.
Note:asyncio
code is usually single-threaded and hence PTB currently does not aim to be thread safe (see the readme for more info.)
By default, incoming updates and handler callbacks are processed sequentially, i.e. one after the other.So, if one callback function takes some time to execute, all other updates have to wait for it.
Example:You're running theEchobot and two users (User A andUser B) send a message to the bot at the same time.MaybeUser A was a bit quicker, so their request arrives first, in the form of anUpdate
object (Update A).TheApplication
checks theUpdate
and decides it should be handled by the handler with the callback function namedecho
.At the same time, theUpdate
ofUser B arrives (Update B).But theApplication
is not finished withUpdate A.It calls theecho
function withUpdate A, which sends a reply toUser A. Sending a reply takes some time, andUpdate B remains untouched during that time.Only after theecho
function finishes forUpdate A, theApplication
repeats the same process forUpdate B.
If you have handlers in multiple groups, it gets a tiny bit more complicated.The following pseudocode explains howApplication.process_update
roughly works in the default case, i.e. sequential processing (we simplified a bit by e.g. skipping some arguments of the involved methods):
asyncdefprocess_update(self,update):# self is the `Application` instanceforgroup_number,handlersinself.handlers.items():# `handlers` is the list of handlers in group `group_number`forhandlerinhandlers:ifhandler.check_update(update):# Here we `await`, i.e. we only continue after the callback is done!awaithandler.handle_update(update)break# at most one handler per group handles the update
We want to reply to bothUser A andUser B as fast as possible and while sending the reply to userUser A we'd like to already get started on handlingUpdate B.PTB comes with three built-in mechanisms that can help with that.
Via theblock
parameter ofHandler
you can specify thatApplication.process_update
should not wait for the callback to finish:
application.add_handler(MessageHandler(filters.TEXT&~filters.COMMAND,echo,block=False))
Instead, it will run the callback asasyncio.Task
viaasyncio.create_task
.Now, when theApplication
determined that theecho
function should handleUpdate A, it creates a new task fromecho(Update A)
.Immediately after that, it callsApplication.process_update(Update B)
and repeats the process forUpdate B without any further delay.Both replies are sentconcurrently.
Again, let's have a look at pseudocode:
asyncdefprocess_update(self,update):# self is the `Application` instanceforgroup_number,handlersinself.handlers.items():# `handlers` is the list of handlers in group `group_number`forhandlerinhandlers:ifhandler.check_update(update):# Here we *don't* `await`, such that the loop immediately continuesasyncio.create_task(handler.handle_update(update))break# at most one handler per group handles the update
This already helps for many use cases.However, by usingblock=False
in a handler, you can no longer rely on handlers in different groups being called one after the other.Depending on your use case, this can be an issue.Hence, PTB comes with a second option.
Instead of running single handlers in a non-blocking way, we can tell theApplication
to run the whole call ofApplication.process_update
concurrently:
Application.builder().token('TOKEN').concurrent_updates(True).build()
Now theApplication
will startApplication.process_update(Update A)
viaasyncio.create_task
and immediately afterwards do the same withUpdate B.Again, pseudocode:
whilenotapplication.update_queue.empty():update=awaitapplication.update_queue.get()asyncio.create_task(application.process_update(update))
This setting isindependent of theblock
parameter ofHandler
and withinapplication.process_update
concurrency still works as explained above.
You can further customize concurrent handlingApplication.process_update
also implement your own custom update processor by subclassing theBaseUpdateProcessor
interface class. Let's have a look at an example:
classMyUpdateProcessor(BaseUpdateProcessor):asyncdefdo_process_update(self,update,coroutine)->None:# This method is called for every updateifupdate.callback_query:awaitasyncio.sleep(5)awaitcoroutineasyncdefinitialize(self)->None:passasyncdefshutdown(self)->None:passApplication.builder().token('TOKEN').concurrent_updates(MyUpdateProcessor(10)).build()
The above code processes everycallback_query
update with a delay of 5 seconds for up to 10 updates simultaneously.The psuedocode for this now looks something like this:
whilenotapplication.update_queue.empty():update=awaitapplication.update_queue.get()coroutine=application.process_update(update)asyncio.create_task(my_update_processor.do_process_update(update,coroutine))
This is just an example of how to use theBaseUpdateProcessor
class to handle updates in the way you want, there are endless possibilities to this.For example, you can throttle update processing for specific users or ensure that inline queries are always processed sequentially.See thedocumentation for more information.
Note: The number of concurrently processed updates is limited (the limit defaults to 4096 updates at a time).This is a simple measure to avoid e.g. DDOS attacks
Handler.block
andApplication.concurrent_updates
allow running handler callbacks or the entirety of handling an update concurrently.In addition to that, PTB offersApplication.create_task
to run specific coroutine function concurrently.Application.create_task
is a very thin wrapper aroundasyncio.create_task
that adds some book-keeping that comes in handy for using it in PTB.Please consult the documentation ofApplication.create_task
for more details.
This wrapper gives you fine-grained control about how you use concurrency in PTB.The next section gives you an idea about why that is helpful.
Even thoughasyncio
is usually single-threaded, concurrent programming comes with a number of traps to fall into, and we'll try to give you a few hints on how to spot them.However, this wiki article does not replaceyour psychiatrist a university lecture on concurrency.
Probably the biggest cause of issues of concurrency are shared states, and those issues are hard to fix.So instead of showing you how to fix them, we'll show you how to avoid them altogether. More about that later.
A fair warning: In this section, we'll try to give you a simple talk (if that's possible) on a very complex topic.Many have written about it before, and we're certainly less qualified than most.As usual, we'll use an example to complement the text, and try to stay in the realm of what's important to you.
An example that is often used to illustrate this is that of a bank.Let's say you have been hired by a bank to write a Telegram bot to manage bank accounts. The bot has the command/transaction <amount> <recipient>
, and because many people will be using this command, you think it's a good idea to make this command run concurrently.You Some unpaid intern wrote the following (BAD AND DANGEROUS) callback function:
asyncdeftransaction(update,context):bot=context.botchat_id=update.effective_user.idsource_id,target_id,amount=parse_update(update)awaitbot.send_message(chat_id,'Preparing...')bank.log(BEGINNING_TRANSACTION,amount,source_id,target_id)source=bank.read_account(source_id)target=bank.read_account(target_id)source.balance-=amounttarget.balance+=amountawaitbot.send_message(chat_id,'Transferring money...')bank.log(CALCULATED_TRANSACTION,amount,source_id,target_id)bank.write_account(source)awaitbot.send_message(chat_id,'Source account updated...')awaitbot.send_message(chat_id,'Target account updated...')bank.write_account(target)awaitbot.send_message(chat_id,'Done!')bank.log(FINISHED_TRANSACTION,amount,source_id,target_id)application.add_handler(CommandHandler('transaction',transaction,block=False))
We skipped some of the implementation details, so here's a short explanation:
parse_update
extracts the user id's of the sender (source_id
) and receiver (target_id
) from the messagebank
is a globally accessible object that exposes the Python API of the banks operationsbank.read_account
reads a bank account from the bank's database into a Python objectbank.write_account
writes a bank account back to the bank's databasebank.log
must be used to keep a log of all changes to make sure no money is lost
Sadly,you that damn intern fell right into the trap.Let's say there are two morally corrupt customers,Customer A withAccount A andCustomer B withAccount B, who both make a transaction simultaneously.Customer A sendsTransaction AB of$10 toCustomer B.At the same time,Customer B sends aTransaction BA of$100 toCustomer A.
Now theApplication
starts two tasks,Task AB andTask BA, almost simultaneously.Both tasks read the accounts from the database with thesame balance and calculate a new balance for both of them.In most cases, one of the two transactions will simply overwrite the other.That's not too bad, but will at least be confusing to the customers.However, eachawait
gives control back to the event loop which may then continue another task.Hence, the following may occur:
- Task AB executes
bank.write_account(source)
and updatesAccount A with-$10 - Before updatingAccount B,Task AB sends two messages and during that time, the event loop continuesTask BA
- Task BA executes
bank.write_account(source)
and updatesAccount B with-$100 - Before updatingAccount A,Task BA sends two messages and during that time, the event loop continuesTask AB
- Task AB executes
bank.write_account(target)
and updatesAccount B with+$10 - WhenTask BA is resumed again, it executes
bank.write_account(target)
and updatesAccount A with+$100
In the end,Account A is at+$100 andAccount B is at+$10.Of course, this won't happen very often.And that's what makes this bug so critical.It will probably be missed by your tests and end up in production, potentially causing a lot of financial damage.
Note: This kind of bug is called arace condition and has been the source of many, many security vulnerabilities.It's also one of the reasons why banking software is not written by unpaid interns.
To be fair, you probably don't write software for banks (if you do, you should already know about this), but this kind of bug can occur in much simpler situations.While in this case, the shared state is thebank
object, it can take many forms.A database, adict
, alist
or any other kind of object that is modified by more than one task.Depending on the situation, race conditions are more or less likely to occur, and the damage they do is bigger or smaller, but as a rule of thumb, they're bad.
As promised in the first paragraph, let's discuss how to avoid such situations.That's not always as easy as it is in this case, but we're lucky:
- Our set of tools is very limited -
Application.create_task
is the onlyasyncio
tool we're using - Our goals are not very ambitious - we only want to speed up our I/O
There are two relatively simple steps you have to follow.First, identify those parts of the code thatmust run sequentially (the opposite ofin parallel orconcurrently).Usually, that is code that fitsat least one of these criteria:
- Modifies shared state
- Reads shared state andrelies on it being correct
- Modifies local state (e.g. a variable used later in the same function)
Make sure you have a good idea whatshared state meansDon't hesitate to do a quick Google search on it.
Let's go through our bank example line by line and note which of the criteria it matches:
asyncdeftransaction(update,context):bot=context.botchat_id=update.effective_user.id# 3source_id,target_id,amount=parse_update(update)# 3awaitbot.send_message(chat_id,'Preparing...')# Nonebank.log(BEGINNING_TRANSACTION,amount,source_id,target_id)# Nonesource=bank.read_account(source_id)# 2, 3target=bank.read_account(target_id)# 2, 3source.balance-=amount# 3target.balance+=amount# 3awaitbot.send_message(chat_id,'Transferring money...')# Nonebank.log(CALCULATED_TRANSACTION,amount,source_id,target_id)# Nonebank.write_account(source)# 1awaitbot.send_message(chat_id,'Source account updated...')# Noneawaitbot.send_message(chat_id,'Target account updated...')# Nonebank.write_account(target)# 1awaitbot.send_message(chat_id,'Done!')# Nonebank.log(FINISHED_TRANSACTION,amount,source_id,target_id)# Noneapplication.add_handler(CommandHandler('transaction',transaction,block=False))
Note:One could argue thatbank.log
modifies shared state.However, logging libraries are usually thread-safe and it's unlikely that the log has a critical functional role.It's not being read from in this function, and let's assume it's not being read from anywhere else in the code, so maybe consider this an exception to the rule.Also, for the sake of this example, it'd be boring if onlybot.sendMessage
would be safe to run in parallel.However, we will keep this in mind for the next step.
As you can see, there's a pretty obvious pattern here:bot.send_message
andbank.log
are not matching any criteria we have set for strictly sequential code.That means we can run this code asynchronously without risk.Therefore, the second step is to extract that code to separate functions and run only them concurrently.Since our async code parts are all very similar, they can be replaced by a single function.We could have done that before, but then this moment would've been less cool.
asyncdeflog_and_notify(action,amount,source_id,target_id,chat_id,message):bank.log(action,amount,source_id,target_id)awaitbot.send_message(chat_id,message)asyncdeftransaction(update,context):chat_id=update.message.chat_id# 3source_id,target_id,amount=parse_update(update)# 3context.application.create_task(log_and_notify(BEGINNING_TRANSACTION,amount,source_id,target_id,chat_id,'Preparing...', ),update=update )source=bank.read_account(source_id)# 2, 3target=bank.read_account(target_id)# 2, 3source.balance-=amount# 3target.balance+=amount# 3context.application.create_task(log_and_notify(CALCULATED_TRANSACTION,amount,source_id,target_id,chat_id,'Transferring money...' ),update=update )bank.write_account(source)# 1bank.write_account(target)# 1context.application.create_task(log_and_notify(FINISHED_TRANSACTION,amount,source_id,target_id,chat_id,'Done!', ),update=update )application.add_handler(CommandHandler('transaction',transaction,block=True))
Note: You might have noticed that we movedbank.log
beforebot.send_message
, so the log entries will be in ordermost of the time, assuming the database operations take long enough for the log to complete.
Note: It's likely thatbank.read_account
andbank.write_account
require some I/O operations to interact with the banks database.You see that it's not always possible to write code concurrently, at least with this simplified method. Read aboutTransactions to learn how databases solve this in "real life".
By separating the strictly sequential code from the concurrent code, we made sure that no race conditions can occur.Thetransaction
function won't be executed concurrently anymore, but we still managed to gain some substantial performance boost over completely sequential code, because the logging and user notification is now run in parallel.
That's basically it for now.For the very end, here are a few helpful guidelines for writing concurrent code:
- Avoid using shared state whenever possible
- Write self-contained (pure) functions
- When in doubt, make it sequential
Wiki ofpython-telegram-bot
© Copyright 2015-2025 – Licensed byCreative Commons
- Architecture Overview
- Builder Pattern for
Application
- Types of Handlers
- Working with Files and Media
- Exceptions, Warnings and Logging
- Concurrency in PTB
- Advanced Filters
- Storing data
- Making your bot persistent
- Adding Defaults
- Job Queue
- Arbitrary
callback_data
- Avoiding flood limits
- Webhooks
- Bot API Forward Compatiblity
- Frequently requested design patterns
- Code snippets
- Performance Optimizations
- Telegram Passport
- Bots built with PTB
- Automated Bot Tests