Skip to content

Commit c054731

Browse files
committed
fix: prevent setCheckpoint from overwriting concurrent job state updates
Replace savePlan(staleSnapshot) calls in setCheckpoint, clearCheckpoint, _doReconcile, and resumePlan with atomic updatePlanFields that only modifies plan-level fields (status, checkpoint, completedAt, prUrl) without touching job states. Add reconciliation safety net that cross-references jobs.json to detect plan jobs stuck as 'running' when they have already completed or failed. Closes #63
1 parent a991d64 commit c054731

5 files changed

Lines changed: 373 additions & 33 deletions

File tree

src/lib/orchestrator.ts

Lines changed: 63 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { randomUUID } from 'crypto';
22
import type { MCConfig } from './config';
33
import type { PlanSpec, JobSpec, PlanStatus, CheckpointType, CheckpointContext } from './plan-types';
4-
import { loadPlan, savePlan, updatePlanJob, clearPlan, validateGhAuth } from './plan-state';
4+
import { loadPlan, savePlan, updatePlanJob, updatePlanFields, clearPlan, validateGhAuth } from './plan-state';
55
import { getDefaultBranch } from './git';
66
import { createIntegrationBranch, deleteIntegrationBranch } from './integration';
77
import { MergeTrain, checkMergeability, type MergeTestReport, validateTouchSet } from './merge-train';
@@ -291,7 +291,6 @@ export class Orchestrator {
291291

292292
const plan = await loadPlan();
293293
if (plan && plan.status === 'paused') {
294-
// Track jobs approved for merge so reconciler doesn't re-checkpoint them
295294
if (wasPreMerge) {
296295
for (const job of plan.jobs) {
297296
if (job.status === 'ready_to_merge') {
@@ -300,10 +299,11 @@ export class Orchestrator {
300299
}
301300
}
302301

303-
plan.status = 'running';
304-
plan.checkpoint = null;
305-
plan.checkpointContext = null;
306-
await savePlan(plan);
302+
await updatePlanFields(plan.id, {
303+
status: 'running',
304+
checkpoint: null,
305+
checkpointContext: null,
306+
});
307307
this.showToast('Mission Control', 'Checkpoint cleared, resuming execution.', 'info');
308308

309309
if (!this.isRunning) {
@@ -391,10 +391,11 @@ export class Orchestrator {
391391

392392
private async setCheckpoint(type: CheckpointType, plan: PlanSpec, context?: CheckpointContext): Promise<void> {
393393
this.checkpoint = type;
394-
plan.status = 'paused';
395-
plan.checkpoint = type;
396-
plan.checkpointContext = context ?? null;
397-
await savePlan(plan);
394+
await updatePlanFields(plan.id, {
395+
status: 'paused',
396+
checkpoint: type,
397+
checkpointContext: context ?? null,
398+
});
398399
this.stopReconciler();
399400
this.showToast(
400401
'Mission Control',
@@ -447,6 +448,32 @@ export class Orchestrator {
447448
const runningJobs = (await getRunningJobs()).filter((job) => job.planId === plan.id);
448449
let runningCount = runningJobs.length;
449450

451+
// Safety net: detect plan jobs stuck as 'running' when jobs.json already
452+
// shows them as completed/failed (can happen if a prior savePlan race
453+
// overwrote their status, or if the 'complete' event was missed).
454+
const jobState = await loadJobState();
455+
const runningJobNames = new Set(runningJobs.map((j) => j.name));
456+
for (const planJob of plan.jobs) {
457+
if (planJob.status !== 'running') continue;
458+
if (runningJobNames.has(planJob.name)) continue;
459+
460+
const stateJob = jobState.jobs.find(
461+
(j) => j.name === planJob.name && j.planId === plan.id,
462+
);
463+
if (!stateJob) continue;
464+
465+
if (stateJob.status === 'completed') {
466+
await updatePlanJob(plan.id, planJob.name, { status: 'completed' });
467+
planJob.status = 'completed';
468+
} else if (stateJob.status === 'failed') {
469+
await updatePlanJob(plan.id, planJob.name, {
470+
status: 'failed',
471+
error: 'recovered from missed completion event',
472+
});
473+
planJob.status = 'failed';
474+
}
475+
}
476+
450477
const mergeOrder = [...plan.jobs].sort(
451478
(a, b) => (a.mergeOrder ?? Number.MAX_SAFE_INTEGER) - (b.mergeOrder ?? Number.MAX_SAFE_INTEGER),
452479
);
@@ -654,23 +681,26 @@ export class Orchestrator {
654681
return;
655682
}
656683

657-
latestPlan.status = 'creating_pr';
658-
await savePlan(latestPlan);
684+
await updatePlanFields(latestPlan.id, { status: 'creating_pr' });
659685

660686
try {
661687
const prUrl = await this.createPR();
662-
latestPlan.prUrl = prUrl;
663-
latestPlan.status = 'completed';
664-
latestPlan.completedAt = new Date().toISOString();
665-
await savePlan(latestPlan);
688+
const completedAt = new Date().toISOString();
689+
await updatePlanFields(latestPlan.id, {
690+
status: 'completed',
691+
prUrl,
692+
completedAt,
693+
});
666694
this.stopReconciler();
667695
this.unsubscribeFromMonitorEvents();
668696
this.showToast('Mission Control', `Plan completed! PR: ${prUrl}`, 'success');
669697
this.notify(`🎉 Plan "${latestPlan.name}" completed! PR created: ${prUrl}`);
670698
} catch (prError) {
671-
latestPlan.status = 'failed';
672-
latestPlan.completedAt = new Date().toISOString();
673-
await savePlan(latestPlan);
699+
const completedAt = new Date().toISOString();
700+
await updatePlanFields(latestPlan.id, {
701+
status: 'failed',
702+
completedAt,
703+
});
674704
this.stopReconciler();
675705
this.unsubscribeFromMonitorEvents();
676706
const errMsg = prError instanceof Error ? prError.message : String(prError);
@@ -681,15 +711,17 @@ export class Orchestrator {
681711
}
682712

683713
if (latestPlan.status !== plan.status) {
684-
latestPlan.status = plan.status;
714+
const updates: { status: typeof plan.status; completedAt?: string } = {
715+
status: plan.status,
716+
};
685717
if (plan.status === 'failed') {
686-
latestPlan.completedAt = new Date().toISOString();
718+
updates.completedAt = new Date().toISOString();
687719
this.stopReconciler();
688720
this.unsubscribeFromMonitorEvents();
689721
this.showToast('Mission Control', `Plan "${latestPlan.name}" failed.`, 'error');
690722
this.notify(`❌ Plan "${latestPlan.name}" failed.`);
691723
}
692-
await savePlan(latestPlan);
724+
await updatePlanFields(latestPlan.id, updates);
693725
}
694726
}
695727

@@ -1154,10 +1186,11 @@ If your work needs human review before it can proceed: mc_report(status: "needs_
11541186
}
11551187

11561188
if (plan.status === 'paused') {
1157-
plan.status = 'running';
1158-
plan.checkpoint = null;
1159-
plan.checkpointContext = null;
1160-
await savePlan(plan);
1189+
await updatePlanFields(plan.id, {
1190+
status: 'running',
1191+
checkpoint: null,
1192+
checkpointContext: null,
1193+
});
11611194
}
11621195
this.checkpoint = null;
11631196

@@ -1196,12 +1229,10 @@ If your work needs human review before it can proceed: mc_report(status: "needs_
11961229
}
11971230

11981231
if (hasDeadRunningJob) {
1199-
const currentPlan = await loadPlan();
1200-
if (currentPlan) {
1201-
currentPlan.status = 'failed';
1202-
currentPlan.completedAt = new Date().toISOString();
1203-
await savePlan(currentPlan);
1204-
}
1232+
await updatePlanFields(plan.id, {
1233+
status: 'failed',
1234+
completedAt: new Date().toISOString(),
1235+
});
12051236
return;
12061237
}
12071238

src/lib/plan-state.ts

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { join } from 'path';
22
import { z } from 'zod';
33
import { getDataDir } from './paths';
44
import { GitMutex } from './git-mutex';
5-
import type { PlanSpec, JobSpec } from './plan-types';
5+
import type { PlanSpec, JobSpec, CheckpointContext } from './plan-types';
66
import { isValidPlanTransition, isValidJobTransition } from './plan-types';
77
import { PlanSpecSchema } from './schemas';
88
import { atomicWrite } from './utils';
@@ -114,6 +114,71 @@ export async function updatePlanJob(
114114
});
115115
}
116116

117+
export interface PlanFieldUpdates {
118+
status?: PlanSpec['status'];
119+
checkpoint?: PlanSpec['checkpoint'];
120+
checkpointContext?: CheckpointContext | null;
121+
completedAt?: string;
122+
prUrl?: string;
123+
}
124+
125+
/**
126+
* Atomically update plan-level fields without overwriting job states.
127+
*
128+
* Unlike savePlan(), this reads the current plan inside the mutex and merges
129+
* only the specified fields. This prevents a stale plan snapshot from clobbering
130+
* concurrent updatePlanJob() writes — the root cause of completed jobs appearing
131+
* as "running" after a sibling job failed (see #63).
132+
*/
133+
export async function updatePlanFields(
134+
planId: string,
135+
updates: PlanFieldUpdates,
136+
): Promise<void> {
137+
await planMutex.withLock(async () => {
138+
const plan = await loadPlan();
139+
140+
if (!plan) {
141+
throw new Error('No active plan exists');
142+
}
143+
144+
if (plan.id !== planId) {
145+
throw new Error(
146+
`Plan ID mismatch: expected ${planId}, got ${plan.id}`,
147+
);
148+
}
149+
150+
if (updates.status !== undefined && updates.status !== plan.status) {
151+
if (!isValidPlanTransition(plan.status, updates.status)) {
152+
console.warn(`[MC] Invalid plan transition: ${plan.status}${updates.status} (plan: ${plan.name})`);
153+
}
154+
plan.status = updates.status;
155+
}
156+
if (updates.checkpoint !== undefined) {
157+
plan.checkpoint = updates.checkpoint;
158+
}
159+
if (updates.checkpointContext !== undefined) {
160+
plan.checkpointContext = updates.checkpointContext;
161+
}
162+
if (updates.completedAt !== undefined) {
163+
plan.completedAt = updates.completedAt;
164+
}
165+
if (updates.prUrl !== undefined) {
166+
plan.prUrl = updates.prUrl;
167+
}
168+
169+
const ghAuthenticated = await validateGhAuth();
170+
const planToSave = { ...plan, ghAuthenticated };
171+
172+
const filePath = await getPlanFilePath();
173+
try {
174+
const data = JSON.stringify(planToSave, null, 2);
175+
await atomicWrite(filePath, data);
176+
} catch (error) {
177+
throw new Error(`Failed to save plan state to ${filePath}: ${error}`);
178+
}
179+
});
180+
}
181+
117182
export async function clearPlan(): Promise<void> {
118183
await planMutex.withLock(async () => {
119184
const filePath = await getPlanFilePath();

tests/lib/orchestrator-modes.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,16 @@ describe('orchestrator modes', () => {
8080
);
8181
},
8282
);
83+
spyOn(planStateMod, 'updatePlanFields').mockImplementation(
84+
async (planId: string, updates: Partial<PlanSpec>) => {
85+
if (!planState || planState.id !== planId) return;
86+
if (updates.status !== undefined) planState.status = updates.status;
87+
if (updates.checkpoint !== undefined) planState.checkpoint = updates.checkpoint;
88+
if (updates.checkpointContext !== undefined) planState.checkpointContext = updates.checkpointContext;
89+
if (updates.completedAt !== undefined) planState.completedAt = updates.completedAt;
90+
if (updates.prUrl !== undefined) planState.prUrl = updates.prUrl;
91+
},
92+
);
8393
spyOn(planStateMod, 'clearPlan').mockImplementation(async () => {
8494
planState = null;
8595
});

0 commit comments

Comments
 (0)