Async-IO

Awaitables

There are three types of awaitables

  • A coroutine Function, or a coroutine Object

  • asyncio Task, which is future-like object

  • asyncio Future

Coroutines

A Coroutine Function is declared with the async keyword, and takes all kinds of parameters as normal functions do.

A Coroutine Object is created by the invocation of a Coroutine Function with specific arguments

import asyncio

# Coroutine Function
async def ajob(seconds):
    # invoking and awaiting a Coroutine Function from the asyncio lib
    await asyncio.sleep(seconds)
    return "success"

# a Coroutine Function is awaitable.
# This will return and print "success" after 2 seconds
print(await ajob(2))

# Calling a Coroutine Function returns a Coroutine Object
# Creating a Coroutine object doesn't start or run it
job = ajob(1)


# A coroutine Object can be awaited
# This will return and print "success" after 1 second
print(await job)

Behavior and Usage

  • Creating a Coroutine Object (by Invoking a Coroutine Function) doesn’t actually run it

  • A coroutine object can be started (or awaited) in one of two ways

    • to be directly awaited via the await keyword. When awaited, the caller will pause and yield control to the event loop until a result is returned, or an exception is raised

    • To be passed to create_task(), gather(), sheild(), wait_for() and when passed to and iterating over the result of as_completed()

  • A coroutine object can be awaited (or executed) only once. A second await will trigger an exception

  • If the coroutine raises an exception, it will propagate over to the caller.

  • Should avoid passing Coroutines directly to wait() as this is deprecated

asyncio Tasks

A task is mainly and directly created by calling the asyncio.create_task(coro) function passing a coroutine object argument.

When a task is created, the wrapped coroutine is automatically started as soon as possible. So, even if a task is not awaited, the wrapped coroutine is started as soon as possible.

A task can (optionally) be awaited to wait for and retrieve the result (or the raised exception) of the wrapped coroutine object

import asyncio

async def ajob(seconds):
    await asyncio.sleep(seconds)
    return 2 * seconds

# create a task, wrapping and starting the coroutine
t1 = asyncio.create_task(ajob(1))

# create another task, wrapping and starting another coroutine
t2 = asyncio.create_task(ajob(2))

# awaiting the tasks and returning their results
r1 = await t1 # r1 = 2
r2 = await t2 # r2 = 4

Behavior and Usage

  • A Task can be created directly by the create_task(coro) function, passing a coroutine object

  • A Task can be created indirectly by passing a Coroutine to:

    • gather()

    • shield()

    • as_completed()

  • A task is automatically scheduled to start as soon as possible once created

  • When the task is awaited:

    • If the task hasn’t completed yet, the called will wait (or block) until the execution completes or an exception is raised

    • If it had already completed, it will return the result, or propagate the exception.

  • If the task hasn’t yet completed, it can be cancelled via asyncio.Task.cancel() which may be able to cancel the task before completion.

  • If it does, it will raise CancelledError within the coroutine, and to the caller if the cancelled task is awaited

  • passing a coroutine directly to wait() is deprecated and should be avoided

  • getting the Task result via Task.result() directly should be avoided as it may raise an InvalidState Exception. A Task should be awaited instead

asyncio Future

A Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation.

A Future object is mainly created and returned by

  • the asyncio.gather(*awaitables) function with a sequence of awaitable objects (coroutines, tasks and futures)

  • The shield(awaitable) which wraps an awaitable with another awaitable that if cancelled, shields the wrapped awaitable from cancellation

If one or more of the awaitables passed to gather(*aws) or shield(awaitable) are coroutines, they are automatically scheduled as tasks. A Gather-Future can (optionally) be awaited to wait for all the provided awaitables and retrieve their results

import asyncio

async def square(num):
    await asyncio.sleep(0.1)
    return num ** 2

some_awaitables = [
    square(2), # a coroutine
    square(3), # a coroutine
    asyncio.create_task(square(4)), # a Task
    asyncio.gather(square(5), square(6)) # a Future
]

# Returns a Future, which is awaitable
gather_future = asyncio.gather(*some_awaitables)

# awaiting the Future will wait for all awaitables to complete, and print
# their return values
print(await gather_future) # prints [4, 9, 16, [25, 36]]

High Level asyncio functions

create_task(coroutine)

Returns a Task Object

This function takes a coroutine and starts it automatically, and returns a Task that can be awaited, or cancelled. Only accepts coroutine objects, and not other awaitables, namely Future or Task.

gather(*awaitables, return_exceptions=False)

Returns a Future Object

takes one or more awaitables and returns a Future object, that can (optionally) be

  • awaited to return all results of each awaitable,or

  • Cancelled, which attempts to cancel all the provided awaitables

if return_exceptions=False, any raised exception in any of the awaitables will be propagated to the caller if and when gather() is awaited. Otherwise, the raised exception object will simply be returned as the awaitable result.

If a provided awaitable is a coroutine, it is automatically scheduled and run as a Task

shield(awaitable)

Returns a Future Object

Wraps an awaitable with another awaitable that if cancelled, shields the wrapped awaitable from cancellation.

If the provided awaitable is a coroutine, it is automatically scheduled and run as a Task

wait_for(awaitable, timeout)

Returns a Coroutine

wait for the awaitable to be complete with a timeout. if a timeout occurs, the awaitable will be cancelled. and an asyncio.TimeoutError is raised for the caller

if the awaitable is a coroutine, it will be scheduled and run as a Task

if awaited, it will return the result of the wrapped awaitable

timout can be None or a float seconds until timeout

wait(*awaitables,timout=None,return_when=ALL_COMPLETED)

Returns a tuple of (done,pending) two instances of set() of awaitables, which are instances of either Task or Future, depending on the provided awaitables

Takes one or more awaitables, and run and/or wait for each concurrently and block until the condition specified by return_when.

  • ALL_COMPLETED

  • FIRST_COMPLETED

  • FIRST_EXCEPTION

Unlike wait_for(), wait() does not cancel the futures when a timeout occurs.

Coroutines should not be passed directly to wait()

as_completed(*awaitables, Timeout=None)

Return an iterator (a normal generator) of coroutines.

Run awaitable objects in the aws iterable concurrently.

Each coroutine returned can be awaited to get the earliest next result from the iterable of the remaining awaitables.

Raises asyncio.TimeoutError if the timeout occurs before all Futures are done.

if an awaitable is a coroutine, it will be scheduled and run as a Task when iteration of the returned iterator begins

Running Functions from context to another.

Sometimes we need to run blocking code, or CPU intensive code off the asyncio loop, initiated from within the asyncio loop. Other times we need to do the opposite: run code on the event loop, which is initiated from outside the event loop. Another way of phrasing it is the idea of running blocking code from within a non-blocking context, and vice-versa: run asyncio code from a blocking context.

Run Blocking code from an asyncio context

This is typically needed when we need to use legacy code or blocking libraries to read / write to and from disk or the network, where an asyncio implementation isn’t available.

The first approach is to use the loop.run_in_executor() function, which can take a concurrent.futures.Executor instance that pools a number of threads or Processes as needed, or can be a None for the executor to create a default one for the current asyncio loop.

This function can be used as is for accomplishing this.

import asyncio
from typing import  Any, Callable, Optional
from functools import partial
from concurrent.futures import Executor

def run_in_executor(executor: Optional[Executor],
                    func: Callable, *args, **kwargs) -> asyncio.Future:
    """
    This function must only be called from an async context
    
    Run the given callable, with the given *args ans **kwargs, which may be a blocking
    function, or a CPU intensive function with the given executor.
    
    If callable is IO blocking, executor is recommended to be a ThreadPoolExecutor.
    If callable is a CPU intensive function, executor should be a ProcessPoolExecutor.

    Returns an Asyncio Future, which is an awaitable
    """
    ft: Any = asyncio.get_running_loop().run_in_executor(
        executor, partial(func, *args, **kwargs))
    return ft

Run async code from other non-asyncio threads

The way this is done is asyncio.run_coroutine_threadsafe() function. The main challenge is that this function needs an instance of the running event loop. The following code is a well tested class that wraps a given loop, starts it when needed, and provides the necessary API to run coroutines on the loop and stop the loop when needed

import asyncio
import threading
import sys
from typing import Optional, Coroutine
from concurrent.futures import Future

class LoopWorker:
    def __init__(self):
        self._loop: Optional[asyncio.AbstractEventLoop] = None
        self._lock = threading.Lock()

    def _loop_thread_target(self):
        try:
            self._loop.run_forever()
        finally:
            self._loop.close()
            self._loop = None

    def _init_loop(self):
        if self._loop: return  # do nothing if loop has already been started
        self._loop = asyncio.new_event_loop()
        threading.Thread(target=self._loop_thread_target).start()
    
    @staticmethod
    async def _coro_wrapper(coro):
        try:
            return await coro
        except Exception as ex:
            sys.stderr.write(f"Exception: {ex}\n")
            raise ex

    def run_in_loop(self, coro: Coroutine) -> Future:
        with self._lock:
            if not self._loop: self._init_loop()
        return asyncio.run_coroutine_threadsafe(LoopWorker._coro_wrapper(coro), self._loop)

    def stop_loop(self):
        if self._loop:
            async def closer():
                self._loop.stop()
            self.run_in_loop(closer())

    def __del__(self):
        self.stop_loop()

Async-IO Constructs and Patterns

Running a Number of Tasks Concurrently

In general if you have a few dozen to a few hundred I/O tasks that need to be executed without the need to return any values, Creating Tasks via the asyncio.create_task(coro) function would be sufficient.

import asyncio

# a database as a python dict
database = {}

# this Task takes a long time (1 second) to execute, and store the result in a database
async def square(num):
    await asyncio.sleep(1)
    x = num ** 2
    database[num] = x


async def main():
    # Create and run 100 concurrent tasks simultaneously
    [asyncio.create_task(square(x)) for x in range(100)]

await main()

If the Coroutine already takes care of running the job and processing the result, there is generally no need to do a gather(), unless you need to ensure that all tasks are complete before proceeding, and ensuring all completed successfully without exceptions.

import asyncio

# a database as a python dict
database = {}

# this Task takes a long time (1 second) to execute, and store the result in a database
async def square(num):
    await asyncio.sleep(1)
    x = num ** 2
    database[num] = x


async def main():
    # Create and run 100 concurrent tasks simultaneously
    tasks = [asyncio.create_task(square(x)) for x in range(100)]
    
    await asyncio.gather(*tasks)
    
    # Here we are certain that all tasks have been completed successfully without exceptions

await main()

Running an Infinite number of tasks with limited Concurrency

Say you have hundreds of millions of tasks to run (such as HTTP requests, database queries or other Network or Disk I/O operations) The ideal solution would be as follows:

  • Generating Coroutine Objects as needed, each representing one task (Via a generator / Iterator)

  • Setting a maximum number of concurrent Operations running at one given time

  • Executing Each coroutine, and returning the result as they come

The example below is a function that takes a Coroutine Generator, a max_concurrency parameter, and returns an async generator of results that can be iterated over as tasks complete

from typing import Any, AsyncIterator
import itertools
import asyncio

async def run_limited_concurrency(coro_gen, max_concurrency=5) -> AsyncIterator[Any]:
    pending = set()

    while True:
        pending.update(itertools.islice(coro_gen,max_concurrency - len(pending)))
        if len(pending) == 0:
            break
    
        done, pending = await asyncio.wait(pending,return_when=asyncio.FIRST_COMPLETED)
        for item in done:
            yield await item

an async generator can be used in an async for expression to yield results as they come. Here is an example usage of the above

Here is example of how this fuction may be used

import asyncio

# An example async job that takes some time to figure out and return the square of a number
async def example_squaring_job(num):
    await asyncio.sleep(1)
    return num ** 2

# Generator of Coroutines
def coro_generator(total_jobs):
    for x in range(total_jobs):
        yield example_squaring_job(x)


async def main():
    # a generator of a billion coroutines
    job_generator = coro_generator(1000_000_000)

    # maximum concurrency is 10 tasks at a time
    agen = run_limited_concurrency(job_generator,10)

    # async for each result as it comes
    async for res in agen:
        print(res)

await main()