asyncio, multiprocessing – missing link

Part III: The missing link.

We basically have seen it is not that tough to have multiple event loops running on different cores. But in typical use cases you will need to communicate to or from a process and its event loop. For example to get some state information or to simply distribute the work.

The package multiprocessing comes with some possible candidates. A very low level one is multiprocessing.Pipe. As we do need more then just sending and receiving bytes we quickly land at the multiprocessing.Queue class. The Queue class itself is already synchronized, so we do not rank our brains about parallel access to resources etc (thread-safeness). But the problem with the mp.Queue class is, that its not designed for using it in the asynchronous world. Because it “tends” to block. And polling may also not be a good idea because it will consume CPU.

import asyncio
import multiprocessing as mp
from queue import Empty

async def main():
    q = mp.Queue()
    q.put_nowait("stuff")
    print(q.get())  # note that this would block if there is now stuff on the queue
    try:
        q.get(timeout=.31) # and now it will block for at least .31 seconds
    except Empty:
        print("queue is finally empty")

if __name__ == '__main__':
    asyncio.run(main(), debug=True)
    # prints:
    # stuff
    # queue is finally empty
    # Executing <Task finished name='Task-1' coro=<main() done, defined at C:\ ... tasks.py:634> took 0.329 seconds
    

As we learned above, we could easily unblock the get / or put call, by passing it to the thread pool executor. However, this will, when there is a lot of bandwidth on the queue, easily saturate the thread pool with all its disadvantages.

For simplicity the code below does not wrap the worker into a process holding the event loop, but the outcome is the same.

async def worker(q: mp.Queue, uid: str):
    # this worker will not block the event loop.
    # but it will consume one thread until it is unblocked
    loop = asyncio.get_event_loop()
    while True:
        try:
            item = await loop.run_in_executor(None, q.get)
        except asyncio.CancelledError:
            return
        # .... do some useful stuff
        print(f"{uid} did something with {item}")


async def main():
    loop = asyncio.get_event_loop()
    q = mp.Queue()
    # create then workers waiting for work
    # means consuming 10 threads
    workers = [loop.create_task(worker(q, f"worker-{uid}")) for uid in range(10)]
    await asyncio.sleep(0)  # yield to the event loop so it can schedule above tasks
    # feed them a bit
    for key in range(5):
        q.put_nowait(f"food-{key}")
    await asyncio.sleep(0.5)  # sleep here to let the worker consume that food
    # enough we want to stop:
    [worker.cancel() for worker in workers]
    await asyncio.wait(workers)


if __name__ == '__main__':
    asyncio.run(main(), debug=True)
    # prints:
    # worker-0 did something with food-0
    # worker-1 did something with food-1
    # worker-2 did something with food-2
    # worker-3 did something with food-3
    # worker-4 did something with food-4
    #
    # and then hangs forever or throws exception and complains about broken pipe

This happens because the thread is canceled and the mp.Queue instance holds a lock at this point. So we easily deadlock our system with that pattern, because there is no way to clean before we finally cancel the thread. I know this pattern is very often recommended on stack overflow but it will sooner or later make huge problems.
In better cases you will receive at least a broken pipe exception and you system stays responsive and sometimes it will shutdown as thought but still this happens in a non deterministic way.

Now what we can do?

What I have first done was to wrap up the mp.Queue class into an adapter handling the thread pool executor pattern (like the package aiomultiprocess does). Of course this solves nothing, but gives a handy interface.

class Queue:
    """
    An adapter to make multiprocessing.Queue available for async
    """

    def __init__(self, maxsize: int = 0, *, ctx=None, queue: mp.Queue = None):
        self.maxsize = maxsize
        if not queue:
            if not ctx:
                self._mp_queue = mp.JoinableQueue(maxsize)
            else:
                self._mp_queue = mp.JoinableQueue(maxsize, ctx=ctx)
        else:
            self._mp_queue = queue
        self._t_executor: None | concurrent.futures.ThreadPoolExecutor = None
        self._loop = None

    @property
    def loop(self):
        if not self._loop:
            self._loop = asyncio.get_event_loop()
        return self._loop

    @property
    def _executor(self):
        if not self._t_executor:
            self._t_executor = concurrent.futures.ThreadPoolExecutor()
        return self._t_executor

    async def get_async(self):
        item = await self.loop.run_in_executor(self._executor, self._mp_queue.get)
        return item

    def get_sync(self):
        return self._mp_queue.get()

    def get_nowait(self):
        return self._mp_queue.get_nowait()

    def get(self):
        if from_coroutine():
            return self.get_async()
        else:
            return self.get_sync()

    def put_sync(self, item):
        self._mp_queue.put(item)

    async def put_async(self, item):
        item = await self.loop.run_in_executor(self._executor, self._mp_queue.put, item)
        return item

    def put_nowait(self, item):
        return self._mp_queue.put_nowait(item)

    def put(self, item):
        if from_coroutine():
            return self.put_async(item)
        else:
            return self.put_sync(item)

    def task_done(self):
        return self._mp_queue.task_done()

    async def join(self):
        return await asyncio.to_thread(self._mp_queue.join)

    def sync_join(self):
        return self._mp_queue.join()

    def flush(self):
        self._mp_queue.put_nowait(None)

    def empty(self):
        return self._mp_queue.empty()

    def qsize(self):
        return self._mp_queue.qsize()

    def full(self):
        return self._mp_queue.full()

    def close(self):
        return self._mp_queue.close()

    def __str__(self):
        return self._mp_queue.__str__()

    def __repr__(self):
        return f"<Queue at {id(self)} maxsize={self.maxsize}>"

As next, I needed to add a lot of fuses with mp.Event to communicate the start and shutdown to the workers making them a kind of sequencer. That makes the worker aware of an upcoming shutdown and he than could flush the queue with some poison pills and when all ends are poisoned and finally have been died calling the Queue.close method to let die the child process. This was working good, as long as the count of getters and putters was not large. But as soon as there are thousands of coroutines trying to put or get the queue and/or the thread pool gets saturated returning into a virtual deadlocked system. This getting worse when there are also inter dependencies, for example when one worker does put and get onto the same queue (=on the same thread pool).

Then I found this very impressive YouTube clip from Mr. David Beazley in which he shows in a live coding session how one could connect asyncio with threading by using the concurrent.future.Future class avoiding the thread pool pattern.

But it turns out, that some of the used parts are not working with multiprocessing because we need to serialize things before transmitting them.

Instead of pushing the blocking get method onto the thread pool one could wrap that blocking call into a concurrent.future.Future. And asyncio could wait for it with await asyncio.wrap_future(concurrent.futures.Future()) without having the above problems from the thread pool. But to mange the waiting state of putters and getters we need to communicate them again between all parties. David Beazley uses the collections.deque() class. This works super for threading applications where you could directly communicate python objects. But deque fails when using it with multiprocessing because of its involved RLock its not serializable. And now I’m here where I’m working at. Some time ago I issued a deque miming class based on the multiprocessing.shared-memory package:

Dark Mode

shared-memory-queue (this link opens in a new window) by cloasdata (this link opens in a new window)

StaticQueue class memes the collections.deque interface. The goal for this class was to improve performance of mp.Queue class.

But this class still lacks of synchronization so that it is not possible to run it thread safe at the moment. I’m also working on a semaphore based on shared memory to level the shared-memory-queue to a thread safe queue.

Below you see the Queue implementation for multiprocessing inspired by David. But still without communication of the getter/putter channel as deque will be dead linked when child processes are spwaned.

class Queue3:
    """
    An adapter to make multiprocessing.Queue available for async
    using Futures instead of threads.
    However, the communication of the putters and getters will not work,
    as they are not connected. The use of deque will only work at that worker but fails
    to communicate to other workers
    """

    def __init__(self, maxsize: int = 0, *, ctx=None, queue: "mp.Queue[T]" = None):
        self.maxsize = maxsize
        if not queue:
            if not ctx:
                self._mp_queue: "mp.JoinableQueue[T]" = mp.JoinableQueue(maxsize)
            else:
                self._mp_queue: "mp.JoinableQueue[T]" = mp.JoinableQueue(maxsize, ctx=ctx)
        else:
            self._mp_queue = queue
        self.putters: deque[Future] = deque()
        self.getters: deque[Future] = deque()

    def get_noblock(self) -> tuple[T | None, Future | None]:
        item = None
        fut = None
        print("get_noblock", mp.current_process().name)
        try:
            item = self._mp_queue.get(block=False)
            if self.putters:
                self.putters.popleft().set_result(True)
        except Empty:
            print("empty", mp.current_process().name)
            fut = Future()
            self.getters.append(fut)
        except Exception as e:
            breakpoint()
        finally:
            return item, fut

    async def get_async(self, timeout=None):
        item, fut = self.get_noblock()
        if fut:
            print("wait for future at", mp.current_process())
            item = await asyncio.wait_for(asyncio.wrap_future(fut), timeout)
        return item

    def get_sync(self, timeout=None):
        item, fut = self.get_noblock()
        if fut:
            print("wait sync for future", mp.current_process().name)
            item = fut.result(timeout) # block until
        return item

    def get_nowait(self):
        return self._mp_queue.get_nowait()

    def get(self, timeout=None):
        if from_coroutine():
            return self.get_async(timeout)
        else:
            return self.get_sync(timeout)

    def put_noblock(self, item: T) -> Future | None:
        fut = None
        try:
            if self.getters:
                self.getters.pop().set_result(item)
            else:
                self._mp_queue.put(item, block=False)
        except Full:
            fut = Future()
            self.putters.append(fut)
        finally:
            return fut

    def put_sync(self, item:T, timeout=None):
        while True:
            fut = self.put_noblock(item)
            if fut is None:
                return
            fut.result(timeout) # block until timeout

    async def put_async(self, item:T, timeout=None):
        while True:
            fut = self.put_noblock(item)
            if fut is None:
                return
            await asyncio.wait_for(asyncio.wrap_future(fut), timeout)

    def put(self, item:T, timeout=None):
        if from_coroutine():
            return self.put_async(item, timeout)
        else:
            return self.put_sync(item, timeout)

    def put_nowait(self, item):
        return self._mp_queue.put_nowait(item)

    def task_done(self):
        return self._mp_queue.task_done()

    async def join(self):
        return await asyncio.to_thread(self._mp_queue.join)

    def sync_join(self):
        return self._mp_queue.join()

    def flush(self):
        self._mp_queue.put_nowait(None)

    def empty(self):
        return self._mp_queue.empty()

    def qsize(self):
        return self._mp_queue.qsize()

    def full(self):
        return self._mp_queue.full()

    def close(self):
        return self._mp_queue.close()

    def __str__(self):
        return self._mp_queue.__str__()

    def __repr__(self):
        return f"<Queue at {id(self)} maxsize={self.maxsize}>"

Another way could be for example to communicate the items satisfying getters and putters via additional mp.Queue‘s .This should be possible because, the getter/putter channel bandwidth should assumption wise not be that large as the main queue.

Leave a Comment

Your email address will not be published. Required fields are marked *