Asyncio Introduction
In this step, we’re gonna learn about cooperative multitasking in Python through the popular library asyncio
.
To begin, checkout the branch step-5-asyncio-intro
:
git checkout step-5-asyncio-intro
We’ll start at the first file 01-sync_tasks.py
.
Open up the file and take a look at the two synchronous python methods in the file.
There is a worker()
method, that simulates doing IO-bound work by calling time.sleep()
.
The main()
method calls a worker()
instance for each of the jobs that need to be done.
Go ahead and run this file:
python 01-sync_tasks.py
As you expect, of course, the workers execute the jobs synchronoushly. How can we use coroutines to execute these jobs concurrently?
Coroutines and asyncio
Coroutines in Python are methods that allow cooperative multi-tasking; a type of multi-tasking where the coroutines voluntarily yield control to other coroutines. Contrast this with preemptive multitasking, where a scheduler context-switches between different tasks or threads of execution involuntarily. Coroutines can be entered, exited and resumed at many different but deterministic points 1.
We can visualise the difference between cooperative multi-tasking and preemptive multitasking using this diagram:
Coroutines can be defined using the keyword async def
, and may contain synchronisation keywords like await
and yield
1.
asyncio
is a popular library for writing concurrent IO-bound code using the async
/await
syntax.
It allows you to schedule and run Python coroutines concurrently on an event loop, perform network IO, distribute tasks via queues and synchronise concurrent code 2.
await <awaitable>
: tells a coroutine to yield control while it awaits for the<awaitable>
, like another coroutine, to finish executingasync def
: defines a coroutine function
On a high level, asyncio
implements cooperative multitasking by scheduling coroutines on an event loop.
For our understanding, we model an event loop using a priority queue (left to right).
At a given moment, asyncio
will run the first READY
task on the queue:
Using Coroutines
Let’s turn our methods from 01-sync_tasks.py
into coroutines and run them on asyncio
’s event loop.
Open the file 02-async_tasks.py
.
We have turned the worker()
method into a coroutine using the async def
keyword.
We also log the start time and end time of the job execution so we can better understand how long tasks take.
We have also turned our main()
function into an async
coroutine.
In main()
, we await
two tasks, “order milk” and “order bread”, taking 1 and 2 seconds, respectively.
Finally, on L23, we start the execution by running main()
on the event loop.
The program terminates when the event loop doesn’t have any more tasks to run.
What do you expect to happen when we run this program? Take a moment to think about the order of lines you expect to be printed to stdout.
Now, let’s run the program:
python 02-async_tasks.py
As you can see, the coroutines executed sequentially. And the tasks took 3s in total. Why is that?
Let’s take a look at the order of events:
On L14, we schedule and run worker(1, "order milk")
, let’s call it task1
.
Because of the await
on L14, the main()
coroutine is paused there and yields control until task1
is done executing.
Once the await asyncio.sleep(delay)
on L9 is done, task1
is resumed, prints its final message to stdout and signals that it’s done.
This resumes the next ready coroutine on the event loop which is main()
and advances the execution to L15.
Again, we schedule and run this task, which await
s the sleep timer, and once done, main()
is resumed again, and signals its end.
As there are no more tasks on the event loop, the program terminates.
So how do we make these coroutines run concurrently?
Comment out L14-15 and uncomment L20.
Now re-run this file.
You’ll notice that now, the tasks take 2s instead of 3s.
How does asyncio.gather()
achieve this?
Concurrent Coroutines
Open the file 03-scheduling-tasks
.
To understand better how Tasks
are scheduled and run on the event loop, we modify the example from 02-async_tasks.py
to schedule then await
our tasks.
TIP
In
asyncio
terminology, a Task is a future-like object that runs a coroutine on an event loop 3.
To do this, we use the asyncio.create_task()
which creates a Task
and schedules its execution.
So on L18-L19, we have scheduled both task1
and task2
.
Then on L23, we await task1
.
So what order of execution do we expect now? Run this file and observe the order of steps:
python 03-scheduling_tasks.py
How does this happen? Here is a list of steps, showing the pseudo-state of the event-loop after each step.
T0: task1 scheduled, task2 scheduled, main()
yields control at L23.
Event loop:
T1: task1 prints its start time, yields control as it awaits the sleep on L9. Event loop:
T2: task2 prints its start time, yields control as it awaits the sleep on L9. Event loop:
T3: the next ready task is task1, it prints its end time and terminates, which makes main ready. Event loop:
T4: the next ready task is main, its execution advances to the next await on L24. Event loop:
T5: task2 prints its end time and terminates, which makes main ready. Event loop:
T6: finally, main resumes and terminates, the event loop is now empty. execution resumes at L24 and the program terminates.
Test Your Understanding
What happens if
task1
takes 2 minutes, andtask2
takes 1 minute?How about if happens if
task1
takes 0 minutes, andtask2
takes 2 minutes? What order do we expect?
Now, we can go back to asyncio.gather()
in 02-async_tasks.py
.
It has achieved concurrency by scheduling all the tasks passed to it first, before awaiting them.
Task Groups
Now that we understand how scheduling works, we can look at another useful alternative for scheduling and running tasks concurrently introduced in Python3.11
, known as asyncio.TaskGroup
.
It provides stronger safety guarantees than gather()
for scheduling a nesting of subtasks by cancelling the remaining scheduled tasks if one task raises an exception 4.
Let’s look at how it’s used.
Open 04-task_groups.py
.
We have updated our main()
coroutine to create a TaskGroup
using an async with
(async context manager).
We’ve named it tg
and scheduled our tasks on it, instead of using asyncio.create_task()
with no particular group association.
Note that here, the await
is implicit when the context manager exists 5.
Run the file and play around with the delay values:
python 04-task_groups.py
Using async for
Open the file 05-async_for.py
.
This file introduces new syntax async for
. How does it work?
It is used to loop over an asynchronous iterable
.
An asynchronous iterator
is an object that implements __aiter__()
and __anext__()
, the asynchronous counterparts to __iter__()
and __next__()
6.
Go ahead and run the file:
python 05-async_for.py
You’ll notice that each loop iteration executes when the async generator has produced a new value, which happens at intervals that are 1s apart. This can be useful so that you can schedule other work while waiting for values to be produced.
As an example, uncomment L27 and the coroutine function deliver_order()
and re-run the file.
You can see that we can schedule other work to be done while waiting on the async generator.
Using Queues
The last concept we will look at today is asyncio.Queue
.
In our example 06-customers-shops
, we create two sets of tasks:
-
customers
: who place orders on the queuethe coroutine
place_order
places a random number of itemsnum_items
, 1 at a time, on the queue, with some browsing time in between. the coroutine terminates once allnum_items
have been placed. -
shops
: who process orders from the queuethe coroutine
process_orders
runs indefnitely,await
s an item to be available on the queue, processes the item, then marks the order (task) as done.
In main, we await
customers
to place all their orders, during which time some orders may be processed.
We then await
the queue to drain, which waits for all orders to be processed.
Note that awaiting shops instead will cause our program to run indefinitely:
await asyncio.gather(*shops)
This is because the coroutines shops
run indefinitely.
Instead, we await q.join()
then call shop.cancel()
on each of the shop tasks to terminate them.
What happens if main()
terminates while shops
tasks are still running?
When main()
terminates, asyncio.run()
on L88 will be done, and it will cancel the remaining tasks on the event loop:
This raises the asyncio.exceptions.CancelledError
exception, which we have chosen to handle on L64.
References