Skip to content

Commit 3bd6463

Browse files
jan-janssenpyiron-runnerpre-commit-ci[bot]
authored
[Feature] support future objects in map() function (#878)
* [Feature] support future objects in map() function * Format black * fix type hints * more fixes * convert to tuple * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add unit test * Add anothr unit test --------- Co-authored-by: pyiron-runner <pyiron@mpie.de> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent d055130 commit 3bd6463

4 files changed

Lines changed: 91 additions & 0 deletions

File tree

src/executorlib/executor/base.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,43 @@ def submit( # type: ignore
107107
else:
108108
raise RuntimeError("cannot schedule new futures after shutdown")
109109

110+
def map(
111+
self,
112+
fn: Callable,
113+
*iterables,
114+
timeout: Optional[float] = None,
115+
chunksize: int = 1,
116+
):
117+
"""Returns an iterator equivalent to map(fn, iter).
118+
119+
Args:
120+
fn: A callable that will take as many arguments as there are
121+
passed iterables.
122+
timeout: The maximum number of seconds to wait. If None, then there
123+
is no limit on the wait time.
124+
chunksize: The size of the chunks the iterable will be broken into
125+
before being passed to a child process. This argument is only
126+
used by ProcessPoolExecutor; it is ignored by
127+
ThreadPoolExecutor.
128+
129+
Returns:
130+
An iterator equivalent to: map(func, *iterables) but the calls may
131+
be evaluated out-of-order.
132+
133+
Raises:
134+
TimeoutError: If the entire result iterator could not be generated
135+
before the given timeout.
136+
Exception: If fn(*args) raises for any values.
137+
"""
138+
if self._is_active:
139+
return self._task_scheduler.map(
140+
*([fn] + list(iterables)),
141+
timeout=timeout,
142+
chunksize=chunksize,
143+
)
144+
else:
145+
raise RuntimeError("cannot schedule new futures after shutdown")
146+
110147
def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
111148
"""
112149
Clean-up the resources associated with the Executor.

src/executorlib/task_scheduler/base.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,43 @@ def submit( # type: ignore
143143
)
144144
return f
145145

146+
def map(
147+
self,
148+
fn: Callable,
149+
*iterables,
150+
timeout: Optional[float] = None,
151+
chunksize: int = 1,
152+
):
153+
"""Returns an iterator equivalent to map(fn, iter).
154+
155+
Args:
156+
fn: A callable that will take as many arguments as there are
157+
passed iterables.
158+
timeout: The maximum number of seconds to wait. If None, then there
159+
is no limit on the wait time.
160+
chunksize: The size of the chunks the iterable will be broken into
161+
before being passed to a child process. This argument is only
162+
used by ProcessPoolExecutor; it is ignored by
163+
ThreadPoolExecutor.
164+
165+
Returns:
166+
An iterator equivalent to: map(func, *iterables) but the calls may
167+
be evaluated out-of-order.
168+
169+
Raises:
170+
TimeoutError: If the entire result iterator could not be generated
171+
before the given timeout.
172+
Exception: If fn(*args) raises for any values.
173+
"""
174+
if isinstance(iterables, (list, tuple)) and any(
175+
isinstance(i, Future) for i in iterables
176+
):
177+
iterables = tuple(
178+
i.result() if isinstance(i, Future) else i for i in iterables
179+
)
180+
181+
return super().map(fn, *iterables, timeout=timeout, chunksize=chunksize)
182+
146183
def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
147184
"""
148185
Clean-up the resources associated with the Executor.

tests/test_singlenodeexecutor_mpi.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,21 @@ def test_output_files_cwd(self):
125125
[1, 2, 3],
126126
)
127127

128+
def test_map_futures(self):
129+
dirname = os.path.abspath(os.path.dirname(__file__))
130+
os.makedirs(dirname, exist_ok=True)
131+
with SingleNodeExecutor(
132+
max_cores=1,
133+
resource_dict={"cores": 1, "cwd": dirname},
134+
block_allocation=True,
135+
) as p:
136+
calc_lst = p.submit(calc, [1, 2, 3])
137+
output = list(p.map(calc, calc_lst))
138+
self.assertEqual(
139+
output,
140+
[1, 2, 3],
141+
)
142+
128143

129144
class TestSLURMExecutor(unittest.TestCase):
130145
def test_validate_max_workers(self):

tests/test_singlenodeexecutor_noblock.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,5 @@ def test_single_node_executor_exit(self):
173173
exe.shutdown()
174174
with self.assertRaises(RuntimeError):
175175
exe.submit(sum, [1, 2, 3])
176+
with self.assertRaises(RuntimeError):
177+
exe.map(calc, [1, 2, 3])

0 commit comments

Comments
 (0)