Skip to content

Commit 2a94f49

Browse files
[Feature] Implement run time limits (#930)
* [Feature] Implement run time limits * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * extend tests * another fix * Test flux job is reaching the run time limit * Fix error import * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add to __all__ * fix broken import * fix run time pysqa --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent f2275f4 commit 2a94f49

11 files changed

Lines changed: 63 additions & 2 deletions

File tree

src/executorlib/api.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from executorlib.executor.single import TestClusterExecutor
99
from executorlib.standalone.command import get_command_path
1010
from executorlib.standalone.interactive.communication import (
11+
ExecutorlibSocketError,
1112
SocketInterface,
1213
interface_bootup,
1314
interface_connect,
@@ -32,4 +33,5 @@
3233
"MpiExecSpawner",
3334
"SocketInterface",
3435
"SubprocessSpawner",
36+
"ExecutorlibSocketError",
3537
]

src/executorlib/executor/flux.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class FluxJobExecutor(BaseExecutor):
4545
- error_log_file (str): Name of the error log file to use for storing exceptions raised
4646
by the Python functions submitted to the Executor.
4747
- restart_limit (int): The maximum number of restarting worker processes. Default: 0
48+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
4849
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
4950
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
5051
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
@@ -136,6 +137,7 @@ def __init__(
136137
compute notes. Defaults to False.
137138
- error_log_file (str): Name of the error log file to use for storing exceptions
138139
raised by the Python functions submitted to the Executor.
140+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
139141
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
140142
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
141143
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
@@ -246,6 +248,7 @@ class FluxClusterExecutor(BaseExecutor):
246248
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
247249
- error_log_file (str): Name of the error log file to use for storing exceptions raised
248250
by the Python functions submitted to the Executor.
251+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
249252
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
250253
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
251254
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
@@ -333,6 +336,7 @@ def __init__(
333336
only)
334337
- error_log_file (str): Name of the error log file to use for storing exceptions
335338
raised by the Python functions submitted to the Executor.
339+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
336340
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
337341
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
338342
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
@@ -364,6 +368,7 @@ def __init__(
364368
"cwd": None,
365369
"openmpi_oversubscribe": False,
366370
"slurm_cmd_args": [],
371+
"run_time_limit": None,
367372
}
368373
if resource_dict is None:
369374
resource_dict = {}
@@ -478,6 +483,7 @@ def create_flux_executor(
478483
compute notes. Defaults to False.
479484
- error_log_file (str): Name of the error log file to use for storing exceptions raised
480485
by the Python functions submitted to the Executor.
486+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
481487
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
482488
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
483489
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.

src/executorlib/executor/single.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class SingleNodeExecutor(BaseExecutor):
4242
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
4343
- error_log_file (str): Name of the error log file to use for storing exceptions raised
4444
by the Python functions submitted to the Executor.
45+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
4546
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
4647
context of an HPC cluster this essential to be able to communicate to an
4748
Executor running on a different compute node within the same allocation. And
@@ -126,6 +127,7 @@ def __init__(
126127
- error_log_file (str): Name of the error log file to use for storing exceptions
127128
raised by the Python functions submitted to the Executor.
128129
- restart_limit (int): The maximum number of restarting worker processes. Default: 0
130+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
129131
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
130132
context of an HPC cluster this essential to be able to communicate to an
131133
Executor running on a different compute node within the same allocation. And
@@ -219,6 +221,7 @@ class TestClusterExecutor(BaseExecutor):
219221
- cwd (str/None): current working directory where the parallel python task is executed
220222
- error_log_file (str): Name of the error log file to use for storing exceptions raised
221223
by the Python functions submitted to the Executor.
224+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
222225
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
223226
context of an HPC cluster this essential to be able to communicate to an
224227
Executor running on a different compute node within the same allocation. And
@@ -296,6 +299,7 @@ def __init__(
296299
- cwd (str/None): current working directory where the parallel python task is executed
297300
- error_log_file (str): Name of the error log file to use for storing exceptions
298301
raised by the Python functions submitted to the Executor.
302+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
299303
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
300304
context of an HPC cluster this essential to be able to communicate to an
301305
Executor running on a different compute node within the same allocation. And

src/executorlib/executor/slurm.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class SlurmClusterExecutor(BaseExecutor):
4545
- error_log_file (str): Name of the error log file to use for storing exceptions raised
4646
by the Python functions submitted to the Executor.
4747
- restart_limit (int): The maximum number of restarting worker processes. Default: 0
48+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
4849
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
4950
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
5051
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
@@ -132,6 +133,7 @@ def __init__(
132133
only)
133134
- error_log_file (str): Name of the error log file to use for storing exceptions
134135
raised by the Python functions submitted to the Executor.
136+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
135137
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
136138
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
137139
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
@@ -267,6 +269,7 @@ class SlurmJobExecutor(BaseExecutor):
267269
compute notes. Defaults to False.
268270
- error_log_file (str): Name of the error log file to use for storing exceptions raised
269271
by the Python functions submitted to the Executor.
272+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
270273
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
271274
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
272275
context of an HPC cluster this essential to be able to communicate to an
@@ -356,6 +359,7 @@ def __init__(
356359
compute notes. Defaults to False.
357360
- error_log_file (str): Name of the error log file to use for storing exceptions
358361
raised by the Python functions submitted to the Executor.
362+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
359363
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
360364
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
361365
context of an HPC cluster this essential to be able to communicate to an
@@ -469,6 +473,7 @@ def create_slurm_executor(
469473
compute notes. Defaults to False.
470474
- error_log_file (str): Name of the error log file to use for storing exceptions raised
471475
by the Python functions submitted to the Executor.
476+
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
472477
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
473478
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
474479
context of an HPC cluster this essential to be able to communicate to an

src/executorlib/standalone/command.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ def generate_slurm_command(
126126
openmpi_oversubscribe: bool = False,
127127
slurm_cmd_args: Optional[list[str]] = None,
128128
pmi_mode: Optional[str] = None,
129+
run_time_limit: Optional[int] = None,
129130
) -> list[str]:
130131
"""
131132
Generate the command list for the SLURM interface.
@@ -140,6 +141,7 @@ def generate_slurm_command(
140141
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
141142
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
142143
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
144+
run_time_limit (int): The maximum runtime in seconds for each task. Default: None
143145
144146
Returns:
145147
list[str]: The generated command list.
@@ -159,6 +161,8 @@ def generate_slurm_command(
159161
command_prepend_lst += ["--exact"]
160162
if openmpi_oversubscribe:
161163
command_prepend_lst += ["--oversubscribe"]
164+
if run_time_limit is not None:
165+
command_prepend_lst += ["--time=" + str(run_time_limit // 60 + 1)]
162166
if slurm_cmd_args is not None and len(slurm_cmd_args) > 0:
163167
command_prepend_lst += slurm_cmd_args
164168
return command_prepend_lst

src/executorlib/task_scheduler/file/spawner_pysqa.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def execute_with_pysqa(
6969
"command": " ".join(command),
7070
"dependency_list": [str(qid) for qid in task_dependent_lst],
7171
"working_directory": os.path.abspath(cwd),
72+
"run_time_max": resource_dict.get("run_time_limit"),
7273
}
7374
if "cwd" in resource_dict:
7475
del resource_dict["cwd"]

src/executorlib/task_scheduler/interactive/spawner_flux.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class FluxPythonSpawner(BaseSpawner):
4444
flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
4545
flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False.
4646
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
47+
run_time_limit (int): The maximum runtime in seconds for each task. Default: None
4748
"""
4849

4950
def __init__(
@@ -61,6 +62,7 @@ def __init__(
6162
flux_executor: Optional[flux.job.FluxExecutor] = None,
6263
flux_executor_nesting: bool = False,
6364
flux_log_files: bool = False,
65+
run_time_limit: Optional[int] = None,
6466
):
6567
super().__init__(
6668
cwd=cwd,
@@ -78,6 +80,7 @@ def __init__(
7880
self._flux_log_files = flux_log_files
7981
self._priority = priority
8082
self._future = None
83+
self._run_time_limit = run_time_limit
8184

8285
def bootup(
8386
self,
@@ -128,6 +131,8 @@ def bootup(
128131
if self._cwd is not None:
129132
jobspec.cwd = self._cwd
130133
os.makedirs(self._cwd, exist_ok=True)
134+
if self._run_time_limit is not None:
135+
jobspec.duration = self._run_time_limit
131136
file_prefix = "flux_" + str(self._worker_id)
132137
if self._flux_log_files and self._cwd is not None:
133138
jobspec.stderr = os.path.join(self._cwd, file_prefix + ".err")

src/executorlib/task_scheduler/interactive/spawner_pysqa.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(
3131
pmi_mode: Optional[str] = None,
3232
config_directory: Optional[str] = None,
3333
backend: Optional[str] = None,
34+
run_time_limit: Optional[int] = None,
3435
**kwargs,
3536
):
3637
"""
@@ -50,6 +51,7 @@ def __init__(
5051
pmi_mode (str, optional): PMI interface to use (OpenMPI v5 requires pmix) default is None
5152
config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
5253
backend (str): name of the backend used to spawn tasks.
54+
run_time_limit (int): The maximum runtime in seconds for each task. Default: None
5355
"""
5456
super().__init__(
5557
cwd=cwd,
@@ -68,6 +70,7 @@ def __init__(
6870
self._pysqa_submission_kwargs = kwargs
6971
self._process: Optional[int] = None
7072
self._queue_adapter: Optional[QueueAdapter] = None
73+
self._run_time_limit = run_time_limit
7174

7275
def bootup(
7376
self,
@@ -191,6 +194,7 @@ def _start_process_helper(
191194
command=" ".join(self.generate_command(command_lst=command_lst)),
192195
working_directory=working_directory,
193196
cores=int(self._cores * self._threads_per_core),
197+
run_time_max=self._run_time_limit,
194198
**self._pysqa_submission_kwargs,
195199
)
196200

src/executorlib/task_scheduler/interactive/spawner_slurm.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def __init__(
3232
openmpi_oversubscribe: bool = False,
3333
slurm_cmd_args: Optional[list[str]] = None,
3434
pmi_mode: Optional[str] = None,
35+
run_time_limit: Optional[int] = None,
3536
):
3637
"""
3738
Srun interface implementation.
@@ -47,6 +48,7 @@ def __init__(
4748
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
4849
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
4950
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
51+
run_time_limit (int): The maximum runtime in seconds for each task. Default: None
5052
"""
5153
super().__init__(
5254
cwd=cwd,
@@ -60,6 +62,7 @@ def __init__(
6062
self._num_nodes = num_nodes
6163
self._exclusive = exclusive
6264
self._pmi_mode = pmi_mode
65+
self._run_time_limit = run_time_limit
6366

6467
def generate_command(self, command_lst: list[str]) -> list[str]:
6568
"""
@@ -81,6 +84,7 @@ def generate_command(self, command_lst: list[str]) -> list[str]:
8184
openmpi_oversubscribe=self._openmpi_oversubscribe,
8285
slurm_cmd_args=self._slurm_cmd_args,
8386
pmi_mode=self._pmi_mode,
87+
run_time_limit=self._run_time_limit,
8488
)
8589
return super().generate_command(
8690
command_lst=command_prepend_lst + command_lst,

tests/unit/executor/test_flux_job.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import os
22
import unittest
3+
from time import sleep
34

45
import numpy as np
56

67
from executorlib import FluxJobExecutor
8+
from executorlib.api import ExecutorlibSocketError
79

810

911
try:
@@ -20,6 +22,11 @@ def calc(i):
2022
return i
2123

2224

25+
def delayed_calc(i):
26+
sleep(2)
27+
return i
28+
29+
2330
def mpi_funct(i):
2431
from mpi4py import MPI
2532

@@ -110,6 +117,24 @@ def test_single_task(self):
110117
[[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]],
111118
)
112119

120+
def test_run_time_limit(self):
121+
with FluxJobExecutor(
122+
max_cores=1,
123+
resource_dict={"cores": 1},
124+
flux_executor=self.executor,
125+
block_allocation=False,
126+
pmi_mode=pmi,
127+
) as p:
128+
f1 = p.submit(delayed_calc, 1, resource_dict={"run_time_limit": 1})
129+
f2 = p.submit(delayed_calc, 2, resource_dict={"run_time_limit": 5})
130+
self.assertFalse(f1.done())
131+
self.assertFalse(f2.done())
132+
self.assertEqual(f2.result(), 2)
133+
self.assertTrue(f1.done())
134+
self.assertTrue(f2.done())
135+
with self.assertRaises(ExecutorlibSocketError):
136+
f1.result()
137+
113138
def test_output_files_cwd(self):
114139
dirname = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
115140
os.makedirs(dirname, exist_ok=True)

0 commit comments

Comments
 (0)