Skip to content

Commit e38a2d5

Browse files
[Fix] Provide restart limit during initialization (#938)
* [Fix] Provide restart limit during initialization * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 7eae4a2 commit e38a2d5

6 files changed

Lines changed: 51 additions & 0 deletions

File tree

src/executorlib/executor/flux.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
check_plot_dependency_graph,
1010
check_pmi,
1111
check_refresh_rate,
12+
check_restart_limit,
1213
check_wait_on_shutdown,
1314
validate_number_of_cores,
1415
)
@@ -70,6 +71,7 @@ class FluxJobExecutor(BaseExecutor):
7071
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
7172
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
7273
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
74+
restart_limit (int): The maximum number of restarting worker processes.
7375
openmpi_oversubscribe (bool): adds the `--oversubscribe` command flag (OpenMPI and SLURM) - default False
7476
7577
Examples:
@@ -113,6 +115,7 @@ def __init__(
113115
export_workflow_filename: Optional[str] = None,
114116
log_obj_size: bool = False,
115117
wait: bool = True,
118+
restart_limit: int = 0,
116119
openmpi_oversubscribe: bool = False,
117120
):
118121
"""
@@ -164,6 +167,7 @@ def __init__(
164167
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
165168
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
166169
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
170+
restart_limit (int): The maximum number of restarting worker processes.
167171
openmpi_oversubscribe (bool): adds the `--oversubscribe` command flag (OpenMPI and SLURM) - default False
168172
169173
"""
@@ -180,6 +184,9 @@ def __init__(
180184
resource_dict.update(
181185
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
182186
)
187+
check_restart_limit(
188+
restart_limit=restart_limit, block_allocation=block_allocation
189+
)
183190
if not disable_dependencies:
184191
super().__init__(
185192
executor=DependencyTaskScheduler(
@@ -197,6 +204,7 @@ def __init__(
197204
init_function=init_function,
198205
log_obj_size=log_obj_size,
199206
wait=wait,
207+
restart_limit=restart_limit,
200208
),
201209
max_cores=max_cores,
202210
refresh_rate=refresh_rate,
@@ -223,6 +231,7 @@ def __init__(
223231
init_function=init_function,
224232
log_obj_size=log_obj_size,
225233
wait=wait,
234+
restart_limit=restart_limit,
226235
)
227236
)
228237

@@ -464,6 +473,7 @@ def create_flux_executor(
464473
init_function: Optional[Callable] = None,
465474
log_obj_size: bool = False,
466475
wait: bool = True,
476+
restart_limit: int = 0,
467477
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
468478
"""
469479
Create a flux executor
@@ -504,6 +514,7 @@ def create_flux_executor(
504514
init_function (None): optional function to preset arguments for functions which are submitted later
505515
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
506516
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
517+
restart_limit (int): The maximum number of restarting worker processes.
507518
508519
Returns:
509520
InteractiveStepExecutor/ InteractiveExecutor
@@ -551,6 +562,7 @@ def create_flux_executor(
551562
max_workers=max_workers,
552563
executor_kwargs=resource_dict,
553564
spawner=FluxPythonSpawner,
565+
restart_limit=restart_limit,
554566
)
555567
else:
556568
return OneProcessTaskScheduler(

src/executorlib/executor/single.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
check_init_function,
88
check_plot_dependency_graph,
99
check_refresh_rate,
10+
check_restart_limit,
1011
check_wait_on_shutdown,
1112
validate_number_of_cores,
1213
)
@@ -61,6 +62,7 @@ class SingleNodeExecutor(BaseExecutor):
6162
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
6263
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
6364
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
65+
restart_limit (int): The maximum number of restarting worker processes.
6466
openmpi_oversubscribe (bool): adds the `--oversubscribe` command flag (OpenMPI and SLURM) - default False
6567
6668
Examples:
@@ -100,6 +102,7 @@ def __init__(
100102
export_workflow_filename: Optional[str] = None,
101103
log_obj_size: bool = False,
102104
wait: bool = True,
105+
restart_limit: int = 0,
103106
openmpi_oversubscribe: bool = False,
104107
):
105108
"""
@@ -146,6 +149,7 @@ def __init__(
146149
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
147150
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
148151
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
152+
restart_limit (int): The maximum number of restarting worker processes.
149153
openmpi_oversubscribe (bool): adds the `--oversubscribe` command flag (OpenMPI and SLURM) - default False
150154
151155
"""
@@ -162,6 +166,9 @@ def __init__(
162166
resource_dict.update(
163167
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
164168
)
169+
check_restart_limit(
170+
restart_limit=restart_limit, block_allocation=block_allocation
171+
)
165172
if not disable_dependencies:
166173
super().__init__(
167174
executor=DependencyTaskScheduler(
@@ -175,6 +182,7 @@ def __init__(
175182
init_function=init_function,
176183
log_obj_size=log_obj_size,
177184
wait=wait,
185+
restart_limit=restart_limit,
178186
),
179187
max_cores=max_cores,
180188
refresh_rate=refresh_rate,
@@ -197,6 +205,7 @@ def __init__(
197205
init_function=init_function,
198206
log_obj_size=log_obj_size,
199207
wait=wait,
208+
restart_limit=restart_limit,
200209
)
201210
)
202211

@@ -397,6 +406,7 @@ def create_single_node_executor(
397406
init_function: Optional[Callable] = None,
398407
log_obj_size: bool = False,
399408
wait: bool = True,
409+
restart_limit: int = 0,
400410
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
401411
"""
402412
Create a single node executor
@@ -432,6 +442,7 @@ def create_single_node_executor(
432442
init_function (None): optional function to preset arguments for functions which are submitted later
433443
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
434444
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
445+
restart_limit (int): The maximum number of restarting worker processes.
435446
436447
Returns:
437448
InteractiveStepExecutor/ InteractiveExecutor
@@ -466,6 +477,7 @@ def create_single_node_executor(
466477
),
467478
executor_kwargs=resource_dict,
468479
spawner=MpiExecSpawner,
480+
restart_limit=restart_limit,
469481
)
470482
else:
471483
return OneProcessTaskScheduler(

src/executorlib/executor/slurm.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
check_log_obj_size,
77
check_plot_dependency_graph,
88
check_refresh_rate,
9+
check_restart_limit,
910
check_wait_on_shutdown,
1011
validate_number_of_cores,
1112
)
@@ -288,6 +289,7 @@ class SlurmJobExecutor(BaseExecutor):
288289
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
289290
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
290291
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
292+
restart_limit (int): The maximum number of restarting worker processes.
291293
openmpi_oversubscribe (bool): adds the `--oversubscribe` command flag (OpenMPI and SLURM) - default False
292294
293295
Examples:
@@ -328,6 +330,7 @@ def __init__(
328330
export_workflow_filename: Optional[str] = None,
329331
log_obj_size: bool = False,
330332
wait: bool = True,
333+
restart_limit: int = 0,
331334
openmpi_oversubscribe: bool = False,
332335
):
333336
"""
@@ -378,6 +381,7 @@ def __init__(
378381
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
379382
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
380383
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
384+
restart_limit (int): The maximum number of restarting worker processes.
381385
openmpi_oversubscribe (bool): adds the `--oversubscribe` command flag (OpenMPI and SLURM) - default False
382386
383387
"""
@@ -394,6 +398,9 @@ def __init__(
394398
resource_dict.update(
395399
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
396400
)
401+
check_restart_limit(
402+
restart_limit=restart_limit, block_allocation=block_allocation
403+
)
397404
if not disable_dependencies:
398405
super().__init__(
399406
executor=DependencyTaskScheduler(
@@ -408,6 +415,7 @@ def __init__(
408415
init_function=init_function,
409416
log_obj_size=log_obj_size,
410417
wait=wait,
418+
restart_limit=restart_limit,
411419
),
412420
max_cores=max_cores,
413421
refresh_rate=refresh_rate,
@@ -431,6 +439,7 @@ def __init__(
431439
init_function=init_function,
432440
log_obj_size=log_obj_size,
433441
wait=wait,
442+
restart_limit=restart_limit,
434443
)
435444
)
436445

@@ -446,6 +455,7 @@ def create_slurm_executor(
446455
init_function: Optional[Callable] = None,
447456
log_obj_size: bool = False,
448457
wait: bool = True,
458+
restart_limit: int = 0,
449459
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
450460
"""
451461
Create a SLURM executor
@@ -487,6 +497,7 @@ def create_slurm_executor(
487497
init_function (None): optional function to preset arguments for functions which are submitted later
488498
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
489499
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
500+
restart_limit (int): The maximum number of restarting worker processes.
490501
491502
Returns:
492503
InteractiveStepExecutor/ InteractiveExecutor
@@ -517,6 +528,7 @@ def create_slurm_executor(
517528
max_workers=max_workers,
518529
executor_kwargs=resource_dict,
519530
spawner=SrunSpawner,
531+
restart_limit=restart_limit,
520532
)
521533
else:
522534
return OneProcessTaskScheduler(

src/executorlib/standalone/inputcheck.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,13 @@ def check_hostname_localhost(hostname_localhost: Optional[bool]) -> None:
158158
)
159159

160160

161+
def check_restart_limit(restart_limit: int, block_allocation: bool = True) -> None:
162+
if not block_allocation and restart_limit != 0:
163+
raise ValueError(
164+
"The option to specify a restart limit for worker processes is only available with block_allocation=True."
165+
)
166+
167+
161168
def check_pmi_mode(pmi_mode: Optional[str]) -> None:
162169
if pmi_mode is not None:
163170
raise ValueError(

src/executorlib/task_scheduler/interactive/blockallocation.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class BlockAllocationTaskScheduler(TaskSchedulerBase):
3838
max_workers (int): defines the number workers which can execute functions in parallel
3939
executor_kwargs (dict): keyword arguments for the executor
4040
spawner (BaseSpawner): interface class to initiate python processes
41+
restart_limit (int): The maximum number of restarting worker processes.
4142
4243
Examples:
4344
@@ -65,13 +66,15 @@ def __init__(
6566
max_workers: int = 1,
6667
executor_kwargs: Optional[dict] = None,
6768
spawner: type[BaseSpawner] = MpiExecSpawner,
69+
restart_limit: int = 0,
6870
):
6971
if executor_kwargs is None:
7072
executor_kwargs = {}
7173
super().__init__(max_cores=executor_kwargs.get("max_cores"))
7274
executor_kwargs["future_queue"] = self._future_queue
7375
executor_kwargs["spawner"] = spawner
7476
executor_kwargs["queue_join_on_shutdown"] = False
77+
executor_kwargs["restart_limit"] = restart_limit
7578
self._process_kwargs = executor_kwargs
7679
self._max_workers = max_workers
7780
self_id = random.getrandbits(128)

tests/unit/standalone/test_inputcheck.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
check_refresh_rate,
1515
check_resource_dict,
1616
check_resource_dict_is_empty,
17+
check_restart_limit,
1718
check_pmi_mode,
1819
check_max_workers_and_cores,
1920
check_hostname_localhost,
@@ -70,6 +71,10 @@ def test_check_pmi(self):
7071
with self.assertRaises(ValueError):
7172
check_pmi(backend="flux_allocation", pmi="test")
7273

74+
def test_check_restart_limit(self):
75+
with self.assertRaises(ValueError):
76+
check_restart_limit(restart_limit=1, block_allocation=False)
77+
7378
def test_check_nested_flux_executor(self):
7479
with self.assertRaises(ValueError):
7580
check_nested_flux_executor(nested_flux_executor=True)

0 commit comments

Comments
 (0)