Share queue in event loop












1














Is it possible to share an asyncio.Queue over different tasks in one event loop?



The usecase:



Two tasks are publishing data on a queue, and one task is grabbing the new items from the Queue. All tasks in an asynchronous way.



main.py



import asyncio
import creator


async def pull_message(queue):
while True:
# Here I dont get messages, maybe the queue is always
# occupied by a other task?
msg = await queue.get()
print(msg)

if __name__ == "__main__"
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
future = asyncio.ensure_future(pull_message(queue))

creators = list()
for i in range(2):
creators.append(loop.create_task(cr.populate_msg(queue)))

# add future to creators for easy handling
creators.append(future)
loop.run_until_complete(asyncio.gather(*creators))


creator.py



import asyncio

async def populate_msg(queue):
while True:
msg = "Foo"
await queue.put(msg)









share|improve this question
























  • Yes - the whole point of the queue is that it can be shared by multiple tasks. Can you show the code that doesn't work like you'd like it to?
    – user4815162342
    Nov 19 '18 at 14:38










  • I added some code to clarify what I mean
    – loose11
    Nov 19 '18 at 15:06
















1














Is it possible to share an asyncio.Queue over different tasks in one event loop?



The usecase:



Two tasks are publishing data on a queue, and one task is grabbing the new items from the Queue. All tasks in an asynchronous way.



main.py



import asyncio
import creator


async def pull_message(queue):
while True:
# Here I dont get messages, maybe the queue is always
# occupied by a other task?
msg = await queue.get()
print(msg)

if __name__ == "__main__"
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
future = asyncio.ensure_future(pull_message(queue))

creators = list()
for i in range(2):
creators.append(loop.create_task(cr.populate_msg(queue)))

# add future to creators for easy handling
creators.append(future)
loop.run_until_complete(asyncio.gather(*creators))


creator.py



import asyncio

async def populate_msg(queue):
while True:
msg = "Foo"
await queue.put(msg)









share|improve this question
























  • Yes - the whole point of the queue is that it can be shared by multiple tasks. Can you show the code that doesn't work like you'd like it to?
    – user4815162342
    Nov 19 '18 at 14:38










  • I added some code to clarify what I mean
    – loose11
    Nov 19 '18 at 15:06














1












1








1


1





Is it possible to share an asyncio.Queue over different tasks in one event loop?



The usecase:



Two tasks are publishing data on a queue, and one task is grabbing the new items from the Queue. All tasks in an asynchronous way.



main.py



import asyncio
import creator


async def pull_message(queue):
while True:
# Here I dont get messages, maybe the queue is always
# occupied by a other task?
msg = await queue.get()
print(msg)

if __name__ == "__main__"
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
future = asyncio.ensure_future(pull_message(queue))

creators = list()
for i in range(2):
creators.append(loop.create_task(cr.populate_msg(queue)))

# add future to creators for easy handling
creators.append(future)
loop.run_until_complete(asyncio.gather(*creators))


creator.py



import asyncio

async def populate_msg(queue):
while True:
msg = "Foo"
await queue.put(msg)









share|improve this question















Is it possible to share an asyncio.Queue over different tasks in one event loop?



The usecase:



Two tasks are publishing data on a queue, and one task is grabbing the new items from the Queue. All tasks in an asynchronous way.



main.py



import asyncio
import creator


async def pull_message(queue):
while True:
# Here I dont get messages, maybe the queue is always
# occupied by a other task?
msg = await queue.get()
print(msg)

if __name__ == "__main__"
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
future = asyncio.ensure_future(pull_message(queue))

creators = list()
for i in range(2):
creators.append(loop.create_task(cr.populate_msg(queue)))

# add future to creators for easy handling
creators.append(future)
loop.run_until_complete(asyncio.gather(*creators))


creator.py



import asyncio

async def populate_msg(queue):
while True:
msg = "Foo"
await queue.put(msg)






python python-asyncio






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 19 '18 at 15:06

























asked Nov 19 '18 at 13:45









loose11

475322




475322












  • Yes - the whole point of the queue is that it can be shared by multiple tasks. Can you show the code that doesn't work like you'd like it to?
    – user4815162342
    Nov 19 '18 at 14:38










  • I added some code to clarify what I mean
    – loose11
    Nov 19 '18 at 15:06


















  • Yes - the whole point of the queue is that it can be shared by multiple tasks. Can you show the code that doesn't work like you'd like it to?
    – user4815162342
    Nov 19 '18 at 14:38










  • I added some code to clarify what I mean
    – loose11
    Nov 19 '18 at 15:06
















Yes - the whole point of the queue is that it can be shared by multiple tasks. Can you show the code that doesn't work like you'd like it to?
– user4815162342
Nov 19 '18 at 14:38




Yes - the whole point of the queue is that it can be shared by multiple tasks. Can you show the code that doesn't work like you'd like it to?
– user4815162342
Nov 19 '18 at 14:38












I added some code to clarify what I mean
– loose11
Nov 19 '18 at 15:06




I added some code to clarify what I mean
– loose11
Nov 19 '18 at 15:06












1 Answer
1






active

oldest

votes


















1














The problem in your code is that populate_msg doesn't yield to the event loop because the queue is unbounded. This is somewhat counter-intuitive because the coroutine clearly contains an await, but that await only suspends the execution of the coroutine if the coroutine would otherwise block. Since put() on an unbounded queue never blocks, populate_msg is the only thing executed by the event loop.



The problem will go away once you change populate_msg to actually do something else (like await a network event). For testing purposes you can add await asyncio.sleep(0) inside the loop, which will force the coroutine to yield control to the event loop at every iteration of the while loop. Note that this will cause the event loop to spend an entire core by continuously spinning the loop.






share|improve this answer





















  • Thank you. Is there a other approach get rid of the sleep? Because as you said, it is adding overhead to add a sleep.
    – loose11
    Nov 20 '18 at 6:59






  • 1




    @loose11 sleep(0) is only suggested for testing purposes. In production code populate_msg would produce the messages based on an external source, and that would where the real awaiting would occur. If that is not the case, then please add more information about your actual use case.
    – user4815162342
    Nov 20 '18 at 8:31











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53375981%2fshare-queue-in-event-loop%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









1














The problem in your code is that populate_msg doesn't yield to the event loop because the queue is unbounded. This is somewhat counter-intuitive because the coroutine clearly contains an await, but that await only suspends the execution of the coroutine if the coroutine would otherwise block. Since put() on an unbounded queue never blocks, populate_msg is the only thing executed by the event loop.



The problem will go away once you change populate_msg to actually do something else (like await a network event). For testing purposes you can add await asyncio.sleep(0) inside the loop, which will force the coroutine to yield control to the event loop at every iteration of the while loop. Note that this will cause the event loop to spend an entire core by continuously spinning the loop.






share|improve this answer





















  • Thank you. Is there a other approach get rid of the sleep? Because as you said, it is adding overhead to add a sleep.
    – loose11
    Nov 20 '18 at 6:59






  • 1




    @loose11 sleep(0) is only suggested for testing purposes. In production code populate_msg would produce the messages based on an external source, and that would where the real awaiting would occur. If that is not the case, then please add more information about your actual use case.
    – user4815162342
    Nov 20 '18 at 8:31
















1














The problem in your code is that populate_msg doesn't yield to the event loop because the queue is unbounded. This is somewhat counter-intuitive because the coroutine clearly contains an await, but that await only suspends the execution of the coroutine if the coroutine would otherwise block. Since put() on an unbounded queue never blocks, populate_msg is the only thing executed by the event loop.



The problem will go away once you change populate_msg to actually do something else (like await a network event). For testing purposes you can add await asyncio.sleep(0) inside the loop, which will force the coroutine to yield control to the event loop at every iteration of the while loop. Note that this will cause the event loop to spend an entire core by continuously spinning the loop.






share|improve this answer





















  • Thank you. Is there a other approach get rid of the sleep? Because as you said, it is adding overhead to add a sleep.
    – loose11
    Nov 20 '18 at 6:59






  • 1




    @loose11 sleep(0) is only suggested for testing purposes. In production code populate_msg would produce the messages based on an external source, and that would where the real awaiting would occur. If that is not the case, then please add more information about your actual use case.
    – user4815162342
    Nov 20 '18 at 8:31














1












1








1






The problem in your code is that populate_msg doesn't yield to the event loop because the queue is unbounded. This is somewhat counter-intuitive because the coroutine clearly contains an await, but that await only suspends the execution of the coroutine if the coroutine would otherwise block. Since put() on an unbounded queue never blocks, populate_msg is the only thing executed by the event loop.



The problem will go away once you change populate_msg to actually do something else (like await a network event). For testing purposes you can add await asyncio.sleep(0) inside the loop, which will force the coroutine to yield control to the event loop at every iteration of the while loop. Note that this will cause the event loop to spend an entire core by continuously spinning the loop.






share|improve this answer












The problem in your code is that populate_msg doesn't yield to the event loop because the queue is unbounded. This is somewhat counter-intuitive because the coroutine clearly contains an await, but that await only suspends the execution of the coroutine if the coroutine would otherwise block. Since put() on an unbounded queue never blocks, populate_msg is the only thing executed by the event loop.



The problem will go away once you change populate_msg to actually do something else (like await a network event). For testing purposes you can add await asyncio.sleep(0) inside the loop, which will force the coroutine to yield control to the event loop at every iteration of the while loop. Note that this will cause the event loop to spend an entire core by continuously spinning the loop.







share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 19 '18 at 15:27









user4815162342

60.6k490141




60.6k490141












  • Thank you. Is there a other approach get rid of the sleep? Because as you said, it is adding overhead to add a sleep.
    – loose11
    Nov 20 '18 at 6:59






  • 1




    @loose11 sleep(0) is only suggested for testing purposes. In production code populate_msg would produce the messages based on an external source, and that would where the real awaiting would occur. If that is not the case, then please add more information about your actual use case.
    – user4815162342
    Nov 20 '18 at 8:31


















  • Thank you. Is there a other approach get rid of the sleep? Because as you said, it is adding overhead to add a sleep.
    – loose11
    Nov 20 '18 at 6:59






  • 1




    @loose11 sleep(0) is only suggested for testing purposes. In production code populate_msg would produce the messages based on an external source, and that would where the real awaiting would occur. If that is not the case, then please add more information about your actual use case.
    – user4815162342
    Nov 20 '18 at 8:31
















Thank you. Is there a other approach get rid of the sleep? Because as you said, it is adding overhead to add a sleep.
– loose11
Nov 20 '18 at 6:59




Thank you. Is there a other approach get rid of the sleep? Because as you said, it is adding overhead to add a sleep.
– loose11
Nov 20 '18 at 6:59




1




1




@loose11 sleep(0) is only suggested for testing purposes. In production code populate_msg would produce the messages based on an external source, and that would where the real awaiting would occur. If that is not the case, then please add more information about your actual use case.
– user4815162342
Nov 20 '18 at 8:31




@loose11 sleep(0) is only suggested for testing purposes. In production code populate_msg would produce the messages based on an external source, and that would where the real awaiting would occur. If that is not the case, then please add more information about your actual use case.
– user4815162342
Nov 20 '18 at 8:31


















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.





Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


Please pay close attention to the following guidance:


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53375981%2fshare-queue-in-event-loop%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

MongoDB - Not Authorized To Execute Command

Npm cannot find a required file even through it is in the searched directory

in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith