Skip to content

Commit 571ce19

Browse files
committed
feat: add dynamic orchestration with replanning, inter-job comms, session forking, and fix-before-rollback (#68)
1 parent 998472c commit 571ce19

8 files changed

Lines changed: 493 additions & 2 deletions

File tree

src/lib/config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const DEFAULT_CONFIG: MCConfig = {
2525
useServeMode: true,
2626
portRangeStart: 14100,
2727
portRangeEnd: 14199,
28+
fixBeforeRollbackTimeout: 120000,
2829
omo: {
2930
enabled: false,
3031
defaultMode: 'vanilla',

src/lib/job-comms.ts

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import type { JobSpec } from './plan-types';
2+
import { sendPrompt, waitForServer } from './sdk-client';
3+
4+
export interface RelayContext {
5+
finding: string;
6+
filePath?: string;
7+
lineNumber?: number;
8+
severity?: 'info' | 'warning' | 'error';
9+
}
10+
11+
export interface RelayMessage {
12+
from: string;
13+
to: string;
14+
context: RelayContext;
15+
timestamp: string;
16+
}
17+
18+
export class JobComms {
19+
private messageBus: Map<string, RelayMessage[]> = new Map();
20+
private relayPatterns: Map<string, Bun.Glob[]> = new Map();
21+
private relayPatternSources: Map<string, string[]> = new Map();
22+
23+
registerJob(job: JobSpec): void {
24+
if (job.relayPatterns && job.relayPatterns.length > 0) {
25+
const patterns: Bun.Glob[] = [];
26+
const sources: string[] = [];
27+
for (const pattern of job.relayPatterns) {
28+
const normalized = pattern.endsWith('/') ? `${pattern}**` : pattern;
29+
patterns.push(new Bun.Glob(normalized));
30+
sources.push(pattern);
31+
}
32+
this.relayPatterns.set(job.name, patterns);
33+
this.relayPatternSources.set(job.name, sources);
34+
}
35+
if (!this.messageBus.has(job.name)) {
36+
this.messageBus.set(job.name, []);
37+
}
38+
}
39+
40+
unregisterJob(jobName: string): void {
41+
this.relayPatterns.delete(jobName);
42+
this.relayPatternSources.delete(jobName);
43+
this.messageBus.delete(jobName);
44+
}
45+
46+
relayFinding(from: string, to: string, context: RelayContext): void {
47+
const message: RelayMessage = {
48+
from,
49+
to,
50+
context,
51+
timestamp: new Date().toISOString(),
52+
};
53+
54+
const messages = this.messageBus.get(to) ?? [];
55+
messages.push(message);
56+
this.messageBus.set(to, messages);
57+
}
58+
59+
getMessagesForJob(jobName: string): RelayMessage[] {
60+
return this.messageBus.get(jobName) ?? [];
61+
}
62+
63+
clearMessagesForJob(jobName: string): void {
64+
this.messageBus.set(jobName, []);
65+
}
66+
67+
shouldRelayForFile(jobName: string, filePath: string): boolean {
68+
const patterns = this.relayPatterns.get(jobName);
69+
if (!patterns || patterns.length === 0) {
70+
return false;
71+
}
72+
return patterns.some((pattern) => pattern.match(filePath));
73+
}
74+
75+
async deliverMessages(
76+
job: JobSpec,
77+
options?: { filterFrom?: string[] },
78+
): Promise<number> {
79+
const messages = this.getMessagesForJob(job.name);
80+
if (messages.length === 0) {
81+
return 0;
82+
}
83+
84+
const filtered = options?.filterFrom
85+
? messages.filter((m) => options.filterFrom!.includes(m.from))
86+
: messages;
87+
88+
if (filtered.length === 0) {
89+
return 0;
90+
}
91+
92+
if (!job.port) {
93+
return 0;
94+
}
95+
96+
try {
97+
const client = await waitForServer(job.port, { timeoutMs: 5000 });
98+
99+
for (const message of filtered) {
100+
const prompt = this.formatRelayPrompt(message);
101+
await sendPrompt(client, job.launchSessionID ?? '', prompt);
102+
}
103+
104+
this.clearMessagesForJob(job.name);
105+
return filtered.length;
106+
} catch {
107+
return 0;
108+
}
109+
}
110+
111+
private formatRelayPrompt(message: RelayMessage): string {
112+
const { from, context } = message;
113+
const { finding, filePath, lineNumber, severity } = context;
114+
115+
const parts: string[] = [`[Inter-Job Communication from ${from}]`];
116+
117+
if (severity) {
118+
parts.push(`Severity: ${severity.toUpperCase()}`);
119+
}
120+
121+
parts.push(`Finding: ${finding}`);
122+
123+
if (filePath) {
124+
parts.push(`File: ${filePath}`);
125+
}
126+
127+
if (lineNumber) {
128+
parts.push(`Line: ${lineNumber}`);
129+
}
130+
131+
parts.push('\nConsider how this finding may affect your current work.');
132+
133+
return parts.join('\n');
134+
}
135+
136+
getAllRegisteredJobs(): string[] {
137+
return Array.from(this.messageBus.keys());
138+
}
139+
140+
getRelayPatternsForJob(jobName: string): string[] | undefined {
141+
return this.relayPatternSources.get(jobName);
142+
}
143+
}

src/lib/merge-train.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type { JobSpec } from './plan-types';
44
import { gitCommand } from './git';
55
import { getIntegrationWorktree } from './integration';
66
import { extractConflicts } from './utils';
7+
import { createJobClient, sendPrompt, waitForServer } from './sdk-client';
78

89
export type MergeResult =
910
| { success: true; mergedAt: string; testReport: MergeTestReport }
@@ -35,9 +36,11 @@ type MergeTrainConfig = {
3536
testTimeout?: number;
3637
mergeStrategy?: 'squash' | 'ff-only' | 'merge';
3738
setupCommands?: string[];
39+
fixBeforeRollbackTimeout?: number;
3840
};
3941

4042
const DEFAULT_TEST_TIMEOUT_MS = 600000;
43+
const DEFAULT_FIX_BEFORE_ROLLBACK_TIMEOUT_MS = 120000;
4144

4245
const INSTALL_COMMAND_BY_LOCKFILE = [
4346
{ file: 'bun.lockb', command: 'bun install --frozen-lockfile' },
@@ -564,6 +567,37 @@ export class MergeTrain {
564567
}
565568

566569
if (!testResult.success) {
570+
const fixTimeout = this.config?.fixBeforeRollbackTimeout ?? DEFAULT_FIX_BEFORE_ROLLBACK_TIMEOUT_MS;
571+
const isServeMode = job.port !== undefined;
572+
573+
if (isServeMode && fixTimeout > 0) {
574+
const fixPrompt = this.buildFixPrompt(job.name, testCommand, testResult.output);
575+
const fixPrompted = await this.promptAgentForFix(job, fixPrompt);
576+
577+
if (fixPrompted) {
578+
await this.sleep(fixTimeout);
579+
580+
const retestResult = await runTestCommand(this.integrationWorktree, testCommand, timeoutMs);
581+
582+
if (retestResult.success) {
583+
return {
584+
success: true,
585+
mergedAt: new Date().toISOString(),
586+
testReport: {
587+
status: 'passed',
588+
command: testCommand,
589+
output: retestResult.output || undefined,
590+
setup: {
591+
status: dependencySetupResult.status,
592+
commands: dependencySetupResult.commands,
593+
output: dependencySetupResult.output || undefined,
594+
},
595+
},
596+
};
597+
}
598+
}
599+
}
600+
567601
await rollbackMergeToHead(this.integrationWorktree, headBeforeStr);
568602
return {
569603
success: false,
@@ -609,4 +643,35 @@ export class MergeTrain {
609643

610644
return results;
611645
}
646+
647+
private buildFixPrompt(jobName: string, testCommand: string, testOutput: string): string {
648+
return `[AUTO-GENERATED] Test Failure in Merge Train
649+
650+
Job "${jobName}" failed during merge train testing.
651+
652+
Test Command: ${testCommand}
653+
654+
Test Output:
655+
${testOutput.slice(0, 2000)}
656+
657+
Please fix the failing tests. You have a limited time window to apply fixes before the merge is rolled back. Focus on the most critical failures first.`;
658+
}
659+
660+
private async promptAgentForFix(job: JobSpec, prompt: string): Promise<boolean> {
661+
if (!job.port || !job.launchSessionID) {
662+
return false;
663+
}
664+
665+
try {
666+
const client = await waitForServer(job.port, { timeoutMs: 10000 });
667+
await sendPrompt(client, job.launchSessionID, prompt);
668+
return true;
669+
} catch {
670+
return false;
671+
}
672+
}
673+
674+
private sleep(ms: number): Promise<void> {
675+
return new Promise((resolve) => setTimeout(resolve, ms));
676+
}
612677
}

0 commit comments

Comments
 (0)