Skip to content

Commit 813d6b5

Browse files
[Fix] Rename resource_dict to executor_kwargs (#936)
* [Fix] Rename resource_dict to executor_kwargs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fixes * fix test * fix execute_tasks_h5() function * fix tests * more fixes --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent e38a2d5 commit 813d6b5

10 files changed

Lines changed: 113 additions & 111 deletions

File tree

src/executorlib/executor/flux.py

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ def __init__(
194194
max_workers=max_workers,
195195
cache_directory=cache_directory,
196196
max_cores=max_cores,
197-
resource_dict=resource_dict,
197+
executor_kwargs=resource_dict,
198198
pmi_mode=pmi_mode,
199199
flux_executor=flux_executor,
200200
flux_executor_nesting=flux_executor_nesting,
@@ -221,7 +221,7 @@ def __init__(
221221
max_workers=max_workers,
222222
cache_directory=cache_directory,
223223
max_cores=max_cores,
224-
resource_dict=resource_dict,
224+
executor_kwargs=resource_dict,
225225
pmi_mode=pmi_mode,
226226
flux_executor=flux_executor,
227227
flux_executor_nesting=flux_executor_nesting,
@@ -404,7 +404,7 @@ def __init__(
404404
pmi_mode=pmi_mode,
405405
init_function=init_function,
406406
max_workers=max_workers,
407-
resource_dict=resource_dict,
407+
executor_kwargs=resource_dict,
408408
pysqa_config_directory=pysqa_config_directory,
409409
backend="flux",
410410
)
@@ -420,7 +420,7 @@ def __init__(
420420
backend="flux",
421421
max_cores=max_cores,
422422
cache_directory=cache_directory,
423-
resource_dict=resource_dict,
423+
executor_kwargs=resource_dict,
424424
flux_executor=None,
425425
pmi_mode=pmi_mode,
426426
flux_executor_nesting=False,
@@ -441,7 +441,7 @@ def __init__(
441441
max_workers=max_workers,
442442
cache_directory=cache_directory,
443443
max_cores=max_cores,
444-
resource_dict=resource_dict,
444+
executor_kwargs=resource_dict,
445445
pmi_mode=None,
446446
flux_executor=None,
447447
flux_executor_nesting=False,
@@ -463,7 +463,7 @@ def create_flux_executor(
463463
max_workers: Optional[int] = None,
464464
max_cores: Optional[int] = None,
465465
cache_directory: Optional[str] = None,
466-
resource_dict: Optional[dict] = None,
466+
executor_kwargs: Optional[dict] = None,
467467
pmi_mode: Optional[str] = None,
468468
flux_executor=None,
469469
flux_executor_nesting: bool = False,
@@ -484,7 +484,7 @@ def create_flux_executor(
484484
max_cores is recommended, as computers have a limited number of compute cores.
485485
max_cores (int): defines the number cores which can be used in parallel
486486
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
487-
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
487+
executor_kwargs (dict): A dictionary of arguments required by the executor. With the following keys:
488488
- cores (int): number of MPI cores to be used for each function call
489489
- threads_per_core (int): number of OpenMP threads to be used for each function call
490490
- gpus_per_core (int): number of GPUs per worker - defaults to 0
@@ -524,29 +524,31 @@ def create_flux_executor(
524524
validate_max_workers,
525525
)
526526

527-
if resource_dict is None:
528-
resource_dict = {}
529-
cores_per_worker = resource_dict.get("cores", 1)
530-
resource_dict["cache_directory"] = cache_directory
531-
resource_dict["hostname_localhost"] = hostname_localhost
532-
resource_dict["log_obj_size"] = log_obj_size
527+
if executor_kwargs is None:
528+
executor_kwargs = {}
529+
cores_per_worker = executor_kwargs.get("cores", 1)
530+
executor_kwargs["cache_directory"] = cache_directory
531+
executor_kwargs["hostname_localhost"] = hostname_localhost
532+
executor_kwargs["log_obj_size"] = log_obj_size
533533
check_init_function(block_allocation=block_allocation, init_function=init_function)
534534
check_pmi(backend="flux_allocation", pmi=pmi_mode)
535-
check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False))
535+
check_oversubscribe(
536+
oversubscribe=executor_kwargs.get("openmpi_oversubscribe", False)
537+
)
536538
check_command_line_argument_lst(
537-
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
539+
command_line_argument_lst=executor_kwargs.get("slurm_cmd_args", [])
538540
)
539541
check_wait_on_shutdown(wait_on_shutdown=wait)
540-
if "openmpi_oversubscribe" in resource_dict:
541-
del resource_dict["openmpi_oversubscribe"]
542-
if "slurm_cmd_args" in resource_dict:
543-
del resource_dict["slurm_cmd_args"]
544-
resource_dict["pmi_mode"] = pmi_mode
545-
resource_dict["flux_executor"] = flux_executor
546-
resource_dict["flux_executor_nesting"] = flux_executor_nesting
547-
resource_dict["flux_log_files"] = flux_log_files
542+
if "openmpi_oversubscribe" in executor_kwargs:
543+
del executor_kwargs["openmpi_oversubscribe"]
544+
if "slurm_cmd_args" in executor_kwargs:
545+
del executor_kwargs["slurm_cmd_args"]
546+
executor_kwargs["pmi_mode"] = pmi_mode
547+
executor_kwargs["flux_executor"] = flux_executor
548+
executor_kwargs["flux_executor_nesting"] = flux_executor_nesting
549+
executor_kwargs["flux_log_files"] = flux_log_files
548550
if block_allocation:
549-
resource_dict["init_function"] = init_function
551+
executor_kwargs["init_function"] = init_function
550552
max_workers = validate_number_of_cores(
551553
max_cores=max_cores,
552554
max_workers=max_workers,
@@ -556,18 +558,18 @@ def create_flux_executor(
556558
validate_max_workers(
557559
max_workers=max_workers,
558560
cores=cores_per_worker,
559-
threads_per_core=resource_dict.get("threads_per_core", 1),
561+
threads_per_core=executor_kwargs.get("threads_per_core", 1),
560562
)
561563
return BlockAllocationTaskScheduler(
562564
max_workers=max_workers,
563-
executor_kwargs=resource_dict,
565+
executor_kwargs=executor_kwargs,
564566
spawner=FluxPythonSpawner,
565567
restart_limit=restart_limit,
566568
)
567569
else:
568570
return OneProcessTaskScheduler(
569571
max_cores=max_cores,
570572
max_workers=max_workers,
571-
executor_kwargs=resource_dict,
573+
executor_kwargs=executor_kwargs,
572574
spawner=FluxPythonSpawner,
573575
)

src/executorlib/executor/single.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def __init__(
176176
max_workers=max_workers,
177177
cache_directory=cache_directory,
178178
max_cores=max_cores,
179-
resource_dict=resource_dict,
179+
executor_kwargs=resource_dict,
180180
hostname_localhost=hostname_localhost,
181181
block_allocation=block_allocation,
182182
init_function=init_function,
@@ -199,7 +199,7 @@ def __init__(
199199
max_workers=max_workers,
200200
cache_directory=cache_directory,
201201
max_cores=max_cores,
202-
resource_dict=resource_dict,
202+
executor_kwargs=resource_dict,
203203
hostname_localhost=hostname_localhost,
204204
block_allocation=block_allocation,
205205
init_function=init_function,
@@ -359,7 +359,7 @@ def __init__(
359359
backend=None,
360360
max_cores=max_cores,
361361
cache_directory=cache_directory,
362-
resource_dict=resource_dict,
362+
executor_kwargs=resource_dict,
363363
flux_executor=None,
364364
pmi_mode=None,
365365
flux_executor_nesting=False,
@@ -381,7 +381,7 @@ def __init__(
381381
max_workers=max_workers,
382382
cache_directory=cache_directory,
383383
max_cores=max_cores,
384-
resource_dict=resource_dict,
384+
executor_kwargs=resource_dict,
385385
hostname_localhost=hostname_localhost,
386386
block_allocation=block_allocation,
387387
init_function=init_function,
@@ -400,7 +400,7 @@ def create_single_node_executor(
400400
max_workers: Optional[int] = None,
401401
max_cores: Optional[int] = None,
402402
cache_directory: Optional[str] = None,
403-
resource_dict: Optional[dict] = None,
403+
executor_kwargs: Optional[dict] = None,
404404
hostname_localhost: Optional[bool] = None,
405405
block_allocation: bool = False,
406406
init_function: Optional[Callable] = None,
@@ -417,7 +417,7 @@ def create_single_node_executor(
417417
max_cores is recommended, as computers have a limited number of compute cores.
418418
max_cores (int): defines the number cores which can be used in parallel
419419
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
420-
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
420+
executor_kwargs (dict): A dictionary of arguments required by the executor. With the following keys:
421421
- cores (int): number of MPI cores to be used for each function call
422422
- threads_per_core (int): number of OpenMP threads to be used for each function call
423423
- gpus_per_core (int): number of GPUs per worker - defaults to 0
@@ -447,42 +447,42 @@ def create_single_node_executor(
447447
Returns:
448448
InteractiveStepExecutor/ InteractiveExecutor
449449
"""
450-
if resource_dict is None:
451-
resource_dict = {}
452-
cores_per_worker = resource_dict.get("cores", 1)
453-
resource_dict["cache_directory"] = cache_directory
454-
resource_dict["hostname_localhost"] = hostname_localhost
455-
resource_dict["log_obj_size"] = log_obj_size
450+
if executor_kwargs is None:
451+
executor_kwargs = {}
452+
cores_per_worker = executor_kwargs.get("cores", 1)
453+
executor_kwargs["cache_directory"] = cache_directory
454+
executor_kwargs["hostname_localhost"] = hostname_localhost
455+
executor_kwargs["log_obj_size"] = log_obj_size
456456

457457
check_init_function(block_allocation=block_allocation, init_function=init_function)
458-
check_gpus_per_worker(gpus_per_worker=resource_dict.get("gpus_per_core", 0))
458+
check_gpus_per_worker(gpus_per_worker=executor_kwargs.get("gpus_per_core", 0))
459459
check_command_line_argument_lst(
460-
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
460+
command_line_argument_lst=executor_kwargs.get("slurm_cmd_args", [])
461461
)
462462
check_wait_on_shutdown(wait_on_shutdown=wait)
463-
if "threads_per_core" in resource_dict:
464-
del resource_dict["threads_per_core"]
465-
if "gpus_per_core" in resource_dict:
466-
del resource_dict["gpus_per_core"]
467-
if "slurm_cmd_args" in resource_dict:
468-
del resource_dict["slurm_cmd_args"]
463+
if "threads_per_core" in executor_kwargs:
464+
del executor_kwargs["threads_per_core"]
465+
if "gpus_per_core" in executor_kwargs:
466+
del executor_kwargs["gpus_per_core"]
467+
if "slurm_cmd_args" in executor_kwargs:
468+
del executor_kwargs["slurm_cmd_args"]
469469
if block_allocation:
470-
resource_dict["init_function"] = init_function
470+
executor_kwargs["init_function"] = init_function
471471
return BlockAllocationTaskScheduler(
472472
max_workers=validate_number_of_cores(
473473
max_cores=max_cores,
474474
max_workers=max_workers,
475475
cores_per_worker=cores_per_worker,
476476
set_local_cores=True,
477477
),
478-
executor_kwargs=resource_dict,
478+
executor_kwargs=executor_kwargs,
479479
spawner=MpiExecSpawner,
480480
restart_limit=restart_limit,
481481
)
482482
else:
483483
return OneProcessTaskScheduler(
484484
max_cores=max_cores,
485485
max_workers=max_workers,
486-
executor_kwargs=resource_dict,
486+
executor_kwargs=executor_kwargs,
487487
spawner=MpiExecSpawner,
488488
)

src/executorlib/executor/slurm.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ def __init__(
189189
pmi_mode=pmi_mode,
190190
init_function=init_function,
191191
max_workers=max_workers,
192-
resource_dict=resource_dict,
192+
executor_kwargs=resource_dict,
193193
pysqa_config_directory=pysqa_config_directory,
194194
backend="slurm",
195195
),
@@ -206,7 +206,7 @@ def __init__(
206206
backend="slurm",
207207
max_cores=max_cores,
208208
cache_directory=cache_directory,
209-
resource_dict=resource_dict,
209+
executor_kwargs=resource_dict,
210210
pmi_mode=pmi_mode,
211211
flux_executor=None,
212212
flux_executor_nesting=False,
@@ -227,7 +227,7 @@ def __init__(
227227
max_workers=max_workers,
228228
cache_directory=cache_directory,
229229
max_cores=max_cores,
230-
resource_dict=resource_dict,
230+
executor_kwargs=resource_dict,
231231
hostname_localhost=hostname_localhost,
232232
block_allocation=block_allocation,
233233
init_function=init_function,
@@ -408,7 +408,7 @@ def __init__(
408408
max_workers=max_workers,
409409
cache_directory=cache_directory,
410410
max_cores=max_cores,
411-
resource_dict=resource_dict,
411+
executor_kwargs=resource_dict,
412412
pmi_mode=pmi_mode,
413413
hostname_localhost=hostname_localhost,
414414
block_allocation=block_allocation,
@@ -432,7 +432,7 @@ def __init__(
432432
max_workers=max_workers,
433433
cache_directory=cache_directory,
434434
max_cores=max_cores,
435-
resource_dict=resource_dict,
435+
executor_kwargs=resource_dict,
436436
pmi_mode=pmi_mode,
437437
hostname_localhost=hostname_localhost,
438438
block_allocation=block_allocation,
@@ -448,7 +448,7 @@ def create_slurm_executor(
448448
max_workers: Optional[int] = None,
449449
max_cores: Optional[int] = None,
450450
cache_directory: Optional[str] = None,
451-
resource_dict: Optional[dict] = None,
451+
executor_kwargs: Optional[dict] = None,
452452
pmi_mode: Optional[str] = None,
453453
hostname_localhost: Optional[bool] = None,
454454
block_allocation: bool = False,
@@ -466,7 +466,7 @@ def create_slurm_executor(
466466
max_cores is recommended, as computers have a limited number of compute cores.
467467
max_cores (int): defines the number cores which can be used in parallel
468468
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
469-
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
469+
executor_kwargs (dict): A dictionary of arguments required by the executor. With the following keys:
470470
- cores (int): number of MPI cores to be used for each function call
471471
- threads_per_core (int): number of OpenMP threads to be used for each function call
472472
- gpus_per_core (int): number of GPUs per worker - defaults to 0
@@ -502,17 +502,17 @@ def create_slurm_executor(
502502
Returns:
503503
InteractiveStepExecutor/ InteractiveExecutor
504504
"""
505-
if resource_dict is None:
506-
resource_dict = {}
507-
cores_per_worker = resource_dict.get("cores", 1)
508-
resource_dict["cache_directory"] = cache_directory
509-
resource_dict["hostname_localhost"] = hostname_localhost
510-
resource_dict["log_obj_size"] = log_obj_size
511-
resource_dict["pmi_mode"] = pmi_mode
505+
if executor_kwargs is None:
506+
executor_kwargs = {}
507+
cores_per_worker = executor_kwargs.get("cores", 1)
508+
executor_kwargs["cache_directory"] = cache_directory
509+
executor_kwargs["hostname_localhost"] = hostname_localhost
510+
executor_kwargs["log_obj_size"] = log_obj_size
511+
executor_kwargs["pmi_mode"] = pmi_mode
512512
check_init_function(block_allocation=block_allocation, init_function=init_function)
513513
check_wait_on_shutdown(wait_on_shutdown=wait)
514514
if block_allocation:
515-
resource_dict["init_function"] = init_function
515+
executor_kwargs["init_function"] = init_function
516516
max_workers = validate_number_of_cores(
517517
max_cores=max_cores,
518518
max_workers=max_workers,
@@ -522,18 +522,18 @@ def create_slurm_executor(
522522
validate_max_workers(
523523
max_workers=max_workers,
524524
cores=cores_per_worker,
525-
threads_per_core=resource_dict.get("threads_per_core", 1),
525+
threads_per_core=executor_kwargs.get("threads_per_core", 1),
526526
)
527527
return BlockAllocationTaskScheduler(
528528
max_workers=max_workers,
529-
executor_kwargs=resource_dict,
529+
executor_kwargs=executor_kwargs,
530530
spawner=SrunSpawner,
531531
restart_limit=restart_limit,
532532
)
533533
else:
534534
return OneProcessTaskScheduler(
535535
max_cores=max_cores,
536536
max_workers=max_workers,
537-
executor_kwargs=resource_dict,
537+
executor_kwargs=executor_kwargs,
538538
spawner=SrunSpawner,
539539
)

src/executorlib/task_scheduler/file/shared.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def done(self) -> bool:
5757
def execute_tasks_h5(
5858
future_queue: queue.Queue,
5959
execute_function: Callable,
60-
resource_dict: dict,
60+
executor_kwargs: dict,
6161
terminate_function: Optional[Callable] = None,
6262
pysqa_config_directory: Optional[str] = None,
6363
backend: Optional[str] = None,
@@ -71,7 +71,7 @@ def execute_tasks_h5(
7171
7272
Args:
7373
future_queue (queue.Queue): The queue containing the tasks.
74-
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
74+
executor_kwargs (dict): A dictionary of executor arguments required by the task. With the following keys:
7575
- cores (int): number of MPI cores to be used for each function call
7676
- cwd (str/None): current working directory where the parallel python task is executed
7777
execute_function (Callable): The function to execute the tasks.
@@ -119,7 +119,7 @@ def execute_tasks_h5(
119119
task_resource_dict, cache_key, cache_directory, error_log_file = (
120120
_get_task_input(
121121
task_resource_dict=task_dict["resource_dict"].copy(),
122-
resource_dict=resource_dict,
122+
executor_kwargs=executor_kwargs,
123123
)
124124
)
125125
task_key, data_dict = serialize_funct(
@@ -352,10 +352,10 @@ def _cancel_processes(
352352

353353

354354
def _get_task_input(
355-
task_resource_dict: dict, resource_dict: dict
355+
task_resource_dict: dict, executor_kwargs: dict
356356
) -> tuple[dict, Optional[str], str, Optional[str]]:
357357
task_resource_dict.update(
358-
{k: v for k, v in resource_dict.items() if k not in task_resource_dict}
358+
{k: v for k, v in executor_kwargs.items() if k not in task_resource_dict}
359359
)
360360
cache_key = task_resource_dict.pop("cache_key", None)
361361
cache_directory = os.path.abspath(task_resource_dict.pop("cache_directory"))

0 commit comments

Comments
 (0)