Asyncio, the concurrent Python programmer’s dream, write borderline synchronous code and let Python work out the rest, it’simport antigravity all over again…
Except it isn’t quite there yet, concurrent programming is hard and while coroutines allow us to avoid callback hell it can only get you so far, you still need to think about creating tasks, retrieving results and graceful handling of errors. Sad face.
Good news is all of that is possible inasyncio. Bad news is it’s not always immediately obvious what wrong and how to fix it. Here are a few patterns I’ve noticed while working withasyncio.
Really quick before we start, I’ll be using the lovelyaiohttp library to make asynchronous HTTP requests and theHacker News API because it’s simple and a well-known site that follows a familiar use case. I’ll also be using theasync/await
syntax introduced in Python 3.5, following feedback from myprevious article, I will assume the reader is familiar with the concepts described there. And finally all examples are available inthis article’s GitHub repo.
Right, let’s get started!
Creating and scheduling tasks is trivial inasyncio. The API includes several methods in theAbstractEventLoop
class and well as functions in the library for that purpose. But usually you want to combine the results of those tasks and process them in some way, recursion is a perfect example of this pattern and also shows off the simplicity of coroutines versus other means of concurrency.
A common use case forasyncio is to create some kind of webcrawler. Imagine we’re just too busy to check HackerNews, or maybe you just like a good ol’ flamewar, so you want to implement a system that retrieves the number of comments for a particular HN post and if it’s over a threshold notify you. You google for a bit and find the HN API docs, just what we need, however you notice this in its docs:
Want to know the total number of comments on an article? Traverse the tree and count.
Challenge accepted!
[14:47:32] > Calculating comments took 2.23 seconds and 73 fetches
[14:47:32] -- Post 8863 has 72 comments
Let’s skip the boilerplate and go directly to the recursive coroutine, notice it reads almost exactly as it would in synchronous code:
This is a perfect example of what Brett Slatkin describes asfan-in and fan-out, wefan-out to retrieve the data for the descendants andfan-in reducing the data retrieved to calculate the number of comments.
Asyncio’s API has a couple of ways to perform this fan-out operation, here I’m usinggather
which effectively waits until all coroutines are done and returns a list of their results.
Note how using a coroutine also fits nicely with recursion since at any one point there are any number coroutines awaiting their responses on thegather call and resuming execution after the I/O operation is done and allowing us to express a fairly complicated behaviour in a single nice and readable coroutine.
“Too easy”, you say? Ok, let’s turn in up a notch.
Imagine you want to send yourself an email with any posts with a number of comments above a certain threshold, and you’d like to do that as we traverse the tree of posts. We could simply add anif statement at the end of our recursive function to do so:
[09:41:02] Post logged
[09:41:04] Post logged
[09:41:06] Post logged
[09:41:06] >Calculating comments took 6.35 seconds and 73 fetches
[09:41:06] -- Post 8863 has 72 comments
That is quite slower than before! The reason is that, as we discussed before,await suspends the coroutine until the future is done, but since we don’t need the result of the logging there’s no real reason to do so.
We need tofire and forgetour coroutine, but since we can’tawait
it, we need another way to schedule the execution of the coroutine without waiting on it. A quick look at the asyncio API yieldsensure_future
, which will schedule a coroutine to be run, wrap it in a Task object and return it. Remember that, once scheduled, the event loop will yield control to our coroutine at some point in the future when another coroutineawaits. Cool, let’s swapawait log_post
with it then:
[09:42:57] > Calculating comments took1.69 seconds and 73 fetches
[09:42:57] -- Post 8863 has 72 comments
[09:42:57]Task was destroyed but it is pending!
task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1109197f8>()]>>
[09:42:57]Task was destroyed but it is pending!
task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919948>()]>>
[09:42:57]Task was destroyed but it is pending!
task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919978>()]>>
Ah, the dreadedTask was destroyed but it is pending!
, haunting users ofasyncio around the globe. Good news is we’re back to the times we were getting before (1.69 secs), the bad news isasynciois not liking out fire-and-forget.
The problem is that we’re forcible closing the loop right after thepost_number_of_comments
coroutine returns, leaving ourlog_post
tasks no time to complete.
We have two options, we either let the looprun forever and manually abort the script, or use theall_tasks
Task class method to find any pending tasks and await the once we’re done calculating comments. Let’s try that out with a quick change after our call topost_number_of_comments
:
[09:47:29] > Calculating comments took 1.72 seconds and 73 fetches
[09:47:29] — Post 8863 has 72 comments
[09:47:30] Post logged
[09:47:31] Post logged
[09:47:32] Post logged
Now we’re ensuring the logging tasks complete. Relying onall_tasks
works fine in situations where we have a good idea of what tasks are due to be executed in our event loop, but on more complex examples there could be any number of tasks pending whose origin might not even be in our code.
Another approach is to clean up after ourselves by registering any and all coroutineswe’ve scheduled and allow the pending ones to be executed once we’re done calculating comments. As we knowensure_future
returns aTask object we can use to register our low priority tasks. Let’s simply define atask_registry
list and store the futures in it:
[09:53:46] > Calculating comments took 1.68 seconds and 73 fetches
[09:53:46] — Post 8863 has 72 comments
[09:53:46] Post logged
[09:53:48] Post logged
[09:53:49] Post logged
The lesson here isasyncio should not be treated as a distributed job queue such asCelery, it all runs under a single thread and the event loop needs to be managed accordingly allowing it time to complete the tasks.
Which leads to another common pattern:
Continuing with our HN example, and since we’ve done such a great job already, we’ve decided it’s absolutely crucial to calculate the number of comments of stories in HN as they appear and while they’re at the top 5 new entries.
A quick look at the HN API reveals an endpoint that returns the 500 latest stories, perfect, so we could simply poll that endpoint to retrieve new stories and calculate the number of comments on them every, say, five seconds.
Right, so since we are going to poll periodically we can just use an infinitewhile
loop,await
the polling task andsleep
for the period of time necessary. I’ve added a few minor changes so as to retrieve the top stories instead of a specific post’s URL:
[10:14:03] Calculating comments for top 5 stories. (1)
[10:14:06] Post 13848196 has 31 comments (1)
[10:14:06] Post 13849430 has 37 comments (1)
[10:14:06] Post 13849037 has 15 comments (1)
[10:14:06] Post 13845337 has 128 comments (1)
[10:14:06] Post 13847465 has 27 comments (1)
[10:14:06] > Calculating comments took 2.96 seconds and 244 fetches
[10:14:06] Waiting for 5 seconds…
[10:14:11] Calculating comments for top 5 stories. (2)
[10:14:14] Post 13848196 has 31 comments (2)
[10:14:14] Post 13849430 has 37 comments (2)
[10:14:14] Post 13849037 has 15 comments (2)
[10:14:14] Post 13845337 has 128 comments (2)
[10:14:14] Post 13847465 has 27 comments (2)
[10:14:14] > Calculating comments took 3.04 seconds and 244 fetches
[10:14:14] Waiting for 5 seconds…
Nice, but there’s a slight problem: if you notice the timestamps it’s not strictly running the task every 5 seconds, it runs it 5 secondsafterget_comments_of_top_stories
finishes. Again a consequence of usingawait
and blocking until we get our results back. Which might not be a problem except in cases where the task takes longer than five seconds. Also, it feels kind of wrong to userun_until_complete
on a coroutine designed to be infinite.
Good news is we’re experts onensure_future
now, instead of awaiting we can just slap that guy in and …
[10:55:40] Calculating comments for top 5 stories. (1)
[10:55:40] > Calculating comments took 0.00 seconds and 0 fetches
[10:55:40] Waiting for 5 seconds…
[10:55:43] Post 13848196 has 32 comments (1)
[10:55:43] Post 13849430 has 48 comments (1)
[10:55:43] Post 13849037 has 16 comments (1)
[10:55:43] Post 13845337 has 129 comments (1)
[10:55:43] Post 13847465 has 29 comments (1)
[10:55:45] Calculating comments for top 5 stories. (2)
[10:55:45] > Calculating comments took 0.00 seconds and 260 fetches
[10:55:45] Waiting for 5 seconds…
[10:55:48] Post 13848196 has 32 comments (2)
[10:55:48] Post 13849430 has 48 comments (2)
[10:55:48] Post 13849037 has 16 comments (2)
[10:55:48] Post 13845337 has 129 comments (2)
[10:55:48] Post 13847465 has 29 comments (2)
O… k… good news is the timestamps are spaced exactly by five seconds, but what’s with the zero seconds and no fetches? And then the next iteration took zero seconds and 260 fetches?
This is one of the consequences of moving away fromawait
, since we’re not blocking anymore the coroutine simply moves to the next line which prints the zero seconds and, the first time, zero fetches message. These are fairly trivial problems since we can live without the messages, but what if we needed the result of the task?
Then, my friend, we’d need to resort to…callbacks(shudder)
I know, I know, the whole point of coroutines is to not use callbacks, but this is why the article’s dramatic subtitle is “Beyond await”. We’re not inawait-land anymore, we’re adventuring into manually scheduling tasks to meet our use case. Do you have what it takes? (hint: it’s not that bad)
As we discussed beforeensure_future
returns aFuture
object to which we can add a callback to usingadd_done_callback
.
Before we do that, and in order to have proper counting of fetches we’re going to have to encapsulate ourfetch
coroutine into a class calledURLFetcher
, then make an instance per task to have proper fetch counting, also removing that global variable that was bugging me anyway:
[12:23:40] Calculating comments for top 5 stories. (1)
[12:23:40] Waiting for 5 seconds...
[12:23:43] Post 13848196 has 38 comments (1)
[12:23:43] Post 13849430 has 72 comments (1)
[12:23:43] Post 13849037 has 19 comments (1)
[12:23:43] Post 13848283 has 64 comments (1)
[12:23:43] Post 13847465 has 34 comments (1)
[12:23:43] > Calculating comments took 3.17 seconds and 233 fetches
[12:23:45] Calculating comments for top 5 stories. (2)
[12:23:45] Waiting for 5 seconds...
[12:23:47] Post 13848196 has 38 comments (2)
[12:23:47] Post 13849430 has 72 comments (2)
[12:23:47] Post 13849037 has 19 comments (2)
[12:23:47] Post 13848283 has 64 comments (2)
[12:23:47] Post 13847465 has 34 comments (2)
[12:23:47] > Calculating comments took 2.47 seconds and 233 fetches
[12:23:50] Calculating comments for top 5 stories. (3)
[12:23:50] Waiting for 5 seconds...
Right, that’s better, but let’s focus on the callback section:
Notice thecallback
functionneeds to accept a single argument, the future it’s assigned to. We’re also returning the fetch count from theURLFetcher
instance as a result ofget_comments_of_top_stories
and retrieving it as the result of the future.
See? I told you it wasn’t that bad, but it’s noawait
that’s for sure.
While we’re on the subject of callbacks, on your inevitable journeys into the asyncio API docs you’re likely to find a couple of methods inAbstractBaseLoop
by the name ofcall_later
and its cousincall_at
which sound like something useful to implement periodic coroutines. And you would be right, it can be used, we just need make a couple of changes :
Resulting in a similar output as before. Notice a few changes though:
main
entry point coroutine, instead…poll_top_stories_for_comments
is now a standard function that schedules tasks. Due to this we moved the creation of the aiohttp.ClientSession toget_comments_of_top_stories
which is our first coroutine. Note this is simply to work aroundaiohttp
’s requirement ofasync with
when creating as session. The important bit is…ensure_future
to schedule the coroutine and then the function scheduling itself for later execution. One could argue this approach isless explicit. Finally…poll_top_stories_for_comments
uses the loop to schedule itself we obviously have to use torun_forever
for the loop to always be running.OK, that’s all fine and dandy, but what if, God forbid, our connection broke in the middle of a task? What would happen to our lovely system? Let’s simulate that by introducing a raised exception after a number of URL fetches:
[12:51:00] Calculating comments for top 5 stories. (1)
[12:51:00] Waiting for 5 seconds…
[12:51:01] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion(‘BOOM!’,)>) at 05_periodic_coroutines.py:121
handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion(‘BOOM!’,)>) at 05_periodic_coroutines.py:121>
Traceback (most recent call last):
File “/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line 126, in _run
self._callback(*self._args)
File “05_periodic_coroutines.py”, line 122, in callback
fetch_count = fut.result()
File “05_periodic_coroutines.py”, line 100, in get_comments_of_top_stories
results = await asyncio.gather(*tasks)
File “05_periodic_coroutines.py”, line 69, in post_number_of_comments
response = await fetcher.fetch(session, url)
File “05_periodic_coroutines.py”, line 58, in fetch
raise Exception(‘BOOM!’)
Exception: BOOM!
[12:51:05] Calculating comments for top 5 stories. (2)
[12:51:05] Waiting for 5 seconds…
Not exactly graceful, is it?
What to do? Head over to the next part of this series where I explore the options we have for error handling and other issues:Asyncio Coroutine Patterns: Errors and Cancellation.
A place to read and write about all things Python. If you want to become a writer for this publication then let me know.
Freelance Software Engineer, London, UK. Twitter: @yera_ee