asyncio, multiprocessing – missing link

Part II: multiprocessing

As we know it is not a good idea to block the event loop with anything. If no native asyncio interface is available for I/O bound stuff it could be a good idea to move it to the thread pool executor as we have seen in Part 1. But when we still need to handle CPU load than this pattern may not help even briefly.

In today’s world we have machines with more than one core but we have also python which cannot utilize them natively due to its GIL “limitations”. To overcome this natural barrier pythons battery has two packages.

The obviously one is multiprocessing but is used on a lower level. So we start with concurrent.futures package first.
The package is a kind of wrapper around multiprocessing and threading and gives a lightweight API. It is quiet a ease to work on a multi core environment.

from concurrent.futures import ProcessPoolExecutor


def cpu_bound_func(work):
    result = 0
    for _ in range(work * 1000):
        result = result + 1
    return result


if __name__ == "__main__":
    executor = ProcessPoolExecutor()
    work = [1, 2, 3, 4]
    results = executor.map(cpu_bound_func, work)
    print(list(results))

Also asyncio does have a lightweight interface for the usage of concurrent.futures classes. Instead of passing None we explicitly pass a process pool and ask asyncios event loop to wait for it.

async def main():
    executor = ProcessPoolExecutor()
    loop = asyncio.get_event_loop()
    works = [1, 2, 3, 4]
    pending = set()
    for work in works:
        pending.add(
            loop.run_in_executor(executor, cpu_bound_func, work)
        )
    done, pending = await asyncio.wait(pending)
    print([t.result() for t in done])
    print("bye")

if __name__ == '__main__':
    asyncio.run(main(), debug=True)

While this looks sure as hell why then we should not place everything on to the process pool which looks like CPU load?

The answer is longer then one may except. At first all communication to a process goes through underlying operating system pipe or even worse uses a server/client architecture. And here all the problems are starting you could endeavor through the high performance journey.
You cannot send everything to child process and therefore, you must take care of information needs to be serialize-able or pickle-able. For simple things like strings this is not a problem but there are surely a lot of objects you may like to pass and work on. As long as you can manipulate the class objects by inheritance for example this can be handled in many cases but when you are trying to provide for example C implemented classes than things getting harder and harder and will add surely more and more overhead to the communication problem but also to you head.

Second, serializing takes time and adds load to the parent process (remember, this is what we want to avoid). So if you plan to shift only a bit of CPU bound work this may not be the solution because due the above overhead needed this makes things slower.
If you want to solve bigger chunks of work this is the way for you. In conclusion this design decision should always involve some profiling. In general assumption here about code behavior are in nearly every case wrong and therefore, optimized code on that assumption will not show positive effects.

Third there will be a lot of more problems then just the one sketched above because multiprocessing is handling a lot of OS bounded things. For example the underlying pipe does not have a large buffer to accumulate work before sending it to the worker or pipes may break when they are not properly flushed and will leave the worker or parent process in a non deterministic state. I will cover some of the pitfalls in the next chapter as well.

It looks now like it is impossible to run a event loop in a kind of mixed mode (between CPU and i/o bound) for high performance but there is again a quiet simple solution for it. Instead of running one event loop on one core, we now try to run several loops on several core.

async def main():
    for _ in range(10):
        cpu_bound_func(1)  # should not take longer than 100 ms here
        await asyncio.sleep(0)
    print("async main gone at", multiprocessing.current_process())


def async_worker():
    print("starting", multiprocessing.current_process())
    asyncio.run(main(), debug=True)
    print("bye from", multiprocessing.current_process())

def do_main():
    processes = [multiprocessing.Process(target=async_worker, name=f"P<{count}>")
                 for count in range(multiprocessing.cpu_count())]
    for p in processes:
        p.start()

    while not all(p.is_alive() for p in processes):
        print(".", end="")

if __name__ == '__main__':
    do_main()

The above code implements an async worker which is running than later on a process. Simple so far. In most cases though we need some inter process communication to put work onto the worker or to balance the worker loads.

In the next chapter I will explain how basic communication could be implemented and what problems we are going to fight with it.

Leave a Comment

Your email address will not be published. Required fields are marked *