asyncio, multiprocessing – missing link

Part I: asyncio

One of the advantage of async programming is the light weight of the underlying coroutines having nearly no overhead compared the threads or processes. So it is very recommended to issue many of them parallel.

import asyncio

async def aio_producer(pending: set):
    loop = asyncio.get_event_loop()
    for _ in range(100):
        work = random.randint(0,100) / 100
        task = loop.create_task(consumer_coro(work))
        pending.add(task)
    # Wait until consumer has eaten all work
    print("Producer done")

# this is a simple consumer coroutine now.
async def consumer_coro(work):
    # simulates some i/o bound task
    await asyncio.sleep(work)
    return f"done {work}"


async def main():
    pending = set() # we keep reference to the pending task. To wait for them and get the result
    await aio_producer(pending)
    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        for task in done:
            print(task.result())
        print(f"{len(done)=}, {len(pending)=}")
    print("Bye")

asyncio.run(main(), debug=True)

In the above code snippet we issue as many as possible coroutines and wait for them to finish. So the execution of each single piece of work runs (virtual) in parallel to all other ones.

A typical anti pattern would be here to simple await that consumer coroutine and than issue another one. This happens typical when one tries to implement the producer consumer pattern with a queue synchronization.

async def aio_producer(pending: set):
    loop = asyncio.get_event_loop()
    for _ in range(100):
        work = random.randint(0,100) / 100
        task = loop.create_task(consumer_coro(work))
        pending.add(task)
    # Wait until consumer has eaten all work
    print("Producer done")

# this is a simple consumer coroutine now.
async def consumer_coro(work):
    # simultates some i/o bound task
    await asyncio.sleep(work)
    return f"done {work}"


async def main():
    pending = set() # we keep reference to the pending task. To wait for them and get the result
    await aio_producer(pending)
    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        for task in done:
            print(task.result())
        print(f"{len(done)=}, {len(pending)=}")
    print("Bye")

asyncio.run(main(), debug=True)

As we now know how to implement a high parallelized asynchronous design the next most obvious snare one could fall over is to block the event loop with CPU bound tasks or other blocking calls.

In general blocking the event loop will delay all other subsequent coroutines and will therefore disturb the scheduling. You can easily test this by exchange the asyncio.sleep with time.sleep in the above examples.
As long as the event loop is blocked as long other already finished coroutines cannot be processed or new coroutines cannot be fired and need to wait longer than necessary. In consequence this could than pile CPU bound work, so that it may have a positive feedback loop making you system hang.

In consequence there are many ways to block the event loop. A typical one is to call a synchronous function of a package, module or API. The good thing here is addressing this is very easy. Instead of calling the function directly from the coroutine we pass it to another thread and wait for the thread execution. Pythons asyncio package brings a simple solution. There are to possibilities here. Use loop.run_in_executor or loop.to_task. The later one is wrapper of first one.

import asyncio
import time


def blocking_api_function():
    print("working heavily")
    time.sleep(1)
    print("done")
    return "42"

async def block_main():
    # this will block and raise at least a warning. So it blocks the event loop for a second.
    res = blocking_api_function()
    print(res)

asyncio.run(block_main(), debug=True)

async def main():
    # now we want to unblock the blocking api call
    loop = asyncio.get_event_loop()

    # we wrap that function into a future
    fut = loop.run_in_executor(None, blocking_api_function)
    # same as loop.to_task(...)

    # we than can await the future if we need to
    res = await fut # event loop could do other things now.
    print(res)

asyncio.run(main(), debug=True)

More dangerous are CPU bound things which are not that obvious. For example one is running a function with a some parsing operation. The function is called several times while iterating something. Now each single line of code will add load and will delay the event loop. As long as there is enough CPU time remaining this may or may not be a problem. In some cases this could be obvious in many other scenarios it is hard to figure out. Especially when you do high parallelized design with a lot of CPU bound work. This will always make a bottle neck, pilling up CPU load, even when there is a lot I/O bound stuff one going. On this particular topic I will write another post in future.
A typical estimation is running the even loop on more than 30% saturated CPU will bring you in difficulties, leaving the event loop unable to react timely.

Even if the underlying code is an I/O bound task many packages do not unblock such calls (that is not absolute correct because I/O bound python code will make the interpreter to release the GIL to the operating system and this why the thread pool pattern above does work) because there are not async aware. Again the most common advise at first glance is to pass this call to the thread pool.

In my real world problems I do never have pure CPU bound code (anyway this would be a show stopper for using asyncio) and I also never have a pure I/O or peripheral task (which is the dreamland of all asyncio programmers). So a event loop will always need to manage CPU bound things, even if you try to externalize it to a thread pool.

Before satirizing now the CPU and adding more and more delay it may be a worth an idea to pass load to other cores of you machine without loosing the ability of concurrency and “cost free” I/O tasks.

In the next chapter we will see how to utilize more than one event loop within the python battery.

Leave a Comment

Your email address will not be published. Required fields are marked *