Skip to content

Commit ea0d1a2

Browse files
committed
feat: relay mcp_question from serve-mode jobs back to launching session
Add two-way question relay so when a background job's agent asks a question via mcp_question, it is detected over SSE, formatted with task context, and forwarded to the launching session as an interactive picker. The user can answer inline via mc_answer or attach directly to the job. - New mc_answer tool for responding to pending questions - SSE detection of question tool calls (running/completed states) - Backfill on SSE connect for questions fired before subscription - Toast notifications for questions, completions, and failures - Session label lookup before title annotation to show correct name
1 parent 16692aa commit ea0d1a2

6 files changed

Lines changed: 346 additions & 20 deletions

File tree

src/hooks/notifications.ts

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ import type { PluginInput } from '@opencode-ai/plugin';
22
import type { Job } from '../lib/job-state';
33
import { readReport } from '../lib/reports';
44
import { formatElapsed } from '../lib/utils';
5+
import { type PendingQuestion, buildQuestionRelayMessage } from '../lib/question-relay';
56

67
type Client = PluginInput['client'];
7-
type NotificationEvent = 'complete' | 'failed' | 'blocked' | 'needs_review' | 'awaiting_input';
8+
type NotificationEvent = 'complete' | 'failed' | 'blocked' | 'needs_review' | 'awaiting_input' | 'question';
89

910
interface JobMonitorLike {
10-
on(event: NotificationEvent, handler: (job: Job) => void): void;
11+
on(event: NotificationEvent, handler: (job: Job, ...extra: unknown[]) => void): void;
1112
}
1213

1314
interface SetupNotificationsOptions {
@@ -105,12 +106,12 @@ export function _getTitleStateForTesting(): Map<string, SessionTitleState> {
105106
return titleState;
106107
}
107108

108-
async function sendMessage(client: Client, sessionID: string, text: string): Promise<void> {
109+
async function sendMessage(client: Client, sessionID: string, text: string, expectReply = false): Promise<void> {
109110
await client.session.prompt({
110111
path: { id: sessionID },
111112
body: {
112-
noReply: true,
113-
parts: [{ type: 'text' as const, text, ignored: true }],
113+
noReply: !expectReply,
114+
parts: [{ type: 'text' as const, text, ...(!expectReply && { ignored: true }) }],
114115
},
115116
});
116117
}
@@ -130,9 +131,12 @@ export function setupNotifications(options: SetupNotificationsOptions): void {
130131
const sent = new Set<string>();
131132
let pending: Promise<void> = Promise.resolve();
132133

133-
const notify = async (event: NotificationEvent, job: Job): Promise<void> => {
134+
const notify = async (event: NotificationEvent, job: Job, extra?: unknown): Promise<void> => {
134135
const report = event === 'blocked' || event === 'needs_review' ? await readReport(job.id) : null;
135-
const dedupKey = getDedupKey(event, job, report?.timestamp);
136+
const questionData = event === 'question' ? extra as PendingQuestion | undefined : undefined;
137+
const dedupKey = event === 'question' && questionData
138+
? `question:${job.id}:${questionData.partId}`
139+
: getDedupKey(event, job, report?.timestamp);
136140
if (sent.has(dedupKey)) {
137141
return;
138142
}
@@ -152,7 +156,9 @@ export function setupNotifications(options: SetupNotificationsOptions): void {
152156
const duration = formatElapsed(job.createdAt);
153157
let message = '';
154158

155-
if (event === 'complete') {
159+
if (event === 'question' && questionData) {
160+
message = buildQuestionRelayMessage(questionData);
161+
} else if (event === 'complete') {
156162
message = `🟢 Job '${job.name}' completed in ${duration}. Branch: ${job.branch}. Next: run mc_diff(name: '${job.name}') to review changes, then mc_pr or mc_merge.`;
157163
} else if (event === 'failed') {
158164
message = `🔴 Job '${job.name}' failed after ${duration}. Branch: ${job.branch}. Next: run mc_capture(name: '${job.name}') for logs, then mc_attach(name: '${job.name}') to investigate.`;
@@ -166,27 +172,62 @@ export function setupNotifications(options: SetupNotificationsOptions): void {
166172
message = `👀 Job '${job.name}' needs review (${duration} elapsed). Branch: ${job.branch}.${detail} Next: run mc_diff(name: '${job.name}') and mc_capture(name: '${job.name}') before approving next steps.`;
167173
}
168174

169-
await sendMessage(client, sessionID, message);
170-
sent.add(dedupKey);
175+
let sessionLabel = sessionID;
176+
try {
177+
const sessionInfo = await client.session.get({ path: { id: sessionID } });
178+
const data = (sessionInfo as any)?.data ?? sessionInfo;
179+
sessionLabel = data?.title ?? data?.slug ?? sessionID;
180+
} catch {
181+
// Fall back to raw session ID
182+
}
171183

172184
const titleAnnotationMap: Partial<Record<NotificationEvent, string>> = {
173185
complete: 'done',
174186
failed: 'failed',
175187
awaiting_input: 'needs input',
188+
question: 'has question',
176189
};
177190
const statusText = titleAnnotationMap[event];
178191
if (statusText) {
179192
await annotateSessionTitle(client, sessionID, job.name, statusText);
180193
}
194+
195+
const toastMap: Partial<Record<NotificationEvent, { variant: 'info' | 'success' | 'warning' | 'error'; title: string }>> = {
196+
complete: { variant: 'success', title: `Job "${job.name}" completed` },
197+
failed: { variant: 'error', title: `Job "${job.name}" failed` },
198+
blocked: { variant: 'warning', title: `Job "${job.name}" is blocked` },
199+
awaiting_input: { variant: 'warning', title: `Job "${job.name}" needs input` },
200+
needs_review: { variant: 'info', title: `Job "${job.name}" needs review` },
201+
question: { variant: 'warning', title: `Job "${job.name}" has a question` },
202+
};
203+
const toast = toastMap[event];
204+
if (toast) {
205+
try {
206+
await client.tui.showToast({
207+
body: {
208+
title: toast.title,
209+
message: `Switch to session "${sessionLabel}" to respond`,
210+
variant: toast.variant,
211+
duration: event === 'question' ? 10000 : 5000,
212+
},
213+
});
214+
} catch {
215+
// Toast may not be available in all environments
216+
}
217+
}
218+
219+
await sendMessage(client, sessionID, message, event === 'question');
220+
sent.add(dedupKey);
181221
};
182222

183-
const enqueue = (event: NotificationEvent, job: Job): void => {
184-
pending = pending.then(() => notify(event, job)).catch(() => {});
223+
const enqueue = (event: NotificationEvent, job: Job, extra?: unknown): void => {
224+
pending = pending.then(() => notify(event, job, extra)).catch(() => {});
185225
};
186226

187227
monitor.on('complete', (job) => enqueue('complete', job));
188228
monitor.on('failed', (job) => enqueue('failed', job));
189229
monitor.on('blocked', (job) => enqueue('blocked', job));
190230
monitor.on('needs_review', (job) => enqueue('needs_review', job));
191231
monitor.on('awaiting_input', (job) => enqueue('awaiting_input', job));
232+
monitor.on('question', (job, questionData) => enqueue('question', job, questionData));
192233
}

src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import { mc_plan_cancel } from './tools/plan-cancel';
2727
import { mc_plan_approve } from './tools/plan-approve';
2828
import { mc_report } from './tools/report';
2929
import { mc_overview } from './tools/overview';
30+
import { mc_answer } from './tools/answer';
3031

3132
interface SessionWithID {
3233
id?: string;
@@ -252,7 +253,7 @@ export const MissionControl: Plugin = async ({ client }) => {
252253
},
253254
tool: isJobAgent
254255
? { mc_report, mc_status } as any
255-
: { mc_launch, mc_jobs, mc_status, mc_diff, mc_pr, mc_merge, mc_sync, mc_cleanup, mc_kill, mc_attach, mc_capture, mc_plan, mc_plan_status, mc_plan_cancel, mc_plan_approve, mc_report, mc_overview },
256+
: { mc_launch, mc_jobs, mc_status, mc_diff, mc_pr, mc_merge, mc_sync, mc_cleanup, mc_kill, mc_attach, mc_capture, mc_plan, mc_plan_status, mc_plan_cancel, mc_plan_approve, mc_report, mc_overview, mc_answer },
256257
event: async ({ event }) => {
257258
const sessionID = extractSessionIDFromEvent(event);
258259
if (sessionID) {

src/lib/monitor.ts

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,21 @@ import { isPaneRunning, capturePane, captureExitStatus } from './tmux.js';
44
import { loadConfig } from './config.js';
55
import { readReport } from './reports.js';
66
import { createJobClient } from './sdk-client.js';
7-
import { QuestionRelay, type PermissionRequest } from './question-relay.js';
7+
import {
8+
QuestionRelay,
9+
type PermissionRequest,
10+
type PendingQuestion,
11+
addPendingQuestion,
12+
removePendingQuestion,
13+
buildQuestionRelayMessage,
14+
clearPendingQuestionsForJob,
15+
buildTaskSummary,
16+
} from './question-relay.js';
817
import { loadPlan } from './plan-state.js';
918
import { PermissionPolicy, type PermissionPolicyConfig } from './permission-policy.js';
1019
import type { OpencodeClient } from '@opencode-ai/sdk';
1120

12-
type JobEventType = 'complete' | 'failed' | 'blocked' | 'needs_review' | 'awaiting_input' | 'agent_report';
13-
type JobEventHandler = (job: Job) => void;
21+
type JobEventType = 'complete' | 'failed' | 'blocked' | 'needs_review' | 'awaiting_input' | 'agent_report' | 'question';
1422

1523
interface IdleTracker {
1624
lastOutputHash: string;
@@ -144,9 +152,10 @@ export class JobMonitor extends EventEmitter {
144152
this.sseSubscriptions.delete(jobId);
145153
this.questionRelay.cleanup(jobId);
146154
}
155+
clearPendingQuestionsForJob(jobId);
147156
}
148157

149-
on(event: JobEventType, handler: JobEventHandler): this {
158+
on(event: JobEventType, handler: (job: Job, ...extra: unknown[]) => void): this {
150159
return super.on(event, handler);
151160
}
152161

@@ -212,11 +221,36 @@ export class JobMonitor extends EventEmitter {
212221
reconnectAttempts: 0,
213222
});
214223

224+
this.backfillPendingQuestions(job, client).catch(() => {});
225+
215226
this.processSSEStream(job, client, abortController).catch((error) => {
216227
console.error(`[Monitor] SSE stream error for job ${job.name}:`, error);
217228
});
218229
}
219230

231+
private async backfillPendingQuestions(job: Job, client: OpencodeClient): Promise<void> {
232+
if (!job.remoteSessionID) return;
233+
234+
try {
235+
const result = await client.session.messages({ path: { id: job.remoteSessionID } });
236+
const messages = (result.data ?? result) as Array<{ parts?: Array<Record<string, unknown>> }>;
237+
if (!Array.isArray(messages)) return;
238+
239+
for (const msg of messages) {
240+
for (const part of msg.parts ?? []) {
241+
if (part.type === 'tool' && part.tool === 'question') {
242+
const state = part.state as { status?: string } | undefined;
243+
if (state?.status === 'running') {
244+
this.handleQuestionToolUpdate(job, part);
245+
}
246+
}
247+
}
248+
}
249+
} catch {
250+
// Non-fatal: backfill failure shouldn't block SSE subscription
251+
}
252+
}
253+
220254
private async processSSEStream(
221255
job: Job,
222256
client: OpencodeClient,
@@ -295,6 +329,10 @@ export class JobMonitor extends EventEmitter {
295329
}
296330

297331
case 'message.part.updated': {
332+
const part = event.properties?.part;
333+
if (part?.type === 'tool' && part.tool === 'question') {
334+
this.handleQuestionToolUpdate(job, part);
335+
}
298336
this.updateEventAccumulator(job.id, { currentTool: 'streaming' });
299337
break;
300338
}
@@ -317,6 +355,47 @@ export class JobMonitor extends EventEmitter {
317355
}
318356
}
319357

358+
private handleQuestionToolUpdate(job: Job, part: any): void {
359+
const state = part.state;
360+
if (!state) return;
361+
362+
if (state.status === 'running') {
363+
const input = state.input ?? {};
364+
const questions = (input.questions ?? []) as Array<{
365+
question?: string;
366+
header?: string;
367+
options?: Array<{ label: string; description?: string }>;
368+
multiple?: boolean;
369+
}>;
370+
371+
if (questions.length === 0) return;
372+
if (!job.remoteSessionID || !job.port) return;
373+
374+
const first = questions[0];
375+
const pending: PendingQuestion = {
376+
jobId: job.id,
377+
jobName: job.name,
378+
taskSummary: buildTaskSummary(job.prompt),
379+
partId: part.id,
380+
callID: part.callID,
381+
remoteSessionID: job.remoteSessionID,
382+
port: job.port,
383+
question: first.question ?? '',
384+
options: first.options ?? [],
385+
header: first.header,
386+
multiple: first.multiple,
387+
detectedAt: Date.now(),
388+
};
389+
390+
const isNew = addPendingQuestion(pending);
391+
if (isNew) {
392+
this.emit('question', job, pending);
393+
}
394+
} else if (state.status === 'completed' || state.status === 'error') {
395+
removePendingQuestion(job.id, part.id);
396+
}
397+
}
398+
320399
private inferPermissionType(event: any): PermissionRequest['type'] {
321400
const eventData = event.properties || event;
322401
const metadata = eventData.metadata ?? {};

0 commit comments

Comments
 (0)