Part III: The missing link.
We basically have seen it is not that tough to have multiple event loops running on different cores. But in typical use cases you will need to communicate to or from a process and its event loop. For example to get some state information or to simply distribute the work.
The package multiprocessing comes with some possible candidates. A very low level one is multiprocessing.Pipe
. As we do need more then just sending and receiving bytes we quickly land at the multiprocessing.Queue
class. The Queue class itself is already synchronized, so we do not rank our brains about parallel access to resources etc (thread-safeness). But the problem with the mp.Queue class is, that its not designed for using it in the asynchronous world. Because it “tends” to block. And polling may also not be a good idea because it will consume CPU.
import asyncio
import multiprocessing as mp
from queue import Empty
async def main():
q = mp.Queue()
q.put_nowait("stuff")
print(q.get()) # note that this would block if there is now stuff on the queue
try:
q.get(timeout=.31) # and now it will block for at least .31 seconds
except Empty:
print("queue is finally empty")
if __name__ == '__main__':
asyncio.run(main(), debug=True)
# prints:
# stuff
# queue is finally empty
# Executing <Task finished name='Task-1' coro=<main() done, defined at C:\ ... tasks.py:634> took 0.329 seconds
As we learned above, we could easily unblock the get / or put call, by passing it to the thread pool executor. However, this will, when there is a lot of bandwidth on the queue, easily saturate the thread pool with all its disadvantages.
For simplicity the code below does not wrap the worker into a process holding the event loop, but the outcome is the same.
async def worker(q: mp.Queue, uid: str):
# this worker will not block the event loop.
# but it will consume one thread until it is unblocked
loop = asyncio.get_event_loop()
while True:
try:
item = await loop.run_in_executor(None, q.get)
except asyncio.CancelledError:
return
# .... do some useful stuff
print(f"{uid} did something with {item}")
async def main():
loop = asyncio.get_event_loop()
q = mp.Queue()
# create then workers waiting for work
# means consuming 10 threads
workers = [loop.create_task(worker(q, f"worker-{uid}")) for uid in range(10)]
await asyncio.sleep(0) # yield to the event loop so it can schedule above tasks
# feed them a bit
for key in range(5):
q.put_nowait(f"food-{key}")
await asyncio.sleep(0.5) # sleep here to let the worker consume that food
# enough we want to stop:
[worker.cancel() for worker in workers]
await asyncio.wait(workers)
if __name__ == '__main__':
asyncio.run(main(), debug=True)
# prints:
# worker-0 did something with food-0
# worker-1 did something with food-1
# worker-2 did something with food-2
# worker-3 did something with food-3
# worker-4 did something with food-4
#
# and then hangs forever or throws exception and complains about broken pipe
This happens because the thread is canceled and the mp.Queue
instance holds a lock at this point. So we easily deadlock our system with that pattern, because there is no way to clean before we finally cancel the thread. I know this pattern is very often recommended on stack overflow but it will sooner or later make huge problems.
In better cases you will receive at least a broken pipe exception and you system stays responsive and sometimes it will shutdown as thought but still this happens in a non deterministic way.
Now what we can do?
What I have first done was to wrap up the mp.Queue
class into an adapter handling the thread pool executor pattern (like the package aiomultiprocess does). Of course this solves nothing, but gives a handy interface.
class Queue:
"""
An adapter to make multiprocessing.Queue available for async
"""
def __init__(self, maxsize: int = 0, *, ctx=None, queue: mp.Queue = None):
self.maxsize = maxsize
if not queue:
if not ctx:
self._mp_queue = mp.JoinableQueue(maxsize)
else:
self._mp_queue = mp.JoinableQueue(maxsize, ctx=ctx)
else:
self._mp_queue = queue
self._t_executor: None | concurrent.futures.ThreadPoolExecutor = None
self._loop = None
@property
def loop(self):
if not self._loop:
self._loop = asyncio.get_event_loop()
return self._loop
@property
def _executor(self):
if not self._t_executor:
self._t_executor = concurrent.futures.ThreadPoolExecutor()
return self._t_executor
async def get_async(self):
item = await self.loop.run_in_executor(self._executor, self._mp_queue.get)
return item
def get_sync(self):
return self._mp_queue.get()
def get_nowait(self):
return self._mp_queue.get_nowait()
def get(self):
if from_coroutine():
return self.get_async()
else:
return self.get_sync()
def put_sync(self, item):
self._mp_queue.put(item)
async def put_async(self, item):
item = await self.loop.run_in_executor(self._executor, self._mp_queue.put, item)
return item
def put_nowait(self, item):
return self._mp_queue.put_nowait(item)
def put(self, item):
if from_coroutine():
return self.put_async(item)
else:
return self.put_sync(item)
def task_done(self):
return self._mp_queue.task_done()
async def join(self):
return await asyncio.to_thread(self._mp_queue.join)
def sync_join(self):
return self._mp_queue.join()
def flush(self):
self._mp_queue.put_nowait(None)
def empty(self):
return self._mp_queue.empty()
def qsize(self):
return self._mp_queue.qsize()
def full(self):
return self._mp_queue.full()
def close(self):
return self._mp_queue.close()
def __str__(self):
return self._mp_queue.__str__()
def __repr__(self):
return f"<Queue at {id(self)} maxsize={self.maxsize}>"
As next, I needed to add a lot of fuses with mp.Event
to communicate the start and shutdown to the workers making them a kind of sequencer. That makes the worker aware of an upcoming shutdown and he than could flush the queue with some poison pills and when all ends are poisoned and finally have been died calling the Queue.close
method to let die the child process. This was working good, as long as the count of getters and putters was not large. But as soon as there are thousands of coroutines trying to put or get the queue and/or the thread pool gets saturated returning into a virtual deadlocked system. This getting worse when there are also inter dependencies, for example when one worker does put and get onto the same queue (=on the same thread pool).
Then I found this very impressive YouTube clip from Mr. David Beazley in which he shows in a live coding session how one could connect asyncio with threading by using the concurrent.future.Future
class avoiding the thread pool pattern.
But it turns out, that some of the used parts are not working with multiprocessing because we need to serialize things before transmitting them.
Instead of pushing the blocking get method onto the thread pool one could wrap that blocking call into a concurrent.future.Future. And asyncio could wait for it with await asyncio.wrap_future(concurrent.futures.Future())
without having the above problems from the thread pool. But to mange the waiting state of putters and getters we need to communicate them again between all parties. David Beazley uses the collections.deque()
class. This works super for threading applications where you could directly communicate python objects. But deque fails when using it with multiprocessing because of its involved RLock
its not serializable. And now I’m here where I’m working at. Some time ago I issued a deque miming class based on the multiprocessing.shared-memory
package:
shared-memory-queue (this link opens in a new window) by cloasdata (this link opens in a new window)
StaticQueue class memes the collections.deque interface. The goal for this class was to improve performance of mp.Queue class.
But this class still lacks of synchronization so that it is not possible to run it thread safe at the moment. I’m also working on a semaphore based on shared memory to level the shared-memory-queue to a thread safe queue.
Below you see the Queue implementation for multiprocessing inspired by David. But still without communication of the getter/putter channel as deque will be dead linked when child processes are spwaned.
class Queue3:
"""
An adapter to make multiprocessing.Queue available for async
using Futures instead of threads.
However, the communication of the putters and getters will not work,
as they are not connected. The use of deque will only work at that worker but fails
to communicate to other workers
"""
def __init__(self, maxsize: int = 0, *, ctx=None, queue: "mp.Queue[T]" = None):
self.maxsize = maxsize
if not queue:
if not ctx:
self._mp_queue: "mp.JoinableQueue[T]" = mp.JoinableQueue(maxsize)
else:
self._mp_queue: "mp.JoinableQueue[T]" = mp.JoinableQueue(maxsize, ctx=ctx)
else:
self._mp_queue = queue
self.putters: deque[Future] = deque()
self.getters: deque[Future] = deque()
def get_noblock(self) -> tuple[T | None, Future | None]:
item = None
fut = None
print("get_noblock", mp.current_process().name)
try:
item = self._mp_queue.get(block=False)
if self.putters:
self.putters.popleft().set_result(True)
except Empty:
print("empty", mp.current_process().name)
fut = Future()
self.getters.append(fut)
except Exception as e:
breakpoint()
finally:
return item, fut
async def get_async(self, timeout=None):
item, fut = self.get_noblock()
if fut:
print("wait for future at", mp.current_process())
item = await asyncio.wait_for(asyncio.wrap_future(fut), timeout)
return item
def get_sync(self, timeout=None):
item, fut = self.get_noblock()
if fut:
print("wait sync for future", mp.current_process().name)
item = fut.result(timeout) # block until
return item
def get_nowait(self):
return self._mp_queue.get_nowait()
def get(self, timeout=None):
if from_coroutine():
return self.get_async(timeout)
else:
return self.get_sync(timeout)
def put_noblock(self, item: T) -> Future | None:
fut = None
try:
if self.getters:
self.getters.pop().set_result(item)
else:
self._mp_queue.put(item, block=False)
except Full:
fut = Future()
self.putters.append(fut)
finally:
return fut
def put_sync(self, item:T, timeout=None):
while True:
fut = self.put_noblock(item)
if fut is None:
return
fut.result(timeout) # block until timeout
async def put_async(self, item:T, timeout=None):
while True:
fut = self.put_noblock(item)
if fut is None:
return
await asyncio.wait_for(asyncio.wrap_future(fut), timeout)
def put(self, item:T, timeout=None):
if from_coroutine():
return self.put_async(item, timeout)
else:
return self.put_sync(item, timeout)
def put_nowait(self, item):
return self._mp_queue.put_nowait(item)
def task_done(self):
return self._mp_queue.task_done()
async def join(self):
return await asyncio.to_thread(self._mp_queue.join)
def sync_join(self):
return self._mp_queue.join()
def flush(self):
self._mp_queue.put_nowait(None)
def empty(self):
return self._mp_queue.empty()
def qsize(self):
return self._mp_queue.qsize()
def full(self):
return self._mp_queue.full()
def close(self):
return self._mp_queue.close()
def __str__(self):
return self._mp_queue.__str__()
def __repr__(self):
return f"<Queue at {id(self)} maxsize={self.maxsize}>"
Another way could be for example to communicate the items satisfying getters and putters via additional mp.Queue
‘s .This should be possible because, the getter/putter channel bandwidth should assumption wise not be that large as the main queue.