Skip to content

Commit 693ca9e

Browse files
IlgarBaghishovIlgar Baghishovjan-janssen
authored
[Fix] serialize worker job submissions to preserve worker_id ordering (#935)
* fix: serialize worker job submissions to preserve worker_id ordering Worker threads were submitting Flux jobs concurrently, causing executorlib_worker_id to not correspond to the Flux scheduling order. This made worker_id unreliable for resource mapping (e.g. GPU assignment). Two changes: - Add threading.Event chain in BlockAllocationTaskScheduler so each worker waits for the previous worker to finish submitting before starting its own submission. - Call self._future.jobid() after FluxExecutor.submit() to block until the job is actually registered with the Flux broker, not just queued in the async FluxExecutor. * Update blockallocation.py * Update spawner_flux.py * Update spawner_flux.py --------- Co-authored-by: Ilgar Baghishov <ilgar@login15.chn.perlmutter.nersc.gov> Co-authored-by: Jan Janssen <jan-janssen@users.noreply.github.com>
1 parent 2fa06c8 commit 693ca9e

2 files changed

Lines changed: 20 additions & 1 deletion

File tree

src/executorlib/task_scheduler/interactive/blockallocation.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import queue
22
import random
33
from concurrent.futures import Future
4-
from threading import Thread
4+
from threading import Event, Thread
55
from typing import Callable, Optional
66

77
from executorlib.standalone.command import get_interactive_execute_command
@@ -83,6 +83,8 @@ def __init__(
8383
self_id = random.getrandbits(128)
8484
self._self_id = self_id
8585
_interrupt_bootup_dict[self._self_id] = False
86+
bootup_events = [Event() for _ in range(self._max_workers)]
87+
bootup_events[0].set()
8688
self._set_process(
8789
process=[
8890
Thread(
@@ -91,6 +93,12 @@ def __init__(
9193
| {
9294
"worker_id": worker_id,
9395
"stop_function": lambda: _interrupt_bootup_dict[self_id],
96+
"bootup_event": bootup_events[worker_id],
97+
"next_bootup_event": (
98+
bootup_events[worker_id + 1]
99+
if worker_id + 1 < self._max_workers
100+
else None
101+
),
94102
},
95103
)
96104
for worker_id in range(self._max_workers)
@@ -217,6 +225,8 @@ def _execute_multiple_tasks(
217225
worker_id: int = 0,
218226
stop_function: Optional[Callable] = None,
219227
restart_limit: int = 0,
228+
bootup_event: Optional[Event] = None,
229+
next_bootup_event: Optional[Event] = None,
220230
**kwargs,
221231
) -> None:
222232
"""
@@ -245,7 +255,12 @@ def _execute_multiple_tasks(
245255
distribution.
246256
stop_function (Callable): Function to stop the interface.
247257
restart_limit (int): The maximum number of restarting worker processes.
258+
bootup_event (Event): Event to wait on before submitting the job to the scheduler, ensuring workers are
259+
submitted in worker_id order.
260+
next_bootup_event (Event): Event to signal after job submission, unblocking the next worker.
248261
"""
262+
if bootup_event is not None:
263+
bootup_event.wait()
249264
interface = interface_bootup(
250265
command_lst=get_interactive_execute_command(
251266
cores=cores,
@@ -256,6 +271,8 @@ def _execute_multiple_tasks(
256271
worker_id=worker_id,
257272
stop_function=stop_function,
258273
)
274+
if next_bootup_event is not None:
275+
next_bootup_event.set()
259276
interface_initialization_exception = _set_init_function(
260277
interface=interface,
261278
init_function=init_function,

src/executorlib/task_scheduler/interactive/spawner_flux.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ def bootup(
146146
)
147147
else:
148148
self._future = self._flux_executor.submit(jobspec=jobspec)
149+
if self._future is not None:
150+
self._future.jobid()
149151
return self.poll()
150152

151153
def shutdown(self, wait: bool = True):

0 commit comments

Comments
 (0)