How to reset an asyncio eventloop by a worker?












2















I'm working with an asyncio forever() eventloop. Now I want to restart the loop (stop the loop and recreate a new loop) after a process or a signal or a change in a file, but I have some problem to do that:





Here are three simplified snippet codes which demonstrate some coroutine worker and a coroutine loop restarter:





#1st try:



import asyncio

async def coro_worker(proc):
print(f'Worker: {proc} started.')
while True:
print(f'Worker: {proc} process.')
await asyncio.sleep(proc)

async def reset_loop(loop):
# Some process
for i in range(5): # Like a process.
print(f'{i} counting for reset the eventloop.')
await asyncio.sleep(1)

main(loop) # Expected close the current loop and start a new loop!

def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
# task.clear()
# task.close()
# task.stop()

print("Done cancelling tasks")
asyncio.get_event_loop().stop()

process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))

try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()


Out[1]:



Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Done cancelling tasks
Closing Loop
Closing Loop
Task exception was never retrieved
future: <Task cancelling coro=<reset_loop() done, defined at reset_asycio.py:11> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 40, in main
loop.run_forever()
File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "reset_asycio.py", line 17, in reset_loop
main(loop) # Expected close the current loop and start a new loop!
File "reset_asycio.py", line 48, in main
loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<reset_loop() running at reset_asycio.py:11>>
reset_asycio.py:51: RuntimeWarning: coroutine 'reset_loop' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
reset_asycio.py:51: RuntimeWarning: coroutine 'coro_worker' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>




#2 try:



.
.
.

def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
previous_loop.stop()
previous_loop.close()
offset = 1 # An offset to change the process name.

process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))

try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()


Out[2]:



Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Closing Loop
Task exception was never retrieved
future: <Task finished coro=<reset_loop() done, defined at reset_asycio.py:9> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 15, in reset_loop
main(loop) # Expected close the current loop and start new loop!
File "reset_asycio.py", line 21, in main
previous_loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f138>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f048>()]>>




#3 try:



.
.
.

def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()

process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))

try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()


Out[3]:



Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
0 counting for reset the eventloop.
1 counting for reset the eventloop.
Worker: 2 process.
2 counting for reset the eventloop.
Worker: 3 process.
3 counting for reset the eventloop.
Worker: 2 process.
4 counting for reset the eventloop.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
.
.
.




Problem:




  • In the #3 try, apparently I did it, but print('Cancel the tasks') increase up after each restarting, what's the reason?!


  • Is there a better approach to overcome this problem?



Forgive me for the long question I tried to simplify it!





[NOTE]:




  • I'm not looking for the asyncio.timeout()

  • I also tried with another thread in order to restart the eventloop with unsuccessfully result.

  • I'm using Python 3.6










share|improve this question

























  • Why do you want to reset the event loop? That is an unusual idea.

    – Klaus D.
    Jan 1 at 20:00











  • Can you explain what you are trying to achieve with this - i.e. what problem are you solving?

    – user4815162342
    Jan 1 at 21:39











  • @user4815162342 I'm trying to implement an SNMP collector with the several configurations which stored in a file, I parse these configs, each config is given to the coroutine SNMP collector (like coro_worker() in question) in init loop (loop.create_task()). my SNMP collector (coro_worker()) has an indefinite loop. Problem is that when the config file changes, I can't stop this event loop (run_forever()) and recreate the coroutine SNMP collector with the new config.

    – Benyamin Jafari
    Jan 2 at 5:46











  • This could be an XY problem. While you are focusing on how to reset the eventloop you should focus on how to properly end your task.

    – Klaus D.
    Jan 2 at 6:18











  • @KlausD. Is it necessary to ensure the end of the tasks to stop or close the event loop?

    – Benyamin Jafari
    Jan 2 at 6:39
















2















I'm working with an asyncio forever() eventloop. Now I want to restart the loop (stop the loop and recreate a new loop) after a process or a signal or a change in a file, but I have some problem to do that:





Here are three simplified snippet codes which demonstrate some coroutine worker and a coroutine loop restarter:





#1st try:



import asyncio

async def coro_worker(proc):
print(f'Worker: {proc} started.')
while True:
print(f'Worker: {proc} process.')
await asyncio.sleep(proc)

async def reset_loop(loop):
# Some process
for i in range(5): # Like a process.
print(f'{i} counting for reset the eventloop.')
await asyncio.sleep(1)

main(loop) # Expected close the current loop and start a new loop!

def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
# task.clear()
# task.close()
# task.stop()

print("Done cancelling tasks")
asyncio.get_event_loop().stop()

process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))

try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()


Out[1]:



Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Done cancelling tasks
Closing Loop
Closing Loop
Task exception was never retrieved
future: <Task cancelling coro=<reset_loop() done, defined at reset_asycio.py:11> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 40, in main
loop.run_forever()
File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "reset_asycio.py", line 17, in reset_loop
main(loop) # Expected close the current loop and start a new loop!
File "reset_asycio.py", line 48, in main
loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<reset_loop() running at reset_asycio.py:11>>
reset_asycio.py:51: RuntimeWarning: coroutine 'reset_loop' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
reset_asycio.py:51: RuntimeWarning: coroutine 'coro_worker' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>




#2 try:



.
.
.

def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
previous_loop.stop()
previous_loop.close()
offset = 1 # An offset to change the process name.

process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))

try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()


Out[2]:



Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Closing Loop
Task exception was never retrieved
future: <Task finished coro=<reset_loop() done, defined at reset_asycio.py:9> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 15, in reset_loop
main(loop) # Expected close the current loop and start new loop!
File "reset_asycio.py", line 21, in main
previous_loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f138>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f048>()]>>




#3 try:



.
.
.

def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()

process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))

try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()


Out[3]:



Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
0 counting for reset the eventloop.
1 counting for reset the eventloop.
Worker: 2 process.
2 counting for reset the eventloop.
Worker: 3 process.
3 counting for reset the eventloop.
Worker: 2 process.
4 counting for reset the eventloop.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
.
.
.




Problem:




  • In the #3 try, apparently I did it, but print('Cancel the tasks') increase up after each restarting, what's the reason?!


  • Is there a better approach to overcome this problem?



Forgive me for the long question I tried to simplify it!





[NOTE]:




  • I'm not looking for the asyncio.timeout()

  • I also tried with another thread in order to restart the eventloop with unsuccessfully result.

  • I'm using Python 3.6










share|improve this question

























  • Why do you want to reset the event loop? That is an unusual idea.

    – Klaus D.
    Jan 1 at 20:00











  • Can you explain what you are trying to achieve with this - i.e. what problem are you solving?

    – user4815162342
    Jan 1 at 21:39











  • @user4815162342 I'm trying to implement an SNMP collector with the several configurations which stored in a file, I parse these configs, each config is given to the coroutine SNMP collector (like coro_worker() in question) in init loop (loop.create_task()). my SNMP collector (coro_worker()) has an indefinite loop. Problem is that when the config file changes, I can't stop this event loop (run_forever()) and recreate the coroutine SNMP collector with the new config.

    – Benyamin Jafari
    Jan 2 at 5:46











  • This could be an XY problem. While you are focusing on how to reset the eventloop you should focus on how to properly end your task.

    – Klaus D.
    Jan 2 at 6:18











  • @KlausD. Is it necessary to ensure the end of the tasks to stop or close the event loop?

    – Benyamin Jafari
    Jan 2 at 6:39














2












2








2


1






I'm working with an asyncio forever() eventloop. Now I want to restart the loop (stop the loop and recreate a new loop) after a process or a signal or a change in a file, but I have some problem to do that:





Here are three simplified snippet codes which demonstrate some coroutine worker and a coroutine loop restarter:





#1st try:



import asyncio

async def coro_worker(proc):
print(f'Worker: {proc} started.')
while True:
print(f'Worker: {proc} process.')
await asyncio.sleep(proc)

async def reset_loop(loop):
# Some process
for i in range(5): # Like a process.
print(f'{i} counting for reset the eventloop.')
await asyncio.sleep(1)

main(loop) # Expected close the current loop and start a new loop!

def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
# task.clear()
# task.close()
# task.stop()

print("Done cancelling tasks")
asyncio.get_event_loop().stop()

process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))

try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()


Out[1]:



Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Done cancelling tasks
Closing Loop
Closing Loop
Task exception was never retrieved
future: <Task cancelling coro=<reset_loop() done, defined at reset_asycio.py:11> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 40, in main
loop.run_forever()
File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "reset_asycio.py", line 17, in reset_loop
main(loop) # Expected close the current loop and start a new loop!
File "reset_asycio.py", line 48, in main
loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<reset_loop() running at reset_asycio.py:11>>
reset_asycio.py:51: RuntimeWarning: coroutine 'reset_loop' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
reset_asycio.py:51: RuntimeWarning: coroutine 'coro_worker' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>




#2 try:



.
.
.

def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
previous_loop.stop()
previous_loop.close()
offset = 1 # An offset to change the process name.

process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))

try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()


Out[2]:



Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Closing Loop
Task exception was never retrieved
future: <Task finished coro=<reset_loop() done, defined at reset_asycio.py:9> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 15, in reset_loop
main(loop) # Expected close the current loop and start new loop!
File "reset_asycio.py", line 21, in main
previous_loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f138>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f048>()]>>




#3 try:



.
.
.

def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()

process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))

try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()


Out[3]:



Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
0 counting for reset the eventloop.
1 counting for reset the eventloop.
Worker: 2 process.
2 counting for reset the eventloop.
Worker: 3 process.
3 counting for reset the eventloop.
Worker: 2 process.
4 counting for reset the eventloop.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
.
.
.




Problem:




  • In the #3 try, apparently I did it, but print('Cancel the tasks') increase up after each restarting, what's the reason?!


  • Is there a better approach to overcome this problem?



Forgive me for the long question I tried to simplify it!





[NOTE]:




  • I'm not looking for the asyncio.timeout()

  • I also tried with another thread in order to restart the eventloop with unsuccessfully result.

  • I'm using Python 3.6










share|improve this question
















I'm working with an asyncio forever() eventloop. Now I want to restart the loop (stop the loop and recreate a new loop) after a process or a signal or a change in a file, but I have some problem to do that:





Here are three simplified snippet codes which demonstrate some coroutine worker and a coroutine loop restarter:





#1st try:



import asyncio

async def coro_worker(proc):
print(f'Worker: {proc} started.')
while True:
print(f'Worker: {proc} process.')
await asyncio.sleep(proc)

async def reset_loop(loop):
# Some process
for i in range(5): # Like a process.
print(f'{i} counting for reset the eventloop.')
await asyncio.sleep(1)

main(loop) # Expected close the current loop and start a new loop!

def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
# task.clear()
# task.close()
# task.stop()

print("Done cancelling tasks")
asyncio.get_event_loop().stop()

process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))

try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()


Out[1]:



Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Done cancelling tasks
Closing Loop
Closing Loop
Task exception was never retrieved
future: <Task cancelling coro=<reset_loop() done, defined at reset_asycio.py:11> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 40, in main
loop.run_forever()
File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "reset_asycio.py", line 17, in reset_loop
main(loop) # Expected close the current loop and start a new loop!
File "reset_asycio.py", line 48, in main
loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<reset_loop() running at reset_asycio.py:11>>
reset_asycio.py:51: RuntimeWarning: coroutine 'reset_loop' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
reset_asycio.py:51: RuntimeWarning: coroutine 'coro_worker' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>




#2 try:



.
.
.

def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
previous_loop.stop()
previous_loop.close()
offset = 1 # An offset to change the process name.

process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))

try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()


Out[2]:



Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Closing Loop
Task exception was never retrieved
future: <Task finished coro=<reset_loop() done, defined at reset_asycio.py:9> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 15, in reset_loop
main(loop) # Expected close the current loop and start new loop!
File "reset_asycio.py", line 21, in main
previous_loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f138>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f048>()]>>




#3 try:



.
.
.

def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()

process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))

try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()


Out[3]:



Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
0 counting for reset the eventloop.
1 counting for reset the eventloop.
Worker: 2 process.
2 counting for reset the eventloop.
Worker: 3 process.
3 counting for reset the eventloop.
Worker: 2 process.
4 counting for reset the eventloop.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
.
.
.




Problem:




  • In the #3 try, apparently I did it, but print('Cancel the tasks') increase up after each restarting, what's the reason?!


  • Is there a better approach to overcome this problem?



Forgive me for the long question I tried to simplify it!





[NOTE]:




  • I'm not looking for the asyncio.timeout()

  • I also tried with another thread in order to restart the eventloop with unsuccessfully result.

  • I'm using Python 3.6







python python-3.x asynchronous python-asyncio event-loop






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Jan 2 at 22:05







Benyamin Jafari

















asked Jan 1 at 19:20









Benyamin JafariBenyamin Jafari

3,44342250




3,44342250













  • Why do you want to reset the event loop? That is an unusual idea.

    – Klaus D.
    Jan 1 at 20:00











  • Can you explain what you are trying to achieve with this - i.e. what problem are you solving?

    – user4815162342
    Jan 1 at 21:39











  • @user4815162342 I'm trying to implement an SNMP collector with the several configurations which stored in a file, I parse these configs, each config is given to the coroutine SNMP collector (like coro_worker() in question) in init loop (loop.create_task()). my SNMP collector (coro_worker()) has an indefinite loop. Problem is that when the config file changes, I can't stop this event loop (run_forever()) and recreate the coroutine SNMP collector with the new config.

    – Benyamin Jafari
    Jan 2 at 5:46











  • This could be an XY problem. While you are focusing on how to reset the eventloop you should focus on how to properly end your task.

    – Klaus D.
    Jan 2 at 6:18











  • @KlausD. Is it necessary to ensure the end of the tasks to stop or close the event loop?

    – Benyamin Jafari
    Jan 2 at 6:39



















  • Why do you want to reset the event loop? That is an unusual idea.

    – Klaus D.
    Jan 1 at 20:00











  • Can you explain what you are trying to achieve with this - i.e. what problem are you solving?

    – user4815162342
    Jan 1 at 21:39











  • @user4815162342 I'm trying to implement an SNMP collector with the several configurations which stored in a file, I parse these configs, each config is given to the coroutine SNMP collector (like coro_worker() in question) in init loop (loop.create_task()). my SNMP collector (coro_worker()) has an indefinite loop. Problem is that when the config file changes, I can't stop this event loop (run_forever()) and recreate the coroutine SNMP collector with the new config.

    – Benyamin Jafari
    Jan 2 at 5:46











  • This could be an XY problem. While you are focusing on how to reset the eventloop you should focus on how to properly end your task.

    – Klaus D.
    Jan 2 at 6:18











  • @KlausD. Is it necessary to ensure the end of the tasks to stop or close the event loop?

    – Benyamin Jafari
    Jan 2 at 6:39

















Why do you want to reset the event loop? That is an unusual idea.

– Klaus D.
Jan 1 at 20:00





Why do you want to reset the event loop? That is an unusual idea.

– Klaus D.
Jan 1 at 20:00













Can you explain what you are trying to achieve with this - i.e. what problem are you solving?

– user4815162342
Jan 1 at 21:39





Can you explain what you are trying to achieve with this - i.e. what problem are you solving?

– user4815162342
Jan 1 at 21:39













@user4815162342 I'm trying to implement an SNMP collector with the several configurations which stored in a file, I parse these configs, each config is given to the coroutine SNMP collector (like coro_worker() in question) in init loop (loop.create_task()). my SNMP collector (coro_worker()) has an indefinite loop. Problem is that when the config file changes, I can't stop this event loop (run_forever()) and recreate the coroutine SNMP collector with the new config.

– Benyamin Jafari
Jan 2 at 5:46





@user4815162342 I'm trying to implement an SNMP collector with the several configurations which stored in a file, I parse these configs, each config is given to the coroutine SNMP collector (like coro_worker() in question) in init loop (loop.create_task()). my SNMP collector (coro_worker()) has an indefinite loop. Problem is that when the config file changes, I can't stop this event loop (run_forever()) and recreate the coroutine SNMP collector with the new config.

– Benyamin Jafari
Jan 2 at 5:46













This could be an XY problem. While you are focusing on how to reset the eventloop you should focus on how to properly end your task.

– Klaus D.
Jan 2 at 6:18





This could be an XY problem. While you are focusing on how to reset the eventloop you should focus on how to properly end your task.

– Klaus D.
Jan 2 at 6:18













@KlausD. Is it necessary to ensure the end of the tasks to stop or close the event loop?

– Benyamin Jafari
Jan 2 at 6:39





@KlausD. Is it necessary to ensure the end of the tasks to stop or close the event loop?

– Benyamin Jafari
Jan 2 at 6:39












1 Answer
1






active

oldest

votes


















1














The recursive call to main() and the new event loop adds unnecessary complication. Here is a simpler prototype to play with - it monitors an external source (the file system) and, when a file is created, it just stops the loop. main() contains a loop that takes care of both (re-)creating and cancelling the tasks:



import os, asyncio, random

async def monitor():
loop = asyncio.get_event_loop()
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
loop.stop()
await asyncio.sleep(1)

async def work(workid):
while True:
t = random.random()
print(workid, 'sleeping for', t)
await asyncio.sleep(t)

def main():
loop = asyncio.get_event_loop()
loop.create_task(monitor())
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
loop.run_forever()
for t in workers:
t.cancel()
offset += 3

if __name__ == '__main__':
main()


Another option would be to never even stop the event loop, but to simply trigger a reset event:



async def monitor(evt):
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
evt.set()
await asyncio.sleep(1)


In this design main() can be a coroutine:



async def main():
loop = asyncio.get_event_loop()
reset_evt = asyncio.Event()
loop.create_task(monitor(reset_evt))
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
await reset_evt.wait()
reset_evt.clear()
for t in workers:
t.cancel()
offset += 3

if __name__ == '__main__':
asyncio.run(main())
# or asyncio.get_event_loop().run_until_complete(main())


Note that in both variants canceling the tasks is implemented by await raising a CancelledError exception. The task must not catch all exceptions using try: ... except: ... and, if it does so, needs to re-raise the exception.






share|improve this answer


























  • Apparently works fine, Thanks again +1

    – Benyamin Jafari
    Jan 2 at 22:06











  • Is the second section in your answer compatible with Python 3.7?

    – Benyamin Jafari
    Jan 4 at 16:10








  • 1





    @BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace the asyncio.run line with the commented-out line below it.

    – user4815162342
    Jan 4 at 23:02











  • Sometimes I encountered with this error in my workers (work()): concurrent.futures._base.CancelledError then the previous task runs with the new task after the change, simultaneously.

    – Benyamin Jafari
    Jan 6 at 7:52











  • @BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising a CancelledError is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.

    – user4815162342
    Jan 6 at 8:48











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53998267%2fhow-to-reset-an-asyncio-eventloop-by-a-worker%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









1














The recursive call to main() and the new event loop adds unnecessary complication. Here is a simpler prototype to play with - it monitors an external source (the file system) and, when a file is created, it just stops the loop. main() contains a loop that takes care of both (re-)creating and cancelling the tasks:



import os, asyncio, random

async def monitor():
loop = asyncio.get_event_loop()
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
loop.stop()
await asyncio.sleep(1)

async def work(workid):
while True:
t = random.random()
print(workid, 'sleeping for', t)
await asyncio.sleep(t)

def main():
loop = asyncio.get_event_loop()
loop.create_task(monitor())
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
loop.run_forever()
for t in workers:
t.cancel()
offset += 3

if __name__ == '__main__':
main()


Another option would be to never even stop the event loop, but to simply trigger a reset event:



async def monitor(evt):
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
evt.set()
await asyncio.sleep(1)


In this design main() can be a coroutine:



async def main():
loop = asyncio.get_event_loop()
reset_evt = asyncio.Event()
loop.create_task(monitor(reset_evt))
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
await reset_evt.wait()
reset_evt.clear()
for t in workers:
t.cancel()
offset += 3

if __name__ == '__main__':
asyncio.run(main())
# or asyncio.get_event_loop().run_until_complete(main())


Note that in both variants canceling the tasks is implemented by await raising a CancelledError exception. The task must not catch all exceptions using try: ... except: ... and, if it does so, needs to re-raise the exception.






share|improve this answer


























  • Apparently works fine, Thanks again +1

    – Benyamin Jafari
    Jan 2 at 22:06











  • Is the second section in your answer compatible with Python 3.7?

    – Benyamin Jafari
    Jan 4 at 16:10








  • 1





    @BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace the asyncio.run line with the commented-out line below it.

    – user4815162342
    Jan 4 at 23:02











  • Sometimes I encountered with this error in my workers (work()): concurrent.futures._base.CancelledError then the previous task runs with the new task after the change, simultaneously.

    – Benyamin Jafari
    Jan 6 at 7:52











  • @BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising a CancelledError is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.

    – user4815162342
    Jan 6 at 8:48
















1














The recursive call to main() and the new event loop adds unnecessary complication. Here is a simpler prototype to play with - it monitors an external source (the file system) and, when a file is created, it just stops the loop. main() contains a loop that takes care of both (re-)creating and cancelling the tasks:



import os, asyncio, random

async def monitor():
loop = asyncio.get_event_loop()
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
loop.stop()
await asyncio.sleep(1)

async def work(workid):
while True:
t = random.random()
print(workid, 'sleeping for', t)
await asyncio.sleep(t)

def main():
loop = asyncio.get_event_loop()
loop.create_task(monitor())
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
loop.run_forever()
for t in workers:
t.cancel()
offset += 3

if __name__ == '__main__':
main()


Another option would be to never even stop the event loop, but to simply trigger a reset event:



async def monitor(evt):
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
evt.set()
await asyncio.sleep(1)


In this design main() can be a coroutine:



async def main():
loop = asyncio.get_event_loop()
reset_evt = asyncio.Event()
loop.create_task(monitor(reset_evt))
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
await reset_evt.wait()
reset_evt.clear()
for t in workers:
t.cancel()
offset += 3

if __name__ == '__main__':
asyncio.run(main())
# or asyncio.get_event_loop().run_until_complete(main())


Note that in both variants canceling the tasks is implemented by await raising a CancelledError exception. The task must not catch all exceptions using try: ... except: ... and, if it does so, needs to re-raise the exception.






share|improve this answer


























  • Apparently works fine, Thanks again +1

    – Benyamin Jafari
    Jan 2 at 22:06











  • Is the second section in your answer compatible with Python 3.7?

    – Benyamin Jafari
    Jan 4 at 16:10








  • 1





    @BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace the asyncio.run line with the commented-out line below it.

    – user4815162342
    Jan 4 at 23:02











  • Sometimes I encountered with this error in my workers (work()): concurrent.futures._base.CancelledError then the previous task runs with the new task after the change, simultaneously.

    – Benyamin Jafari
    Jan 6 at 7:52











  • @BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising a CancelledError is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.

    – user4815162342
    Jan 6 at 8:48














1












1








1







The recursive call to main() and the new event loop adds unnecessary complication. Here is a simpler prototype to play with - it monitors an external source (the file system) and, when a file is created, it just stops the loop. main() contains a loop that takes care of both (re-)creating and cancelling the tasks:



import os, asyncio, random

async def monitor():
loop = asyncio.get_event_loop()
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
loop.stop()
await asyncio.sleep(1)

async def work(workid):
while True:
t = random.random()
print(workid, 'sleeping for', t)
await asyncio.sleep(t)

def main():
loop = asyncio.get_event_loop()
loop.create_task(monitor())
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
loop.run_forever()
for t in workers:
t.cancel()
offset += 3

if __name__ == '__main__':
main()


Another option would be to never even stop the event loop, but to simply trigger a reset event:



async def monitor(evt):
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
evt.set()
await asyncio.sleep(1)


In this design main() can be a coroutine:



async def main():
loop = asyncio.get_event_loop()
reset_evt = asyncio.Event()
loop.create_task(monitor(reset_evt))
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
await reset_evt.wait()
reset_evt.clear()
for t in workers:
t.cancel()
offset += 3

if __name__ == '__main__':
asyncio.run(main())
# or asyncio.get_event_loop().run_until_complete(main())


Note that in both variants canceling the tasks is implemented by await raising a CancelledError exception. The task must not catch all exceptions using try: ... except: ... and, if it does so, needs to re-raise the exception.






share|improve this answer















The recursive call to main() and the new event loop adds unnecessary complication. Here is a simpler prototype to play with - it monitors an external source (the file system) and, when a file is created, it just stops the loop. main() contains a loop that takes care of both (re-)creating and cancelling the tasks:



import os, asyncio, random

async def monitor():
loop = asyncio.get_event_loop()
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
loop.stop()
await asyncio.sleep(1)

async def work(workid):
while True:
t = random.random()
print(workid, 'sleeping for', t)
await asyncio.sleep(t)

def main():
loop = asyncio.get_event_loop()
loop.create_task(monitor())
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
loop.run_forever()
for t in workers:
t.cancel()
offset += 3

if __name__ == '__main__':
main()


Another option would be to never even stop the event loop, but to simply trigger a reset event:



async def monitor(evt):
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
evt.set()
await asyncio.sleep(1)


In this design main() can be a coroutine:



async def main():
loop = asyncio.get_event_loop()
reset_evt = asyncio.Event()
loop.create_task(monitor(reset_evt))
offset = 0
while True:
workers =
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
await reset_evt.wait()
reset_evt.clear()
for t in workers:
t.cancel()
offset += 3

if __name__ == '__main__':
asyncio.run(main())
# or asyncio.get_event_loop().run_until_complete(main())


Note that in both variants canceling the tasks is implemented by await raising a CancelledError exception. The task must not catch all exceptions using try: ... except: ... and, if it does so, needs to re-raise the exception.







share|improve this answer














share|improve this answer



share|improve this answer








edited Jan 6 at 21:04

























answered Jan 2 at 11:49









user4815162342user4815162342

63.4k594149




63.4k594149













  • Apparently works fine, Thanks again +1

    – Benyamin Jafari
    Jan 2 at 22:06











  • Is the second section in your answer compatible with Python 3.7?

    – Benyamin Jafari
    Jan 4 at 16:10








  • 1





    @BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace the asyncio.run line with the commented-out line below it.

    – user4815162342
    Jan 4 at 23:02











  • Sometimes I encountered with this error in my workers (work()): concurrent.futures._base.CancelledError then the previous task runs with the new task after the change, simultaneously.

    – Benyamin Jafari
    Jan 6 at 7:52











  • @BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising a CancelledError is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.

    – user4815162342
    Jan 6 at 8:48



















  • Apparently works fine, Thanks again +1

    – Benyamin Jafari
    Jan 2 at 22:06











  • Is the second section in your answer compatible with Python 3.7?

    – Benyamin Jafari
    Jan 4 at 16:10








  • 1





    @BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace the asyncio.run line with the commented-out line below it.

    – user4815162342
    Jan 4 at 23:02











  • Sometimes I encountered with this error in my workers (work()): concurrent.futures._base.CancelledError then the previous task runs with the new task after the change, simultaneously.

    – Benyamin Jafari
    Jan 6 at 7:52











  • @BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising a CancelledError is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.

    – user4815162342
    Jan 6 at 8:48

















Apparently works fine, Thanks again +1

– Benyamin Jafari
Jan 2 at 22:06





Apparently works fine, Thanks again +1

– Benyamin Jafari
Jan 2 at 22:06













Is the second section in your answer compatible with Python 3.7?

– Benyamin Jafari
Jan 4 at 16:10







Is the second section in your answer compatible with Python 3.7?

– Benyamin Jafari
Jan 4 at 16:10






1




1





@BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace the asyncio.run line with the commented-out line below it.

– user4815162342
Jan 4 at 23:02





@BenyaminJafari The second section is written specifically for Python 3.7 in mind. It can also work in Python 3.5/3.6, just replace the asyncio.run line with the commented-out line below it.

– user4815162342
Jan 4 at 23:02













Sometimes I encountered with this error in my workers (work()): concurrent.futures._base.CancelledError then the previous task runs with the new task after the change, simultaneously.

– Benyamin Jafari
Jan 6 at 7:52





Sometimes I encountered with this error in my workers (work()): concurrent.futures._base.CancelledError then the previous task runs with the new task after the change, simultaneously.

– Benyamin Jafari
Jan 6 at 7:52













@BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising a CancelledError is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.

– user4815162342
Jan 6 at 8:48





@BenyaminJafari What do you mean by "encountered the error"? Are you perhaps catching all exceptions? Raising a CancelledError is how asyncio terminates a task. If your code is catching that exception, it should immediately re-raise it.

– user4815162342
Jan 6 at 8:48




















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53998267%2fhow-to-reset-an-asyncio-eventloop-by-a-worker%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

MongoDB - Not Authorized To Execute Command

How to fix TextFormField cause rebuild widget in Flutter

in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith