33import queue
44import sys
55import time
6- from concurrent .futures import Future
6+ from asyncio .exceptions import CancelledError
7+ from concurrent .futures import Future , TimeoutError
8+ from threading import Thread
79from time import sleep
810from typing import Any , Callable , Optional , Union
911
1921)
2022from executorlib .standalone .interactive .spawner import BaseSpawner , MpiExecSpawner
2123from executorlib .standalone .serialize import serialize_funct_h5
22- from executorlib .standalone .thread import RaisingThread
2324
2425
2526class ExecutorBroker (ExecutorBase ):
@@ -88,7 +89,7 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
8889 self ._process = None
8990 self ._future_queue = None
9091
91- def _set_process (self , process : list [RaisingThread ]): # type: ignore
92+ def _set_process (self , process : list [Thread ]): # type: ignore
9293 """
9394 Set the process for the executor.
9495
@@ -148,7 +149,7 @@ def __init__(
148149 executor_kwargs ["queue_join_on_shutdown" ] = False
149150 self ._set_process (
150151 process = [
151- RaisingThread (
152+ Thread (
152153 target = execute_parallel_tasks ,
153154 kwargs = executor_kwargs ,
154155 )
@@ -204,7 +205,7 @@ def __init__(
204205 executor_kwargs ["max_cores" ] = max_cores
205206 executor_kwargs ["max_workers" ] = max_workers
206207 self ._set_process (
207- RaisingThread (
208+ Thread (
208209 target = execute_separate_tasks ,
209210 kwargs = executor_kwargs ,
210211 )
@@ -361,15 +362,19 @@ def execute_tasks_with_dependencies(
361362 task_dict is not None and "fn" in task_dict and "future" in task_dict
362363 ):
363364 future_lst , ready_flag = _get_future_objects_from_input (task_dict = task_dict )
364- if len (future_lst ) == 0 or ready_flag :
365- # No future objects are used in the input or all future objects are already done
366- task_dict ["args" ], task_dict ["kwargs" ] = _update_futures_in_input (
367- args = task_dict ["args" ], kwargs = task_dict ["kwargs" ]
368- )
369- executor_queue .put (task_dict )
370- else : # Otherwise add the function to the wait list
371- task_dict ["future_lst" ] = future_lst
372- wait_lst .append (task_dict )
365+ exception_lst = _get_exception_lst (future_lst = future_lst )
366+ if not _get_exception (future_obj = task_dict ["future" ]):
367+ if len (exception_lst ) > 0 :
368+ task_dict ["future" ].set_exception (exception_lst [0 ])
369+ elif len (future_lst ) == 0 or ready_flag :
370+ # No future objects are used in the input or all future objects are already done
371+ task_dict ["args" ], task_dict ["kwargs" ] = _update_futures_in_input (
372+ args = task_dict ["args" ], kwargs = task_dict ["kwargs" ]
373+ )
374+ executor_queue .put (task_dict )
375+ else : # Otherwise add the function to the wait list
376+ task_dict ["future_lst" ] = future_lst
377+ wait_lst .append (task_dict )
373378 future_queue .task_done ()
374379 elif len (wait_lst ) > 0 :
375380 number_waiting = len (wait_lst )
@@ -455,7 +460,10 @@ def _submit_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> l
455460 """
456461 wait_tmp_lst = []
457462 for task_wait_dict in wait_lst :
458- if all (future .done () for future in task_wait_dict ["future_lst" ]):
463+ exception_lst = _get_exception_lst (future_lst = task_wait_dict ["future_lst" ])
464+ if len (exception_lst ) > 0 :
465+ task_wait_dict ["future" ].set_exception (exception_lst [0 ])
466+ elif all (future .done () for future in task_wait_dict ["future_lst" ]):
459467 del task_wait_dict ["future_lst" ]
460468 task_wait_dict ["args" ], task_wait_dict ["kwargs" ] = _update_futures_in_input (
461469 args = task_wait_dict ["args" ], kwargs = task_wait_dict ["kwargs" ]
@@ -483,6 +491,8 @@ def get_result(arg: Union[list[Future], Future]) -> Any:
483491 return arg .result ()
484492 elif isinstance (arg , list ):
485493 return [get_result (arg = el ) for el in arg ]
494+ elif isinstance (arg , dict ):
495+ return {k : get_result (arg = v ) for k , v in arg .items ()}
486496 else :
487497 return arg
488498
@@ -510,6 +520,8 @@ def find_future_in_list(lst):
510520 future_lst .append (el )
511521 elif isinstance (el , list ):
512522 find_future_in_list (lst = el )
523+ elif isinstance (el , dict ):
524+ find_future_in_list (lst = el .values ())
513525
514526 find_future_in_list (lst = task_dict ["args" ])
515527 find_future_in_list (lst = task_dict ["kwargs" ].values ())
@@ -578,7 +590,7 @@ def _submit_function_to_separate_process(
578590 "init_function" : None ,
579591 }
580592 )
581- process = RaisingThread (
593+ process = Thread (
582594 target = execute_parallel_tasks ,
583595 kwargs = task_kwargs ,
584596 )
@@ -599,14 +611,13 @@ def _execute_task(
599611 future_queue (Queue): Queue for receiving new tasks.
600612 """
601613 f = task_dict .pop ("future" )
602- if f .set_running_or_notify_cancel ():
614+ if not f . done () and f .set_running_or_notify_cancel ():
603615 try :
604616 f .set_result (interface .send_and_receive_dict (input_dict = task_dict ))
605617 except Exception as thread_exception :
606618 interface .shutdown (wait = True )
607619 future_queue .task_done ()
608620 f .set_exception (exception = thread_exception )
609- raise thread_exception
610621 else :
611622 future_queue .task_done ()
612623
@@ -659,3 +670,15 @@ def _execute_task_with_cache(
659670 future = task_dict ["future" ]
660671 future .set_result (result )
661672 future_queue .task_done ()
673+
674+
675+ def _get_exception_lst (future_lst : list [Future ]) -> list :
676+ return [f .exception () for f in future_lst if _get_exception (future_obj = f )]
677+
678+
679+ def _get_exception (future_obj : Future ) -> bool :
680+ try :
681+ excp = future_obj .exception (timeout = 10 ** - 10 )
682+ return excp is not None and not isinstance (excp , CancelledError )
683+ except TimeoutError :
684+ return False
0 commit comments