# Async-IO ## Awaitables There are three types of awaitables * A coroutine Function, or a coroutine Object * asyncio Task, which is future-like object * asyncio Future ### Coroutines A **Coroutine Function** is declared with the ``async`` keyword, and takes all kinds of parameters as normal functions do. A **Coroutine Object** is created by the invocation of a Coroutine Function with specific arguments ```python import asyncio # Coroutine Function async def ajob(seconds): # invoking and awaiting a Coroutine Function from the asyncio lib await asyncio.sleep(seconds) return "success" # a Coroutine Function is awaitable. # This will return and print "success" after 2 seconds print(await ajob(2)) # Calling a Coroutine Function returns a Coroutine Object # Creating a Coroutine object doesn't start or run it job = ajob(1) # A coroutine Object can be awaited # This will return and print "success" after 1 second print(await job) ``` #### Behavior and Usage * Creating a Coroutine Object (by Invoking a Coroutine Function) doesn't actually run it * A coroutine object can be started (or awaited) in one of two ways * to be directly awaited via the ``await`` keyword. When awaited, the caller will pause and yield control to the event loop until a result is returned, or an exception is raised * To be passed to ``create_task()``, ``gather()``, ``sheild()``, ``wait_for()`` and when passed to and iterating over the result of ``as_completed()`` * A coroutine object can be awaited (or executed) only once. A second await will trigger an exception * If the coroutine raises an exception, it will propagate over to the caller. * Should avoid passing Coroutines directly to ``wait()`` as this is deprecated ### asyncio Tasks A task is mainly and directly created by calling the ``asyncio.create_task(coro)`` function passing a coroutine object argument. When a task is created, the wrapped coroutine is automatically started as soon as possible. So, even if a task is not awaited, the wrapped coroutine is started as soon as possible. A task can (optionally) be awaited to wait for and retrieve the result (or the raised exception) of the wrapped coroutine object ```python import asyncio async def ajob(seconds): await asyncio.sleep(seconds) return 2 * seconds # create a task, wrapping and starting the coroutine t1 = asyncio.create_task(ajob(1)) # create another task, wrapping and starting another coroutine t2 = asyncio.create_task(ajob(2)) # awaiting the tasks and returning their results r1 = await t1 # r1 = 2 r2 = await t2 # r2 = 4 ``` #### Behavior and Usage * A Task can be created **directly** by the ``create_task(coro)`` function, passing a coroutine object * A Task can be created **indirectly** by passing a Coroutine to: * ``gather()`` * ``shield()`` * ``as_completed()`` * A task is automatically scheduled to start as soon as possible once created * When the task is awaited: * If the task hasn't completed yet, the called will wait (or block) until the execution completes or an exception is raised * If it had already completed, it will return the result, or propagate the exception. * If the task hasn't yet completed, it can be cancelled via ``asyncio.Task.cancel()`` which *may* be able to cancel the task before completion. * If it does, it will raise ``CancelledError`` within the coroutine, and to the caller if the cancelled task is awaited * passing a coroutine directly to ``wait()`` is deprecated and should be avoided * getting the Task result via Task.result() directly should be avoided as it may raise an InvalidState Exception. A Task should be awaited instead ### asyncio Future A Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation. A Future object is mainly created and returned by * the ``asyncio.gather(*awaitables)`` function with a sequence of awaitable objects (coroutines, tasks and futures) * The ``shield(awaitable)`` which wraps an awaitable with another awaitable that if cancelled, shields the wrapped awaitable from cancellation If one or more of the awaitables passed to ``gather(*aws)`` or ``shield(awaitable)`` are coroutines, they are automatically scheduled as tasks. A Gather-Future can (optionally) be awaited to wait for all the provided awaitables and retrieve their results ```python import asyncio async def square(num): await asyncio.sleep(0.1) return num ** 2 some_awaitables = [ square(2), # a coroutine square(3), # a coroutine asyncio.create_task(square(4)), # a Task asyncio.gather(square(5), square(6)) # a Future ] # Returns a Future, which is awaitable gather_future = asyncio.gather(*some_awaitables) # awaiting the Future will wait for all awaitables to complete, and print # their return values print(await gather_future) # prints [4, 9, 16, [25, 36]] ``` ## High Level asyncio functions ### ``create_task(coroutine)`` Returns a ``Task`` Object This function takes a coroutine and starts it automatically, and returns a ``Task`` that can be awaited, or cancelled. Only accepts coroutine objects, and not other awaitables, namely ``Future`` or ``Task``. ### ``gather(*awaitables, return_exceptions=False)`` Returns a ``Future`` Object takes one or more awaitables and returns a ``Future`` object, that can (optionally) be * awaited to return all results of each awaitable,or * Cancelled, which attempts to cancel all the provided awaitables if ``return_exceptions=False``, any raised exception in any of the awaitables will be propagated to the caller if and when ``gather()`` is awaited. Otherwise, the raised exception object will simply be returned as the awaitable result. If a provided awaitable is a coroutine, it is automatically scheduled and run as a ``Task`` ### ``shield(awaitable)`` Returns a ``Future`` Object Wraps an awaitable with another awaitable that if cancelled, shields the wrapped awaitable from cancellation. If the provided awaitable is a coroutine, it is automatically scheduled and run as a Task ### ``wait_for(awaitable, timeout)`` Returns a Coroutine wait for the awaitable to be complete with a timeout. if a timeout occurs, the awaitable will be cancelled. and an asyncio.TimeoutError is raised for the caller if the awaitable is a coroutine, it will be scheduled and run as a ``Task`` if awaited, it will return the result of the wrapped awaitable timout can be ``None`` or a ``float`` seconds until timeout ### ``wait(*awaitables,timout=None,return_when=ALL_COMPLETED)`` Returns a ``tuple`` of ``(done,pending)`` two instances of ``set()`` of awaitables, which are instances of either ``Task`` or ``Future``, depending on the provided awaitables Takes one or more awaitables, and run and/or wait for each concurrently and block until the condition specified by return_when. * ALL_COMPLETED * FIRST_COMPLETED * FIRST_EXCEPTION Unlike wait_for(), wait() does not cancel the futures when a timeout occurs. Coroutines should not be passed directly to ``wait()`` ### ``as_completed(*awaitables, Timeout=None)`` Return an iterator (a normal generator) of coroutines. Run awaitable objects in the aws iterable concurrently. Each coroutine returned can be awaited to get the earliest next result from the iterable of the remaining awaitables. Raises asyncio.TimeoutError if the timeout occurs before all Futures are done. if an awaitable is a coroutine, it will be scheduled and run as a ``Task`` when iteration of the returned iterator begins ## Running Functions from context to another. Sometimes we need to run blocking code, or CPU intensive code off the asyncio loop, initiated from within the asyncio loop. Other times we need to do the opposite: run code on the event loop, which is initiated from outside the event loop. Another way of phrasing it is the idea of running blocking code from within a non-blocking context, and vice-versa: run asyncio code from a blocking context. ### Run Blocking code from an asyncio context This is typically needed when we need to use legacy code or blocking libraries to read / write to and from disk or the network, where an asyncio implementation isn't available. The first approach is to use the ``loop.run_in_executor()`` function, which can take a ``concurrent.futures.Executor `` instance that pools a number of threads or Processes as needed, or can be a None for the executor to create a default one for the current asyncio loop. This function can be used as is for accomplishing this. ```python import asyncio from typing import Any, Callable, Optional from functools import partial from concurrent.futures import Executor def run_in_executor(executor: Optional[Executor], func: Callable, *args, **kwargs) -> asyncio.Future: """ This function must only be called from an async context Run the given callable, with the given *args ans **kwargs, which may be a blocking function, or a CPU intensive function with the given executor. If callable is IO blocking, executor is recommended to be a ThreadPoolExecutor. If callable is a CPU intensive function, executor should be a ProcessPoolExecutor. Returns an Asyncio Future, which is an awaitable """ ft: Any = asyncio.get_running_loop().run_in_executor( executor, partial(func, *args, **kwargs)) return ft ``` ### Run async code from other non-asyncio threads The way this is done is ``asyncio.run_coroutine_threadsafe()`` function. The main challenge is that this function needs an instance of the running event loop. The following code is a well tested class that wraps a given loop, starts it when needed, and provides the necessary API to run coroutines on the loop and stop the loop when needed ```python import asyncio import threading import sys from typing import Optional, Coroutine from concurrent.futures import Future class LoopWorker: def __init__(self): self._loop: Optional[asyncio.AbstractEventLoop] = None self._lock = threading.Lock() def _loop_thread_target(self): try: self._loop.run_forever() finally: self._loop.close() self._loop = None def _init_loop(self): if self._loop: return # do nothing if loop has already been started self._loop = asyncio.new_event_loop() threading.Thread(target=self._loop_thread_target).start() @staticmethod async def _coro_wrapper(coro): try: return await coro except Exception as ex: sys.stderr.write(f"Exception: {ex}\n") raise ex def run_in_loop(self, coro: Coroutine) -> Future: with self._lock: if not self._loop: self._init_loop() return asyncio.run_coroutine_threadsafe(LoopWorker._coro_wrapper(coro), self._loop) def stop_loop(self): if self._loop: async def closer(): self._loop.stop() self.run_in_loop(closer()) def __del__(self): self.stop_loop() ``` ## Async-IO Constructs and Patterns ### Running a Number of Tasks Concurrently In general if you have a few dozen to a few hundred I/O tasks that need to be executed without the need to return any values, Creating Tasks via the ``asyncio.create_task(coro)`` function would be sufficient. ```python import asyncio # a database as a python dict database = {} # this Task takes a long time (1 second) to execute, and store the result in a database async def square(num): await asyncio.sleep(1) x = num ** 2 database[num] = x async def main(): # Create and run 100 concurrent tasks simultaneously [asyncio.create_task(square(x)) for x in range(100)] await main() ``` If the Coroutine already takes care of running the job and processing the result, there is generally no need to do a ``gather()``, unless you need to ensure that all tasks are complete before proceeding, and ensuring all completed successfully without exceptions. ```python import asyncio # a database as a python dict database = {} # this Task takes a long time (1 second) to execute, and store the result in a database async def square(num): await asyncio.sleep(1) x = num ** 2 database[num] = x async def main(): # Create and run 100 concurrent tasks simultaneously tasks = [asyncio.create_task(square(x)) for x in range(100)] await asyncio.gather(*tasks) # Here we are certain that all tasks have been completed successfully without exceptions await main() ``` ### Running an Infinite number of tasks with limited Concurrency Say you have hundreds of millions of tasks to run (such as HTTP requests, database queries or other Network or Disk I/O operations) The ideal solution would be as follows: * Generating Coroutine Objects as needed, each representing one task (Via a generator / Iterator) * Setting a maximum number of concurrent Operations running at one given time * Executing Each coroutine, and returning the result as they come The example below is a function that takes a Coroutine Generator, a max_concurrency parameter, and returns an async generator of results that can be iterated over as tasks complete ```python from typing import Any, AsyncIterator import itertools import asyncio async def run_limited_concurrency(coro_gen, max_concurrency=5) -> AsyncIterator[Any]: pending = set() while True: pending.update(itertools.islice(coro_gen,max_concurrency - len(pending))) if len(pending) == 0: break done, pending = await asyncio.wait(pending,return_when=asyncio.FIRST_COMPLETED) for item in done: yield await item ``` an ``async`` generator can be used in an ``async for`` expression to yield results as they come. Here is an example usage of the above Here is example of how this fuction may be used ```python import asyncio # An example async job that takes some time to figure out and return the square of a number async def example_squaring_job(num): await asyncio.sleep(1) return num ** 2 # Generator of Coroutines def coro_generator(total_jobs): for x in range(total_jobs): yield example_squaring_job(x) async def main(): # a generator of a billion coroutines job_generator = coro_generator(1000_000_000) # maximum concurrency is 10 tasks at a time agen = run_limited_concurrency(job_generator,10) # async for each result as it comes async for res in agen: print(res) await main() ```