Python, How to make an asynchronous data generator?












1















I have a program that loads data and processes it. Both loading and processing take time, and I'd like to do them in parallel.



Here is the synchronous version of my program (where the "loading" and "processing" are done in sequence, and are trivial operations here for the sake of the example):



import time

def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i

def main():
start = time.time()
for data in data_loader():
time.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

if __name__ == '__main__':
main()


When I run this, I get output:



At t=2.01, processed data 0 into 0
At t=4.01, processed data 1 into -2
At t=6.02, processed data 2 into -4
At t=8.02, processed data 3 into -6


The loop runs every 2s, with 1s for loading and 1s for processing.



Now, I'd like to make an asynchronous version, where the loading and processing are done concurrently (so the loader gets the next data ready while the processor is processing it). It should then take 2s for the first statement to be printed, and 1s for each statement after that. Expected output would be similar to:



At t=2.01, processed data 0 into 0
At t=3.01, processed data 1 into -2
At t=4.02, processed data 2 into -4
At t=5.02, processed data 3 into -6


Ideally, only contents of the main function would have to change (as the data_loader code should not care that it may be used in an asynchronous way).










share|improve this question



























    1















    I have a program that loads data and processes it. Both loading and processing take time, and I'd like to do them in parallel.



    Here is the synchronous version of my program (where the "loading" and "processing" are done in sequence, and are trivial operations here for the sake of the example):



    import time

    def data_loader():
    for i in range(4):
    time.sleep(1) # Simulated loading time
    yield i

    def main():
    start = time.time()
    for data in data_loader():
    time.sleep(1) # Simulated processing time
    processed_data = -data*2
    print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

    if __name__ == '__main__':
    main()


    When I run this, I get output:



    At t=2.01, processed data 0 into 0
    At t=4.01, processed data 1 into -2
    At t=6.02, processed data 2 into -4
    At t=8.02, processed data 3 into -6


    The loop runs every 2s, with 1s for loading and 1s for processing.



    Now, I'd like to make an asynchronous version, where the loading and processing are done concurrently (so the loader gets the next data ready while the processor is processing it). It should then take 2s for the first statement to be printed, and 1s for each statement after that. Expected output would be similar to:



    At t=2.01, processed data 0 into 0
    At t=3.01, processed data 1 into -2
    At t=4.02, processed data 2 into -4
    At t=5.02, processed data 3 into -6


    Ideally, only contents of the main function would have to change (as the data_loader code should not care that it may be used in an asynchronous way).










    share|improve this question

























      1












      1








      1


      1






      I have a program that loads data and processes it. Both loading and processing take time, and I'd like to do them in parallel.



      Here is the synchronous version of my program (where the "loading" and "processing" are done in sequence, and are trivial operations here for the sake of the example):



      import time

      def data_loader():
      for i in range(4):
      time.sleep(1) # Simulated loading time
      yield i

      def main():
      start = time.time()
      for data in data_loader():
      time.sleep(1) # Simulated processing time
      processed_data = -data*2
      print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

      if __name__ == '__main__':
      main()


      When I run this, I get output:



      At t=2.01, processed data 0 into 0
      At t=4.01, processed data 1 into -2
      At t=6.02, processed data 2 into -4
      At t=8.02, processed data 3 into -6


      The loop runs every 2s, with 1s for loading and 1s for processing.



      Now, I'd like to make an asynchronous version, where the loading and processing are done concurrently (so the loader gets the next data ready while the processor is processing it). It should then take 2s for the first statement to be printed, and 1s for each statement after that. Expected output would be similar to:



      At t=2.01, processed data 0 into 0
      At t=3.01, processed data 1 into -2
      At t=4.02, processed data 2 into -4
      At t=5.02, processed data 3 into -6


      Ideally, only contents of the main function would have to change (as the data_loader code should not care that it may be used in an asynchronous way).










      share|improve this question














      I have a program that loads data and processes it. Both loading and processing take time, and I'd like to do them in parallel.



      Here is the synchronous version of my program (where the "loading" and "processing" are done in sequence, and are trivial operations here for the sake of the example):



      import time

      def data_loader():
      for i in range(4):
      time.sleep(1) # Simulated loading time
      yield i

      def main():
      start = time.time()
      for data in data_loader():
      time.sleep(1) # Simulated processing time
      processed_data = -data*2
      print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

      if __name__ == '__main__':
      main()


      When I run this, I get output:



      At t=2.01, processed data 0 into 0
      At t=4.01, processed data 1 into -2
      At t=6.02, processed data 2 into -4
      At t=8.02, processed data 3 into -6


      The loop runs every 2s, with 1s for loading and 1s for processing.



      Now, I'd like to make an asynchronous version, where the loading and processing are done concurrently (so the loader gets the next data ready while the processor is processing it). It should then take 2s for the first statement to be printed, and 1s for each statement after that. Expected output would be similar to:



      At t=2.01, processed data 0 into 0
      At t=3.01, processed data 1 into -2
      At t=4.02, processed data 2 into -4
      At t=5.02, processed data 3 into -6


      Ideally, only contents of the main function would have to change (as the data_loader code should not care that it may be used in an asynchronous way).







      python python-3.x asynchronous






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Jan 2 at 16:08









      PeterPeter

      3,78543045




      3,78543045
























          3 Answers
          3






          active

          oldest

          votes


















          3














          The multiprocessing module's utilities may be what you want.



          import time
          import multiprocessing

          def data_loader():
          for i in range(4):
          time.sleep(1) # Simulated loading time
          yield i


          def process_item(item):
          time.sleep(1) # Simulated processing time
          return (item, -item*2) # Return the original too.


          def main():
          start = time.time()
          with multiprocessing.Pool() as p:
          data_iterator = data_loader()
          for (data, processed_data) in p.imap(process_item, data_iterator):
          print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

          if __name__ == '__main__':
          main()


          This outputs



          At t=2.03, processed data 0 into 0
          At t=3.03, processed data 1 into -2
          At t=4.04, processed data 2 into -4
          At t=5.04, processed data 3 into -6


          Depending on your requirements, you may find .imap_unordered() to be faster, and it's also worth knowing that there's a thread-based version of Pool available as multiprocessing.dummy.Pool – this may be useful to avoid IPC overhead if your data is large, and your processing is not done in Python (so you can avoid the GIL).






          share|improve this answer
























          • Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it: processed_data = -data*2 - processed_data with processed_data=0 before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.

            – Peter
            Jan 2 at 16:30













          • You could use multiprocessing.Value to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even need multiprocessing.

            – AKX
            Jan 2 at 17:05



















          1














          The key of your problem is in the actual processing of the data. I don't know what you're doing with the data in your real program, but it must be an asynchronous operation to use asynchronous programming. If you're doing active, blocking CPU-bound processing, you might be better offloading to a separate process, instead, to be able to use multiple CPU cores and do things concurrently. If the actual processing of the data is in fact just the consumption of some asynchronous service, then it can be wrapped in a single asynchronous concurrent thread very effectively.



          In your example, you're using time.sleep() to simulate the processing. Since that example operation can be done asynchronously (by using asyncio.sleep() instead) then the conversion is simple:



          import itertools
          import asyncio

          async def data_loader():
          for i in itertools.count(0):
          await asyncio.sleep(1) # Simulated loading time
          yield i

          async def process(data):
          await asyncio.sleep(1) # Simulated processing time
          processed_data = -data*2
          print(f'At t={loop.time()-start:.3g}, processed data {data} into {processed_data}')

          async def main():
          tasks =
          async for data in data_loader():
          tasks.append(loop.create_task(process(data)))
          await asyncio.wait(tasks) # wait for all remaining tasks

          if __name__ == '__main__':
          loop = asyncio.get_event_loop()
          start = loop.time()
          loop.run_until_complete(main())
          loop.close()


          The results, as you expect:



          At t=2, processed data 0 into 0
          At t=3, processed data 1 into -2
          At t=4, processed data 2 into -4
          ...


          Remember that it only works because time.sleep() has an asynchronous alternative in the form of asyncio.sleep(). Check the operation you're using, to see if it can be written in asynchronous form.






          share|improve this answer
























          • Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?

            – Peter
            Jan 2 at 16:48











          • @Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?

            – nosklo
            Jan 2 at 18:56





















          0














          Here is a solution that allows you to wrap the dataloader with an iter_asynchronously function. It solves the problem for now. (Note however that there is still the problem that if the dataloader is faster than the processing loop, the queue will grow indefinitely. This could easily be solved by adding a wait in _async_queue_manager if the queue gets to big (but sadly Queue.qsize() is not supported on Mac!))



          import time
          from multiprocessing import Queue, Process

          class PoisonPill:
          pass

          def _async_queue_manager(gen_func, queue: Queue):
          for item in gen_func():
          queue.put(item)
          queue.put(PoisonPill)

          def iter_asynchronously(gen_func):
          """ Given a generator function, make it asynchonous. """
          q = Queue()
          p = Process(target=_async_queue_manager, args=(gen_func, q))
          p.start()
          while True:
          item = q.get()
          if item is PoisonPill:
          break
          else:
          yield item

          def data_loader():
          for i in range(4):
          time.sleep(1) # Simulated loading time
          yield i

          def main():
          start = time.time()
          for data in iter_asynchronously(data_loader):
          time.sleep(1) # Simulated processing time
          processed_data = -data*2
          print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

          if __name__ == '__main__':
          main()


          The output is now as desired:



          At t=2.03, processed data 0 into 0
          At t=3.03, processed data 1 into -2
          At t=4.04, processed data 2 into -4
          At t=5.04, processed data 3 into -6





          share|improve this answer

























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54009542%2fpython-how-to-make-an-asynchronous-data-generator%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            3 Answers
            3






            active

            oldest

            votes








            3 Answers
            3






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            3














            The multiprocessing module's utilities may be what you want.



            import time
            import multiprocessing

            def data_loader():
            for i in range(4):
            time.sleep(1) # Simulated loading time
            yield i


            def process_item(item):
            time.sleep(1) # Simulated processing time
            return (item, -item*2) # Return the original too.


            def main():
            start = time.time()
            with multiprocessing.Pool() as p:
            data_iterator = data_loader()
            for (data, processed_data) in p.imap(process_item, data_iterator):
            print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

            if __name__ == '__main__':
            main()


            This outputs



            At t=2.03, processed data 0 into 0
            At t=3.03, processed data 1 into -2
            At t=4.04, processed data 2 into -4
            At t=5.04, processed data 3 into -6


            Depending on your requirements, you may find .imap_unordered() to be faster, and it's also worth knowing that there's a thread-based version of Pool available as multiprocessing.dummy.Pool – this may be useful to avoid IPC overhead if your data is large, and your processing is not done in Python (so you can avoid the GIL).






            share|improve this answer
























            • Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it: processed_data = -data*2 - processed_data with processed_data=0 before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.

              – Peter
              Jan 2 at 16:30













            • You could use multiprocessing.Value to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even need multiprocessing.

              – AKX
              Jan 2 at 17:05
















            3














            The multiprocessing module's utilities may be what you want.



            import time
            import multiprocessing

            def data_loader():
            for i in range(4):
            time.sleep(1) # Simulated loading time
            yield i


            def process_item(item):
            time.sleep(1) # Simulated processing time
            return (item, -item*2) # Return the original too.


            def main():
            start = time.time()
            with multiprocessing.Pool() as p:
            data_iterator = data_loader()
            for (data, processed_data) in p.imap(process_item, data_iterator):
            print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

            if __name__ == '__main__':
            main()


            This outputs



            At t=2.03, processed data 0 into 0
            At t=3.03, processed data 1 into -2
            At t=4.04, processed data 2 into -4
            At t=5.04, processed data 3 into -6


            Depending on your requirements, you may find .imap_unordered() to be faster, and it's also worth knowing that there's a thread-based version of Pool available as multiprocessing.dummy.Pool – this may be useful to avoid IPC overhead if your data is large, and your processing is not done in Python (so you can avoid the GIL).






            share|improve this answer
























            • Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it: processed_data = -data*2 - processed_data with processed_data=0 before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.

              – Peter
              Jan 2 at 16:30













            • You could use multiprocessing.Value to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even need multiprocessing.

              – AKX
              Jan 2 at 17:05














            3












            3








            3







            The multiprocessing module's utilities may be what you want.



            import time
            import multiprocessing

            def data_loader():
            for i in range(4):
            time.sleep(1) # Simulated loading time
            yield i


            def process_item(item):
            time.sleep(1) # Simulated processing time
            return (item, -item*2) # Return the original too.


            def main():
            start = time.time()
            with multiprocessing.Pool() as p:
            data_iterator = data_loader()
            for (data, processed_data) in p.imap(process_item, data_iterator):
            print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

            if __name__ == '__main__':
            main()


            This outputs



            At t=2.03, processed data 0 into 0
            At t=3.03, processed data 1 into -2
            At t=4.04, processed data 2 into -4
            At t=5.04, processed data 3 into -6


            Depending on your requirements, you may find .imap_unordered() to be faster, and it's also worth knowing that there's a thread-based version of Pool available as multiprocessing.dummy.Pool – this may be useful to avoid IPC overhead if your data is large, and your processing is not done in Python (so you can avoid the GIL).






            share|improve this answer













            The multiprocessing module's utilities may be what you want.



            import time
            import multiprocessing

            def data_loader():
            for i in range(4):
            time.sleep(1) # Simulated loading time
            yield i


            def process_item(item):
            time.sleep(1) # Simulated processing time
            return (item, -item*2) # Return the original too.


            def main():
            start = time.time()
            with multiprocessing.Pool() as p:
            data_iterator = data_loader()
            for (data, processed_data) in p.imap(process_item, data_iterator):
            print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

            if __name__ == '__main__':
            main()


            This outputs



            At t=2.03, processed data 0 into 0
            At t=3.03, processed data 1 into -2
            At t=4.04, processed data 2 into -4
            At t=5.04, processed data 3 into -6


            Depending on your requirements, you may find .imap_unordered() to be faster, and it's also worth knowing that there's a thread-based version of Pool available as multiprocessing.dummy.Pool – this may be useful to avoid IPC overhead if your data is large, and your processing is not done in Python (so you can avoid the GIL).







            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Jan 2 at 16:19









            AKXAKX

            44k45670




            44k45670













            • Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it: processed_data = -data*2 - processed_data with processed_data=0 before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.

              – Peter
              Jan 2 at 16:30













            • You could use multiprocessing.Value to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even need multiprocessing.

              – AKX
              Jan 2 at 17:05



















            • Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it: processed_data = -data*2 - processed_data with processed_data=0 before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.

              – Peter
              Jan 2 at 16:30













            • You could use multiprocessing.Value to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even need multiprocessing.

              – AKX
              Jan 2 at 17:05

















            Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it: processed_data = -data*2 - processed_data with processed_data=0 before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.

            – Peter
            Jan 2 at 16:30







            Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it: processed_data = -data*2 - processed_data with processed_data=0 before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.

            – Peter
            Jan 2 at 16:30















            You could use multiprocessing.Value to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even need multiprocessing.

            – AKX
            Jan 2 at 17:05





            You could use multiprocessing.Value to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even need multiprocessing.

            – AKX
            Jan 2 at 17:05













            1














            The key of your problem is in the actual processing of the data. I don't know what you're doing with the data in your real program, but it must be an asynchronous operation to use asynchronous programming. If you're doing active, blocking CPU-bound processing, you might be better offloading to a separate process, instead, to be able to use multiple CPU cores and do things concurrently. If the actual processing of the data is in fact just the consumption of some asynchronous service, then it can be wrapped in a single asynchronous concurrent thread very effectively.



            In your example, you're using time.sleep() to simulate the processing. Since that example operation can be done asynchronously (by using asyncio.sleep() instead) then the conversion is simple:



            import itertools
            import asyncio

            async def data_loader():
            for i in itertools.count(0):
            await asyncio.sleep(1) # Simulated loading time
            yield i

            async def process(data):
            await asyncio.sleep(1) # Simulated processing time
            processed_data = -data*2
            print(f'At t={loop.time()-start:.3g}, processed data {data} into {processed_data}')

            async def main():
            tasks =
            async for data in data_loader():
            tasks.append(loop.create_task(process(data)))
            await asyncio.wait(tasks) # wait for all remaining tasks

            if __name__ == '__main__':
            loop = asyncio.get_event_loop()
            start = loop.time()
            loop.run_until_complete(main())
            loop.close()


            The results, as you expect:



            At t=2, processed data 0 into 0
            At t=3, processed data 1 into -2
            At t=4, processed data 2 into -4
            ...


            Remember that it only works because time.sleep() has an asynchronous alternative in the form of asyncio.sleep(). Check the operation you're using, to see if it can be written in asynchronous form.






            share|improve this answer
























            • Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?

              – Peter
              Jan 2 at 16:48











            • @Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?

              – nosklo
              Jan 2 at 18:56


















            1














            The key of your problem is in the actual processing of the data. I don't know what you're doing with the data in your real program, but it must be an asynchronous operation to use asynchronous programming. If you're doing active, blocking CPU-bound processing, you might be better offloading to a separate process, instead, to be able to use multiple CPU cores and do things concurrently. If the actual processing of the data is in fact just the consumption of some asynchronous service, then it can be wrapped in a single asynchronous concurrent thread very effectively.



            In your example, you're using time.sleep() to simulate the processing. Since that example operation can be done asynchronously (by using asyncio.sleep() instead) then the conversion is simple:



            import itertools
            import asyncio

            async def data_loader():
            for i in itertools.count(0):
            await asyncio.sleep(1) # Simulated loading time
            yield i

            async def process(data):
            await asyncio.sleep(1) # Simulated processing time
            processed_data = -data*2
            print(f'At t={loop.time()-start:.3g}, processed data {data} into {processed_data}')

            async def main():
            tasks =
            async for data in data_loader():
            tasks.append(loop.create_task(process(data)))
            await asyncio.wait(tasks) # wait for all remaining tasks

            if __name__ == '__main__':
            loop = asyncio.get_event_loop()
            start = loop.time()
            loop.run_until_complete(main())
            loop.close()


            The results, as you expect:



            At t=2, processed data 0 into 0
            At t=3, processed data 1 into -2
            At t=4, processed data 2 into -4
            ...


            Remember that it only works because time.sleep() has an asynchronous alternative in the form of asyncio.sleep(). Check the operation you're using, to see if it can be written in asynchronous form.






            share|improve this answer
























            • Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?

              – Peter
              Jan 2 at 16:48











            • @Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?

              – nosklo
              Jan 2 at 18:56
















            1












            1








            1







            The key of your problem is in the actual processing of the data. I don't know what you're doing with the data in your real program, but it must be an asynchronous operation to use asynchronous programming. If you're doing active, blocking CPU-bound processing, you might be better offloading to a separate process, instead, to be able to use multiple CPU cores and do things concurrently. If the actual processing of the data is in fact just the consumption of some asynchronous service, then it can be wrapped in a single asynchronous concurrent thread very effectively.



            In your example, you're using time.sleep() to simulate the processing. Since that example operation can be done asynchronously (by using asyncio.sleep() instead) then the conversion is simple:



            import itertools
            import asyncio

            async def data_loader():
            for i in itertools.count(0):
            await asyncio.sleep(1) # Simulated loading time
            yield i

            async def process(data):
            await asyncio.sleep(1) # Simulated processing time
            processed_data = -data*2
            print(f'At t={loop.time()-start:.3g}, processed data {data} into {processed_data}')

            async def main():
            tasks =
            async for data in data_loader():
            tasks.append(loop.create_task(process(data)))
            await asyncio.wait(tasks) # wait for all remaining tasks

            if __name__ == '__main__':
            loop = asyncio.get_event_loop()
            start = loop.time()
            loop.run_until_complete(main())
            loop.close()


            The results, as you expect:



            At t=2, processed data 0 into 0
            At t=3, processed data 1 into -2
            At t=4, processed data 2 into -4
            ...


            Remember that it only works because time.sleep() has an asynchronous alternative in the form of asyncio.sleep(). Check the operation you're using, to see if it can be written in asynchronous form.






            share|improve this answer













            The key of your problem is in the actual processing of the data. I don't know what you're doing with the data in your real program, but it must be an asynchronous operation to use asynchronous programming. If you're doing active, blocking CPU-bound processing, you might be better offloading to a separate process, instead, to be able to use multiple CPU cores and do things concurrently. If the actual processing of the data is in fact just the consumption of some asynchronous service, then it can be wrapped in a single asynchronous concurrent thread very effectively.



            In your example, you're using time.sleep() to simulate the processing. Since that example operation can be done asynchronously (by using asyncio.sleep() instead) then the conversion is simple:



            import itertools
            import asyncio

            async def data_loader():
            for i in itertools.count(0):
            await asyncio.sleep(1) # Simulated loading time
            yield i

            async def process(data):
            await asyncio.sleep(1) # Simulated processing time
            processed_data = -data*2
            print(f'At t={loop.time()-start:.3g}, processed data {data} into {processed_data}')

            async def main():
            tasks =
            async for data in data_loader():
            tasks.append(loop.create_task(process(data)))
            await asyncio.wait(tasks) # wait for all remaining tasks

            if __name__ == '__main__':
            loop = asyncio.get_event_loop()
            start = loop.time()
            loop.run_until_complete(main())
            loop.close()


            The results, as you expect:



            At t=2, processed data 0 into 0
            At t=3, processed data 1 into -2
            At t=4, processed data 2 into -4
            ...


            Remember that it only works because time.sleep() has an asynchronous alternative in the form of asyncio.sleep(). Check the operation you're using, to see if it can be written in asynchronous form.







            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Jan 2 at 16:26









            nosklonosklo

            157k46253274




            157k46253274













            • Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?

              – Peter
              Jan 2 at 16:48











            • @Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?

              – nosklo
              Jan 2 at 18:56





















            • Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?

              – Peter
              Jan 2 at 16:48











            • @Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?

              – nosklo
              Jan 2 at 18:56



















            Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?

            – Peter
            Jan 2 at 16:48





            Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?

            – Peter
            Jan 2 at 16:48













            @Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?

            – nosklo
            Jan 2 at 18:56







            @Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?

            – nosklo
            Jan 2 at 18:56













            0














            Here is a solution that allows you to wrap the dataloader with an iter_asynchronously function. It solves the problem for now. (Note however that there is still the problem that if the dataloader is faster than the processing loop, the queue will grow indefinitely. This could easily be solved by adding a wait in _async_queue_manager if the queue gets to big (but sadly Queue.qsize() is not supported on Mac!))



            import time
            from multiprocessing import Queue, Process

            class PoisonPill:
            pass

            def _async_queue_manager(gen_func, queue: Queue):
            for item in gen_func():
            queue.put(item)
            queue.put(PoisonPill)

            def iter_asynchronously(gen_func):
            """ Given a generator function, make it asynchonous. """
            q = Queue()
            p = Process(target=_async_queue_manager, args=(gen_func, q))
            p.start()
            while True:
            item = q.get()
            if item is PoisonPill:
            break
            else:
            yield item

            def data_loader():
            for i in range(4):
            time.sleep(1) # Simulated loading time
            yield i

            def main():
            start = time.time()
            for data in iter_asynchronously(data_loader):
            time.sleep(1) # Simulated processing time
            processed_data = -data*2
            print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

            if __name__ == '__main__':
            main()


            The output is now as desired:



            At t=2.03, processed data 0 into 0
            At t=3.03, processed data 1 into -2
            At t=4.04, processed data 2 into -4
            At t=5.04, processed data 3 into -6





            share|improve this answer






























              0














              Here is a solution that allows you to wrap the dataloader with an iter_asynchronously function. It solves the problem for now. (Note however that there is still the problem that if the dataloader is faster than the processing loop, the queue will grow indefinitely. This could easily be solved by adding a wait in _async_queue_manager if the queue gets to big (but sadly Queue.qsize() is not supported on Mac!))



              import time
              from multiprocessing import Queue, Process

              class PoisonPill:
              pass

              def _async_queue_manager(gen_func, queue: Queue):
              for item in gen_func():
              queue.put(item)
              queue.put(PoisonPill)

              def iter_asynchronously(gen_func):
              """ Given a generator function, make it asynchonous. """
              q = Queue()
              p = Process(target=_async_queue_manager, args=(gen_func, q))
              p.start()
              while True:
              item = q.get()
              if item is PoisonPill:
              break
              else:
              yield item

              def data_loader():
              for i in range(4):
              time.sleep(1) # Simulated loading time
              yield i

              def main():
              start = time.time()
              for data in iter_asynchronously(data_loader):
              time.sleep(1) # Simulated processing time
              processed_data = -data*2
              print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

              if __name__ == '__main__':
              main()


              The output is now as desired:



              At t=2.03, processed data 0 into 0
              At t=3.03, processed data 1 into -2
              At t=4.04, processed data 2 into -4
              At t=5.04, processed data 3 into -6





              share|improve this answer




























                0












                0








                0







                Here is a solution that allows you to wrap the dataloader with an iter_asynchronously function. It solves the problem for now. (Note however that there is still the problem that if the dataloader is faster than the processing loop, the queue will grow indefinitely. This could easily be solved by adding a wait in _async_queue_manager if the queue gets to big (but sadly Queue.qsize() is not supported on Mac!))



                import time
                from multiprocessing import Queue, Process

                class PoisonPill:
                pass

                def _async_queue_manager(gen_func, queue: Queue):
                for item in gen_func():
                queue.put(item)
                queue.put(PoisonPill)

                def iter_asynchronously(gen_func):
                """ Given a generator function, make it asynchonous. """
                q = Queue()
                p = Process(target=_async_queue_manager, args=(gen_func, q))
                p.start()
                while True:
                item = q.get()
                if item is PoisonPill:
                break
                else:
                yield item

                def data_loader():
                for i in range(4):
                time.sleep(1) # Simulated loading time
                yield i

                def main():
                start = time.time()
                for data in iter_asynchronously(data_loader):
                time.sleep(1) # Simulated processing time
                processed_data = -data*2
                print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

                if __name__ == '__main__':
                main()


                The output is now as desired:



                At t=2.03, processed data 0 into 0
                At t=3.03, processed data 1 into -2
                At t=4.04, processed data 2 into -4
                At t=5.04, processed data 3 into -6





                share|improve this answer















                Here is a solution that allows you to wrap the dataloader with an iter_asynchronously function. It solves the problem for now. (Note however that there is still the problem that if the dataloader is faster than the processing loop, the queue will grow indefinitely. This could easily be solved by adding a wait in _async_queue_manager if the queue gets to big (but sadly Queue.qsize() is not supported on Mac!))



                import time
                from multiprocessing import Queue, Process

                class PoisonPill:
                pass

                def _async_queue_manager(gen_func, queue: Queue):
                for item in gen_func():
                queue.put(item)
                queue.put(PoisonPill)

                def iter_asynchronously(gen_func):
                """ Given a generator function, make it asynchonous. """
                q = Queue()
                p = Process(target=_async_queue_manager, args=(gen_func, q))
                p.start()
                while True:
                item = q.get()
                if item is PoisonPill:
                break
                else:
                yield item

                def data_loader():
                for i in range(4):
                time.sleep(1) # Simulated loading time
                yield i

                def main():
                start = time.time()
                for data in iter_asynchronously(data_loader):
                time.sleep(1) # Simulated processing time
                processed_data = -data*2
                print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')

                if __name__ == '__main__':
                main()


                The output is now as desired:



                At t=2.03, processed data 0 into 0
                At t=3.03, processed data 1 into -2
                At t=4.04, processed data 2 into -4
                At t=5.04, processed data 3 into -6






                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Jan 2 at 17:42

























                answered Jan 2 at 16:45









                PeterPeter

                3,78543045




                3,78543045






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54009542%2fpython-how-to-make-an-asynchronous-data-generator%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Can a sorcerer learn a 5th-level spell early by creating spell slots using the Font of Magic feature?

                    ts Property 'filter' does not exist on type '{}'

                    Notepad++ export/extract a list of installed plugins