Skip to content

Commit 2ce0c12

Browse files
authored
Merge branch 'main' into Py314u
2 parents f0f82a7 + aa0f602 commit 2ce0c12

9 files changed

Lines changed: 142 additions & 48 deletions

File tree

src/executorlib/standalone/interactive/spawner.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ def bootup(
112112
"""
113113
if self._cwd is not None:
114114
os.makedirs(self._cwd, exist_ok=True)
115+
set_current_directory_in_environment()
115116
self._process = subprocess.Popen(
116117
args=self.generate_command(command_lst=command_lst),
117118
cwd=self._cwd,
@@ -195,3 +196,15 @@ def generate_mpiexec_command(
195196
if openmpi_oversubscribe:
196197
command_prepend_lst += ["--oversubscribe"]
197198
return command_prepend_lst
199+
200+
201+
def set_current_directory_in_environment():
202+
"""
203+
Add the current directory to the PYTHONPATH to be able to access local Python modules.
204+
"""
205+
environment = os.environ
206+
current_path = os.getcwd()
207+
if "PYTHONPATH" in environment and current_path not in environment["PYTHONPATH"]:
208+
environment["PYTHONPATH"] = os.getcwd() + ":" + environment["PYTHONPATH"]
209+
elif "PYTHONPATH" not in environment:
210+
environment["PYTHONPATH"] = os.getcwd()

src/executorlib/task_scheduler/file/shared.py

Lines changed: 67 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -90,39 +90,22 @@ def execute_tasks_h5(
9090
if task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"]:
9191
if task_dict["wait"] and wait:
9292
while len(memory_dict) > 0:
93-
memory_dict = {
94-
key: _check_task_output(
95-
task_key=key,
96-
future_obj=value,
97-
cache_directory=cache_dir_dict[key],
98-
)
99-
for key, value in memory_dict.items()
100-
if not value.done()
101-
}
93+
memory_dict = _refresh_memory_dict(
94+
memory_dict=memory_dict,
95+
cache_dir_dict=cache_dir_dict,
96+
)
10297
if not task_dict["cancel_futures"] and wait:
103-
if (
104-
terminate_function is not None
105-
and terminate_function == terminate_subprocess
106-
):
107-
for task in process_dict.values():
108-
terminate_function(task=task)
109-
elif terminate_function is not None:
110-
for queue_id in process_dict.values():
111-
terminate_function(
112-
queue_id=queue_id,
113-
config_directory=pysqa_config_directory,
114-
backend=backend,
115-
)
98+
_cancel_processes(
99+
terminate_function=terminate_function,
100+
process_dict=process_dict,
101+
pysqa_config_directory=pysqa_config_directory,
102+
backend=backend,
103+
)
116104
else:
117-
memory_dict = {
118-
key: _check_task_output(
119-
task_key=key,
120-
future_obj=value,
121-
cache_directory=cache_dir_dict[key],
122-
)
123-
for key, value in memory_dict.items()
124-
if not value.done()
125-
}
105+
memory_dict = _refresh_memory_dict(
106+
memory_dict=memory_dict,
107+
cache_dir_dict=cache_dir_dict,
108+
)
126109
for value in memory_dict.values():
127110
if not value.done():
128111
value.cancel()
@@ -193,15 +176,10 @@ def execute_tasks_h5(
193176
cache_dir_dict[task_key] = cache_directory
194177
future_queue.task_done()
195178
else:
196-
memory_dict = {
197-
key: _check_task_output(
198-
task_key=key,
199-
future_obj=value,
200-
cache_directory=cache_dir_dict[key],
201-
)
202-
for key, value in memory_dict.items()
203-
if not value.done()
204-
}
179+
memory_dict = _refresh_memory_dict(
180+
memory_dict=memory_dict,
181+
cache_dir_dict=cache_dir_dict,
182+
)
205183

206184

207185
def _check_task_output(
@@ -275,3 +253,52 @@ def _convert_args_and_kwargs(
275253
else:
276254
task_kwargs[key] = arg
277255
return task_args, task_kwargs, future_wait_key_lst
256+
257+
258+
def _refresh_memory_dict(memory_dict: dict, cache_dir_dict: dict) -> dict:
259+
"""
260+
Refresh memory dictionary
261+
262+
Args:
263+
memory_dict (dict): dictionary with task keys and future objects
264+
cache_dir_dict (dict): dictionary with task keys and cache directories
265+
266+
Returns:
267+
dict: Updated memory dictionary
268+
"""
269+
return {
270+
key: _check_task_output(
271+
task_key=key,
272+
future_obj=value,
273+
cache_directory=cache_dir_dict[key],
274+
)
275+
for key, value in memory_dict.items()
276+
if not value.done()
277+
}
278+
279+
280+
def _cancel_processes(
281+
process_dict: dict,
282+
terminate_function: Optional[Callable] = None,
283+
pysqa_config_directory: Optional[str] = None,
284+
backend: Optional[str] = None,
285+
):
286+
"""
287+
Cancel processes
288+
289+
Args:
290+
process_dict (dict): dictionary with task keys and process reference.
291+
terminate_function (callable): The function to terminate the tasks.
292+
pysqa_config_directory (str): path to the pysqa config directory (only for pysqa based backend).
293+
backend (str): name of the backend used to spawn tasks.
294+
"""
295+
if terminate_function is not None and terminate_function == terminate_subprocess:
296+
for task in process_dict.values():
297+
terminate_function(task=task)
298+
elif terminate_function is not None and backend is not None:
299+
for queue_id in process_dict.values():
300+
terminate_function(
301+
queue_id=queue_id,
302+
config_directory=pysqa_config_directory,
303+
backend=backend,
304+
)

src/executorlib/task_scheduler/file/spawner_pysqa.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55

66
from executorlib.standalone.hdf import dump, get_queue_id
77
from executorlib.standalone.inputcheck import check_file_exists
8+
from executorlib.standalone.interactive.spawner import (
9+
set_current_directory_in_environment,
10+
)
811
from executorlib.standalone.scheduler import pysqa_execute_command, terminate_with_pysqa
912

1013

@@ -85,6 +88,7 @@ def execute_with_pysqa(
8588
os.path.dirname(os.path.abspath(cwd))
8689
)
8790
submit_kwargs.update(resource_dict)
91+
set_current_directory_in_environment()
8892
queue_id = qa.submit_job(**submit_kwargs)
8993
dump(file_name=file_name, data_dict={"queue_id": queue_id})
9094
return queue_id

src/executorlib/task_scheduler/file/spawner_subprocess.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55

66
from executorlib.standalone.hdf import dump
77
from executorlib.standalone.inputcheck import check_file_exists
8+
from executorlib.standalone.interactive.spawner import (
9+
set_current_directory_in_environment,
10+
)
811

912

1013
def execute_in_subprocess(
@@ -53,11 +56,12 @@ def execute_in_subprocess(
5356
)
5457
if backend is not None:
5558
raise ValueError("backend parameter is not supported for subprocess spawner.")
56-
if resource_dict is None:
57-
resource_dict = {}
58-
cwd = resource_dict.get("cwd", cache_directory)
59+
cwd = _get_working_directory(
60+
cache_directory=cache_directory, resource_dict=resource_dict
61+
)
5962
if cwd is not None:
6063
os.makedirs(cwd, exist_ok=True)
64+
set_current_directory_in_environment()
6165
return subprocess.Popen(command, universal_newlines=True, cwd=cwd)
6266

6367

@@ -71,3 +75,14 @@ def terminate_subprocess(task):
7175
task.terminate()
7276
while task.poll() is None:
7377
time.sleep(0.1)
78+
79+
80+
def _get_working_directory(
81+
cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None
82+
):
83+
if resource_dict is None:
84+
resource_dict = {}
85+
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
86+
return resource_dict["cwd"]
87+
else:
88+
return cache_directory

src/executorlib/task_scheduler/interactive/spawner_flux.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
import flux
66
import flux.job
77

8-
from executorlib.standalone.interactive.spawner import BaseSpawner
8+
from executorlib.standalone.interactive.spawner import (
9+
BaseSpawner,
10+
set_current_directory_in_environment,
11+
)
912

1013

1114
def validate_max_workers(max_workers: int, cores: int, threads_per_core: int):
@@ -118,6 +121,7 @@ def bootup(
118121
num_nodes=self._num_nodes,
119122
exclusive=self._exclusive,
120123
)
124+
set_current_directory_in_environment()
121125
jobspec.environment = dict(os.environ)
122126
if self._pmi_mode is not None:
123127
jobspec.setattr_shell_option("pmi", self._pmi_mode)

src/executorlib/task_scheduler/interactive/spawner_pysqa.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66
from pysqa import QueueAdapter
77

88
from executorlib.standalone.inputcheck import validate_number_of_cores
9-
from executorlib.standalone.interactive.spawner import BaseSpawner
9+
from executorlib.standalone.interactive.spawner import (
10+
BaseSpawner,
11+
set_current_directory_in_environment,
12+
)
1013
from executorlib.standalone.scheduler import pysqa_execute_command, terminate_with_pysqa
1114
from executorlib.task_scheduler.interactive.blockallocation import (
1215
BlockAllocationTaskScheduler,
@@ -183,6 +186,7 @@ def _start_process_helper(
183186
working_directory = os.path.join(self._cwd, hash)
184187
else:
185188
working_directory = os.path.abspath(hash)
189+
set_current_directory_in_environment()
186190
return queue_adapter.submit_job(
187191
command=" ".join(self.generate_command(command_lst=command_lst)),
188192
working_directory=working_directory,

tests/unit/executor/test_flux_cluster.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ def echo(i):
3030
return i
3131

3232

33+
def long_running_function(i):
34+
sleep(10)
35+
return i
36+
37+
3338
def mpi_funct(i):
3439
from mpi4py import MPI
3540

tests/unit/standalone/interactive/test_spawner.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33
from queue import Queue
44
from time import sleep
55
import shutil
6+
import os
67
from sys import platform
78
import unittest
89

910
import numpy as np
1011

1112
from executorlib.task_scheduler.base import TaskSchedulerBase
12-
from executorlib.standalone.interactive.spawner import MpiExecSpawner
13+
from executorlib.standalone.interactive.spawner import MpiExecSpawner, set_current_directory_in_environment
1314
from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler, _execute_multiple_tasks
1415
from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler
1516
from executorlib.standalone.interactive.backend import call_funct
@@ -544,3 +545,22 @@ def test_execute_task_cache_failed_no_argument(self):
544545
with self.assertRaises(TypeError):
545546
f.result()
546547
q.join()
548+
549+
550+
class TestEnvManipulation(unittest.TestCase):
551+
def test_set_current_directory_in_environment(self):
552+
env = os.environ
553+
if "PYTHONPATH" in env:
554+
python_path = env["PYTHONPATH"]
555+
del env["PYTHONPATH"]
556+
else:
557+
python_path = None
558+
self.assertFalse("PYTHONPATH" in env)
559+
set_current_directory_in_environment()
560+
self.assertTrue("PYTHONPATH" in env)
561+
self.assertEqual(env["PYTHONPATH"], os.getcwd())
562+
env["PYTHONPATH"] = "/my/special/path"
563+
set_current_directory_in_environment()
564+
self.assertEqual(env["PYTHONPATH"], os.getcwd() + ":/my/special/path")
565+
if python_path is not None:
566+
env["PYTHONPATH"] = python_path

tests/unit/standalone/test_command.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
import importlib.util
33
import sys
44
import unittest
5-
from executorlib.standalone.command import get_cache_execute_command, get_interactive_execute_command
6-
5+
from executorlib.standalone.command import (
6+
get_cache_execute_command,
7+
get_interactive_execute_command,
8+
)
79

810
skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None
911

0 commit comments

Comments
 (0)