diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts index 688b9d25c1..831e93e42b 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts @@ -1,5 +1,6 @@ import type { OutgoingRequest } from "@/generated/prisma/client"; import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata"; +import { recoverStaleOutgoingRequests, type RecoverStaleResult } from "@/lib/external-db-sync-queue"; import { upstash } from "@/lib/upstash"; import { globalPrismaClient, retryTransaction } from "@/prisma-client"; import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; @@ -85,13 +86,11 @@ export const GET = createSmartRouteHandler({ const startTime = performance.now(); const maxDurationMs = parseMaxDurationMs(query.maxDurationMs); const pollIntervalMs = 50; - const staleClaimIntervalMinutes = 5; const pollerClaimLimit = getPollerClaimLimit(); span.setAttribute("stack.external-db-sync.max-duration-ms", maxDurationMs); span.setAttribute("stack.external-db-sync.poll-interval-ms", pollIntervalMs); span.setAttribute("stack.external-db-sync.poller-claim-limit", pollerClaimLimit); - span.setAttribute("stack.external-db-sync.stale-claim-minutes", staleClaimIntervalMinutes); let totalRequestsProcessed = 0; let iterationCount = 0; @@ -115,6 +114,55 @@ export const GET = createSmartRouteHandler({ }); } + async function handleStaleRequests(): Promise { + return await traceSpan("external-db-sync.poller.handleStaleRequests", async (staleSpan) => { + // Recovery is best-effort: any failure here must not abort the rest of the poller iteration, + // because the loop still owns processing the pending queue. + let result: RecoverStaleResult; + try { + result = await recoverStaleOutgoingRequests(STALE_REQUEST_THRESHOLD_MS); + } catch (error) { + staleSpan.setAttribute("stack.external-db-sync.stale-recovery-error", true); + captureError("poller-stale-recovery-error", error); + return { resetIds: [], deletedIds: [] }; + } + const { resetIds, deletedIds } = result; + const total = resetIds.length + deletedIds.length; + + staleSpan.setAttribute("stack.external-db-sync.stale-reset-count", resetIds.length); + staleSpan.setAttribute("stack.external-db-sync.stale-deleted-count", deletedIds.length); + + if (total > 0) { + const ID_SAMPLE_LIMIT = 10; + captureError( + "poller-stale-outgoing-requests", + new StackAssertionError( + [ + `Recovered ${total} stale outgoing request(s) (reset=${resetIds.length}, deleted=${deletedIds.length}) older than ${STALE_REQUEST_THRESHOLD_MS}ms.`, + `Stale rows are claims that never got cleared after publishing — the most likely cause is a poller lambda dying between the UPDATE that set startedFulfillingAt and the DELETE that should have removed the row.`, + `Recovery deletes the stale row if any active sibling (pending OR fresh-in-flight) already represents the work; among multiple stale rows for the same deduplicationKey it resets the oldest and deletes the rest; otherwise it resets startedFulfillingAt to NULL so the row can be re-claimed.`, + `If this fires repeatedly, look for nearby unhandled-promise-rejection events (which trigger process.exit(1) via the polyfill) or function-timeout signals on the external-db-sync poller route.`, + ].join(" "), + { + totalRecovered: total, + staleResetCount: resetIds.length, + staleDeletedCount: deletedIds.length, + staleResetIdsSample: resetIds.slice(0, ID_SAMPLE_LIMIT), + staleDeletedIdsSample: deletedIds.slice(0, ID_SAMPLE_LIMIT), + staleResetIdsSampleNote: resetIds.length > ID_SAMPLE_LIMIT + ? `Showing first ${ID_SAMPLE_LIMIT} of ${resetIds.length} reset ids; remainder omitted to bound payload size.` + : `All ${resetIds.length} reset ids included.`, + staleDeletedIdsSampleNote: deletedIds.length > ID_SAMPLE_LIMIT + ? `Showing first ${ID_SAMPLE_LIMIT} of ${deletedIds.length} deleted ids; remainder omitted to bound payload size.` + : `All ${deletedIds.length} deleted ids included.`, + }, + ), + ); + } + return { resetIds, deletedIds }; + }); + } + async function deleteOutgoingRequest(id: string): Promise { await retryTransaction(globalPrismaClient, async (tx) => { await tx.outgoingRequest.delete({ where: { id } }); @@ -205,6 +253,9 @@ export const GET = createSmartRouteHandler({ } if (requests.length === 0) { + // Performance optimization: skip the upstash batch call when the + // caller passed no claimed rows (i.e. claimPendingRequests returned + // an empty array). processSpan.setAttribute("stack.external-db-sync.processed-count", 0); return 0; } @@ -243,23 +294,9 @@ export const GET = createSmartRouteHandler({ return { stopReason: "disabled", processed: 0 }; } - const staleRequests = await globalPrismaClient.$queryRaw<{ id: string, startedFulfillingAt: Date }[]>` - SELECT "id", "startedFulfillingAt" - FROM "OutgoingRequest" - WHERE "startedFulfillingAt" IS NOT NULL - AND "startedFulfillingAt" < NOW() - ${STALE_REQUEST_THRESHOLD_MS} * INTERVAL '1 millisecond' - LIMIT 10 - `; - iterationSpan.setAttribute("stack.external-db-sync.stale-count", staleRequests.length); - if (staleRequests.length > 0) { - captureError( - "poller-stale-outgoing-requests", - new StackAssertionError( - `Found ${staleRequests.length} outgoing request(s) with startedFulfillingAt older than ${STALE_REQUEST_THRESHOLD_MS}ms`, - { staleRequestIds: staleRequests.map(r => r.id) }, - ), - ); - } + const stale = await handleStaleRequests(); + iterationSpan.setAttribute("stack.external-db-sync.stale-reset-count", stale.resetIds.length); + iterationSpan.setAttribute("stack.external-db-sync.stale-deleted-count", stale.deletedIds.length); const pendingRequests = await claimPendingRequests(); iterationSpan.setAttribute("stack.external-db-sync.pending-count", pendingRequests.length); diff --git a/apps/backend/src/lib/external-db-sync-queue.ts b/apps/backend/src/lib/external-db-sync-queue.ts index 93593acedf..0e7fcc51b8 100644 --- a/apps/backend/src/lib/external-db-sync-queue.ts +++ b/apps/backend/src/lib/external-db-sync-queue.ts @@ -41,3 +41,100 @@ export async function enqueueExternalDbSyncBatch(tenancyIds: string[]): Promise< ON CONFLICT ("deduplicationKey") WHERE "startedFulfillingAt" IS NULL DO NOTHING `; } + +export type RecoverStaleResult = { resetIds: string[], deletedIds: string[] }; + +// Recovers OutgoingRequest rows that were claimed (startedFulfillingAt set) +// but never deleted — typically because the poller died mid-iteration. We +// can't naively reset every stale row because the partial unique index +// `OutgoingRequest_deduplicationKey_pending_key` would reject any reset that +// produces a duplicate among rows where startedFulfillingAt IS NULL. +// +// Per stale row: +// - dedup key is NULL -> reset (NULLs don't enter the index) +// - any active sibling exists for the -> delete (the active sibling already +// same key (pending OR fresh-in-flight) represents the work; resetting would +// create concurrent duplicate work) +// - shares key with other stale rows -> reset the oldest, delete the rest +// - otherwise -> reset +// +// "Active sibling" includes both pending rows and rows currently being processed +// by another poller invocation — we don't want recovery to spawn a parallel +// sync alongside an already-in-flight one for the same tenancy. +// +// Concurrency notes (READ COMMITTED): +// - Mutation CTEs repeat the staleness predicate so EvalPlanQual skips rows +// another transaction reset/deleted/re-claimed during the lock wait — +// otherwise we could clobber a freshly-claimed row back to pending. +// - A concurrent sequencer INSERT for the same key between our EXISTS check +// and UPDATE raises P2010 (SQLSTATE 23505). The poller call site catches +// it so the rest of the iteration keeps processing; the next cron tick +// re-runs recovery on a fresh snapshot. +export async function recoverStaleOutgoingRequests(staleThresholdMs: number): Promise { + type Row = { action: "reset" | "delete", id: string }; + const rows = await globalPrismaClient.$queryRaw` + WITH stale AS ( + SELECT + o."id", + CASE + WHEN o."deduplicationKey" IS NULL THEN 'reset'::text + WHEN EXISTS ( + SELECT 1 FROM "OutgoingRequest" p + WHERE p."deduplicationKey" = o."deduplicationKey" + AND ( + p."startedFulfillingAt" IS NULL + OR p."startedFulfillingAt" >= NOW() - ${staleThresholdMs} * INTERVAL '1 millisecond' + ) + ) THEN 'delete'::text + WHEN ROW_NUMBER() OVER ( + PARTITION BY o."deduplicationKey" + ORDER BY o."createdAt" ASC, o."id" ASC + ) = 1 THEN 'reset'::text + ELSE 'delete'::text + END AS action + FROM "OutgoingRequest" o + WHERE o."startedFulfillingAt" IS NOT NULL + AND o."startedFulfillingAt" < NOW() - ${staleThresholdMs} * INTERVAL '1 millisecond' + -- Drain oldest first; LIMIT caps each call so a backlog can't blow + -- up one transaction. Subsequent poll iterations mop up the rest. + ORDER BY o."startedFulfillingAt" ASC, o."id" ASC + LIMIT 100 + ), + -- Both mutation CTEs repeat the staleness predicate so that under + -- READ COMMITTED, EvalPlanQual re-evaluates against the latest row + -- version after any lock wait and skips rows that are no longer stale + -- (because a concurrent recovery reset/deleted them or a poller + -- re-claimed the row in the gap). + deleted AS ( + DELETE FROM "OutgoingRequest" o + USING stale s + WHERE o."id" = s."id" + AND s.action = 'delete' + AND o."startedFulfillingAt" IS NOT NULL + AND o."startedFulfillingAt" < NOW() - ${staleThresholdMs} * INTERVAL '1 millisecond' + RETURNING o."id" + ), + reset AS ( + UPDATE "OutgoingRequest" o + SET "startedFulfillingAt" = NULL + FROM stale s + WHERE o."id" = s."id" + AND s.action = 'reset' + AND o."startedFulfillingAt" IS NOT NULL + AND o."startedFulfillingAt" < NOW() - ${staleThresholdMs} * INTERVAL '1 millisecond' + RETURNING o."id" + ) + -- Read from the mutation CTEs (not from the planning CTE) so the counts + -- reflect rows that actually changed. Under concurrent recovery this + -- matters: a row that was deleted/reset by another transaction between + -- snapshot and execution would still appear in the planning CTE but + -- not in the mutation CTEs. + SELECT 'reset'::text AS action, "id" FROM reset + UNION ALL + SELECT 'delete'::text AS action, "id" FROM deleted + `; + return { + resetIds: rows.filter(r => r.action === "reset").map(r => r.id), + deletedIds: rows.filter(r => r.action === "delete").map(r => r.id), + }; +}