{"id":25,"date":"2022-09-22T17:53:46","date_gmt":"2022-09-22T17:53:46","guid":{"rendered":"https:\/\/cloasdata.de\/?p=25"},"modified":"2022-09-25T15:30:11","modified_gmt":"2022-09-25T15:30:11","slug":"asyncio-multiprocessing-missing-link","status":"publish","type":"post","link":"https:\/\/cloasdata.de\/?p=25","title":{"rendered":"asyncio, multiprocessing &#8211; missing link"},"content":{"rendered":"\n<p class=\"has-text-align-left\">In this blog post you learn first some basic design patterns in python asyncio to have a common knowledge for the further parts of this article. It also shows possible problems which occur during development with asynchronous programming in general. <\/p>\n\n\n\n<p>I also recommend here to read my other post on parallelism.<\/p>\n\n\n<div class=\"vlp-link-container vlp-layout-basic wp-block-visual-link-preview-link\"><a href=\"https:\/\/cloasdata.de\/?p=172\" class=\"vlp-link\" title=\"horizontal and vertical Parallelism\"><\/a><div class=\"vlp-layout-zone-main\"><div class=\"vlp-block-0 vlp-link-title\">horizontal and vertical Parallelism<\/div><div class=\"vlp-block-1 vlp-link-summary\">In this post I will try to give you some insights in my mental model about parallelism when using asynchronous or threading style parallelism. And I also show how you can utilize both types simultaneous. Disclaimer This may not be a super valid article about parallelism in Python. It may also not be an proper &hellip;<p class=\"read-more\"> <a class=\"\" href=\"\">  Read More &raquo;<\/a><\/p><\/div><\/div><\/div>\n\n\n<p>The second part first gives an introduction to pythons multiprocessing world and then dives into occurring problems when using more than one event loop simultaneous. <\/p>\n\n\n\n<p>The last part examines the underlying multiprocessing architecture of python which is not very compatible with python asynchronous programming implementation and sketches some possible solution on inter process communication between different event loops.<\/p>\n\n\n\n<p>At the end one could write a high performance multi event loop software but still with some unsolved problems.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\">Disclaimer<\/h2>\n\n\n\n<p>You need to have at least basic knowledge about asyncio or asynchronous programming. Including a mental model for coroutines and what await, async keywords do. If you do not have such model in you head, this article may not be worth to read it.<br>Myself learned asyncio from the <a href=\"https:\/\/docs.python.org\/3\/library\/asyncio.html\">python docs<\/a> and later used this <a href=\"https:\/\/amzn.to\/3S1w3hQ\">book<\/a><\/p>\n\n\n\n<p><sub><sup>Title <a href=\"https:\/\/commons.wikimedia.org\/wiki\/File:Lemming_stopper,_graffiti_on_wall.jpg\">Photo<\/a> &#8211; r2hox \/ <a href=\"https:\/\/creativecommons.org\/licenses\/by-sa\/2.0\/deed.en\">CC2<\/a><\/sup><\/sub><\/p>\n\n\n\n<!--nextpage-->\n\n\n\n<h2 class=\"wp-block-heading\">Part I: asyncio<\/h2>\n\n\n\n<p>One of the advantage of async programming is the light weight of the underlying coroutines  having nearly no overhead compared the threads or processes. So it is very recommended to issue many of them parallel.<\/p>\n\n\n\n<pre title=\"producer\/consumer in asyncio \" class=\"wp-block-code has-small-font-size\"><code lang=\"python\" class=\"language-python line-numbers\">import asyncio\n\nasync def aio_producer(pending: set):\n    loop = asyncio.get_event_loop()\n    for _ in range(100):\n        work = random.randint(0,100) \/ 100\n        task = loop.create_task(consumer_coro(work))\n        pending.add(task)\n    # Wait until consumer has eaten all work\n    print(\"Producer done\")\n\n# this is a simple consumer coroutine now.\nasync def consumer_coro(work):\n    # simulates some i\/o bound task\n    await asyncio.sleep(work)\n    return f\"done {work}\"\n\n\nasync def main():\n    pending = set() # we keep reference to the pending task. To wait for them and get the result\n    await aio_producer(pending)\n    while pending:\n        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)\n        for task in done:\n            print(task.result())\n        print(f\"{len(done)=}, {len(pending)=}\")\n    print(\"Bye\")\n\nasyncio.run(main(), debug=True)\n\n<\/code><\/pre>\n\n\n\n<p>In the above code snippet we issue as many as possible coroutines and wait for them to finish. So the execution of each single piece of work runs (virtual) in parallel to all other ones. <\/p>\n\n\n\n<p>A typical anti pattern would be here to simple await that consumer coroutine and than issue another one. This happens typical when one tries to implement the producer consumer pattern with a queue synchronization.<\/p>\n\n\n\n<pre title=\"producer consumer anti pattern\" class=\"wp-block-code has-small-font-size\"><code lang=\"python\" class=\"language-python line-numbers\">async def aio_producer(pending: set):\n    loop = asyncio.get_event_loop()\n    for _ in range(100):\n        work = random.randint(0,100) \/ 100\n        task = loop.create_task(consumer_coro(work))\n        pending.add(task)\n    # Wait until consumer has eaten all work\n    print(\"Producer done\")\n\n# this is a simple consumer coroutine now.\nasync def consumer_coro(work):\n    # simultates some i\/o bound task\n    await asyncio.sleep(work)\n    return f\"done {work}\"\n\n\nasync def main():\n    pending = set() # we keep reference to the pending task. To wait for them and get the result\n    await aio_producer(pending)\n    while pending:\n        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)\n        for task in done:\n            print(task.result())\n        print(f\"{len(done)=}, {len(pending)=}\")\n    print(\"Bye\")\n\nasyncio.run(main(), debug=True)<\/code><\/pre>\n\n\n\n<p>As we now know how to implement a high parallelized asynchronous design the next most obvious snare one could fall over is to block the event loop with CPU bound tasks or other blocking calls.<\/p>\n\n\n\n<p>In general blocking the event loop will delay all other subsequent coroutines and will therefore disturb the scheduling. You can easily test this by exchange the <code>asyncio.sleep<\/code> with <code>time.sleep<\/code> in the above examples.<br>As long as the event loop is blocked as long other already finished coroutines cannot be processed or new coroutines cannot be fired and need to wait longer than necessary. In consequence this could than pile CPU bound work, so that it may have a positive feedback loop making you system hang.<\/p>\n\n\n\n<p>In consequence there are many ways to block the event loop.  A typical one is to call a synchronous function of a package, module or API. The good thing here is addressing this is very easy. Instead of calling the function directly from the coroutine we pass it to another thread and wait for the thread execution. Pythons asyncio package brings a simple solution. There are to possibilities here. Use <code>loop.run_in_executor<\/code> or <code>loop.to_task<\/code>. The later one is wrapper of first one. <\/p>\n\n\n\n<pre title=\"unblock blocking calls\" class=\"wp-block-code has-small-font-size\"><code lang=\"python\" class=\"language-python line-numbers\">import asyncio\nimport time\n\n\ndef blocking_api_function():\n    print(\"working heavily\")\n    time.sleep(1)\n    print(\"done\")\n    return \"42\"\n\nasync def block_main():\n    # this will block and raise at least a warning. So it blocks the event loop for a second.\n    res = blocking_api_function()\n    print(res)\n\nasyncio.run(block_main(), debug=True)\n\nasync def main():\n    # now we want to unblock the blocking api call\n    loop = asyncio.get_event_loop()\n\n    # we wrap that function into a future\n    fut = loop.run_in_executor(None, blocking_api_function)\n    # same as loop.to_task(...)\n\n    # we than can await the future if we need to\n    res = await fut # event loop could do other things now.\n    print(res)\n\nasyncio.run(main(), debug=True)<\/code><\/pre>\n\n\n\n<p>More dangerous are CPU bound things which are not that obvious. For example one is running a function with a some parsing operation. The function is called several times while iterating something. Now each single line of code will add load and will delay the event loop. As long as there is enough CPU time remaining this may or may not be a problem. In some cases this could be obvious in many other scenarios it is hard to figure out. Especially when you do high parallelized design with a lot of CPU bound work. This will always make a bottle neck, pilling up CPU load, even when there is a lot I\/O bound stuff one going. On this particular topic I will write another post in future.<br>A typical estimation is running the even loop on more than 30% saturated CPU will bring you in difficulties, leaving the event loop unable to react timely.<\/p>\n\n\n\n<p>Even if the underlying code is an I\/O bound task many packages do not unblock such calls (that is not absolute correct because I\/O bound python code will make the interpreter to release the GIL to the operating system and this why the thread pool pattern above does work) because there are not async aware. Again the most common advise at first glance is to pass this call to the thread pool.<\/p>\n\n\n\n<p>In my real world problems I do never have pure CPU bound code (anyway this would be a show stopper for using asyncio) and I also never have a pure I\/O or peripheral task (which is the dreamland of all asyncio programmers). So a event loop will always need to manage CPU bound things, even if you try to externalize it to a thread pool. <\/p>\n\n\n\n<p>Before satirizing now the CPU and adding more and more delay it may be a worth an idea to pass load to other cores of you machine without loosing the ability of concurrency and &#8220;cost free&#8221; I\/O tasks.<\/p>\n\n\n\n<p>In the next chapter we will see how to utilize more than one event loop within the python battery. <\/p>\n\n\n\n<!--nextpage-->\n\n\n\n<h2 class=\"wp-block-heading\">Part II: multiprocessing<\/h2>\n\n\n\n<p>As we know it is not a good idea to block the event loop with anything. If no native asyncio interface is available for  I\/O bound stuff  it could be a good idea to move it to the thread pool executor as we have seen in Part 1. But when we still need to handle CPU load than this pattern may not help even briefly.<\/p>\n\n\n\n<p>In today&#8217;s world we have machines with more than one core but we have also python which cannot utilize them natively due to its GIL &#8220;limitations&#8221;. To overcome this natural barrier pythons battery has two packages.<\/p>\n\n\n\n<p>The obviously one is <code>multiprocessing<\/code> but is used on a lower level.  So we start with <code>concurrent.futures<\/code> package first.<br>The package is a kind of wrapper around multiprocessing and threading and gives a lightweight API. It is quiet a ease to work on a multi core environment.<\/p>\n\n\n\n<pre title=\"concurrent.futures\" class=\"wp-block-code has-small-font-size\"><code lang=\"python\" class=\"language-python line-numbers\">from concurrent.futures import ProcessPoolExecutor\n\n\ndef cpu_bound_func(work):\n    result = 0\n    for _ in range(work * 1000):\n        result = result + 1\n    return result\n\n\nif __name__ == \"__main__\":\n    executor = ProcessPoolExecutor()\n    work = [1, 2, 3, 4]\n    results = executor.map(cpu_bound_func, work)\n    print(list(results))<\/code><\/pre>\n\n\n\n<p>Also asyncio does have a lightweight interface for the usage of concurrent.futures classes. Instead of passing <code>None<\/code> we explicitly pass a process pool and ask asyncios event loop to wait for it.<\/p>\n\n\n\n<pre title=\"Using the process pool in asyncio\" class=\"wp-block-code has-small-font-size\"><code lang=\"python\" class=\"language-python line-numbers\">async def main():\n    executor = ProcessPoolExecutor()\n    loop = asyncio.get_event_loop()\n    works = [1, 2, 3, 4]\n    pending = set()\n    for work in works:\n        pending.add(\n            loop.run_in_executor(executor, cpu_bound_func, work)\n        )\n    done, pending = await asyncio.wait(pending)\n    print([t.result() for t in done])\n    print(\"bye\")\n\nif __name__ == '__main__':\n    asyncio.run(main(), debug=True)<\/code><\/pre>\n\n\n\n<p>While this looks sure as hell why then we should not place everything on to the process pool which looks like CPU load?<\/p>\n\n\n\n<p>The answer is longer then one may except. At first all communication to a process goes through underlying operating system pipe or even worse uses a server\/client architecture. And here all the problems are starting you could endeavor through the high performance journey. <br>You cannot send everything to child process and therefore, you must take care of information needs to be serialize-able or pickle-able. For simple things like strings this is not a problem but there are surely a lot of objects you may like to pass and work on. As long as you can manipulate the class objects by inheritance for example this can be handled in many cases but when you are trying to provide for example C implemented classes than things getting harder and  harder and will add surely more and more overhead to the communication problem but also to you head. <\/p>\n\n\n\n<p>Second, serializing takes time and adds load to the parent process (remember, this is what we want to avoid). So if you plan to shift only a bit of CPU bound work this may not be the solution because due the above overhead needed this makes things slower. <br>If you want to solve bigger chunks of work this is the way for you. In conclusion this design decision should always involve some profiling. <strong>In general assumption here about code behavior are in nearly every case wrong and therefore, optimized code on that assumption will not show positive effects.<\/strong><\/p>\n\n\n\n<p>Third there will be a lot of more problems then just the one sketched above because multiprocessing is handling a lot of OS bounded things. For example the underlying pipe does not have a large buffer to accumulate work before sending it to the worker or pipes may break when they are not properly flushed and will leave the worker or parent process in a non deterministic state. I will cover some of the pitfalls in the next chapter as well.<\/p>\n\n\n\n<p>It looks now like it is impossible to run a event loop in a kind of mixed mode (between CPU and i\/o bound) for high performance but there is again a quiet simple solution for it. Instead of running one event loop on one core, we now try to run several loops on several core. <\/p>\n\n\n\n<pre title=\"asynchronous multiprocessing\" class=\"wp-block-code has-small-font-size\"><code lang=\"python\" class=\"language-python line-numbers\">async def main():\n    for _ in range(10):\n        cpu_bound_func(1)  # should not take longer than 100 ms here\n        await asyncio.sleep(0)\n    print(\"async main gone at\", multiprocessing.current_process())\n\n\ndef async_worker():\n    print(\"starting\", multiprocessing.current_process())\n    asyncio.run(main(), debug=True)\n    print(\"bye from\", multiprocessing.current_process())\n\ndef do_main():\n    processes = [multiprocessing.Process(target=async_worker, name=f\"P&lt;{count}&gt;\")\n                 for count in range(multiprocessing.cpu_count())]\n    for p in processes:\n        p.start()\n\n    while not all(p.is_alive() for p in processes):\n        print(\".\", end=\"\")\n\nif __name__ == '__main__':\n    do_main()<\/code><\/pre>\n\n\n\n<p>The above code implements an async worker which is running than later on a process. Simple so far. In most cases though we need some inter process communication to put work onto the worker or to balance the worker loads.<\/p>\n\n\n\n<p>In the next chapter I will explain how basic communication could be implemented and what problems we are going to fight with it.<\/p>\n\n\n\n<!--nextpage-->\n\n\n\n<h2 class=\"wp-block-heading\">Part III: The missing link.<\/h2>\n\n\n\n<p>We basically have seen it is not that tough to have multiple event loops running on different cores. But in typical use cases you will need to communicate to or from a process and its event loop. For example to get some state information or to simply distribute the work. <\/p>\n\n\n\n<p>The package multiprocessing comes with some possible candidates. A very low level one is<code> multiprocessing.Pipe<\/code>. As we do need more then just sending and receiving bytes we quickly land at the <code>multiprocessing.Queue<\/code> class. The Queue class itself is already synchronized, so we do not rank our brains about parallel access to resources etc (thread-safeness). But the problem with the mp.Queue class is, that its not designed for using it in the asynchronous world. Because it &#8220;tends&#8221; to block. And polling may also not be a good idea because it will consume CPU.<\/p>\n\n\n\n<pre title=\"mp.Queue blocking call\" class=\"wp-block-code has-small-font-size\"><code lang=\"python\" class=\"language-python\">import asyncio\nimport multiprocessing as mp\nfrom queue import Empty\n\nasync def main():\n    q = mp.Queue()\n    q.put_nowait(\"stuff\")\n    print(q.get())  # note that this would block if there is now stuff on the queue\n    try:\n        q.get(timeout=.31) # and now it will block for at least .31 seconds\n    except Empty:\n        print(\"queue is finally empty\")\n\nif __name__ == '__main__':\n    asyncio.run(main(), debug=True)\n    # prints:\n    # stuff\n    # queue is finally empty\n    # Executing &lt;Task finished name='Task-1' coro=&lt;main() done, defined at C:\\ ... tasks.py:634&gt; took 0.329 seconds\n    <\/code><\/pre>\n\n\n\n<p>As we learned above, we could easily unblock the get \/ or put call, by passing it to the thread pool executor. However, this will, when there is a lot of bandwidth on the queue, easily saturate the thread pool with all its disadvantages. <\/p>\n\n\n\n<p class=\"has-small-font-size\">For simplicity the code below does not wrap the worker into a process holding the event loop, but the outcome is the same.<\/p>\n\n\n\n<pre title=\"Broken pipes: Run this code only in safe environtment\" class=\"wp-block-code has-small-font-size\"><code lang=\"python\" class=\"language-python\">async def worker(q: mp.Queue, uid: str):\n    # this worker will not block the event loop.\n    # but it will consume one thread until it is unblocked\n    loop = asyncio.get_event_loop()\n    while True:\n        try:\n            item = await loop.run_in_executor(None, q.get)\n        except asyncio.CancelledError:\n            return\n        # .... do some useful stuff\n        print(f\"{uid} did something with {item}\")\n\n\nasync def main():\n    loop = asyncio.get_event_loop()\n    q = mp.Queue()\n    # create then workers waiting for work\n    # means consuming 10 threads\n    workers = [loop.create_task(worker(q, f\"worker-{uid}\")) for uid in range(10)]\n    await asyncio.sleep(0)  # yield to the event loop so it can schedule above tasks\n    # feed them a bit\n    for key in range(5):\n        q.put_nowait(f\"food-{key}\")\n    await asyncio.sleep(0.5)  # sleep here to let the worker consume that food\n    # enough we want to stop:\n    [worker.cancel() for worker in workers]\n    await asyncio.wait(workers)\n\n\nif __name__ == '__main__':\n    asyncio.run(main(), debug=True)\n    # prints:\n    # worker-0 did something with food-0\n    # worker-1 did something with food-1\n    # worker-2 did something with food-2\n    # worker-3 did something with food-3\n    # worker-4 did something with food-4\n    #\n    # and then hangs forever or throws exception and complains about broken pipe<\/code><\/pre>\n\n\n\n<p>This happens because the thread is canceled and the <code>mp.Queue <\/code>instance holds a lock at this point. So we easily deadlock our system with that pattern, because there is no way to clean before we finally cancel the thread. I know this pattern is very often recommended on stack overflow but it will sooner or later make huge problems.<br>In better cases you will receive at least a broken pipe exception and you system stays responsive and sometimes it will shutdown as thought but still this happens in a non deterministic way.<\/p>\n\n\n\n<p>Now what we can do?<\/p>\n\n\n\n<p>What I have first done was to wrap up the <code>mp.Queue<\/code> class into an adapter handling the thread pool executor pattern (like the package aiomultiprocess does). Of course this solves nothing, but gives a handy interface. <\/p>\n\n\n\n<pre title=\"A wrapper using thread pool\" class=\"wp-block-code has-small-font-size\"><code lang=\"python\" class=\"language-python\">class Queue:\n    \"\"\"\n    An adapter to make multiprocessing.Queue available for async\n    \"\"\"\n\n    def __init__(self, maxsize: int = 0, *, ctx=None, queue: mp.Queue = None):\n        self.maxsize = maxsize\n        if not queue:\n            if not ctx:\n                self._mp_queue = mp.JoinableQueue(maxsize)\n            else:\n                self._mp_queue = mp.JoinableQueue(maxsize, ctx=ctx)\n        else:\n            self._mp_queue = queue\n        self._t_executor: None | concurrent.futures.ThreadPoolExecutor = None\n        self._loop = None\n\n    @property\n    def loop(self):\n        if not self._loop:\n            self._loop = asyncio.get_event_loop()\n        return self._loop\n\n    @property\n    def _executor(self):\n        if not self._t_executor:\n            self._t_executor = concurrent.futures.ThreadPoolExecutor()\n        return self._t_executor\n\n    async def get_async(self):\n        item = await self.loop.run_in_executor(self._executor, self._mp_queue.get)\n        return item\n\n    def get_sync(self):\n        return self._mp_queue.get()\n\n    def get_nowait(self):\n        return self._mp_queue.get_nowait()\n\n    def get(self):\n        if from_coroutine():\n            return self.get_async()\n        else:\n            return self.get_sync()\n\n    def put_sync(self, item):\n        self._mp_queue.put(item)\n\n    async def put_async(self, item):\n        item = await self.loop.run_in_executor(self._executor, self._mp_queue.put, item)\n        return item\n\n    def put_nowait(self, item):\n        return self._mp_queue.put_nowait(item)\n\n    def put(self, item):\n        if from_coroutine():\n            return self.put_async(item)\n        else:\n            return self.put_sync(item)\n\n    def task_done(self):\n        return self._mp_queue.task_done()\n\n    async def join(self):\n        return await asyncio.to_thread(self._mp_queue.join)\n\n    def sync_join(self):\n        return self._mp_queue.join()\n\n    def flush(self):\n        self._mp_queue.put_nowait(None)\n\n    def empty(self):\n        return self._mp_queue.empty()\n\n    def qsize(self):\n        return self._mp_queue.qsize()\n\n    def full(self):\n        return self._mp_queue.full()\n\n    def close(self):\n        return self._mp_queue.close()\n\n    def __str__(self):\n        return self._mp_queue.__str__()\n\n    def __repr__(self):\n        return f\"&lt;Queue at {id(self)} maxsize={self.maxsize}&gt;\"\n<\/code><\/pre>\n\n\n\n<p>As next, I needed to add a lot of fuses with <code>mp.Event<\/code> to communicate the start and shutdown to the workers making them a kind of sequencer. That makes the worker aware of an upcoming shutdown and he than could flush the queue with some poison pills and when all ends are poisoned and finally have been died calling the <code>Queue.close<\/code> method to let die the child process. This was working good, as long as the count of getters and putters was not large. But as soon as there are thousands of coroutines trying to put or get the queue and\/or the thread pool gets saturated returning into a virtual deadlocked system. This getting worse when there are also inter dependencies, for example when one worker does put and get onto the same queue (=on the same thread pool). <\/p>\n\n\n\n<p>Then I found this very impressive YouTube clip from Mr. David Beazley in which he shows in a live coding session how one could connect asyncio with threading by using the <code>concurrent.future.Future<\/code> class avoiding the thread pool pattern.<\/p>\n\n\n\n<figure class=\"wp-block-embed is-type-video is-provider-youtube wp-block-embed-youtube wp-embed-aspect-4-3 wp-has-aspect-ratio\"><div class=\"wp-block-embed__wrapper\">\n<div class=\"ast-oembed-container \" style=\"height: 100%;\"><iframe loading=\"lazy\" title=\"The Other Async (Threads + Async = \u2764\ufe0f)\" width=\"500\" height=\"375\" src=\"https:\/\/www.youtube.com\/embed\/x1ndXuw7S0s?feature=oembed\" frameborder=\"0\" allow=\"accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share\" referrerpolicy=\"strict-origin-when-cross-origin\" allowfullscreen><\/iframe><\/div>\n<\/div><\/figure>\n\n\n\n<p>But it turns out, that some of the used parts are not working with multiprocessing because we need to serialize things before transmitting them. <\/p>\n\n\n\n<p>Instead of pushing the blocking get method onto the thread pool one could wrap that blocking call into a concurrent.future.Future. And asyncio could wait for it with <code><em>await <\/em>asyncio.wrap_future(concurrent.futures.Future())<\/code> without having the above problems from the thread pool. But to mange the waiting state of putters and getters we need to communicate them again between all parties. David Beazley uses the <code>collections.deque()<\/code> class. This works super for threading applications where you could directly communicate python objects. But deque fails when using it with multiprocessing because of its involved <code>RLock<\/code> its not serializable. And now I&#8217;m here where I&#8217;m working at. Some time ago I issued a deque miming class based on the <code>multiprocessing.shared-memory<\/code> package: <\/p>\n\n\n<div class=\"ebg-br-wrapper ebg-br-wrapper-dark-mode-on\">\n\t<div class=\"ebg-br-background-image\"><\/div>\n\t<div class=\"ebg-br-editmode egb-br-darkmode-status\">\n\t\t<span class=\"egb-br-darkmode-status-img\">Dark Mode<\/span>\n\t<\/div>\n\t<div class=\"ebg-br-avatar\">\n\t\t<img loading=\"lazy\" decoding=\"async\" class=\"ebg-br-header-avatar\" src=\"https:\/\/avatars.githubusercontent.com\/u\/84661606?v=4\" alt=\"\" width=\"150\" height=\"150\" \/>\n\t<\/div>\n\t<div class=\"ebg-br-main\">\n\t\t<p class=\"ebg-br-title\">\n\t\t\t<strong>\n\t\t\t\t<a target=\"_blank\" rel=\"noopener noreferrer\" href=\"https:\/\/github.com\/cloasdata\/shared-memory-queue\">\n\t\t\t\t\tshared-memory-queue\n\t\t\t\t\t<span class=\"screen-reader-text\">(this link opens in a new window)<\/span>\n\t\t\t\t<\/a>\n\t\t\t<\/strong>\n\t\t\t<em>\n\t\t\t\tby<a target=\"_blank\" rel=\"noopener noreferrer\" href=\"https:\/\/github.com\/cloasdata\">\n\t\t\t\t\tcloasdata\n\t\t\t\t\t<span class=\"screen-reader-text\">(this link opens in a new window)<\/span>\n\t\t\t\t<\/a>\n\t\t\t<\/em>\n\t\t<\/p>\n\t\t<p class=\"ebg-br-description\">StaticQueue class memes the collections.deque interface. The goal for this class was to improve performance of mp.Queue class.<\/p>\n\t\t<p class=\"ebg-br-footer\">\n\t\t\t<span class=\"ebg-br-subscribers\">\n\t\t\t\t<span class=\"ebg-br-background-image\"><\/span>\n\t\t\t\t1 Subscriber\t\t\t<\/span>\n\t\t\t<span class=\"ebg-br-watchers\">\n\t\t\t\t<span class=\"ebg-br-background-image\"><\/span>\n\t\t\t\t0 Watchers\t\t\t<\/span>\n\t\t\t<span class=\"ebg-br-forks\">\n\t\t\t\t<span class=\"ebg-br-background-image\"><\/span>\n\t\t\t\t1 Fork\t\t\t<\/span>\n\t\t\t<a target=\"_blank\" rel=\"noopener noreferrer\" class=\"ebg-br-link\" href=\"https:\/\/github.com\/cloasdata\/shared-memory-queue\">\n\t\t\t\tCheck out this repository on GitHub.com\t\t\t\t<span class=\"screen-reader-text\">(this link opens in a new window)<\/span>\n\t\t\t<\/a>\n\t\t<\/p>\n\t<\/div>\n<\/div>\n\n\n<p>But this class still lacks of synchronization so that it is not possible to run it thread safe at the moment. I&#8217;m also working on a semaphore based on shared memory to level the shared-memory-queue to a thread safe queue.<\/p>\n\n\n\n<p>Below you see the Queue implementation for multiprocessing inspired by David. But still without communication of the getter\/putter channel as deque will be dead linked when child processes are spwaned. <br><\/p>\n\n\n\n<pre title=\"A possible candidate for future development\" class=\"wp-block-code has-small-font-size\"><code lang=\"python\" class=\"language-python\">class Queue3:\n    \"\"\"\n    An adapter to make multiprocessing.Queue available for async\n    using Futures instead of threads.\n    However, the communication of the putters and getters will not work,\n    as they are not connected. The use of deque will only work at that worker but fails\n    to communicate to other workers\n    \"\"\"\n\n    def __init__(self, maxsize: int = 0, *, ctx=None, queue: \"mp.Queue[T]\" = None):\n        self.maxsize = maxsize\n        if not queue:\n            if not ctx:\n                self._mp_queue: \"mp.JoinableQueue[T]\" = mp.JoinableQueue(maxsize)\n            else:\n                self._mp_queue: \"mp.JoinableQueue[T]\" = mp.JoinableQueue(maxsize, ctx=ctx)\n        else:\n            self._mp_queue = queue\n        self.putters: deque[Future] = deque()\n        self.getters: deque[Future] = deque()\n\n    def get_noblock(self) -&gt; tuple[T | None, Future | None]:\n        item = None\n        fut = None\n        print(\"get_noblock\", mp.current_process().name)\n        try:\n            item = self._mp_queue.get(block=False)\n            if self.putters:\n                self.putters.popleft().set_result(True)\n        except Empty:\n            print(\"empty\", mp.current_process().name)\n            fut = Future()\n            self.getters.append(fut)\n        except Exception as e:\n            breakpoint()\n        finally:\n            return item, fut\n\n    async def get_async(self, timeout=None):\n        item, fut = self.get_noblock()\n        if fut:\n            print(\"wait for future at\", mp.current_process())\n            item = await asyncio.wait_for(asyncio.wrap_future(fut), timeout)\n        return item\n\n    def get_sync(self, timeout=None):\n        item, fut = self.get_noblock()\n        if fut:\n            print(\"wait sync for future\", mp.current_process().name)\n            item = fut.result(timeout) # block until\n        return item\n\n    def get_nowait(self):\n        return self._mp_queue.get_nowait()\n\n    def get(self, timeout=None):\n        if from_coroutine():\n            return self.get_async(timeout)\n        else:\n            return self.get_sync(timeout)\n\n    def put_noblock(self, item: T) -&gt; Future | None:\n        fut = None\n        try:\n            if self.getters:\n                self.getters.pop().set_result(item)\n            else:\n                self._mp_queue.put(item, block=False)\n        except Full:\n            fut = Future()\n            self.putters.append(fut)\n        finally:\n            return fut\n\n    def put_sync(self, item:T, timeout=None):\n        while True:\n            fut = self.put_noblock(item)\n            if fut is None:\n                return\n            fut.result(timeout) # block until timeout\n\n    async def put_async(self, item:T, timeout=None):\n        while True:\n            fut = self.put_noblock(item)\n            if fut is None:\n                return\n            await asyncio.wait_for(asyncio.wrap_future(fut), timeout)\n\n    def put(self, item:T, timeout=None):\n        if from_coroutine():\n            return self.put_async(item, timeout)\n        else:\n            return self.put_sync(item, timeout)\n\n    def put_nowait(self, item):\n        return self._mp_queue.put_nowait(item)\n\n    def task_done(self):\n        return self._mp_queue.task_done()\n\n    async def join(self):\n        return await asyncio.to_thread(self._mp_queue.join)\n\n    def sync_join(self):\n        return self._mp_queue.join()\n\n    def flush(self):\n        self._mp_queue.put_nowait(None)\n\n    def empty(self):\n        return self._mp_queue.empty()\n\n    def qsize(self):\n        return self._mp_queue.qsize()\n\n    def full(self):\n        return self._mp_queue.full()\n\n    def close(self):\n        return self._mp_queue.close()\n\n    def __str__(self):\n        return self._mp_queue.__str__()\n\n    def __repr__(self):\n        return f\"&lt;Queue at {id(self)} maxsize={self.maxsize}&gt;\"<\/code><\/pre>\n\n\n\n<p>Another way could be for example to communicate the items satisfying getters and putters via additional <code>mp.Queue<\/code>&#8216;s .This should be possible because, the getter\/putter channel bandwidth should assumption wise not be that large as the main queue. <\/p>\n\n\n\n<p><\/p>\n\n\n\n<p><\/p>\n\n\n\n<p><\/p>\n\n\n\n<div class=\"wp-block-media-text alignwide is-stacked-on-mobile\"><figure class=\"wp-block-media-text__media\"><\/figure><div class=\"wp-block-media-text__content\">\n<p><\/p>\n<\/div><\/div>\n","protected":false},"excerpt":{"rendered":"<p>In this blog post you learn first some basic design patterns in python asyncio to have a common knowledge for the further parts of this article. It also shows possible problems which occur during development with asynchronous programming in general. I also recommend here to read my other post on parallelism. The second part first [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":269,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"site-sidebar-layout":"default","site-content-layout":"","ast-site-content-layout":"","site-content-style":"default","site-sidebar-style":"default","ast-global-header-display":"","ast-banner-title-visibility":"","ast-main-header-display":"","ast-hfb-above-header-display":"","ast-hfb-below-header-display":"","ast-hfb-mobile-header-display":"","site-post-title":"","ast-breadcrumbs-content":"","ast-featured-img":"","footer-sml-layout":"","theme-transparent-header-meta":"","adv-header-id-meta":"","stick-header-meta":"","header-above-stick-meta":"","header-main-stick-meta":"","header-below-stick-meta":"","astra-migrate-meta-layouts":"default","ast-page-background-enabled":"default","ast-page-background-meta":{"desktop":{"background-color":"","background-image":"","background-repeat":"repeat","background-position":"center center","background-size":"auto","background-attachment":"scroll","background-type":"","background-media":"","overlay-type":"","overlay-color":"","overlay-opacity":"","overlay-gradient":""},"tablet":{"background-color":"","background-image":"","background-repeat":"repeat","background-position":"center center","background-size":"auto","background-attachment":"scroll","background-type":"","background-media":"","overlay-type":"","overlay-color":"","overlay-opacity":"","overlay-gradient":""},"mobile":{"background-color":"","background-image":"","background-repeat":"repeat","background-position":"center center","background-size":"auto","background-attachment":"scroll","background-type":"","background-media":"","overlay-type":"","overlay-color":"","overlay-opacity":"","overlay-gradient":""}},"ast-content-background-meta":{"desktop":{"background-color":"var(--ast-global-color-5)","background-image":"","background-repeat":"repeat","background-position":"center center","background-size":"auto","background-attachment":"scroll","background-type":"","background-media":"","overlay-type":"","overlay-color":"","overlay-opacity":"","overlay-gradient":""},"tablet":{"background-color":"var(--ast-global-color-5)","background-image":"","background-repeat":"repeat","background-position":"center center","background-size":"auto","background-attachment":"scroll","background-type":"","background-media":"","overlay-type":"","overlay-color":"","overlay-opacity":"","overlay-gradient":""},"mobile":{"background-color":"var(--ast-global-color-5)","background-image":"","background-repeat":"repeat","background-position":"center center","background-size":"auto","background-attachment":"scroll","background-type":"","background-media":"","overlay-type":"","overlay-color":"","overlay-opacity":"","overlay-gradient":""}},"footnotes":""},"categories":[4],"tags":[6,5],"class_list":["post-25","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-blog","tag-asyncio","tag-python"],"_links":{"self":[{"href":"https:\/\/cloasdata.de\/index.php?rest_route=\/wp\/v2\/posts\/25","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/cloasdata.de\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/cloasdata.de\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/cloasdata.de\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/cloasdata.de\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=25"}],"version-history":[{"count":47,"href":"https:\/\/cloasdata.de\/index.php?rest_route=\/wp\/v2\/posts\/25\/revisions"}],"predecessor-version":[{"id":281,"href":"https:\/\/cloasdata.de\/index.php?rest_route=\/wp\/v2\/posts\/25\/revisions\/281"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/cloasdata.de\/index.php?rest_route=\/wp\/v2\/media\/269"}],"wp:attachment":[{"href":"https:\/\/cloasdata.de\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=25"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/cloasdata.de\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=25"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/cloasdata.de\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=25"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}