Skip to content

Commit 4a96b1c

Browse files
jan-janssenpyiron-runnerpre-commit-ci[bot]
authored
Feature: Cancel processes of cancelled futures (#903)
* Fix: Implement one _refresh_memory_dict() function * Add _cancel_processes() * Format black * mypy fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * more fixes * Feature: Cancel processes of cancelled futures * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * add more tests * fix * long running task * sleep * use context * use long running function * fix file count * cancel * everything as it should * do not cancel * not cancelled * no wait * sleep again * fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pyiron-runner <pyiron@mpie.de> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent aa0f602 commit 4a96b1c

1 file changed

Lines changed: 34 additions & 2 deletions

File tree

src/executorlib/task_scheduler/file/shared.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,18 +93,26 @@ def execute_tasks_h5(
9393
memory_dict = _refresh_memory_dict(
9494
memory_dict=memory_dict,
9595
cache_dir_dict=cache_dir_dict,
96+
process_dict=process_dict,
97+
terminate_function=terminate_function,
98+
pysqa_config_directory=pysqa_config_directory,
99+
backend=backend,
96100
)
97101
if not task_dict["cancel_futures"] and wait:
98102
_cancel_processes(
99-
terminate_function=terminate_function,
100103
process_dict=process_dict,
104+
terminate_function=terminate_function,
101105
pysqa_config_directory=pysqa_config_directory,
102106
backend=backend,
103107
)
104108
else:
105109
memory_dict = _refresh_memory_dict(
106110
memory_dict=memory_dict,
107111
cache_dir_dict=cache_dir_dict,
112+
process_dict=process_dict,
113+
terminate_function=terminate_function,
114+
pysqa_config_directory=pysqa_config_directory,
115+
backend=backend,
108116
)
109117
for value in memory_dict.values():
110118
if not value.done():
@@ -179,6 +187,10 @@ def execute_tasks_h5(
179187
memory_dict = _refresh_memory_dict(
180188
memory_dict=memory_dict,
181189
cache_dir_dict=cache_dir_dict,
190+
process_dict=process_dict,
191+
terminate_function=terminate_function,
192+
pysqa_config_directory=pysqa_config_directory,
193+
backend=backend,
182194
)
183195

184196

@@ -255,17 +267,37 @@ def _convert_args_and_kwargs(
255267
return task_args, task_kwargs, future_wait_key_lst
256268

257269

258-
def _refresh_memory_dict(memory_dict: dict, cache_dir_dict: dict) -> dict:
270+
def _refresh_memory_dict(
271+
memory_dict: dict,
272+
cache_dir_dict: dict,
273+
process_dict: dict,
274+
terminate_function: Optional[Callable] = None,
275+
pysqa_config_directory: Optional[str] = None,
276+
backend: Optional[str] = None,
277+
) -> dict:
259278
"""
260279
Refresh memory dictionary
261280
262281
Args:
263282
memory_dict (dict): dictionary with task keys and future objects
264283
cache_dir_dict (dict): dictionary with task keys and cache directories
284+
process_dict (dict): dictionary with task keys and process reference.
285+
terminate_function (callable): The function to terminate the tasks.
286+
pysqa_config_directory (str): path to the pysqa config directory (only for pysqa based backend).
287+
backend (str): name of the backend used to spawn tasks.
265288
266289
Returns:
267290
dict: Updated memory dictionary
268291
"""
292+
cancelled_lst = [
293+
key for key, value in memory_dict.items() if value.done() and value.cancelled()
294+
]
295+
_cancel_processes(
296+
process_dict={k: v for k, v in process_dict.items() if k in cancelled_lst},
297+
terminate_function=terminate_function,
298+
pysqa_config_directory=pysqa_config_directory,
299+
backend=backend,
300+
)
269301
return {
270302
key: _check_task_output(
271303
task_key=key,

0 commit comments

Comments
 (0)