Skip to content

Commit 1055cd1

Browse files
Harden worker disconnect (TraceMachina#1972)
There are a number of issues with worker disconnect that cause jobs to be unable to be scheduled or stick permanently in Executing. The first issue is that killing a process does not wait for the process to die which causes the cleanup of the work tree and associated file system store to fail when the system is heavily loaded or the file system is slow. The second issue is that cleanup is required even if prepare_action hasn't been called yet, separate the concerns of cleanup to filesystem cleanup and manager cleanup as two separate AtomicBools and handle these separate cases. The third issue is that apply_filter_predicate doesn't handle update_awaited_action failing due to a version mismatch (e.g. client keep-alive), so that can cause the worker state to be skipped. The fourth and most important is if a worker doesn't exist when worker_notify_run_action is called, the action remains forever in Executing state as nothing updates it, this is trivially resolved by calling update_operation in this case which would otherwise be called by immediate_evict_worker if it actually existed. Finally, there are a couple of places that log errors which are just noise, notably updating an action on worker disconnect or keep-alive that's already completed and a warning about killing a process that's already dead.
1 parent 9353508 commit 1055cd1

3 files changed

Lines changed: 135 additions & 38 deletions

File tree

nativelink-scheduler/src/api_worker_scheduler.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,15 +340,23 @@ impl ApiWorkerSchedulerImpl {
340340
.await,
341341
);
342342
}
343+
Ok(())
343344
} else {
344345
warn!(
345346
?worker_id,
346347
?operation_id,
347348
?action_info,
348349
"Worker not found in worker map in worker_notify_run_action"
349350
);
351+
// Ensure the operation is put back to queued state.
352+
self.worker_state_manager
353+
.update_operation(
354+
&operation_id,
355+
&worker_id,
356+
UpdateOperationType::UpdateWithDisconnect,
357+
)
358+
.await
350359
}
351-
Ok(())
352360
}
353361

354362
/// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it.

nativelink-scheduler/src/simple_scheduler_state_manager.rs

Lines changed: 101 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -314,13 +314,17 @@ where
314314
async fn apply_filter_predicate(
315315
&self,
316316
awaited_action: &AwaitedAction,
317+
subscriber: &T::Subscriber,
317318
filter: &OperationFilter,
318319
) -> bool {
319320
// Note: The caller must filter `client_operation_id`.
320321

322+
let mut maybe_reloaded_awaited_action: Option<AwaitedAction> = None;
321323
if awaited_action.last_client_keepalive_timestamp() + self.client_action_timeout
322324
< (self.now_fn)().now()
323325
{
326+
// This may change if the version is out of date.
327+
let mut timed_out = true;
324328
if !awaited_action.state().stage.is_finished() {
325329
let mut state = awaited_action.state().as_ref().clone();
326330
state.stage = ActionStage::Completed(ActionResult {
@@ -331,20 +335,66 @@ where
331335
)),
332336
..ActionResult::default()
333337
});
334-
let mut new_awaited_action = awaited_action.clone();
335-
new_awaited_action.worker_set_state(Arc::new(state), (self.now_fn)().now());
336-
if let Err(err) = self
337-
.action_db
338-
.update_awaited_action(new_awaited_action)
339-
.await
340-
{
341-
warn!(
342-
"Failed to update action to timed out state after client keepalive timeout. This is ok if multiple schedulers tried to set the state at the same time: {err}",
343-
);
338+
let state = Arc::new(state);
339+
// We may be competing with an client timestamp update, so try
340+
// this a few times.
341+
for attempt in 1..=MAX_UPDATE_RETRIES {
342+
let mut new_awaited_action = match &maybe_reloaded_awaited_action {
343+
None => awaited_action.clone(),
344+
Some(reloaded_awaited_action) => reloaded_awaited_action.clone(),
345+
};
346+
new_awaited_action.worker_set_state(state.clone(), (self.now_fn)().now());
347+
let err = match self
348+
.action_db
349+
.update_awaited_action(new_awaited_action)
350+
.await
351+
{
352+
Ok(()) => break,
353+
Err(err) => err,
354+
};
355+
// Reload from the database if the action was outdated.
356+
let maybe_awaited_action =
357+
if attempt == MAX_UPDATE_RETRIES || err.code != Code::Aborted {
358+
None
359+
} else {
360+
subscriber.borrow().await.ok()
361+
};
362+
if let Some(reloaded_awaited_action) = maybe_awaited_action {
363+
maybe_reloaded_awaited_action = Some(reloaded_awaited_action);
364+
} else {
365+
warn!(
366+
"Failed to update action to timed out state after client keepalive timeout. This is ok if multiple schedulers tried to set the state at the same time: {err}",
367+
);
368+
break;
369+
}
370+
// Re-check the predicate after reload.
371+
if maybe_reloaded_awaited_action
372+
.as_ref()
373+
.is_some_and(|awaited_action| {
374+
awaited_action.last_client_keepalive_timestamp()
375+
+ self.client_action_timeout
376+
>= (self.now_fn)().now()
377+
})
378+
{
379+
timed_out = false;
380+
break;
381+
} else if maybe_reloaded_awaited_action
382+
.as_ref()
383+
.is_some_and(|awaited_action| awaited_action.state().stage.is_finished())
384+
{
385+
break;
386+
}
344387
}
345388
}
346-
return false;
389+
if timed_out {
390+
return false;
391+
}
347392
}
393+
// If the action was reloaded, then use that for the rest of the checks
394+
// instead of the input parameter.
395+
let awaited_action = maybe_reloaded_awaited_action
396+
.as_ref()
397+
.unwrap_or(awaited_action);
348398

349399
if let Some(operation_id) = &filter.operation_id {
350400
if operation_id != awaited_action.operation_id() {
@@ -518,22 +568,38 @@ where
518568

519569
// Make sure we don't update an action that is already completed.
520570
if awaited_action.state().stage.is_finished() {
521-
return Err(make_err!(
522-
Code::Internal,
523-
"Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}",
524-
awaited_action.state().stage,
525-
maybe_worker_id,
526-
));
571+
match &update {
572+
UpdateOperationType::UpdateWithDisconnect | UpdateOperationType::KeepAlive => {
573+
// No need to error a keep-alive when it's completed, it's just
574+
// unnecessary log noise.
575+
return Ok(());
576+
}
577+
_ => {
578+
return Err(make_err!(
579+
Code::Internal,
580+
"Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}",
581+
awaited_action.state().stage,
582+
maybe_worker_id,
583+
));
584+
}
585+
}
527586
}
528587

529588
let stage = match &update {
530589
UpdateOperationType::KeepAlive => {
531590
awaited_action.worker_keep_alive((self.now_fn)().now());
532-
return self
591+
match self
533592
.action_db
534593
.update_awaited_action(awaited_action)
535594
.await
536-
.err_tip(|| "Failed to send KeepAlive in SimpleSchedulerStateManager::update_operation");
595+
.err_tip(|| "Failed to send KeepAlive in SimpleSchedulerStateManager::update_operation") {
596+
// Try again if there was a version mismatch.
597+
Err(err) if err.code == Code::Aborted => {
598+
last_err = Some(err);
599+
continue;
600+
}
601+
result => return result,
602+
}
537603
}
538604
UpdateOperationType::UpdateWithActionStage(stage) => stage.clone(),
539605
UpdateOperationType::UpdateWithError(err) => {
@@ -658,7 +724,10 @@ where
658724
.borrow()
659725
.await
660726
.err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
661-
if !self.apply_filter_predicate(&awaited_action, &filter).await {
727+
if !self
728+
.apply_filter_predicate(&awaited_action, &subscriber, &filter)
729+
.await
730+
{
662731
return Ok(Box::pin(stream::empty()));
663732
}
664733
return Ok(Box::pin(stream::once(async move {
@@ -678,7 +747,10 @@ where
678747
.borrow()
679748
.await
680749
.err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
681-
if !self.apply_filter_predicate(&awaited_action, &filter).await {
750+
if !self
751+
.apply_filter_predicate(&awaited_action, &subscriber, &filter)
752+
.await
753+
{
682754
return Ok(Box::pin(stream::empty()));
683755
}
684756
return Ok(Box::pin(stream::once(async move {
@@ -704,11 +776,10 @@ where
704776
.try_filter_map(|(subscriber, awaited_action)| {
705777
let filter = filter.clone();
706778
async move {
707-
if self.apply_filter_predicate(&awaited_action, &filter).await {
708-
Ok(Some((subscriber, awaited_action.sort_key())))
709-
} else {
710-
Ok(None)
711-
}
779+
Ok(self
780+
.apply_filter_predicate(&awaited_action, &subscriber, &filter)
781+
.await
782+
.then_some((subscriber, awaited_action.sort_key())))
712783
}
713784
})
714785
.try_collect()
@@ -750,11 +821,10 @@ where
750821
.try_filter_map(move |(subscriber, awaited_action)| {
751822
let filter = filter.clone();
752823
async move {
753-
if self.apply_filter_predicate(&awaited_action, &filter).await {
754-
Ok(Some(subscriber))
755-
} else {
756-
Ok(None)
757-
}
824+
Ok(self
825+
.apply_filter_predicate(&awaited_action, &subscriber, &filter)
826+
.await
827+
.then_some(subscriber))
758828
}
759829
})
760830
.map(move |result| -> Box<dyn ActionStateResult> {

nativelink-worker/src/running_actions_manager.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,7 @@ pub struct RunningActionImpl {
661661
timeout: Duration,
662662
running_actions_manager: Arc<RunningActionsManagerImpl>,
663663
state: Mutex<RunningActionImplState>,
664+
has_manager_entry: AtomicBool,
664665
did_cleanup: AtomicBool,
665666
}
666667

@@ -691,7 +692,10 @@ impl RunningActionImpl {
691692
execution_metadata,
692693
error: None,
693694
}),
694-
did_cleanup: AtomicBool::new(false),
695+
// Always need to ensure that we're removed from the manager on Drop.
696+
has_manager_entry: AtomicBool::new(true),
697+
// Only needs to be cleaned up after a prepare_action call, set there.
698+
did_cleanup: AtomicBool::new(true),
695699
}
696700
}
697701

@@ -732,6 +736,8 @@ impl RunningActionImpl {
732736
fs::create_dir(&self.work_directory)
733737
.await
734738
.err_tip(|| format!("Error creating work directory {}", self.work_directory))?;
739+
// Now the work directory has been created, we have to clean up.
740+
self.did_cleanup.store(false, Ordering::Release);
735741
// Download the input files/folder and place them into the temp directory.
736742
self.metrics()
737743
.download_to_directory
@@ -923,6 +929,10 @@ impl RunningActionImpl {
923929
.err_tip(|| "Expected stderr to exist on command this should never happen")?;
924930

925931
let mut child_process_guard = guard(child_process, |mut child_process| {
932+
if child_process.try_wait().is_ok_and(|res| res.is_some()) {
933+
// The child already exited, probably a timeout or kill operation.
934+
return;
935+
}
926936
error!(
927937
"Child process was not cleaned up before dropping the call to execute(), killing in background spawn."
928938
);
@@ -966,7 +976,7 @@ impl RunningActionImpl {
966976
() = &mut sleep_fut => {
967977
self.running_actions_manager.metrics.task_timeouts.inc();
968978
killed_action = true;
969-
if let Err(err) = child_process_guard.start_kill() {
979+
if let Err(err) = child_process_guard.kill().await {
970980
error!(
971981
?err,
972982
"Could not kill process in RunningActionsManager for action timeout",
@@ -1035,7 +1045,7 @@ impl RunningActionImpl {
10351045
},
10361046
_ = &mut kill_channel_rx => {
10371047
killed_action = true;
1038-
if let Err(err) = child_process_guard.start_kill() {
1048+
if let Err(err) = child_process_guard.kill().await {
10391049
error!(
10401050
operation_id = ?self.operation_id,
10411051
?err,
@@ -1305,6 +1315,12 @@ impl RunningActionImpl {
13051315
impl Drop for RunningActionImpl {
13061316
fn drop(&mut self) {
13071317
if self.did_cleanup.load(Ordering::Acquire) {
1318+
if self.has_manager_entry.load(Ordering::Acquire) {
1319+
drop(
1320+
self.running_actions_manager
1321+
.cleanup_action(&self.operation_id),
1322+
);
1323+
}
13081324
return;
13091325
}
13101326
let operation_id = self.operation_id.clone();
@@ -1370,6 +1386,7 @@ impl RunningAction for RunningActionImpl {
13701386
&self.action_directory,
13711387
)
13721388
.await;
1389+
self.has_manager_entry.store(false, Ordering::Release);
13731390
self.did_cleanup.store(true, Ordering::Release);
13741391
result.map(move |()| self)
13751392
})
@@ -2104,9 +2121,11 @@ impl RunningActionsManager for RunningActionsManagerImpl {
21042121
.filter_map(|(_operation_id, action)| action.upgrade())
21052122
.collect()
21062123
};
2107-
for action in kill_operations {
2108-
Self::kill_operation(action).await;
2109-
}
2124+
let mut kill_futures: FuturesUnordered<_> = kill_operations
2125+
.into_iter()
2126+
.map(Self::kill_operation)
2127+
.collect();
2128+
while kill_futures.next().await.is_some() {}
21102129
})
21112130
.await;
21122131
// Ignore error. If error happens it means there's no sender, which is not a problem.

0 commit comments

Comments
 (0)