Python, How to make an asynchronous data generator?
I have a program that loads data and processes it. Both loading and processing take time, and I'd like to do them in parallel.
Here is the synchronous version of my program (where the "loading" and "processing" are done in sequence, and are trivial operations here for the sake of the example):
import time
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def main():
start = time.time()
for data in data_loader():
time.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
When I run this, I get output:
At t=2.01, processed data 0 into 0
At t=4.01, processed data 1 into -2
At t=6.02, processed data 2 into -4
At t=8.02, processed data 3 into -6
The loop runs every 2s, with 1s for loading and 1s for processing.
Now, I'd like to make an asynchronous version, where the loading and processing are done concurrently (so the loader gets the next data ready while the processor is processing it). It should then take 2s for the first statement to be printed, and 1s for each statement after that. Expected output would be similar to:
At t=2.01, processed data 0 into 0
At t=3.01, processed data 1 into -2
At t=4.02, processed data 2 into -4
At t=5.02, processed data 3 into -6
Ideally, only contents of the main
function would have to change (as the data_loader
code should not care that it may be used in an asynchronous way).
python python-3.x asynchronous
add a comment |
I have a program that loads data and processes it. Both loading and processing take time, and I'd like to do them in parallel.
Here is the synchronous version of my program (where the "loading" and "processing" are done in sequence, and are trivial operations here for the sake of the example):
import time
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def main():
start = time.time()
for data in data_loader():
time.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
When I run this, I get output:
At t=2.01, processed data 0 into 0
At t=4.01, processed data 1 into -2
At t=6.02, processed data 2 into -4
At t=8.02, processed data 3 into -6
The loop runs every 2s, with 1s for loading and 1s for processing.
Now, I'd like to make an asynchronous version, where the loading and processing are done concurrently (so the loader gets the next data ready while the processor is processing it). It should then take 2s for the first statement to be printed, and 1s for each statement after that. Expected output would be similar to:
At t=2.01, processed data 0 into 0
At t=3.01, processed data 1 into -2
At t=4.02, processed data 2 into -4
At t=5.02, processed data 3 into -6
Ideally, only contents of the main
function would have to change (as the data_loader
code should not care that it may be used in an asynchronous way).
python python-3.x asynchronous
add a comment |
I have a program that loads data and processes it. Both loading and processing take time, and I'd like to do them in parallel.
Here is the synchronous version of my program (where the "loading" and "processing" are done in sequence, and are trivial operations here for the sake of the example):
import time
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def main():
start = time.time()
for data in data_loader():
time.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
When I run this, I get output:
At t=2.01, processed data 0 into 0
At t=4.01, processed data 1 into -2
At t=6.02, processed data 2 into -4
At t=8.02, processed data 3 into -6
The loop runs every 2s, with 1s for loading and 1s for processing.
Now, I'd like to make an asynchronous version, where the loading and processing are done concurrently (so the loader gets the next data ready while the processor is processing it). It should then take 2s for the first statement to be printed, and 1s for each statement after that. Expected output would be similar to:
At t=2.01, processed data 0 into 0
At t=3.01, processed data 1 into -2
At t=4.02, processed data 2 into -4
At t=5.02, processed data 3 into -6
Ideally, only contents of the main
function would have to change (as the data_loader
code should not care that it may be used in an asynchronous way).
python python-3.x asynchronous
I have a program that loads data and processes it. Both loading and processing take time, and I'd like to do them in parallel.
Here is the synchronous version of my program (where the "loading" and "processing" are done in sequence, and are trivial operations here for the sake of the example):
import time
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def main():
start = time.time()
for data in data_loader():
time.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
When I run this, I get output:
At t=2.01, processed data 0 into 0
At t=4.01, processed data 1 into -2
At t=6.02, processed data 2 into -4
At t=8.02, processed data 3 into -6
The loop runs every 2s, with 1s for loading and 1s for processing.
Now, I'd like to make an asynchronous version, where the loading and processing are done concurrently (so the loader gets the next data ready while the processor is processing it). It should then take 2s for the first statement to be printed, and 1s for each statement after that. Expected output would be similar to:
At t=2.01, processed data 0 into 0
At t=3.01, processed data 1 into -2
At t=4.02, processed data 2 into -4
At t=5.02, processed data 3 into -6
Ideally, only contents of the main
function would have to change (as the data_loader
code should not care that it may be used in an asynchronous way).
python python-3.x asynchronous
python python-3.x asynchronous
asked Jan 2 at 16:08
PeterPeter
3,78543045
3,78543045
add a comment |
add a comment |
3 Answers
3
active
oldest
votes
The multiprocessing
module's utilities may be what you want.
import time
import multiprocessing
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def process_item(item):
time.sleep(1) # Simulated processing time
return (item, -item*2) # Return the original too.
def main():
start = time.time()
with multiprocessing.Pool() as p:
data_iterator = data_loader()
for (data, processed_data) in p.imap(process_item, data_iterator):
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
This outputs
At t=2.03, processed data 0 into 0
At t=3.03, processed data 1 into -2
At t=4.04, processed data 2 into -4
At t=5.04, processed data 3 into -6
Depending on your requirements, you may find .imap_unordered()
to be faster, and it's also worth knowing that there's a thread-based version of Pool
available as multiprocessing.dummy.Pool
– this may be useful to avoid IPC overhead if your data is large, and your processing is not done in Python (so you can avoid the GIL).
Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it:processed_data = -data*2 - processed_data
withprocessed_data=0
before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.
– Peter
Jan 2 at 16:30
You could usemultiprocessing.Value
to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even needmultiprocessing
.
– AKX
Jan 2 at 17:05
add a comment |
The key of your problem is in the actual processing of the data. I don't know what you're doing with the data in your real program, but it must be an asynchronous operation to use asynchronous programming. If you're doing active, blocking CPU-bound processing, you might be better offloading to a separate process, instead, to be able to use multiple CPU cores and do things concurrently. If the actual processing of the data is in fact just the consumption of some asynchronous service, then it can be wrapped in a single asynchronous concurrent thread very effectively.
In your example, you're using time.sleep()
to simulate the processing. Since that example operation can be done asynchronously (by using asyncio.sleep()
instead) then the conversion is simple:
import itertools
import asyncio
async def data_loader():
for i in itertools.count(0):
await asyncio.sleep(1) # Simulated loading time
yield i
async def process(data):
await asyncio.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={loop.time()-start:.3g}, processed data {data} into {processed_data}')
async def main():
tasks =
async for data in data_loader():
tasks.append(loop.create_task(process(data)))
await asyncio.wait(tasks) # wait for all remaining tasks
if __name__ == '__main__':
loop = asyncio.get_event_loop()
start = loop.time()
loop.run_until_complete(main())
loop.close()
The results, as you expect:
At t=2, processed data 0 into 0
At t=3, processed data 1 into -2
At t=4, processed data 2 into -4
...
Remember that it only works because time.sleep()
has an asynchronous alternative in the form of asyncio.sleep()
. Check the operation you're using, to see if it can be written in asynchronous form.
Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?
– Peter
Jan 2 at 16:48
@Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?
– nosklo
Jan 2 at 18:56
add a comment |
Here is a solution that allows you to wrap the dataloader with an iter_asynchronously
function. It solves the problem for now. (Note however that there is still the problem that if the dataloader is faster than the processing loop, the queue will grow indefinitely. This could easily be solved by adding a wait in _async_queue_manager
if the queue gets to big (but sadly Queue.qsize()
is not supported on Mac!))
import time
from multiprocessing import Queue, Process
class PoisonPill:
pass
def _async_queue_manager(gen_func, queue: Queue):
for item in gen_func():
queue.put(item)
queue.put(PoisonPill)
def iter_asynchronously(gen_func):
""" Given a generator function, make it asynchonous. """
q = Queue()
p = Process(target=_async_queue_manager, args=(gen_func, q))
p.start()
while True:
item = q.get()
if item is PoisonPill:
break
else:
yield item
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def main():
start = time.time()
for data in iter_asynchronously(data_loader):
time.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
The output is now as desired:
At t=2.03, processed data 0 into 0
At t=3.03, processed data 1 into -2
At t=4.04, processed data 2 into -4
At t=5.04, processed data 3 into -6
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54009542%2fpython-how-to-make-an-asynchronous-data-generator%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
The multiprocessing
module's utilities may be what you want.
import time
import multiprocessing
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def process_item(item):
time.sleep(1) # Simulated processing time
return (item, -item*2) # Return the original too.
def main():
start = time.time()
with multiprocessing.Pool() as p:
data_iterator = data_loader()
for (data, processed_data) in p.imap(process_item, data_iterator):
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
This outputs
At t=2.03, processed data 0 into 0
At t=3.03, processed data 1 into -2
At t=4.04, processed data 2 into -4
At t=5.04, processed data 3 into -6
Depending on your requirements, you may find .imap_unordered()
to be faster, and it's also worth knowing that there's a thread-based version of Pool
available as multiprocessing.dummy.Pool
– this may be useful to avoid IPC overhead if your data is large, and your processing is not done in Python (so you can avoid the GIL).
Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it:processed_data = -data*2 - processed_data
withprocessed_data=0
before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.
– Peter
Jan 2 at 16:30
You could usemultiprocessing.Value
to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even needmultiprocessing
.
– AKX
Jan 2 at 17:05
add a comment |
The multiprocessing
module's utilities may be what you want.
import time
import multiprocessing
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def process_item(item):
time.sleep(1) # Simulated processing time
return (item, -item*2) # Return the original too.
def main():
start = time.time()
with multiprocessing.Pool() as p:
data_iterator = data_loader()
for (data, processed_data) in p.imap(process_item, data_iterator):
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
This outputs
At t=2.03, processed data 0 into 0
At t=3.03, processed data 1 into -2
At t=4.04, processed data 2 into -4
At t=5.04, processed data 3 into -6
Depending on your requirements, you may find .imap_unordered()
to be faster, and it's also worth knowing that there's a thread-based version of Pool
available as multiprocessing.dummy.Pool
– this may be useful to avoid IPC overhead if your data is large, and your processing is not done in Python (so you can avoid the GIL).
Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it:processed_data = -data*2 - processed_data
withprocessed_data=0
before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.
– Peter
Jan 2 at 16:30
You could usemultiprocessing.Value
to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even needmultiprocessing
.
– AKX
Jan 2 at 17:05
add a comment |
The multiprocessing
module's utilities may be what you want.
import time
import multiprocessing
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def process_item(item):
time.sleep(1) # Simulated processing time
return (item, -item*2) # Return the original too.
def main():
start = time.time()
with multiprocessing.Pool() as p:
data_iterator = data_loader()
for (data, processed_data) in p.imap(process_item, data_iterator):
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
This outputs
At t=2.03, processed data 0 into 0
At t=3.03, processed data 1 into -2
At t=4.04, processed data 2 into -4
At t=5.04, processed data 3 into -6
Depending on your requirements, you may find .imap_unordered()
to be faster, and it's also worth knowing that there's a thread-based version of Pool
available as multiprocessing.dummy.Pool
– this may be useful to avoid IPC overhead if your data is large, and your processing is not done in Python (so you can avoid the GIL).
The multiprocessing
module's utilities may be what you want.
import time
import multiprocessing
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def process_item(item):
time.sleep(1) # Simulated processing time
return (item, -item*2) # Return the original too.
def main():
start = time.time()
with multiprocessing.Pool() as p:
data_iterator = data_loader()
for (data, processed_data) in p.imap(process_item, data_iterator):
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
This outputs
At t=2.03, processed data 0 into 0
At t=3.03, processed data 1 into -2
At t=4.04, processed data 2 into -4
At t=5.04, processed data 3 into -6
Depending on your requirements, you may find .imap_unordered()
to be faster, and it's also worth knowing that there's a thread-based version of Pool
available as multiprocessing.dummy.Pool
– this may be useful to avoid IPC overhead if your data is large, and your processing is not done in Python (so you can avoid the GIL).
answered Jan 2 at 16:19
AKXAKX
44k45670
44k45670
Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it:processed_data = -data*2 - processed_data
withprocessed_data=0
before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.
– Peter
Jan 2 at 16:30
You could usemultiprocessing.Value
to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even needmultiprocessing
.
– AKX
Jan 2 at 17:05
add a comment |
Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it:processed_data = -data*2 - processed_data
withprocessed_data=0
before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.
– Peter
Jan 2 at 16:30
You could usemultiprocessing.Value
to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even needmultiprocessing
.
– AKX
Jan 2 at 17:05
Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it:
processed_data = -data*2 - processed_data
with processed_data=0
before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.– Peter
Jan 2 at 16:30
Thank you for this nice answer. Now I neglected this for simplicity, but what if the processor is stateful? i.e. we now make it:
processed_data = -data*2 - processed_data
with processed_data=0
before the loop? It would be nice to have a generic "generator wrapper" that we can just use to make generators asynchronous.– Peter
Jan 2 at 16:30
You could use
multiprocessing.Value
to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even need multiprocessing
.– AKX
Jan 2 at 17:05
You could use
multiprocessing.Value
to schlep the state between processes. However, if you have state that depends on values processed so far, you can't process new values in parallel anyway, so you won't even need multiprocessing
.– AKX
Jan 2 at 17:05
add a comment |
The key of your problem is in the actual processing of the data. I don't know what you're doing with the data in your real program, but it must be an asynchronous operation to use asynchronous programming. If you're doing active, blocking CPU-bound processing, you might be better offloading to a separate process, instead, to be able to use multiple CPU cores and do things concurrently. If the actual processing of the data is in fact just the consumption of some asynchronous service, then it can be wrapped in a single asynchronous concurrent thread very effectively.
In your example, you're using time.sleep()
to simulate the processing. Since that example operation can be done asynchronously (by using asyncio.sleep()
instead) then the conversion is simple:
import itertools
import asyncio
async def data_loader():
for i in itertools.count(0):
await asyncio.sleep(1) # Simulated loading time
yield i
async def process(data):
await asyncio.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={loop.time()-start:.3g}, processed data {data} into {processed_data}')
async def main():
tasks =
async for data in data_loader():
tasks.append(loop.create_task(process(data)))
await asyncio.wait(tasks) # wait for all remaining tasks
if __name__ == '__main__':
loop = asyncio.get_event_loop()
start = loop.time()
loop.run_until_complete(main())
loop.close()
The results, as you expect:
At t=2, processed data 0 into 0
At t=3, processed data 1 into -2
At t=4, processed data 2 into -4
...
Remember that it only works because time.sleep()
has an asynchronous alternative in the form of asyncio.sleep()
. Check the operation you're using, to see if it can be written in asynchronous form.
Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?
– Peter
Jan 2 at 16:48
@Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?
– nosklo
Jan 2 at 18:56
add a comment |
The key of your problem is in the actual processing of the data. I don't know what you're doing with the data in your real program, but it must be an asynchronous operation to use asynchronous programming. If you're doing active, blocking CPU-bound processing, you might be better offloading to a separate process, instead, to be able to use multiple CPU cores and do things concurrently. If the actual processing of the data is in fact just the consumption of some asynchronous service, then it can be wrapped in a single asynchronous concurrent thread very effectively.
In your example, you're using time.sleep()
to simulate the processing. Since that example operation can be done asynchronously (by using asyncio.sleep()
instead) then the conversion is simple:
import itertools
import asyncio
async def data_loader():
for i in itertools.count(0):
await asyncio.sleep(1) # Simulated loading time
yield i
async def process(data):
await asyncio.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={loop.time()-start:.3g}, processed data {data} into {processed_data}')
async def main():
tasks =
async for data in data_loader():
tasks.append(loop.create_task(process(data)))
await asyncio.wait(tasks) # wait for all remaining tasks
if __name__ == '__main__':
loop = asyncio.get_event_loop()
start = loop.time()
loop.run_until_complete(main())
loop.close()
The results, as you expect:
At t=2, processed data 0 into 0
At t=3, processed data 1 into -2
At t=4, processed data 2 into -4
...
Remember that it only works because time.sleep()
has an asynchronous alternative in the form of asyncio.sleep()
. Check the operation you're using, to see if it can be written in asynchronous form.
Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?
– Peter
Jan 2 at 16:48
@Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?
– nosklo
Jan 2 at 18:56
add a comment |
The key of your problem is in the actual processing of the data. I don't know what you're doing with the data in your real program, but it must be an asynchronous operation to use asynchronous programming. If you're doing active, blocking CPU-bound processing, you might be better offloading to a separate process, instead, to be able to use multiple CPU cores and do things concurrently. If the actual processing of the data is in fact just the consumption of some asynchronous service, then it can be wrapped in a single asynchronous concurrent thread very effectively.
In your example, you're using time.sleep()
to simulate the processing. Since that example operation can be done asynchronously (by using asyncio.sleep()
instead) then the conversion is simple:
import itertools
import asyncio
async def data_loader():
for i in itertools.count(0):
await asyncio.sleep(1) # Simulated loading time
yield i
async def process(data):
await asyncio.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={loop.time()-start:.3g}, processed data {data} into {processed_data}')
async def main():
tasks =
async for data in data_loader():
tasks.append(loop.create_task(process(data)))
await asyncio.wait(tasks) # wait for all remaining tasks
if __name__ == '__main__':
loop = asyncio.get_event_loop()
start = loop.time()
loop.run_until_complete(main())
loop.close()
The results, as you expect:
At t=2, processed data 0 into 0
At t=3, processed data 1 into -2
At t=4, processed data 2 into -4
...
Remember that it only works because time.sleep()
has an asynchronous alternative in the form of asyncio.sleep()
. Check the operation you're using, to see if it can be written in asynchronous form.
The key of your problem is in the actual processing of the data. I don't know what you're doing with the data in your real program, but it must be an asynchronous operation to use asynchronous programming. If you're doing active, blocking CPU-bound processing, you might be better offloading to a separate process, instead, to be able to use multiple CPU cores and do things concurrently. If the actual processing of the data is in fact just the consumption of some asynchronous service, then it can be wrapped in a single asynchronous concurrent thread very effectively.
In your example, you're using time.sleep()
to simulate the processing. Since that example operation can be done asynchronously (by using asyncio.sleep()
instead) then the conversion is simple:
import itertools
import asyncio
async def data_loader():
for i in itertools.count(0):
await asyncio.sleep(1) # Simulated loading time
yield i
async def process(data):
await asyncio.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={loop.time()-start:.3g}, processed data {data} into {processed_data}')
async def main():
tasks =
async for data in data_loader():
tasks.append(loop.create_task(process(data)))
await asyncio.wait(tasks) # wait for all remaining tasks
if __name__ == '__main__':
loop = asyncio.get_event_loop()
start = loop.time()
loop.run_until_complete(main())
loop.close()
The results, as you expect:
At t=2, processed data 0 into 0
At t=3, processed data 1 into -2
At t=4, processed data 2 into -4
...
Remember that it only works because time.sleep()
has an asynchronous alternative in the form of asyncio.sleep()
. Check the operation you're using, to see if it can be written in asynchronous form.
answered Jan 2 at 16:26
nosklonosklo
157k46253274
157k46253274
Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?
– Peter
Jan 2 at 16:48
@Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?
– nosklo
Jan 2 at 18:56
add a comment |
Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?
– Peter
Jan 2 at 16:48
@Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?
– nosklo
Jan 2 at 18:56
Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?
– Peter
Jan 2 at 16:48
Thanks for this. In my application, the "loading" and "processing" are both blocking, CPU-bound operations, so I guess this approach won't work for me, is that correct?
– Peter
Jan 2 at 16:48
@Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?
– nosklo
Jan 2 at 18:56
@Peter a python process can only run one piece of code at a time, because of the global interpreter lock (GIL). That said, you're probably mistaken about the nature of your process. In particular the "loading" part is hardly a CPU-bound operation, and is probably related with data input/output which doesn't use the CPU at all and can be written in a non-blocking manner; Are you sure? Can you describe your loading/processing part with more detail, preferably code?
– nosklo
Jan 2 at 18:56
add a comment |
Here is a solution that allows you to wrap the dataloader with an iter_asynchronously
function. It solves the problem for now. (Note however that there is still the problem that if the dataloader is faster than the processing loop, the queue will grow indefinitely. This could easily be solved by adding a wait in _async_queue_manager
if the queue gets to big (but sadly Queue.qsize()
is not supported on Mac!))
import time
from multiprocessing import Queue, Process
class PoisonPill:
pass
def _async_queue_manager(gen_func, queue: Queue):
for item in gen_func():
queue.put(item)
queue.put(PoisonPill)
def iter_asynchronously(gen_func):
""" Given a generator function, make it asynchonous. """
q = Queue()
p = Process(target=_async_queue_manager, args=(gen_func, q))
p.start()
while True:
item = q.get()
if item is PoisonPill:
break
else:
yield item
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def main():
start = time.time()
for data in iter_asynchronously(data_loader):
time.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
The output is now as desired:
At t=2.03, processed data 0 into 0
At t=3.03, processed data 1 into -2
At t=4.04, processed data 2 into -4
At t=5.04, processed data 3 into -6
add a comment |
Here is a solution that allows you to wrap the dataloader with an iter_asynchronously
function. It solves the problem for now. (Note however that there is still the problem that if the dataloader is faster than the processing loop, the queue will grow indefinitely. This could easily be solved by adding a wait in _async_queue_manager
if the queue gets to big (but sadly Queue.qsize()
is not supported on Mac!))
import time
from multiprocessing import Queue, Process
class PoisonPill:
pass
def _async_queue_manager(gen_func, queue: Queue):
for item in gen_func():
queue.put(item)
queue.put(PoisonPill)
def iter_asynchronously(gen_func):
""" Given a generator function, make it asynchonous. """
q = Queue()
p = Process(target=_async_queue_manager, args=(gen_func, q))
p.start()
while True:
item = q.get()
if item is PoisonPill:
break
else:
yield item
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def main():
start = time.time()
for data in iter_asynchronously(data_loader):
time.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
The output is now as desired:
At t=2.03, processed data 0 into 0
At t=3.03, processed data 1 into -2
At t=4.04, processed data 2 into -4
At t=5.04, processed data 3 into -6
add a comment |
Here is a solution that allows you to wrap the dataloader with an iter_asynchronously
function. It solves the problem for now. (Note however that there is still the problem that if the dataloader is faster than the processing loop, the queue will grow indefinitely. This could easily be solved by adding a wait in _async_queue_manager
if the queue gets to big (but sadly Queue.qsize()
is not supported on Mac!))
import time
from multiprocessing import Queue, Process
class PoisonPill:
pass
def _async_queue_manager(gen_func, queue: Queue):
for item in gen_func():
queue.put(item)
queue.put(PoisonPill)
def iter_asynchronously(gen_func):
""" Given a generator function, make it asynchonous. """
q = Queue()
p = Process(target=_async_queue_manager, args=(gen_func, q))
p.start()
while True:
item = q.get()
if item is PoisonPill:
break
else:
yield item
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def main():
start = time.time()
for data in iter_asynchronously(data_loader):
time.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
The output is now as desired:
At t=2.03, processed data 0 into 0
At t=3.03, processed data 1 into -2
At t=4.04, processed data 2 into -4
At t=5.04, processed data 3 into -6
Here is a solution that allows you to wrap the dataloader with an iter_asynchronously
function. It solves the problem for now. (Note however that there is still the problem that if the dataloader is faster than the processing loop, the queue will grow indefinitely. This could easily be solved by adding a wait in _async_queue_manager
if the queue gets to big (but sadly Queue.qsize()
is not supported on Mac!))
import time
from multiprocessing import Queue, Process
class PoisonPill:
pass
def _async_queue_manager(gen_func, queue: Queue):
for item in gen_func():
queue.put(item)
queue.put(PoisonPill)
def iter_asynchronously(gen_func):
""" Given a generator function, make it asynchonous. """
q = Queue()
p = Process(target=_async_queue_manager, args=(gen_func, q))
p.start()
while True:
item = q.get()
if item is PoisonPill:
break
else:
yield item
def data_loader():
for i in range(4):
time.sleep(1) # Simulated loading time
yield i
def main():
start = time.time()
for data in iter_asynchronously(data_loader):
time.sleep(1) # Simulated processing time
processed_data = -data*2
print(f'At t={time.time()-start:.3g}, processed data {data} into {processed_data}')
if __name__ == '__main__':
main()
The output is now as desired:
At t=2.03, processed data 0 into 0
At t=3.03, processed data 1 into -2
At t=4.04, processed data 2 into -4
At t=5.04, processed data 3 into -6
edited Jan 2 at 17:42
answered Jan 2 at 16:45
PeterPeter
3,78543045
3,78543045
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54009542%2fpython-how-to-make-an-asynchronous-data-generator%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown