Skip to content

Commit d450829

Browse files
authored
[Feature] Add refresh rate for ClusterExecutors (#919)
1 parent 6f3cc53 commit d450829

5 files changed

Lines changed: 21 additions & 1 deletion

File tree

src/executorlib/executor/flux.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,7 @@ def __init__(
415415
init_function=init_function,
416416
disable_dependencies=disable_dependencies,
417417
wait=wait,
418+
refresh_rate=refresh_rate,
418419
)
419420
)
420421
else:

src/executorlib/executor/single.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ def __init__(
356356
disable_dependencies=disable_dependencies,
357357
execute_function=execute_in_subprocess,
358358
wait=wait,
359+
refresh_rate=refresh_rate,
359360
)
360361
)
361362
else:

src/executorlib/executor/slurm.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ def __init__(
215215
init_function=init_function,
216216
disable_dependencies=disable_dependencies,
217217
wait=wait,
218+
refresh_rate=refresh_rate,
218219
)
219220
)
220221
else:

src/executorlib/task_scheduler/file/shared.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
import queue
44
from concurrent.futures import Future
5+
from time import sleep
56
from typing import Any, Callable, Optional
67

78
from executorlib.standalone.command import get_cache_execute_command
@@ -17,6 +18,7 @@ def __init__(self, file_name: str, selector: Optional[int | str] = None):
1718
1819
Args:
1920
file_name (str): The name of the file.
21+
selector (int | str, optional): The selector to select a specific part of the result. Defaults to None.
2022
2123
"""
2224
self._file_name = file_name
@@ -62,6 +64,7 @@ def execute_tasks_h5(
6264
disable_dependencies: bool = False,
6365
pmi_mode: Optional[str] = None,
6466
wait: bool = True,
67+
refresh_rate: float = 0.01,
6568
) -> None:
6669
"""
6770
Execute tasks stored in a queue using HDF5 files.
@@ -78,6 +81,7 @@ def execute_tasks_h5(
7881
disable_dependencies (boolean): Disable resolving future objects during the submission.
7982
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
8083
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
84+
refresh_rate (float): The rate at which to refresh the result. Defaults to 0.01.
8185
8286
Returns:
8387
None
@@ -101,6 +105,7 @@ def execute_tasks_h5(
101105
terminate_function=terminate_function,
102106
pysqa_config_directory=pysqa_config_directory,
103107
backend=backend,
108+
refresh_rate=refresh_rate,
104109
)
105110
if not task_dict["cancel_futures"] and wait:
106111
_cancel_processes(
@@ -117,6 +122,7 @@ def execute_tasks_h5(
117122
terminate_function=terminate_function,
118123
pysqa_config_directory=pysqa_config_directory,
119124
backend=backend,
125+
refresh_rate=refresh_rate,
120126
)
121127
for value in memory_dict.values():
122128
if not value.done():
@@ -198,6 +204,7 @@ def execute_tasks_h5(
198204
terminate_function=terminate_function,
199205
pysqa_config_directory=pysqa_config_directory,
200206
backend=backend,
207+
refresh_rate=refresh_rate,
201208
)
202209

203210

@@ -297,6 +304,7 @@ def _refresh_memory_dict(
297304
terminate_function: Optional[Callable] = None,
298305
pysqa_config_directory: Optional[str] = None,
299306
backend: Optional[str] = None,
307+
refresh_rate: float = 0.01,
300308
) -> dict:
301309
"""
302310
Refresh memory dictionary
@@ -308,6 +316,7 @@ def _refresh_memory_dict(
308316
terminate_function (callable): The function to terminate the tasks.
309317
pysqa_config_directory (str): path to the pysqa config directory (only for pysqa based backend).
310318
backend (str): name of the backend used to spawn tasks.
319+
refresh_rate (float): The rate at which to refresh the result. Defaults to 0.01.
311320
312321
Returns:
313322
dict: Updated memory dictionary
@@ -321,7 +330,7 @@ def _refresh_memory_dict(
321330
pysqa_config_directory=pysqa_config_directory,
322331
backend=backend,
323332
)
324-
return {
333+
memory_updated_dict = {
325334
key: _check_task_output(
326335
task_key=key,
327336
future_obj=value,
@@ -330,6 +339,9 @@ def _refresh_memory_dict(
330339
for key, value in memory_dict.items()
331340
if not value.done()
332341
}
342+
if len(memory_updated_dict) == len(memory_dict):
343+
sleep(refresh_rate)
344+
return memory_updated_dict
333345

334346

335347
def _cancel_processes(

src/executorlib/task_scheduler/file/task_scheduler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ def __init__(
3636
disable_dependencies: bool = False,
3737
pmi_mode: Optional[str] = None,
3838
wait: bool = True,
39+
refresh_rate: float = 0.01,
3940
):
4041
"""
4142
Initialize the FileExecutor.
@@ -52,6 +53,7 @@ def __init__(
5253
disable_dependencies (boolean): Disable resolving future objects during the submission.
5354
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
5455
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
56+
refresh_rate (float): The rate at which to refresh the result. Defaults to 0.01.
5557
"""
5658
super().__init__(max_cores=None)
5759
default_resource_dict = {
@@ -75,6 +77,7 @@ def __init__(
7577
"backend": backend,
7678
"disable_dependencies": disable_dependencies,
7779
"pmi_mode": pmi_mode,
80+
"refresh_rate": refresh_rate,
7881
"wait": wait,
7982
}
8083
self._set_process(
@@ -102,6 +105,7 @@ def create_file_executor(
102105
disable_dependencies: bool = False,
103106
execute_function: Callable = execute_with_pysqa,
104107
wait: bool = True,
108+
refresh_rate: float = 0.01,
105109
):
106110
if block_allocation:
107111
raise ValueError(
@@ -133,4 +137,5 @@ def create_file_executor(
133137
terminate_function=terminate_function,
134138
pmi_mode=pmi_mode,
135139
wait=wait,
140+
refresh_rate=refresh_rate,
136141
)

0 commit comments

Comments
 (0)