Skip to content

Commit 56861b1

Browse files
authored
Merge branch 'main' into Py314u
2 parents 2ce0c12 + 7e9bf9b commit 56861b1

18 files changed

Lines changed: 523 additions & 70 deletions

File tree

src/executorlib/__init__.py

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
pandas.DataFrame.
1313
"""
1414

15+
from concurrent.futures import Future
1516
from typing import Optional
1617

1718
import executorlib._version
@@ -43,30 +44,82 @@ def get_cache_data(cache_directory: str) -> list[dict]:
4344
return get_cache_data(cache_directory=cache_directory)
4445

4546

47+
def get_future_from_cache(
48+
cache_directory: str,
49+
cache_key: str,
50+
) -> Future:
51+
"""
52+
Reload future from HDF5 file in cache directory with the given cache key. The function checks if the output file
53+
exists, if not it checks for the input file. If neither of them exist, it raises a FileNotFoundError. If the output
54+
file exists, it loads the output and sets it as the result of the future. If only the input file exists, it checks
55+
if the execution is finished and if there was an error. If there was no error, it sets the output as the result of
56+
the future, otherwise it raises the error.
57+
58+
Args:
59+
cache_directory (str): The directory to store cache files.
60+
cache_key (str): The key of the cache file to be reloaded.
61+
62+
Returns:
63+
Future: Future object containing the result of the execution of the python function.
64+
"""
65+
from executorlib.standalone.hdf import get_future_from_cache
66+
67+
return get_future_from_cache(
68+
cache_directory=cache_directory,
69+
cache_key=cache_key,
70+
)
71+
72+
4673
def terminate_tasks_in_cache(
4774
cache_directory: str,
48-
config_directory: Optional[str] = None,
75+
pysqa_config_directory: Optional[str] = None,
4976
backend: Optional[str] = None,
5077
):
5178
"""
5279
Delete all jobs stored in the cache directory from the queuing system
5380
5481
Args:
5582
cache_directory (str): The directory to store cache files.
56-
config_directory (str, optional): path to the config directory.
83+
pysqa_config_directory (str, optional): path to the pysqa config directory.
5784
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
5885
"""
5986
from executorlib.task_scheduler.file.spawner_pysqa import terminate_tasks_in_cache
6087

6188
return terminate_tasks_in_cache(
6289
cache_directory=cache_directory,
63-
config_directory=config_directory,
90+
pysqa_config_directory=pysqa_config_directory,
91+
backend=backend,
92+
)
93+
94+
95+
def terminate_task_in_cache(
96+
cache_directory: str,
97+
cache_key: str,
98+
pysqa_config_directory: Optional[str] = None,
99+
backend: Optional[str] = None,
100+
):
101+
"""
102+
Delete a specific job stored in the cache directory from the queuing system
103+
104+
Args:
105+
cache_directory (str): The directory to store cache files.
106+
cache_key (str): The key of the cache file to be deleted.
107+
pysqa_config_directory (str, optional): path to the pysqa config directory.
108+
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
109+
"""
110+
from executorlib.task_scheduler.file.spawner_pysqa import terminate_task_in_cache
111+
112+
return terminate_task_in_cache(
113+
cache_directory=cache_directory,
114+
cache_key=cache_key,
115+
pysqa_config_directory=pysqa_config_directory,
64116
backend=backend,
65117
)
66118

67119

68120
__all__: list[str] = [
69121
"get_cache_data",
122+
"get_future_from_cache",
70123
"get_item_from_future",
71124
"split_future",
72125
"terminate_tasks_in_cache",

src/executorlib/backend/cache_parallel.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ def main() -> None:
3939

4040
time_start = time.time()
4141
apply_dict = {}
42-
if mpi_rank_zero:
43-
apply_dict = backend_load_file(file_name=file_name)
44-
apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0)
45-
output = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
4642
try:
43+
if mpi_rank_zero:
44+
apply_dict = backend_load_file(file_name=file_name)
45+
apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0)
46+
output = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
4747
result = (
4848
MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output
4949
)

src/executorlib/executor/flux.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,7 @@ def __init__(
415415
init_function=init_function,
416416
disable_dependencies=disable_dependencies,
417417
wait=wait,
418+
refresh_rate=refresh_rate,
418419
)
419420
)
420421
else:

src/executorlib/executor/single.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ def __init__(
356356
disable_dependencies=disable_dependencies,
357357
execute_function=execute_in_subprocess,
358358
wait=wait,
359+
refresh_rate=refresh_rate,
359360
)
360361
)
361362
else:

src/executorlib/executor/slurm.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ def __init__(
215215
init_function=init_function,
216216
disable_dependencies=disable_dependencies,
217217
wait=wait,
218+
refresh_rate=refresh_rate,
218219
)
219220
)
220221
else:

src/executorlib/standalone/hdf.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from concurrent.futures import Future
23
from typing import Any, Optional
34

45
import cloudpickle
@@ -161,6 +162,43 @@ def get_cache_files(cache_directory: str) -> list[str]:
161162
return file_lst
162163

163164

165+
def get_future_from_cache(
166+
cache_directory: str,
167+
cache_key: str,
168+
) -> Future:
169+
"""
170+
Reload future from HDF5 file in cache directory with the given cache key. The function checks if the output file
171+
exists, if not it checks for the input file. If neither of them exist, it raises a FileNotFoundError. If the output
172+
file exists, it loads the output and sets it as the result of the future. If only the input file exists, it checks
173+
if the execution is finished and if there was an error. If there was no error, it sets the output as the result of
174+
the future, otherwise it raises the error.
175+
176+
Args:
177+
cache_directory (str): The directory to store cache files.
178+
cache_key (str): The key of the cache file to be reloaded.
179+
180+
Returns:
181+
Future: Future object containing the result of the execution of the python function.
182+
"""
183+
file_name_in = os.path.join(cache_directory, cache_key + "_i.h5")
184+
file_name_out = os.path.join(cache_directory, cache_key + "_o.h5")
185+
future: Future = Future()
186+
if os.path.exists(file_name_out):
187+
file_name = file_name_out
188+
elif os.path.exists(file_name_in):
189+
file_name = file_name_in
190+
else:
191+
raise FileNotFoundError(
192+
f"Neither input nor output file for cache key {cache_key} found in cache directory {cache_directory}."
193+
)
194+
exec_flag, no_error_flag, result = get_output(file_name=file_name)
195+
if exec_flag and no_error_flag:
196+
future.set_result(result)
197+
elif exec_flag:
198+
raise result
199+
return future
200+
201+
164202
def _get_content_of_file(file_name: str) -> dict:
165203
"""
166204
Get content of an HDF5 file

src/executorlib/task_scheduler/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
201201
self._future_queue.put(
202202
{"shutdown": True, "wait": wait, "cancel_futures": cancel_futures}
203203
)
204-
if wait and isinstance(self._process, Thread):
204+
if isinstance(self._process, Thread):
205205
self._process.join()
206206
self._future_queue.join()
207207
self._process = None

src/executorlib/task_scheduler/file/backend.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ def backend_execute_task_in_file(file_name: str) -> None:
6868
Returns:
6969
None
7070
"""
71-
apply_dict = backend_load_file(file_name=file_name)
72-
time_start = time.time()
7371
try:
72+
apply_dict = backend_load_file(file_name=file_name)
73+
time_start = time.time()
7474
result = {
7575
"result": apply_dict["fn"].__call__(
7676
*apply_dict["args"], **apply_dict["kwargs"]

0 commit comments

Comments
 (0)