Async-IO
Awaitables
There are three types of awaitables
A coroutine Function, or a coroutine Object
asyncio Task, which is future-like object
asyncio Future
Coroutines
A Coroutine Function is declared with the async keyword, and takes all kinds of parameters as normal functions do.
A Coroutine Object is created by the invocation of a Coroutine Function with specific arguments
import asyncio
# Coroutine Function
async def ajob(seconds):
# invoking and awaiting a Coroutine Function from the asyncio lib
await asyncio.sleep(seconds)
return "success"
# a Coroutine Function is awaitable.
# This will return and print "success" after 2 seconds
print(await ajob(2))
# Calling a Coroutine Function returns a Coroutine Object
# Creating a Coroutine object doesn't start or run it
job = ajob(1)
# A coroutine Object can be awaited
# This will return and print "success" after 1 second
print(await job)
Behavior and Usage
Creating a Coroutine Object (by Invoking a Coroutine Function) doesn’t actually run it
A coroutine object can be started (or awaited) in one of two ways
to be directly awaited via the
awaitkeyword. When awaited, the caller will pause and yield control to the event loop until a result is returned, or an exception is raisedTo be passed to
create_task(),gather(),sheild(),wait_for()and when passed to and iterating over the result ofas_completed()
A coroutine object can be awaited (or executed) only once. A second await will trigger an exception
If the coroutine raises an exception, it will propagate over to the caller.
Should avoid passing Coroutines directly to
wait()as this is deprecated
asyncio Tasks
A task is mainly and directly created by calling the asyncio.create_task(coro) function passing a coroutine object argument.
When a task is created, the wrapped coroutine is automatically started as soon as possible. So, even if a task is not awaited, the wrapped coroutine is started as soon as possible.
A task can (optionally) be awaited to wait for and retrieve the result (or the raised exception) of the wrapped coroutine object
import asyncio
async def ajob(seconds):
await asyncio.sleep(seconds)
return 2 * seconds
# create a task, wrapping and starting the coroutine
t1 = asyncio.create_task(ajob(1))
# create another task, wrapping and starting another coroutine
t2 = asyncio.create_task(ajob(2))
# awaiting the tasks and returning their results
r1 = await t1 # r1 = 2
r2 = await t2 # r2 = 4
Behavior and Usage
A Task can be created directly by the
create_task(coro)function, passing a coroutine objectA Task can be created indirectly by passing a Coroutine to:
gather()shield()as_completed()
A task is automatically scheduled to start as soon as possible once created
When the task is awaited:
If the task hasn’t completed yet, the called will wait (or block) until the execution completes or an exception is raised
If it had already completed, it will return the result, or propagate the exception.
If the task hasn’t yet completed, it can be cancelled via
asyncio.Task.cancel()which may be able to cancel the task before completion.If it does, it will raise
CancelledErrorwithin the coroutine, and to the caller if the cancelled task is awaitedpassing a coroutine directly to
wait()is deprecated and should be avoidedgetting the Task result via Task.result() directly should be avoided as it may raise an InvalidState Exception. A Task should be awaited instead
asyncio Future
A Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation.
A Future object is mainly created and returned by
the
asyncio.gather(*awaitables)function with a sequence of awaitable objects (coroutines, tasks and futures)The
shield(awaitable)which wraps an awaitable with another awaitable that if cancelled, shields the wrapped awaitable from cancellation
If one or more of the awaitables passed to gather(*aws) or shield(awaitable) are coroutines, they are automatically scheduled as tasks.
A Gather-Future can (optionally) be awaited to wait for all the provided awaitables and retrieve their results
import asyncio
async def square(num):
await asyncio.sleep(0.1)
return num ** 2
some_awaitables = [
square(2), # a coroutine
square(3), # a coroutine
asyncio.create_task(square(4)), # a Task
asyncio.gather(square(5), square(6)) # a Future
]
# Returns a Future, which is awaitable
gather_future = asyncio.gather(*some_awaitables)
# awaiting the Future will wait for all awaitables to complete, and print
# their return values
print(await gather_future) # prints [4, 9, 16, [25, 36]]
High Level asyncio functions
create_task(coroutine)
Returns a Task Object
This function takes a coroutine and starts it automatically, and returns a Task that can be awaited, or cancelled.
Only accepts coroutine objects, and not other awaitables, namely Future or Task.
gather(*awaitables, return_exceptions=False)
Returns a Future Object
takes one or more awaitables and returns a Future object, that can (optionally) be
awaited to return all results of each awaitable,or
Cancelled, which attempts to cancel all the provided awaitables
if return_exceptions=False, any raised exception in any of the awaitables will be propagated to the caller if and when gather() is awaited.
Otherwise, the raised exception object will simply be returned as the awaitable result.
If a provided awaitable is a coroutine, it is automatically scheduled and run as a Task
shield(awaitable)
Returns a Future Object
Wraps an awaitable with another awaitable that if cancelled, shields the wrapped awaitable from cancellation.
If the provided awaitable is a coroutine, it is automatically scheduled and run as a Task
wait_for(awaitable, timeout)
Returns a Coroutine
wait for the awaitable to be complete with a timeout. if a timeout occurs, the awaitable will be cancelled. and an asyncio.TimeoutError is raised for the caller
if the awaitable is a coroutine, it will be scheduled and run as a Task
if awaited, it will return the result of the wrapped awaitable
timout can be None or a float seconds until timeout
wait(*awaitables,timout=None,return_when=ALL_COMPLETED)
Returns a tuple of (done,pending) two instances of set() of awaitables, which are instances of either Task or Future, depending on the provided awaitables
Takes one or more awaitables, and run and/or wait for each concurrently and block until the condition specified by return_when.
ALL_COMPLETED
FIRST_COMPLETED
FIRST_EXCEPTION
Unlike wait_for(), wait() does not cancel the futures when a timeout occurs.
Coroutines should not be passed directly to wait()
as_completed(*awaitables, Timeout=None)
Return an iterator (a normal generator) of coroutines.
Run awaitable objects in the aws iterable concurrently.
Each coroutine returned can be awaited to get the earliest next result from the iterable of the remaining awaitables.
Raises asyncio.TimeoutError if the timeout occurs before all Futures are done.
if an awaitable is a coroutine, it will be scheduled and run as a Task when iteration of the
returned iterator begins
Running Functions from context to another.
Sometimes we need to run blocking code, or CPU intensive code off the asyncio loop, initiated from within the asyncio loop. Other times we need to do the opposite: run code on the event loop, which is initiated from outside the event loop. Another way of phrasing it is the idea of running blocking code from within a non-blocking context, and vice-versa: run asyncio code from a blocking context.
Run Blocking code from an asyncio context
This is typically needed when we need to use legacy code or blocking libraries to read / write to and from disk or the network, where an asyncio implementation isn’t available.
The first approach is to use the loop.run_in_executor() function, which can
take a concurrent.futures.Executor instance that pools a number of threads or
Processes as needed,
or can be a None for the executor to create a default one for the current asyncio loop.
This function can be used as is for accomplishing this.
import asyncio
from typing import Any, Callable, Optional
from functools import partial
from concurrent.futures import Executor
def run_in_executor(executor: Optional[Executor],
func: Callable, *args, **kwargs) -> asyncio.Future:
"""
This function must only be called from an async context
Run the given callable, with the given *args ans **kwargs, which may be a blocking
function, or a CPU intensive function with the given executor.
If callable is IO blocking, executor is recommended to be a ThreadPoolExecutor.
If callable is a CPU intensive function, executor should be a ProcessPoolExecutor.
Returns an Asyncio Future, which is an awaitable
"""
ft: Any = asyncio.get_running_loop().run_in_executor(
executor, partial(func, *args, **kwargs))
return ft
Run async code from other non-asyncio threads
The way this is done is asyncio.run_coroutine_threadsafe() function. The main challenge is that
this function needs an instance of the running event loop. The following code is a well tested
class that wraps a given loop, starts it when needed, and provides the necessary API to run coroutines
on the loop and stop the loop when needed
import asyncio
import threading
import sys
from typing import Optional, Coroutine
from concurrent.futures import Future
class LoopWorker:
def __init__(self):
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._lock = threading.Lock()
def _loop_thread_target(self):
try:
self._loop.run_forever()
finally:
self._loop.close()
self._loop = None
def _init_loop(self):
if self._loop: return # do nothing if loop has already been started
self._loop = asyncio.new_event_loop()
threading.Thread(target=self._loop_thread_target).start()
@staticmethod
async def _coro_wrapper(coro):
try:
return await coro
except Exception as ex:
sys.stderr.write(f"Exception: {ex}\n")
raise ex
def run_in_loop(self, coro: Coroutine) -> Future:
with self._lock:
if not self._loop: self._init_loop()
return asyncio.run_coroutine_threadsafe(LoopWorker._coro_wrapper(coro), self._loop)
def stop_loop(self):
if self._loop:
async def closer():
self._loop.stop()
self.run_in_loop(closer())
def __del__(self):
self.stop_loop()
Async-IO Constructs and Patterns
Running a Number of Tasks Concurrently
In general if you have a few dozen to a few hundred I/O tasks that need to be executed without
the need to return any values, Creating Tasks via the asyncio.create_task(coro) function
would be sufficient.
import asyncio
# a database as a python dict
database = {}
# this Task takes a long time (1 second) to execute, and store the result in a database
async def square(num):
await asyncio.sleep(1)
x = num ** 2
database[num] = x
async def main():
# Create and run 100 concurrent tasks simultaneously
[asyncio.create_task(square(x)) for x in range(100)]
await main()
If the Coroutine already takes care of running the job and processing the result, there is
generally no need to do a gather(), unless you need to ensure that all tasks are complete
before proceeding, and ensuring all completed successfully without exceptions.
import asyncio
# a database as a python dict
database = {}
# this Task takes a long time (1 second) to execute, and store the result in a database
async def square(num):
await asyncio.sleep(1)
x = num ** 2
database[num] = x
async def main():
# Create and run 100 concurrent tasks simultaneously
tasks = [asyncio.create_task(square(x)) for x in range(100)]
await asyncio.gather(*tasks)
# Here we are certain that all tasks have been completed successfully without exceptions
await main()
Running an Infinite number of tasks with limited Concurrency
Say you have hundreds of millions of tasks to run (such as HTTP requests, database queries or other Network or Disk I/O operations) The ideal solution would be as follows:
Generating Coroutine Objects as needed, each representing one task (Via a generator / Iterator)
Setting a maximum number of concurrent Operations running at one given time
Executing Each coroutine, and returning the result as they come
The example below is a function that takes a Coroutine Generator, a max_concurrency parameter, and returns an async generator of results that can be iterated over as tasks complete
from typing import Any, AsyncIterator
import itertools
import asyncio
async def run_limited_concurrency(coro_gen, max_concurrency=5) -> AsyncIterator[Any]:
pending = set()
while True:
pending.update(itertools.islice(coro_gen,max_concurrency - len(pending)))
if len(pending) == 0:
break
done, pending = await asyncio.wait(pending,return_when=asyncio.FIRST_COMPLETED)
for item in done:
yield await item
an async generator can be used in an async for expression to yield results as they come.
Here is an example usage of the above
Here is example of how this fuction may be used
import asyncio
# An example async job that takes some time to figure out and return the square of a number
async def example_squaring_job(num):
await asyncio.sleep(1)
return num ** 2
# Generator of Coroutines
def coro_generator(total_jobs):
for x in range(total_jobs):
yield example_squaring_job(x)
async def main():
# a generator of a billion coroutines
job_generator = coro_generator(1000_000_000)
# maximum concurrency is 10 tasks at a time
agen = run_limited_concurrency(job_generator,10)
# async for each result as it comes
async for res in agen:
print(res)
await main()