How to reset an asyncio eventloop by a worker?
I'm working with an asyncio forever()
eventloop. Now I want to restart the loop (stop the loop and recreate a new loop) after a process or a signal or a change in a file, but I have some problem to do that:
Here are three simplified snippet codes which demonstrate some coroutine worker and a coroutine loop restarter:
#1st try:
import asyncio
async def coro_worker(proc):
print(f'Worker: {proc} started.')
while True:
print(f'Worker: {proc} process.')
await asyncio.sleep(proc)
async def reset_loop(loop):
# Some process
for i in range(5): # Like a process.
print(f'{i} counting for reset the eventloop.')
await asyncio.sleep(1)
main(loop) # Expected close the current loop and start a new loop!
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
# task.clear()
# task.close()
# task.stop()
print("Done cancelling tasks")
asyncio.get_event_loop().stop()
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[1]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Done cancelling tasks
Closing Loop
Closing Loop
Task exception was never retrieved
future: <Task cancelling coro=<reset_loop() done, defined at reset_asycio.py:11> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 40, in main
loop.run_forever()
File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "reset_asycio.py", line 17, in reset_loop
main(loop) # Expected close the current loop and start a new loop!
File "reset_asycio.py", line 48, in main
loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<reset_loop() running at reset_asycio.py:11>>
reset_asycio.py:51: RuntimeWarning: coroutine 'reset_loop' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
reset_asycio.py:51: RuntimeWarning: coroutine 'coro_worker' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
#2 try:
.
.
.
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
previous_loop.stop()
previous_loop.close()
offset = 1 # An offset to change the process name.
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[2]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Closing Loop
Task exception was never retrieved
future: <Task finished coro=<reset_loop() done, defined at reset_asycio.py:9> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 15, in reset_loop
main(loop) # Expected close the current loop and start new loop!
File "reset_asycio.py", line 21, in main
previous_loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f138>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f048>()]>>
#3 try:
.
.
.
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[3]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
0 counting for reset the eventloop.
1 counting for reset the eventloop.
Worker: 2 process.
2 counting for reset the eventloop.
Worker: 3 process.
3 counting for reset the eventloop.
Worker: 2 process.
4 counting for reset the eventloop.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
.
.
.
Problem:
In the #3 try, apparently I did it, but
print('Cancel the tasks')
increase up after each restarting, what's the reason?!Is there a better approach to overcome this problem?
Forgive me for the long question I tried to simplify it!
[NOTE]:
- I'm not looking for the
asyncio.timeout()
- I also tried with another thread in order to restart the eventloop with unsuccessfully result.
- I'm using Python 3.6
python python-3.x asynchronous python-asyncio event-loop
|
show 6 more comments
I'm working with an asyncio forever()
eventloop. Now I want to restart the loop (stop the loop and recreate a new loop) after a process or a signal or a change in a file, but I have some problem to do that:
Here are three simplified snippet codes which demonstrate some coroutine worker and a coroutine loop restarter:
#1st try:
import asyncio
async def coro_worker(proc):
print(f'Worker: {proc} started.')
while True:
print(f'Worker: {proc} process.')
await asyncio.sleep(proc)
async def reset_loop(loop):
# Some process
for i in range(5): # Like a process.
print(f'{i} counting for reset the eventloop.')
await asyncio.sleep(1)
main(loop) # Expected close the current loop and start a new loop!
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
# task.clear()
# task.close()
# task.stop()
print("Done cancelling tasks")
asyncio.get_event_loop().stop()
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[1]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Done cancelling tasks
Closing Loop
Closing Loop
Task exception was never retrieved
future: <Task cancelling coro=<reset_loop() done, defined at reset_asycio.py:11> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 40, in main
loop.run_forever()
File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "reset_asycio.py", line 17, in reset_loop
main(loop) # Expected close the current loop and start a new loop!
File "reset_asycio.py", line 48, in main
loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<reset_loop() running at reset_asycio.py:11>>
reset_asycio.py:51: RuntimeWarning: coroutine 'reset_loop' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
reset_asycio.py:51: RuntimeWarning: coroutine 'coro_worker' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
#2 try:
.
.
.
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
previous_loop.stop()
previous_loop.close()
offset = 1 # An offset to change the process name.
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[2]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Closing Loop
Task exception was never retrieved
future: <Task finished coro=<reset_loop() done, defined at reset_asycio.py:9> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 15, in reset_loop
main(loop) # Expected close the current loop and start new loop!
File "reset_asycio.py", line 21, in main
previous_loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f138>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f048>()]>>
#3 try:
.
.
.
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[3]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
0 counting for reset the eventloop.
1 counting for reset the eventloop.
Worker: 2 process.
2 counting for reset the eventloop.
Worker: 3 process.
3 counting for reset the eventloop.
Worker: 2 process.
4 counting for reset the eventloop.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
.
.
.
Problem:
In the #3 try, apparently I did it, but
print('Cancel the tasks')
increase up after each restarting, what's the reason?!Is there a better approach to overcome this problem?
Forgive me for the long question I tried to simplify it!
[NOTE]:
- I'm not looking for the
asyncio.timeout()
- I also tried with another thread in order to restart the eventloop with unsuccessfully result.
- I'm using Python 3.6
python python-3.x asynchronous python-asyncio event-loop
Why do you want to reset the event loop? That is an unusual idea.
– Klaus D.
Jan 1 at 20:00
Can you explain what you are trying to achieve with this - i.e. what problem are you solving?
– user4815162342
Jan 1 at 21:39
@user4815162342 I'm trying to implement an SNMP collector with the several configurations which stored in a file, I parse these configs, each config is given to the coroutine SNMP collector (likecoro_worker()
in question) in init loop (loop.create_task()
). my SNMP collector (coro_worker()
) has an indefinite loop. Problem is that when the config file changes, I can't stop this event loop (run_forever()
) and recreate the coroutine SNMP collector with the new config.
– Benyamin Jafari
Jan 2 at 5:46
This could be an XY problem. While you are focusing on how to reset the eventloop you should focus on how to properly end your task.
– Klaus D.
Jan 2 at 6:18
@KlausD. Is it necessary to ensure the end of the tasks to stop or close the event loop?
– Benyamin Jafari
Jan 2 at 6:39
|
show 6 more comments
I'm working with an asyncio forever()
eventloop. Now I want to restart the loop (stop the loop and recreate a new loop) after a process or a signal or a change in a file, but I have some problem to do that:
Here are three simplified snippet codes which demonstrate some coroutine worker and a coroutine loop restarter:
#1st try:
import asyncio
async def coro_worker(proc):
print(f'Worker: {proc} started.')
while True:
print(f'Worker: {proc} process.')
await asyncio.sleep(proc)
async def reset_loop(loop):
# Some process
for i in range(5): # Like a process.
print(f'{i} counting for reset the eventloop.')
await asyncio.sleep(1)
main(loop) # Expected close the current loop and start a new loop!
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
# task.clear()
# task.close()
# task.stop()
print("Done cancelling tasks")
asyncio.get_event_loop().stop()
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[1]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Done cancelling tasks
Closing Loop
Closing Loop
Task exception was never retrieved
future: <Task cancelling coro=<reset_loop() done, defined at reset_asycio.py:11> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 40, in main
loop.run_forever()
File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "reset_asycio.py", line 17, in reset_loop
main(loop) # Expected close the current loop and start a new loop!
File "reset_asycio.py", line 48, in main
loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<reset_loop() running at reset_asycio.py:11>>
reset_asycio.py:51: RuntimeWarning: coroutine 'reset_loop' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
reset_asycio.py:51: RuntimeWarning: coroutine 'coro_worker' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
#2 try:
.
.
.
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
previous_loop.stop()
previous_loop.close()
offset = 1 # An offset to change the process name.
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[2]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Closing Loop
Task exception was never retrieved
future: <Task finished coro=<reset_loop() done, defined at reset_asycio.py:9> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 15, in reset_loop
main(loop) # Expected close the current loop and start new loop!
File "reset_asycio.py", line 21, in main
previous_loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f138>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f048>()]>>
#3 try:
.
.
.
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[3]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
0 counting for reset the eventloop.
1 counting for reset the eventloop.
Worker: 2 process.
2 counting for reset the eventloop.
Worker: 3 process.
3 counting for reset the eventloop.
Worker: 2 process.
4 counting for reset the eventloop.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
.
.
.
Problem:
In the #3 try, apparently I did it, but
print('Cancel the tasks')
increase up after each restarting, what's the reason?!Is there a better approach to overcome this problem?
Forgive me for the long question I tried to simplify it!
[NOTE]:
- I'm not looking for the
asyncio.timeout()
- I also tried with another thread in order to restart the eventloop with unsuccessfully result.
- I'm using Python 3.6
python python-3.x asynchronous python-asyncio event-loop
I'm working with an asyncio forever()
eventloop. Now I want to restart the loop (stop the loop and recreate a new loop) after a process or a signal or a change in a file, but I have some problem to do that:
Here are three simplified snippet codes which demonstrate some coroutine worker and a coroutine loop restarter:
#1st try:
import asyncio
async def coro_worker(proc):
print(f'Worker: {proc} started.')
while True:
print(f'Worker: {proc} process.')
await asyncio.sleep(proc)
async def reset_loop(loop):
# Some process
for i in range(5): # Like a process.
print(f'{i} counting for reset the eventloop.')
await asyncio.sleep(1)
main(loop) # Expected close the current loop and start a new loop!
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
# task.clear()
# task.close()
# task.stop()
print("Done cancelling tasks")
asyncio.get_event_loop().stop()
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[1]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Done cancelling tasks
Closing Loop
Closing Loop
Task exception was never retrieved
future: <Task cancelling coro=<reset_loop() done, defined at reset_asycio.py:11> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 40, in main
loop.run_forever()
File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "reset_asycio.py", line 17, in reset_loop
main(loop) # Expected close the current loop and start a new loop!
File "reset_asycio.py", line 48, in main
loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<reset_loop() running at reset_asycio.py:11>>
reset_asycio.py:51: RuntimeWarning: coroutine 'reset_loop' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
reset_asycio.py:51: RuntimeWarning: coroutine 'coro_worker' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
#2 try:
.
.
.
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
previous_loop.stop()
previous_loop.close()
offset = 1 # An offset to change the process name.
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[2]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Closing Loop
Task exception was never retrieved
future: <Task finished coro=<reset_loop() done, defined at reset_asycio.py:9> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 15, in reset_loop
main(loop) # Expected close the current loop and start new loop!
File "reset_asycio.py", line 21, in main
previous_loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f138>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f048>()]>>
#3 try:
.
.
.
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[3]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
0 counting for reset the eventloop.
1 counting for reset the eventloop.
Worker: 2 process.
2 counting for reset the eventloop.
Worker: 3 process.
3 counting for reset the eventloop.
Worker: 2 process.
4 counting for reset the eventloop.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
.
.
.
Problem:
In the #3 try, apparently I did it, but
print('Cancel the tasks')
increase up after each restarting, what's the reason?!Is there a better approach to overcome this problem?
Forgive me for the long question I tried to simplify it!
[NOTE]:
- I'm not looking for the
asyncio.timeout()
- I also tried with another thread in order to restart the eventloop with unsuccessfully result.
- I'm using Python 3.6
python python-3.x asynchronous python-asyncio event-loop
python python-3.x asynchronous python-asyncio event-loop
edited Jan 2 at 22:05
Benyamin Jafari
asked Jan 1 at 19:20


Benyamin JafariBenyamin Jafari
3,44342250
3,44342250
Why do you want to reset the event loop? That is an unusual idea.
– Klaus D.
Jan 1 at 20:00
Can you explain what you are trying to achieve with this - i.e. what problem are you solving?
– user4815162342
Jan 1 at 21:39
@user4815162342 I'm trying to implement an SNMP collector with the several configurations which stored in a file, I parse these configs, each config is given to the coroutine SNMP collector (likecoro_worker()
in question) in init loop (loop.create_task()
). my SNMP collector (coro_worker()
) has an indefinite loop. Problem is that when the config file changes, I can't stop this event loop (run_forever()
) and recreate the coroutine SNMP collector with the new config.
– Benyamin Jafari
Jan 2 at 5:46
This could be an XY problem. While you are focusing on how to reset the eventloop you should focus on how to properly end your task.
– Klaus D.
Jan 2 at 6:18
@KlausD. Is it necessary to ensure the end of the tasks to stop or close the event loop?
– Benyamin Jafari
Jan 2 at 6:39
|
show 6 more comments
Why do you want to reset the event loop? That is an unusual idea.
– Klaus D.
Jan 1 at 20:00
Can you explain what you are trying to achieve with this - i.e. what problem are you solving?
– user4815162342
Jan 1 at 21:39
@user4815162342 I'm trying to implement an SNMP collector with the several configurations which stored in a file, I parse these configs, each config is given to the coroutine SNMP collector (likecoro_worker()
in question) in init loop (loop.create_task()
). my SNMP collector (coro_worker()
) has an indefinite loop. Problem is that when the config file changes, I can't stop this event loop (run_forever()
) and recreate the coroutine SNMP collector with the new config.
– Benyamin Jafari
Jan 2 at 5:46
This could be an XY problem. While you are focusing on how to reset the eventloop you should focus on how to properly end your task.
– Klaus D.
Jan 2 at 6:18
@KlausD. Is it necessary to ensure the end of the tasks to stop or close the event loop?
– Benyamin Jafari
Jan 2 at 6:39
Why do you want to reset the event loop? That is an unusual idea.
– Klaus D.
Jan 1 at 20:00
Why do you want to reset the event loop? That is an unusual idea.
– Klaus D.
Jan 1 at 20:00
Can you explain what you are trying to achieve with this - i.e. what problem are you solving?
– user4815162342
Jan 1 at 21:39
Can you explain what you are trying to achieve with this - i.e. what problem are you solving?
– user4815162342
Jan 1 at 21:39
@user4815162342 I'm trying to implement an SNMP collector with the several configurations which stored in a file, I parse these configs, each config is given to the coroutine SNMP collector (like
coro_worker()
in question) in init loop (loop.create_task()
). my SNMP collector (coro_worker()
) has an indefinite loop. Problem is that when the config file changes, I can't stop this event loop (run_forever()
) and recreate the coroutine SNMP collector with the new config.– Benyamin Jafari
Jan 2 at 5:46
@user4815162342 I'm trying to implement an SNMP collector with the several configurations which stored in a file, I parse these configs, each config is given to the coroutine SNMP collector (like
coro_worker()
in question) in init loop (loop.create_task()
). my SNMP collector (coro_worker()
) has an indefinite loop. Problem is that when the config file changes, I can't stop this event loop (run_forever()
) and recreate the coroutine SNMP collector with the new config.– Benyamin Jafari
Jan 2 at 5:46
This could be an XY problem. While you are focusing on how to reset the eventloop you should focus on how to properly end your task.
– Klaus D.
Jan 2 at 6:18
This could be an XY problem. While you are focusing on how to reset the eventloop you should focus on how to properly end your task.
– Klaus D.
Jan 2 at 6:18
@KlausD. Is it necessary to ensure the end of the tasks to stop or close the event loop?
– Benyamin Jafari
Jan 2 at 6:39
@KlausD. Is it necessary to ensure the end of the tasks to stop or close the event loop?
– Benyamin Jafari
Jan 2 at 6:39
|
show 6 more comments
1 Answer
1
active
oldest
votes
The recursive call to main()
and the new event loop adds unnecessary complication. Here is a simpler prototype to play with - it monitors an external source (the file system) and, when a file is created, it just stops the loop. main()
contains a loop that takes care of both (re-)creating and cancelling the tasks:
import os, asyncio, random
async def monitor():
loop = asyncio.get_event_loop()
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
loop.stop()
await asyncio.sleep(1)
async def work(workid):
while True:
t = random.random()
print(workid, 'sleeping for', t)
await asyncio.sleep(t)
def main():
loop = asyncio.get_event_loop()
loop.create_task(monitor())
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
loop.run_forever()
for t in workers:
t.cancel()
offset += 3
if __name__ == '__main__':
main()
Another option would be to never even stop the event loop, but to simply trigger a reset event:
async def monitor(evt):
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
evt.set()
await asyncio.sleep(1)
In this design main()
can be a coroutine:
async def main():
loop = asyncio.get_event_loop()
reset_evt = asyncio.Event()
loop.create_task(monitor(reset_evt))
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
await reset_evt.wait()
reset_evt.clear()
for t in workers:
t.cancel()
offset += 3
if __name__ == '__main__':
asyncio.run(main())
# or asyncio.get_event_loop().run_until_complete(main())
Note that in both variants canceling the tasks is implemented by await
raising a CancelledError
exception. The task must not catch all exceptions using try: ... except: ...
and, if it does so, needs to re-raise the exception.
Apparently works fine, Thanks again +1
– Benyamin Jafari
Jan 2 at 22:06
Is the second section in your answer compatible with Python 3.7?
– Benyamin Jafari
Jan 4 at 16:10
1
@BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace theasyncio.run
line with the commented-out line below it.
– user4815162342
Jan 4 at 23:02
Sometimes I encountered with this error in my workers (work()
):concurrent.futures._base.CancelledError
then the previous task runs with the new task after the change, simultaneously.
– Benyamin Jafari
Jan 6 at 7:52
@BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising aCancelledError
is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.
– user4815162342
Jan 6 at 8:48
|
show 4 more comments
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53998267%2fhow-to-reset-an-asyncio-eventloop-by-a-worker%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
The recursive call to main()
and the new event loop adds unnecessary complication. Here is a simpler prototype to play with - it monitors an external source (the file system) and, when a file is created, it just stops the loop. main()
contains a loop that takes care of both (re-)creating and cancelling the tasks:
import os, asyncio, random
async def monitor():
loop = asyncio.get_event_loop()
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
loop.stop()
await asyncio.sleep(1)
async def work(workid):
while True:
t = random.random()
print(workid, 'sleeping for', t)
await asyncio.sleep(t)
def main():
loop = asyncio.get_event_loop()
loop.create_task(monitor())
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
loop.run_forever()
for t in workers:
t.cancel()
offset += 3
if __name__ == '__main__':
main()
Another option would be to never even stop the event loop, but to simply trigger a reset event:
async def monitor(evt):
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
evt.set()
await asyncio.sleep(1)
In this design main()
can be a coroutine:
async def main():
loop = asyncio.get_event_loop()
reset_evt = asyncio.Event()
loop.create_task(monitor(reset_evt))
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
await reset_evt.wait()
reset_evt.clear()
for t in workers:
t.cancel()
offset += 3
if __name__ == '__main__':
asyncio.run(main())
# or asyncio.get_event_loop().run_until_complete(main())
Note that in both variants canceling the tasks is implemented by await
raising a CancelledError
exception. The task must not catch all exceptions using try: ... except: ...
and, if it does so, needs to re-raise the exception.
Apparently works fine, Thanks again +1
– Benyamin Jafari
Jan 2 at 22:06
Is the second section in your answer compatible with Python 3.7?
– Benyamin Jafari
Jan 4 at 16:10
1
@BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace theasyncio.run
line with the commented-out line below it.
– user4815162342
Jan 4 at 23:02
Sometimes I encountered with this error in my workers (work()
):concurrent.futures._base.CancelledError
then the previous task runs with the new task after the change, simultaneously.
– Benyamin Jafari
Jan 6 at 7:52
@BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising aCancelledError
is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.
– user4815162342
Jan 6 at 8:48
|
show 4 more comments
The recursive call to main()
and the new event loop adds unnecessary complication. Here is a simpler prototype to play with - it monitors an external source (the file system) and, when a file is created, it just stops the loop. main()
contains a loop that takes care of both (re-)creating and cancelling the tasks:
import os, asyncio, random
async def monitor():
loop = asyncio.get_event_loop()
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
loop.stop()
await asyncio.sleep(1)
async def work(workid):
while True:
t = random.random()
print(workid, 'sleeping for', t)
await asyncio.sleep(t)
def main():
loop = asyncio.get_event_loop()
loop.create_task(monitor())
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
loop.run_forever()
for t in workers:
t.cancel()
offset += 3
if __name__ == '__main__':
main()
Another option would be to never even stop the event loop, but to simply trigger a reset event:
async def monitor(evt):
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
evt.set()
await asyncio.sleep(1)
In this design main()
can be a coroutine:
async def main():
loop = asyncio.get_event_loop()
reset_evt = asyncio.Event()
loop.create_task(monitor(reset_evt))
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
await reset_evt.wait()
reset_evt.clear()
for t in workers:
t.cancel()
offset += 3
if __name__ == '__main__':
asyncio.run(main())
# or asyncio.get_event_loop().run_until_complete(main())
Note that in both variants canceling the tasks is implemented by await
raising a CancelledError
exception. The task must not catch all exceptions using try: ... except: ...
and, if it does so, needs to re-raise the exception.
Apparently works fine, Thanks again +1
– Benyamin Jafari
Jan 2 at 22:06
Is the second section in your answer compatible with Python 3.7?
– Benyamin Jafari
Jan 4 at 16:10
1
@BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace theasyncio.run
line with the commented-out line below it.
– user4815162342
Jan 4 at 23:02
Sometimes I encountered with this error in my workers (work()
):concurrent.futures._base.CancelledError
then the previous task runs with the new task after the change, simultaneously.
– Benyamin Jafari
Jan 6 at 7:52
@BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising aCancelledError
is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.
– user4815162342
Jan 6 at 8:48
|
show 4 more comments
The recursive call to main()
and the new event loop adds unnecessary complication. Here is a simpler prototype to play with - it monitors an external source (the file system) and, when a file is created, it just stops the loop. main()
contains a loop that takes care of both (re-)creating and cancelling the tasks:
import os, asyncio, random
async def monitor():
loop = asyncio.get_event_loop()
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
loop.stop()
await asyncio.sleep(1)
async def work(workid):
while True:
t = random.random()
print(workid, 'sleeping for', t)
await asyncio.sleep(t)
def main():
loop = asyncio.get_event_loop()
loop.create_task(monitor())
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
loop.run_forever()
for t in workers:
t.cancel()
offset += 3
if __name__ == '__main__':
main()
Another option would be to never even stop the event loop, but to simply trigger a reset event:
async def monitor(evt):
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
evt.set()
await asyncio.sleep(1)
In this design main()
can be a coroutine:
async def main():
loop = asyncio.get_event_loop()
reset_evt = asyncio.Event()
loop.create_task(monitor(reset_evt))
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
await reset_evt.wait()
reset_evt.clear()
for t in workers:
t.cancel()
offset += 3
if __name__ == '__main__':
asyncio.run(main())
# or asyncio.get_event_loop().run_until_complete(main())
Note that in both variants canceling the tasks is implemented by await
raising a CancelledError
exception. The task must not catch all exceptions using try: ... except: ...
and, if it does so, needs to re-raise the exception.
The recursive call to main()
and the new event loop adds unnecessary complication. Here is a simpler prototype to play with - it monitors an external source (the file system) and, when a file is created, it just stops the loop. main()
contains a loop that takes care of both (re-)creating and cancelling the tasks:
import os, asyncio, random
async def monitor():
loop = asyncio.get_event_loop()
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
loop.stop()
await asyncio.sleep(1)
async def work(workid):
while True:
t = random.random()
print(workid, 'sleeping for', t)
await asyncio.sleep(t)
def main():
loop = asyncio.get_event_loop()
loop.create_task(monitor())
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
loop.run_forever()
for t in workers:
t.cancel()
offset += 3
if __name__ == '__main__':
main()
Another option would be to never even stop the event loop, but to simply trigger a reset event:
async def monitor(evt):
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
evt.set()
await asyncio.sleep(1)
In this design main()
can be a coroutine:
async def main():
loop = asyncio.get_event_loop()
reset_evt = asyncio.Event()
loop.create_task(monitor(reset_evt))
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
await reset_evt.wait()
reset_evt.clear()
for t in workers:
t.cancel()
offset += 3
if __name__ == '__main__':
asyncio.run(main())
# or asyncio.get_event_loop().run_until_complete(main())
Note that in both variants canceling the tasks is implemented by await
raising a CancelledError
exception. The task must not catch all exceptions using try: ... except: ...
and, if it does so, needs to re-raise the exception.
edited Jan 6 at 21:04
answered Jan 2 at 11:49
user4815162342user4815162342
63.4k594149
63.4k594149
Apparently works fine, Thanks again +1
– Benyamin Jafari
Jan 2 at 22:06
Is the second section in your answer compatible with Python 3.7?
– Benyamin Jafari
Jan 4 at 16:10
1
@BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace theasyncio.run
line with the commented-out line below it.
– user4815162342
Jan 4 at 23:02
Sometimes I encountered with this error in my workers (work()
):concurrent.futures._base.CancelledError
then the previous task runs with the new task after the change, simultaneously.
– Benyamin Jafari
Jan 6 at 7:52
@BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising aCancelledError
is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.
– user4815162342
Jan 6 at 8:48
|
show 4 more comments
Apparently works fine, Thanks again +1
– Benyamin Jafari
Jan 2 at 22:06
Is the second section in your answer compatible with Python 3.7?
– Benyamin Jafari
Jan 4 at 16:10
1
@BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace theasyncio.run
line with the commented-out line below it.
– user4815162342
Jan 4 at 23:02
Sometimes I encountered with this error in my workers (work()
):concurrent.futures._base.CancelledError
then the previous task runs with the new task after the change, simultaneously.
– Benyamin Jafari
Jan 6 at 7:52
@BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising aCancelledError
is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.
– user4815162342
Jan 6 at 8:48
Apparently works fine, Thanks again +1
– Benyamin Jafari
Jan 2 at 22:06
Apparently works fine, Thanks again +1
– Benyamin Jafari
Jan 2 at 22:06
Is the second section in your answer compatible with Python 3.7?
– Benyamin Jafari
Jan 4 at 16:10
Is the second section in your answer compatible with Python 3.7?
– Benyamin Jafari
Jan 4 at 16:10
1
1
@BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace the
asyncio.run
line with the commented-out line below it.– user4815162342
Jan 4 at 23:02
@BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace the
asyncio.run
line with the commented-out line below it.– user4815162342
Jan 4 at 23:02
Sometimes I encountered with this error in my workers (
work()
): concurrent.futures._base.CancelledError
then the previous task runs with the new task after the change, simultaneously.– Benyamin Jafari
Jan 6 at 7:52
Sometimes I encountered with this error in my workers (
work()
): concurrent.futures._base.CancelledError
then the previous task runs with the new task after the change, simultaneously.– Benyamin Jafari
Jan 6 at 7:52
@BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising a
CancelledError
is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.– user4815162342
Jan 6 at 8:48
@BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising a
CancelledError
is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.– user4815162342
Jan 6 at 8:48
|
show 4 more comments
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53998267%2fhow-to-reset-an-asyncio-eventloop-by-a-worker%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Why do you want to reset the event loop? That is an unusual idea.
– Klaus D.
Jan 1 at 20:00
Can you explain what you are trying to achieve with this - i.e. what problem are you solving?
– user4815162342
Jan 1 at 21:39
@user4815162342 I'm trying to implement an SNMP collector with the several configurations which stored in a file, I parse these configs, each config is given to the coroutine SNMP collector (like
coro_worker()
in question) in init loop (loop.create_task()
). my SNMP collector (coro_worker()
) has an indefinite loop. Problem is that when the config file changes, I can't stop this event loop (run_forever()
) and recreate the coroutine SNMP collector with the new config.– Benyamin Jafari
Jan 2 at 5:46
This could be an XY problem. While you are focusing on how to reset the eventloop you should focus on how to properly end your task.
– Klaus D.
Jan 2 at 6:18
@KlausD. Is it necessary to ensure the end of the tasks to stop or close the event loop?
– Benyamin Jafari
Jan 2 at 6:39