Skip to content

Commit 255a197

Browse files
committed
fix: recover stale external db requests
Failures between claiming and the deletion of outgoing requests from the handler can leave requests stale and never clean them up. Some of these requests may also have duplicates that are fresh in the outgoing queue. These requests need to be deleted or retried.
1 parent e0c1cc5 commit 255a197

2 files changed

Lines changed: 132 additions & 20 deletions

File tree

apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { OutgoingRequest } from "@/generated/prisma/client";
22
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
3+
import { recoverStaleOutgoingRequests, type RecoverStaleResult } from "@/lib/external-db-sync-queue";
34
import { upstash } from "@/lib/upstash";
45
import { globalPrismaClient, retryTransaction } from "@/prisma-client";
56
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
@@ -85,13 +86,11 @@ export const GET = createSmartRouteHandler({
8586
const startTime = performance.now();
8687
const maxDurationMs = parseMaxDurationMs(query.maxDurationMs);
8788
const pollIntervalMs = 50;
88-
const staleClaimIntervalMinutes = 5;
8989
const pollerClaimLimit = getPollerClaimLimit();
9090

9191
span.setAttribute("stack.external-db-sync.max-duration-ms", maxDurationMs);
9292
span.setAttribute("stack.external-db-sync.poll-interval-ms", pollIntervalMs);
9393
span.setAttribute("stack.external-db-sync.poller-claim-limit", pollerClaimLimit);
94-
span.setAttribute("stack.external-db-sync.stale-claim-minutes", staleClaimIntervalMinutes);
9594

9695
let totalRequestsProcessed = 0;
9796
let iterationCount = 0;
@@ -115,6 +114,45 @@ export const GET = createSmartRouteHandler({
115114
});
116115
}
117116

117+
async function handleStaleRequests(): Promise<RecoverStaleResult> {
118+
return await traceSpan("external-db-sync.poller.handleStaleRequests", async (staleSpan) => {
119+
const { resetIds, deletedIds } = await recoverStaleOutgoingRequests(STALE_REQUEST_THRESHOLD_MS);
120+
const total = resetIds.length + deletedIds.length;
121+
122+
staleSpan.setAttribute("stack.external-db-sync.stale-reset-count", resetIds.length);
123+
staleSpan.setAttribute("stack.external-db-sync.stale-deleted-count", deletedIds.length);
124+
125+
if (total > 0) {
126+
const ID_SAMPLE_LIMIT = 10;
127+
captureError(
128+
"poller-stale-outgoing-requests",
129+
new StackAssertionError(
130+
[
131+
`Recovered ${total} stale outgoing request(s) (reset=${resetIds.length}, deleted=${deletedIds.length}) older than ${STALE_REQUEST_THRESHOLD_MS}ms.`,
132+
`Stale rows are claims that never got cleared after publishing — the most likely cause is a poller lambda dying between the UPDATE that set startedFulfillingAt and the DELETE that should have removed the row.`,
133+
`Recovery deletes the stale row if any active sibling (pending OR fresh-in-flight) already represents the work; among multiple stale rows for the same deduplicationKey it resets the oldest and deletes the rest; otherwise it resets startedFulfillingAt to NULL so the row can be re-claimed.`,
134+
`If this fires repeatedly, look for nearby unhandled-promise-rejection events (which trigger process.exit(1) via the polyfill) or function-timeout signals on the external-db-sync poller route.`,
135+
].join(" "),
136+
{
137+
totalRecovered: total,
138+
staleResetCount: resetIds.length,
139+
staleDeletedCount: deletedIds.length,
140+
staleResetIdsSample: resetIds.slice(0, ID_SAMPLE_LIMIT),
141+
staleDeletedIdsSample: deletedIds.slice(0, ID_SAMPLE_LIMIT),
142+
staleResetIdsSampleNote: resetIds.length > ID_SAMPLE_LIMIT
143+
? `Showing first ${ID_SAMPLE_LIMIT} of ${resetIds.length} reset ids; remainder omitted to bound payload size.`
144+
: `All ${resetIds.length} reset ids included.`,
145+
staleDeletedIdsSampleNote: deletedIds.length > ID_SAMPLE_LIMIT
146+
? `Showing first ${ID_SAMPLE_LIMIT} of ${deletedIds.length} deleted ids; remainder omitted to bound payload size.`
147+
: `All ${deletedIds.length} deleted ids included.`,
148+
},
149+
),
150+
);
151+
}
152+
return { resetIds, deletedIds };
153+
});
154+
}
155+
118156
async function deleteOutgoingRequest(id: string): Promise<void> {
119157
await retryTransaction(globalPrismaClient, async (tx) => {
120158
await tx.outgoingRequest.delete({ where: { id } });
@@ -205,6 +243,9 @@ export const GET = createSmartRouteHandler({
205243
}
206244

207245
if (requests.length === 0) {
246+
// Performance optimization: skip the upstash batch call when the
247+
// caller passed no claimed rows (i.e. claimPendingRequests returned
248+
// an empty array).
208249
processSpan.setAttribute("stack.external-db-sync.processed-count", 0);
209250
return 0;
210251
}
@@ -243,23 +284,9 @@ export const GET = createSmartRouteHandler({
243284
return { stopReason: "disabled", processed: 0 };
244285
}
245286

246-
const staleRequests = await globalPrismaClient.$queryRaw<{ id: string, startedFulfillingAt: Date }[]>`
247-
SELECT "id", "startedFulfillingAt"
248-
FROM "OutgoingRequest"
249-
WHERE "startedFulfillingAt" IS NOT NULL
250-
AND "startedFulfillingAt" < NOW() - ${STALE_REQUEST_THRESHOLD_MS} * INTERVAL '1 millisecond'
251-
LIMIT 10
252-
`;
253-
iterationSpan.setAttribute("stack.external-db-sync.stale-count", staleRequests.length);
254-
if (staleRequests.length > 0) {
255-
captureError(
256-
"poller-stale-outgoing-requests",
257-
new StackAssertionError(
258-
`Found ${staleRequests.length} outgoing request(s) with startedFulfillingAt older than ${STALE_REQUEST_THRESHOLD_MS}ms`,
259-
{ staleRequestIds: staleRequests.map(r => r.id) },
260-
),
261-
);
262-
}
287+
const stale = await handleStaleRequests();
288+
iterationSpan.setAttribute("stack.external-db-sync.stale-reset-count", stale.resetIds.length);
289+
iterationSpan.setAttribute("stack.external-db-sync.stale-deleted-count", stale.deletedIds.length);
263290

264291
const pendingRequests = await claimPendingRequests();
265292
iterationSpan.setAttribute("stack.external-db-sync.pending-count", pendingRequests.length);

apps/backend/src/lib/external-db-sync-queue.ts

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { globalPrismaClient } from "@/prisma-client";
1+
import { globalPrismaClient, retryTransaction } from "@/prisma-client";
22
import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors";
33

44
const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
@@ -41,3 +41,88 @@ export async function enqueueExternalDbSyncBatch(tenancyIds: string[]): Promise<
4141
ON CONFLICT ("deduplicationKey") WHERE "startedFulfillingAt" IS NULL DO NOTHING
4242
`;
4343
}
44+
45+
export type RecoverStaleResult = { resetIds: string[], deletedIds: string[] };
46+
47+
// Recovers OutgoingRequest rows that were claimed (startedFulfillingAt set)
48+
// but never deleted — typically because the poller died mid-iteration. We
49+
// can't naively reset every stale row because the partial unique index
50+
// `OutgoingRequest_deduplicationKey_pending_key` would reject any reset that
51+
// produces a duplicate among rows where startedFulfillingAt IS NULL.
52+
//
53+
// Per stale row:
54+
// - dedup key is NULL -> reset (NULLs don't enter the index)
55+
// - any active sibling exists for the -> delete (the active sibling already
56+
// same key (pending OR fresh-in-flight) represents the work; resetting would
57+
// create concurrent duplicate work)
58+
// - shares key with other stale rows -> reset the oldest, delete the rest
59+
// - otherwise -> reset
60+
//
61+
// "Active sibling" includes both pending rows and rows currently being processed
62+
// by another poller invocation — we don't want recovery to spawn a parallel
63+
// sync alongside an already-in-flight one for the same tenancy.
64+
//
65+
// Note on rare partial-unique races: the sequencer can INSERT a fresh pending
66+
// row for the same key between our EXISTS check and our UPDATE. That manifests
67+
// as a P2010 raw-query error (SQLSTATE 23505). We deliberately don't catch it
68+
// here — the next cron tick re-runs recovery on a fresh snapshot. If it ever
69+
// becomes a sustained problem in production, we'll add targeted retry then.
70+
export async function recoverStaleOutgoingRequests(staleThresholdMs: number): Promise<RecoverStaleResult> {
71+
type Row = { action: "reset" | "delete", id: string };
72+
const rows = await retryTransaction(globalPrismaClient, async (tx) => {
73+
return await tx.$queryRaw<Row[]>`
74+
WITH stale AS (
75+
SELECT
76+
o."id",
77+
CASE
78+
WHEN o."deduplicationKey" IS NULL THEN 'reset'::text
79+
WHEN EXISTS (
80+
SELECT 1 FROM "OutgoingRequest" p
81+
WHERE p."deduplicationKey" = o."deduplicationKey"
82+
AND (
83+
p."startedFulfillingAt" IS NULL
84+
OR p."startedFulfillingAt" >= NOW() - ${staleThresholdMs} * INTERVAL '1 millisecond'
85+
)
86+
) THEN 'delete'::text
87+
WHEN ROW_NUMBER() OVER (
88+
PARTITION BY o."deduplicationKey"
89+
ORDER BY o."createdAt" ASC, o."id" ASC
90+
) = 1 THEN 'reset'::text
91+
ELSE 'delete'::text
92+
END AS action
93+
FROM "OutgoingRequest" o
94+
WHERE o."startedFulfillingAt" IS NOT NULL
95+
AND o."startedFulfillingAt" < NOW() - ${staleThresholdMs} * INTERVAL '1 millisecond'
96+
-- Drain oldest first; LIMIT caps each call so a backlog can't blow
97+
-- up one transaction. Subsequent poll iterations mop up the rest.
98+
ORDER BY o."startedFulfillingAt" ASC, o."id" ASC
99+
LIMIT 100
100+
),
101+
deleted AS (
102+
DELETE FROM "OutgoingRequest" o
103+
USING stale s
104+
WHERE o."id" = s."id" AND s.action = 'delete'
105+
RETURNING o."id"
106+
),
107+
reset AS (
108+
UPDATE "OutgoingRequest" o
109+
SET "startedFulfillingAt" = NULL
110+
FROM stale s
111+
WHERE o."id" = s."id" AND s.action = 'reset'
112+
RETURNING o."id"
113+
)
114+
-- Read from the mutation CTEs (not from the planning CTE) so the counts
115+
-- reflect rows that actually changed. Under concurrent recovery this
116+
-- matters: a row that was deleted/reset by another transaction between
117+
-- snapshot and execution would still appear in the planning CTE but
118+
-- not in the mutation CTEs.
119+
SELECT 'reset'::text AS action, "id" FROM reset
120+
UNION ALL
121+
SELECT 'delete'::text AS action, "id" FROM deleted
122+
`;
123+
});
124+
return {
125+
resetIds: rows.filter(r => r.action === "reset").map(r => r.id),
126+
deletedIds: rows.filter(r => r.action === "delete").map(r => r.id),
127+
};
128+
}

0 commit comments

Comments
 (0)