Skip to content

Commit cd9431a

Browse files
committed
fix: recover stale external db requests
Failures between claiming and the deletion of outgoing requests from the handler can leave requests stale and never clean them up. Some of these requests may also have duplicates that are fresh in the outgoing queue. These requests need to be deleted or retried.
1 parent e0c1cc5 commit cd9431a

2 files changed

Lines changed: 156 additions & 20 deletions

File tree

apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { OutgoingRequest } from "@/generated/prisma/client";
22
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
3+
import { recoverStaleOutgoingRequests, type RecoverStaleResult } from "@/lib/external-db-sync-queue";
34
import { upstash } from "@/lib/upstash";
45
import { globalPrismaClient, retryTransaction } from "@/prisma-client";
56
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
@@ -85,13 +86,11 @@ export const GET = createSmartRouteHandler({
8586
const startTime = performance.now();
8687
const maxDurationMs = parseMaxDurationMs(query.maxDurationMs);
8788
const pollIntervalMs = 50;
88-
const staleClaimIntervalMinutes = 5;
8989
const pollerClaimLimit = getPollerClaimLimit();
9090

9191
span.setAttribute("stack.external-db-sync.max-duration-ms", maxDurationMs);
9292
span.setAttribute("stack.external-db-sync.poll-interval-ms", pollIntervalMs);
9393
span.setAttribute("stack.external-db-sync.poller-claim-limit", pollerClaimLimit);
94-
span.setAttribute("stack.external-db-sync.stale-claim-minutes", staleClaimIntervalMinutes);
9594

9695
let totalRequestsProcessed = 0;
9796
let iterationCount = 0;
@@ -115,6 +114,55 @@ export const GET = createSmartRouteHandler({
115114
});
116115
}
117116

117+
async function handleStaleRequests(): Promise<RecoverStaleResult> {
118+
return await traceSpan("external-db-sync.poller.handleStaleRequests", async (staleSpan) => {
119+
// Recovery is best-effort: any failure here must not abort the rest of the poller iteration,
120+
// because the loop still owns processing the pending queue.
121+
let result: RecoverStaleResult;
122+
try {
123+
result = await recoverStaleOutgoingRequests(STALE_REQUEST_THRESHOLD_MS);
124+
} catch (error) {
125+
staleSpan.setAttribute("stack.external-db-sync.stale-recovery-error", true);
126+
captureError("poller-stale-recovery-error", error);
127+
return { resetIds: [], deletedIds: [] };
128+
}
129+
const { resetIds, deletedIds } = result;
130+
const total = resetIds.length + deletedIds.length;
131+
132+
staleSpan.setAttribute("stack.external-db-sync.stale-reset-count", resetIds.length);
133+
staleSpan.setAttribute("stack.external-db-sync.stale-deleted-count", deletedIds.length);
134+
135+
if (total > 0) {
136+
const ID_SAMPLE_LIMIT = 10;
137+
captureError(
138+
"poller-stale-outgoing-requests",
139+
new StackAssertionError(
140+
[
141+
`Recovered ${total} stale outgoing request(s) (reset=${resetIds.length}, deleted=${deletedIds.length}) older than ${STALE_REQUEST_THRESHOLD_MS}ms.`,
142+
`Stale rows are claims that never got cleared after publishing — the most likely cause is a poller lambda dying between the UPDATE that set startedFulfillingAt and the DELETE that should have removed the row.`,
143+
`Recovery deletes the stale row if any active sibling (pending OR fresh-in-flight) already represents the work; among multiple stale rows for the same deduplicationKey it resets the oldest and deletes the rest; otherwise it resets startedFulfillingAt to NULL so the row can be re-claimed.`,
144+
`If this fires repeatedly, look for nearby unhandled-promise-rejection events (which trigger process.exit(1) via the polyfill) or function-timeout signals on the external-db-sync poller route.`,
145+
].join(" "),
146+
{
147+
totalRecovered: total,
148+
staleResetCount: resetIds.length,
149+
staleDeletedCount: deletedIds.length,
150+
staleResetIdsSample: resetIds.slice(0, ID_SAMPLE_LIMIT),
151+
staleDeletedIdsSample: deletedIds.slice(0, ID_SAMPLE_LIMIT),
152+
staleResetIdsSampleNote: resetIds.length > ID_SAMPLE_LIMIT
153+
? `Showing first ${ID_SAMPLE_LIMIT} of ${resetIds.length} reset ids; remainder omitted to bound payload size.`
154+
: `All ${resetIds.length} reset ids included.`,
155+
staleDeletedIdsSampleNote: deletedIds.length > ID_SAMPLE_LIMIT
156+
? `Showing first ${ID_SAMPLE_LIMIT} of ${deletedIds.length} deleted ids; remainder omitted to bound payload size.`
157+
: `All ${deletedIds.length} deleted ids included.`,
158+
},
159+
),
160+
);
161+
}
162+
return { resetIds, deletedIds };
163+
});
164+
}
165+
118166
async function deleteOutgoingRequest(id: string): Promise<void> {
119167
await retryTransaction(globalPrismaClient, async (tx) => {
120168
await tx.outgoingRequest.delete({ where: { id } });
@@ -205,6 +253,9 @@ export const GET = createSmartRouteHandler({
205253
}
206254

207255
if (requests.length === 0) {
256+
// Performance optimization: skip the upstash batch call when the
257+
// caller passed no claimed rows (i.e. claimPendingRequests returned
258+
// an empty array).
208259
processSpan.setAttribute("stack.external-db-sync.processed-count", 0);
209260
return 0;
210261
}
@@ -243,23 +294,9 @@ export const GET = createSmartRouteHandler({
243294
return { stopReason: "disabled", processed: 0 };
244295
}
245296

246-
const staleRequests = await globalPrismaClient.$queryRaw<{ id: string, startedFulfillingAt: Date }[]>`
247-
SELECT "id", "startedFulfillingAt"
248-
FROM "OutgoingRequest"
249-
WHERE "startedFulfillingAt" IS NOT NULL
250-
AND "startedFulfillingAt" < NOW() - ${STALE_REQUEST_THRESHOLD_MS} * INTERVAL '1 millisecond'
251-
LIMIT 10
252-
`;
253-
iterationSpan.setAttribute("stack.external-db-sync.stale-count", staleRequests.length);
254-
if (staleRequests.length > 0) {
255-
captureError(
256-
"poller-stale-outgoing-requests",
257-
new StackAssertionError(
258-
`Found ${staleRequests.length} outgoing request(s) with startedFulfillingAt older than ${STALE_REQUEST_THRESHOLD_MS}ms`,
259-
{ staleRequestIds: staleRequests.map(r => r.id) },
260-
),
261-
);
262-
}
297+
const stale = await handleStaleRequests();
298+
iterationSpan.setAttribute("stack.external-db-sync.stale-reset-count", stale.resetIds.length);
299+
iterationSpan.setAttribute("stack.external-db-sync.stale-deleted-count", stale.deletedIds.length);
263300

264301
const pendingRequests = await claimPendingRequests();
265302
iterationSpan.setAttribute("stack.external-db-sync.pending-count", pendingRequests.length);

apps/backend/src/lib/external-db-sync-queue.ts

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { globalPrismaClient } from "@/prisma-client";
1+
import { globalPrismaClient, retryTransaction } from "@/prisma-client";
22
import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors";
33

44
const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
@@ -41,3 +41,102 @@ export async function enqueueExternalDbSyncBatch(tenancyIds: string[]): Promise<
4141
ON CONFLICT ("deduplicationKey") WHERE "startedFulfillingAt" IS NULL DO NOTHING
4242
`;
4343
}
44+
45+
export type RecoverStaleResult = { resetIds: string[], deletedIds: string[] };
46+
47+
// Recovers OutgoingRequest rows that were claimed (startedFulfillingAt set)
48+
// but never deleted — typically because the poller died mid-iteration. We
49+
// can't naively reset every stale row because the partial unique index
50+
// `OutgoingRequest_deduplicationKey_pending_key` would reject any reset that
51+
// produces a duplicate among rows where startedFulfillingAt IS NULL.
52+
//
53+
// Per stale row:
54+
// - dedup key is NULL -> reset (NULLs don't enter the index)
55+
// - any active sibling exists for the -> delete (the active sibling already
56+
// same key (pending OR fresh-in-flight) represents the work; resetting would
57+
// create concurrent duplicate work)
58+
// - shares key with other stale rows -> reset the oldest, delete the rest
59+
// - otherwise -> reset
60+
//
61+
// "Active sibling" includes both pending rows and rows currently being processed
62+
// by another poller invocation — we don't want recovery to spawn a parallel
63+
// sync alongside an already-in-flight one for the same tenancy.
64+
//
65+
// Concurrency notes (READ COMMITTED):
66+
// - Mutation CTEs repeat the staleness predicate so EvalPlanQual skips rows
67+
// another transaction reset/deleted/re-claimed during the lock wait —
68+
// otherwise we could clobber a freshly-claimed row back to pending.
69+
// - A concurrent sequencer INSERT for the same key between our EXISTS check
70+
// and UPDATE raises P2010 (SQLSTATE 23505). The poller call site catches
71+
// it so the rest of the iteration keeps processing; the next cron tick
72+
// re-runs recovery on a fresh snapshot.
73+
export async function recoverStaleOutgoingRequests(staleThresholdMs: number): Promise<RecoverStaleResult> {
74+
type Row = { action: "reset" | "delete", id: string };
75+
const rows = await retryTransaction(globalPrismaClient, async (tx) => {
76+
return await tx.$queryRaw<Row[]>`
77+
WITH stale AS (
78+
SELECT
79+
o."id",
80+
CASE
81+
WHEN o."deduplicationKey" IS NULL THEN 'reset'::text
82+
WHEN EXISTS (
83+
SELECT 1 FROM "OutgoingRequest" p
84+
WHERE p."deduplicationKey" = o."deduplicationKey"
85+
AND (
86+
p."startedFulfillingAt" IS NULL
87+
OR p."startedFulfillingAt" >= NOW() - ${staleThresholdMs} * INTERVAL '1 millisecond'
88+
)
89+
) THEN 'delete'::text
90+
WHEN ROW_NUMBER() OVER (
91+
PARTITION BY o."deduplicationKey"
92+
ORDER BY o."createdAt" ASC, o."id" ASC
93+
) = 1 THEN 'reset'::text
94+
ELSE 'delete'::text
95+
END AS action
96+
FROM "OutgoingRequest" o
97+
WHERE o."startedFulfillingAt" IS NOT NULL
98+
AND o."startedFulfillingAt" < NOW() - ${staleThresholdMs} * INTERVAL '1 millisecond'
99+
-- Drain oldest first; LIMIT caps each call so a backlog can't blow
100+
-- up one transaction. Subsequent poll iterations mop up the rest.
101+
ORDER BY o."startedFulfillingAt" ASC, o."id" ASC
102+
LIMIT 100
103+
),
104+
-- Both mutation CTEs repeat the staleness predicate so that under
105+
-- READ COMMITTED, EvalPlanQual re-evaluates against the latest row
106+
-- version after any lock wait and skips rows that are no longer stale
107+
-- (because a concurrent recovery reset/deleted them or a poller
108+
-- re-claimed the row in the gap).
109+
deleted AS (
110+
DELETE FROM "OutgoingRequest" o
111+
USING stale s
112+
WHERE o."id" = s."id"
113+
AND s.action = 'delete'
114+
AND o."startedFulfillingAt" IS NOT NULL
115+
AND o."startedFulfillingAt" < NOW() - ${staleThresholdMs} * INTERVAL '1 millisecond'
116+
RETURNING o."id"
117+
),
118+
reset AS (
119+
UPDATE "OutgoingRequest" o
120+
SET "startedFulfillingAt" = NULL
121+
FROM stale s
122+
WHERE o."id" = s."id"
123+
AND s.action = 'reset'
124+
AND o."startedFulfillingAt" IS NOT NULL
125+
AND o."startedFulfillingAt" < NOW() - ${staleThresholdMs} * INTERVAL '1 millisecond'
126+
RETURNING o."id"
127+
)
128+
-- Read from the mutation CTEs (not from the planning CTE) so the counts
129+
-- reflect rows that actually changed. Under concurrent recovery this
130+
-- matters: a row that was deleted/reset by another transaction between
131+
-- snapshot and execution would still appear in the planning CTE but
132+
-- not in the mutation CTEs.
133+
SELECT 'reset'::text AS action, "id" FROM reset
134+
UNION ALL
135+
SELECT 'delete'::text AS action, "id" FROM deleted
136+
`;
137+
});
138+
return {
139+
resetIds: rows.filter(r => r.action === "reset").map(r => r.id),
140+
deletedIds: rows.filter(r => r.action === "delete").map(r => r.id),
141+
};
142+
}

0 commit comments

Comments
 (0)