mdk.fr/content/blog/asyncio-0.md

482 lines
14 KiB
Markdown

Lang: en
Title: Python coroutines with async and await
This is a short introduction of the basic vocabulary and knowledge
needed to start with `async` and `await`, we'll go from "scratch", but
not from the whole "let's use generators" era, when everything
started, but from a contemporary (Python 3.5) point of view with
`natives coroutines`, `async`, `await`.
# Coroutine
A [coroutine](https://en.wikipedia.org/wiki/Coroutine) is a function
whose execution can be suspended.
Coroutines are a great tool to avoid the callback hell, still offering
a non-blocking way to express ourselves.
When a function has to wait for something, suspending it instead
of blocking allows to do something else. In other words, it permits
concurrency (without involving threads or other processes).
Without coroutines there are two solutions: Block or use callbacks.
A typical callback example in a imaginary language may look like:
function pong_handler(client)
{
client.on('data', function (data)
{
client.on('data_written', function ()
{
client.close()
});
client.write(data)
client.flush()
});
}
With coroutines, it would look like:
async function pong_handler()
{
client.write(await client.read())
await client.flush()
client.close()
}
## Coroutines in Python
To be exhaustive, there are two kinds of coroutines in Python:
- generator-based coroutines
- native coroutines
Generator-based coroutines, are the old-style ones, but you may
enconter some of them, they are written using the `@types.coroutine`
(or `@asyncio.coroutine`) decorator:
:::python
@types.coroutine
def get_then_print(url):
...
Native coroutines, the ones you should remember, are defined using the
`async` keyword:
:::python
async def get_then_print(url):
...
A `coroutine function`, when called returns a `coroutine object`:
:::pycon
>>> async def tum():
... print("tum")
...
>>> tum()
<coroutine object tum at 0x7fa294538468>
From this `coroutine object` the `coroutine function` can be manipulated:
:::pycon
>>> async def tum():
... print("tum")
...
>>> a_coroutine_object = tum()
>>> a_coroutine_object.send(None)
tum
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
As you can see, calling `tun()` did not execute the `print("tum")`,
but calling `.send(None)` on the coroutine object did (see PEP 342). Also, `send`
told us the coroutine is complete by [throwing a
`StopIteration`](https://www.python.org/dev/peps/pep-0342/).
## Coroutine invocation
As a coroutine just return a coroutine object when called, at some
point, some code have to call `send(None)` on it. But that's typically
the role of a "main loop". The main loop will also watch for
`StopIteration`.
## Getting result from a coroutine call
A coroutine give its returned value in a `StopIteration` exception,
meaning "Job is done, stop trying to un-suspend me", but typically a
scheduler / event loop will do that for you and provide the result
by one way or another.
There's also a keyword dedicated to pull a value from a coroutine:
`await`. Given an `awaitable`, `await` tries to get a value from it,
and if the `awaitable` suspends, `await` also suspends the current
coroutine, and so on, up to the `.send()` caller.
As long as no coroutine suspends themselves, there is no need call
`send(None)` a second time to restore the coroutines. So, there's no
need for a loop or a scheduler, simply calling `send(None)` once is
enough:
:::python
async def two():
return 2
async def four():
return await two() + await two()
async def eight():
return await four() + await four()
coro = eight()
coro.send(None)
Which gets `StopIteration: 8`, in a completly synchronous way, despite
the vocabulary used.
## Suspending a coroutine
This is just not possible for a coroutine to suspend itself. But the
whole chain of `await` can be suspended using a `yield` in a
"future-like object".
A `future-like` object is an object with an `__await__` method, which
can `yield` a value which will traverse the whole `await` chain up to
the caller of `.send(None)`, at this point, an event loop, a
scheduler, a tranpoline, whatever it's called need to understand why
the `future-like object` did it by reading the value received from the
`send(None)` call, act accordingly, like restoring another coroutine,
wait for an event on the network, whatever.
# Awaitables
[awaitable](https://www.python.org/dev/peps/pep-0492/#await-expression)
are objects that can be `awaited` with an `await` keyword.
Basically a `coroutine` or an object with an `__await__` method
returning an iterator are both awaitables.
# Managing coroutines
The Python library provide `asyncio` which contains an event loop
which take the scheduler role between coroutines, allowing them to
suspend, trying to understand why, and restoring them at the right
time.
## Coroutine manager, step 1
The first step for a coroutine manager is probably to call
`send(None)`, which is enough to do some work:
:::python
async def two():
return 2
async def four():
return await two() + await two()
async def eight():
return await four() + await four()
def coro_manager(coro):
try:
coro.send(None)
except StopIteration as stop:
return stop.value
print(coro_manager(eight()))
# prints 8
But this `coro_manager` will just don't terminate the work if a
suspension occurs:
:::python
class Awaitable:
def __await__(self):
yield
async def wont_terminate_here():
await Awaitable()
print("Terminated")
return 42
print(coro_manager(wont_terminate_here()))
# prints 'None', but no "Terminated" to be seen
So a `coro_manager` should probably restore a suspended coroutine, by
calling back `send(None)`, like this `frenetic_coro_manager`:
:::python
def frenetic_coro_manager(coro):
try:
while True:
coro.send(None)
except StopIteration as stop:
return stop.value
We're now getting "Terminated" followed by "42".
But here, this `frenetic_coro_manager` can only execute a single coroutine,
blindly restoring it if it suspends. Here is an implementation working
with a list of coroutines instead of a single one, restoring a random
one:
:::python
def frenetic_coro_manager(*coros):
coros = list(coros)
while coros:
coro = random.choice(coros)
try:
coro.send(None)
except StopIteration as stop:
coros.remove(coro)
Still print "Terminated" followed by "42", but can work with multiple
coroutines, like:
async def tum():
while True:
await Awaitable()
print("Tum")
async def pak():
while True:
await Awaitable()
print("Pak")
frenetic_coro_manager(tum(), pak())
This time, this frenetically print "Tum"s and "Pak"s. But the
interesting point is that both coroutines co-existed, by co-executing
their `while True`.
Here, `await Awaitable()` is only usefull to give the hand back to the `frenetic_coro_manager`, but it awaits nothing, which is semantically… empty.
But it's possible for the `yield` of the `awaitable object` to yield
something usefull to the coroutine manager, like an ETA before calling
`send` again:
:::python
class Sleep:
def __init__(self, seconds):
self.sleep_until = datetime.now() + timedelta(seconds=seconds)
def __await__(self):
yield self.sleep_until
async def tum():
while True:
await Sleep(1)
print("Tum")
async def pak():
while True:
await Sleep(2)
print("Pak")
def measured_coro_manager(*coros):
coros = [(datetime.now(), coro) for coro in coros]
heapq.heapify(coros)
while coros:
exec_at, coro = heapq.heappop(coros)
if exec_at > datetime.now():
sleep((exec_at - datetime.now()).total_seconds())
try:
exec_at = coro.send(None)
except StopIteration as stop:
pass
else:
heapq.heappush(coros, (exec_at, coro))
print(measured_coro_manager(tum(), pak()))
Gives:
Tum
Pak
Tum
Tum
Pak
Tum
Tum
Pak
In this implementation, the "main loop" `measured_coro_manager` waited
gracefully as specified by `Sleep` before calling `send(None)`.
From this point we can go further by playing with the [dining philosophers problem](https://en.wikipedia.org/wiki/Dining_philosophers_problem):
:::python
import heapq
from time import sleep
from datetime import datetime, timedelta
class Sleep:
"""Basic Sleep implementation, tightly paired with `measured_coro_manager`
which expect each `yield` to give an absolute datetime of revival.
"""
def __init__(self, seconds):
self.sleep_until = datetime.now() + timedelta(seconds=seconds)
def __await__(self):
yield self.sleep_until
class ForkTaking:
"""Asynchronous context manager describing the action of taking a
fork. The fork may be a context manager by itself, but it's nice
to get verbose logging by knowing which is taking what.
"""
def __init__(self, fork, philosopher):
self.philosopher = philosopher
self.fork = fork
async def __aenter__(self):
self.philosopher.speak("Need fork {}".format(self.fork.number))
while self.fork.held_by:
# self.philosopher.speak("Need fork {}".format(self.fork.number))
await Sleep(.1)
self.fork.held_by = self.philosopher
self.philosopher.speak("Took fork {}".format(self.fork.number))
return self
async def __aexit__(self, exc_type, exc, traceback):
self.philosopher.speak("Release fork {}".format(self.fork.number))
self.fork.held_by = None
class Fork:
"""Represent a fork a Philosopher can grab and release via
an asynchronous context manager.
"""
def __init__(self, number):
self.number = number
self.held_by = None
def take(self, philosopher):
return ForkTaking(self, philosopher)
def __str__(self):
return "<Fork {}>".format(self.number)
class Philosopher:
"""Having a reference to a left fork and a right fork so he can grab them,
the philosopher thinks and eat.
"""
def __init__(self, number, left_fork, right_fork):
self.number = number
print("Hello, I'm philosopher {} with forks {} and {}".format(
number, left_fork, right_fork))
self.left_fork = left_fork
self.right_fork = right_fork
self.hungry_since = None
def __str__(self):
return "<Philosopher {}>".format(self.number)
def speak(self, message):
print(' ' * self.number * 20, message)
async def behave(self):
"""Basic implementations of rules:
- think
- hungry, grab forks
- eat
in a loop.
"""
if self.left_fork.number > self.right_fork.number:
first_fork, second_fork = self.left_fork, self.right_fork
else:
first_fork, second_fork = self.right_fork, self.left_fork
while True:
self.speak("*thinking*")
await Sleep(.5)
self.hungry_since = datetime.now()
self.speak("I'm hungry!")
async with first_fork.take(self):
await Sleep(0)
async with second_fork.take(self):
self.hungry_since = None
self.speak("*eating*")
await Sleep(.5)
class Table():
"""The table is responsible of creating Forks and Philosophers,
and distributing forks to them, what a powerfull table.
"""
def __init__(self):
self.forks = [Fork(i) for i in range(5)]
self.philosophers = []
for i in range(5):
self.philosophers.append(Philosopher(
i,
self.forks[i - 1],
self.forks[i]))
async def check_life(philosophers):
while True:
await Sleep(1)
for philosopher in philosophers:
if philosopher.hungry_since is None:
continue
if philosopher.hungry_since < datetime.now() - timedelta(seconds=60):
print("Philosopher {} is dead".format(philosopher))
def measured_coro_manager(*coros):
coros = [(datetime.now(), i, coro) for i, coro in enumerate(coros)]
i = len(coros)
heapq.heapify(coros)
while coros:
exec_at, _, coro = heapq.heappop(coros)
now = datetime.now()
if exec_at > now:
sleep((exec_at - now).total_seconds())
try:
exec_at = coro.send(None)
except StopIteration as stop:
pass
else:
i += 1
heapq.heappush(coros, (exec_at, i, coro))
table = Table()
measured_coro_manager(check_life(table.philosophers),
*[philosopher.behave() for
philosopher in
table.philosophers])
And you can play with this a lot, typically in the `behave` method,
you can remove the fork order and always start by taking the left
fork, you can add or remove, increment or decrements `Sleep`s, and so
on...
# I/O
Now, the next step is almost obviously networking: Why not rewriting
`measured_coro_manager` so it can not only sleep by also use `select`,
`poll` to wait for network event, and combine this with asnchronous
`read` and `write` syscalls? That's exactly what's `asyncio` is, see
`selector_events.py` in the `asyncio` module. I hope now the
separation between `asyncio`, `async` and `await` is clearer.