Skip to content

Commit 50e8c06

Browse files
authored
feat(cloud-agent): improved sse keepalive handling (#1614)
1 parent 194c0b6 commit 50e8c06

4 files changed

Lines changed: 166 additions & 1 deletion

File tree

apps/code/src/main/services/cloud-task/service.test.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,78 @@ describe("CloudTaskService", () => {
322322
);
323323
});
324324

325+
it("ignores keepalive SSE events while keeping the stream open", async () => {
326+
const updates: unknown[] = [];
327+
service.on(CloudTaskEvent.Update, (payload) => updates.push(payload));
328+
329+
mockNetFetch
330+
.mockResolvedValueOnce(
331+
createJsonResponse({
332+
id: "run-1",
333+
status: "in_progress",
334+
stage: "build",
335+
output: null,
336+
error_message: null,
337+
branch: "main",
338+
updated_at: "2026-01-01T00:00:00Z",
339+
}),
340+
)
341+
.mockResolvedValueOnce(
342+
createJsonResponse([], 200, { "X-Has-More": "false" }),
343+
);
344+
345+
mockStreamFetch.mockResolvedValueOnce(
346+
createOpenSseResponse(
347+
'event: keepalive\ndata: {"type":"keepalive"}\n\nid: 2\ndata: {"type":"notification","timestamp":"2026-01-01T00:00:02Z","notification":{"jsonrpc":"2.0","method":"_posthog/console","params":{"sessionId":"run-1","level":"info","message":"live tail"}}}\n\n',
348+
),
349+
);
350+
351+
service.watch({
352+
taskId: "task-1",
353+
runId: "run-1",
354+
apiHost: "https://app.example.com",
355+
teamId: 2,
356+
});
357+
358+
await waitFor(() => updates.length >= 2);
359+
360+
expect(updates).toEqual([
361+
{
362+
taskId: "task-1",
363+
runId: "run-1",
364+
kind: "snapshot",
365+
newEntries: [],
366+
totalEntryCount: 0,
367+
status: "in_progress",
368+
stage: "build",
369+
output: null,
370+
errorMessage: null,
371+
branch: "main",
372+
},
373+
{
374+
taskId: "task-1",
375+
runId: "run-1",
376+
kind: "logs",
377+
newEntries: [
378+
{
379+
type: "notification",
380+
timestamp: "2026-01-01T00:00:02Z",
381+
notification: {
382+
jsonrpc: "2.0",
383+
method: "_posthog/console",
384+
params: {
385+
sessionId: "run-1",
386+
level: "info",
387+
message: "live tail",
388+
},
389+
},
390+
},
391+
],
392+
totalEntryCount: 1,
393+
},
394+
]);
395+
});
396+
325397
it("emits a retryable cloud error after repeated stream failures", async () => {
326398
vi.useFakeTimers();
327399

apps/code/src/main/services/cloud-task/service.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,16 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
654654

655655
watcher.reconnectAttempts = 0;
656656

657+
if (
658+
event.event === "keepalive" ||
659+
(typeof event.data === "object" &&
660+
event.data !== null &&
661+
"type" in event.data &&
662+
event.data.type === "keepalive")
663+
) {
664+
return;
665+
}
666+
657667
if (isTaskRunStateEvent(event.data)) {
658668
if (this.applyTaskRunState(watcher, event.data)) {
659669
if (!watcher.isBootstrapping && !isTerminalStatus(watcher.lastStatus)) {

apps/code/src/renderer/features/sessions/service/service.test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,35 @@ describe("SessionService", () => {
298298
});
299299

300300
describe("connectToTask", () => {
301+
it("skips local connection for cloud runs", async () => {
302+
const service = getSessionService();
303+
304+
await service.connectToTask({
305+
task: createMockTask({
306+
latest_run: {
307+
id: "run-123",
308+
task: "task-123",
309+
team: 123,
310+
environment: "cloud",
311+
status: "in_progress",
312+
log_url: "https://logs.example.com/run-123",
313+
error_message: null,
314+
output: null,
315+
state: {},
316+
branch: "main",
317+
created_at: "2024-01-01T00:00:00Z",
318+
updated_at: "2024-01-01T00:00:00Z",
319+
completed_at: null,
320+
},
321+
}),
322+
repoPath: "/repo",
323+
});
324+
325+
expect(mockAuth.fetchAuthState).not.toHaveBeenCalled();
326+
expect(mockTrpcAgent.reconnect.mutate).not.toHaveBeenCalled();
327+
expect(mockSessionStoreSetters.setSession).not.toHaveBeenCalled();
328+
});
329+
301330
it("skips connection if already connected", async () => {
302331
const service = getSessionService();
303332
const mockSession = createMockSession({ status: "connected" });
@@ -457,6 +486,40 @@ describe("SessionService", () => {
457486
});
458487

459488
describe("watchCloudTask", () => {
489+
it("resets a same-run preloaded session before the first cloud snapshot", () => {
490+
const service = getSessionService();
491+
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
492+
createMockSession({
493+
taskRunId: "run-123",
494+
taskId: "task-123",
495+
taskTitle: "Cloud Task",
496+
events: [{ type: "acp_message", ts: 1, message: { method: "test" } }],
497+
}),
498+
);
499+
500+
service.watchCloudTask(
501+
"task-123",
502+
"run-123",
503+
"https://app.example.com",
504+
2,
505+
);
506+
507+
expect(mockSessionStoreSetters.setSession).toHaveBeenCalledWith(
508+
expect.objectContaining({
509+
taskRunId: "run-123",
510+
taskId: "task-123",
511+
taskTitle: "Cloud Task",
512+
isCloud: true,
513+
status: "disconnected",
514+
events: [],
515+
}),
516+
);
517+
expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith(
518+
"run-123",
519+
expect.objectContaining({ isCloud: true }),
520+
);
521+
});
522+
460523
it("subscribes to cloud updates before starting the watcher", async () => {
461524
const service = getSessionService();
462525

apps/code/src/renderer/features/sessions/service/service.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,14 @@ export class SessionService {
200200
const { id: taskId, latest_run: latestRun } = task;
201201
const taskTitle = task.title || task.description || "Task";
202202

203+
if (latestRun?.environment === "cloud") {
204+
log.info("Skipping local session connect for cloud run", {
205+
taskId,
206+
taskRunId: latestRun.id,
207+
});
208+
return;
209+
}
210+
203211
try {
204212
const auth = await this.getAuthCredentials();
205213
if (!auth) {
@@ -1931,7 +1939,19 @@ export class SessionService {
19311939

19321940
// Create session in the store
19331941
const existing = sessionStoreSetters.getSessionByTaskId(taskId);
1934-
if (!existing || existing.taskRunId !== taskRunId) {
1942+
// A same-run session with history but no processedLineCount came from a
1943+
// non-cloud hydration path. Reset it so the cloud snapshot becomes the
1944+
// single source of truth instead of being appended on top.
1945+
const shouldResetExistingSession =
1946+
existing?.taskRunId === taskRunId &&
1947+
existing.events.length > 0 &&
1948+
existing.processedLineCount === undefined;
1949+
1950+
if (
1951+
!existing ||
1952+
existing.taskRunId !== taskRunId ||
1953+
shouldResetExistingSession
1954+
) {
19351955
const taskTitle = existing?.taskTitle ?? "Cloud Task";
19361956
const session = this.createBaseSession(taskRunId, taskId, taskTitle);
19371957
session.status = "disconnected";

0 commit comments

Comments
 (0)