@@ -154,6 +154,7 @@ def __init__(self, log_handler: logging.Handler, persistent_data):
154154 self .engines = dict ()
155155 self .threads = dict ()
156156 self .loops = dict ()
157+ self ._tasks = set ()
157158
158159 def set_status_for_item (self , status , item ):
159160 raise NotImplementedError
@@ -163,7 +164,7 @@ def start_engine(self, engine_item: Dict[str, str]):
163164 engine_name = engine_item ["engine_name" ]
164165 uod_filename = engine_item ["filename" ]
165166
166- async def start_engine (loop : asyncio .EventLoop ):
167+ async def run_engine (loop : asyncio .EventLoop ):
167168 """Condensed version of main_async from openpectus.engine.main"""
168169 # Imported modules are cached in sys.modules. This causes issues
169170 # because variables defined in a module are shared across all
@@ -261,27 +262,29 @@ async def start_engine(loop: asyncio.EventLoop):
261262 async def on_steady_state ():
262263 assert engine is not None
263264 engine .run ()
265+
264266 runner .first_steady_state_callback = on_steady_state
265267 self .engines [engine_name ] = (engine , runner )
266268 await runner .run ()
267269 file_handler .close ()
268270
269271 self .loops [engine_name ] = asyncio .new_event_loop ()
272+
273+ def run_engine_task ():
274+ async def async_task (loop ):
275+ await run_engine (loop )
276+ self .set_status_for_item ("Not running" , engine_item )
277+ self .loops [engine_name ].stop ()
278+ log .info (f"Finished running { engine_name } " )
279+
280+ self .loops [engine_name ].run_until_complete (async_task (self .loops [engine_name ]))
281+ self .loops [engine_name ].close ()
282+
270283 self .threads [engine_name ] = threading .Thread (
271284 name = engine_name ,
272- target = self . loops [ engine_name ]. run_forever ,
285+ target = run_engine_task ,
273286 daemon = True ,
274287 )
275-
276- async def _start_engine (loop ):
277- await start_engine (loop )
278- self .set_status_for_item ("Not running" , engine_item )
279- self .loops [engine_name ].stop ()
280-
281- asyncio .run_coroutine_threadsafe (
282- _start_engine (self .loops [engine_name ]),
283- self .loops [engine_name ]
284- )
285288 self .threads [engine_name ].start ()
286289
287290 def stop_engine (self , engine_item : Dict [str , str ]):
@@ -290,18 +293,20 @@ def stop_engine(self, engine_item: Dict[str, str]):
290293
291294 async def cancel ():
292295 """Mimics close_async in openpectus.engine.main"""
296+ log .info (f"Stopping { engine_name } " )
293297 if engine_name in self .engines :
294298 engine , runner = self .engines [engine_name ]
295299 engine .stop ()
296300 await runner .shutdown ()
297- log .info ("Stopped" , engine_item )
301+ log .info (f "Stopped { engine_name } " )
298302
299303 if self .loops [engine_name ].is_running ():
300- asyncio .run_coroutine_threadsafe (
304+ task = asyncio .run_coroutine_threadsafe (
301305 cancel (),
302306 self .loops [engine_name ]
303307 )
304- self .loops [engine_name ].stop ()
308+ self ._tasks .add (task )
309+ task .add_done_callback (self ._tasks .discard )
305310
306311 def validate_engine (self , engine_item : Dict [str , str ]):
307312 engine_name = engine_item ["engine_name" ]
0 commit comments