From 6f2cf4cd80995d783fab87d27745fa5159700290 Mon Sep 17 00:00:00 2001 From: nams1570 Date: Fri, 17 Apr 2026 23:20:24 -0700 Subject: [PATCH 1/6] fix: sub-end now expires all expirable items Before we only expired the when-purchase-expires items, and left it to the item-grant-repeat events to expire the rest. But, when a subscription ends all of them should be expired immediately. --- .../schema/__tests__/integration-1-3.test.ts | 146 ++++++++++++++++++ .../payments/schema/__tests__/phase-1.test.ts | 101 ++++++++++++ .../phase-1/subscription-timefold-algo.ts | 19 ++- 3 files changed, 264 insertions(+), 2 deletions(-) diff --git a/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts index 5535480154..90214682b3 100644 --- a/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts +++ b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts @@ -893,6 +893,152 @@ describe.sequential("payments schema integration phase 1→3 (real postgres)", ( }); + // ============================================================ + // when-repeated grants must expire at subscription-end + // (regression: they were previously left stacked in the ledger) + // ============================================================ + + describe("when-repeated grants expire at subscription-end", () => { + const DAY_MS = 86400000; + + beforeAll(async () => { + await runStatements(schema.subscriptions.setRow("sub-repeat-end", jsonbExpr({ + id: "sub-repeat-end", + tenancyId: "t1", + customerId: "u-repeat-end", + customerType: "user", + productId: "prod-repeat-end", + priceId: "p1", + product: { + displayName: "Repeat End Plan", + customerType: "user", + productLineId: "line-repeat-end", + prices: { p1: { USD: "10" } }, + includedItems: { + quota: { quantity: 100, repeat: [7, "day"], expires: "when-repeated" }, + permanent: { quantity: 25, expires: "never" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "canceled", + currentPeriodStartMillis: 0, + currentPeriodEndMillis: MONTH_MS, + cancelAtPeriodEnd: true, + canceledAtMillis: 2 * DAY_MS, + endedAtMillis: 5 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 0, + }))); + }); + + it("should drop when-repeated item balance to 0 after subscription-end", async () => { + const rows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-repeat-end") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + + const latest = rows[rows.length - 1]; + expect(latest.itemQuantities.quota).toBe(0); + // Permanent grants must not be touched. + expect(latest.itemQuantities.permanent).toBe(25); + }); + + it("should revoke owned product at subscription-end", async () => { + const rows = (await getRowDatas(schema.ownedProducts)) + .filter((r: any) => r.customerId === "u-repeat-end") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + + const afterEnd = rows.find((r: any) => r.txnId === "sub-end:sub-repeat-end"); + expect(afterEnd).toBeDefined(); + expect(afterEnd.ownedProducts["prod-repeat-end"].quantity).toBe(0); + }); + }); + + + // ============================================================ + // Upgrade stacking regression: free → team mid-period must not + // leave the outgoing sub's monthly allowance stacked on top of + // the incoming sub's allowance. + // ============================================================ + + describe("mid-period upgrade does not stack when-repeated balances", () => { + const DAY_MS = 86400000; + + beforeAll(async () => { + await runStatements(schema.subscriptions.setRow("sub-upgrade-free", jsonbExpr({ + id: "sub-upgrade-free", + tenancyId: "t1", + customerId: "u-upgrade", + customerType: "user", + productId: "prod-upgrade-free", + priceId: "p-free", + product: { + displayName: "Free", + customerType: "user", + productLineId: "line-upgrade", + prices: { "p-free": { USD: "0" } }, + includedItems: { + emails: { quantity: 100, repeat: [1, "month"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "canceled", + currentPeriodStartMillis: 0, + currentPeriodEndMillis: MONTH_MS, + cancelAtPeriodEnd: false, + canceledAtMillis: 10 * DAY_MS, + endedAtMillis: 10 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 0, + }))); + + await runStatements(schema.subscriptions.setRow("sub-upgrade-team", jsonbExpr({ + id: "sub-upgrade-team", + tenancyId: "t1", + customerId: "u-upgrade", + customerType: "user", + productId: "prod-upgrade-team", + priceId: "p-team", + product: { + displayName: "Team", + customerType: "user", + productLineId: "line-upgrade", + prices: { "p-team": { USD: "30" } }, + includedItems: { + emails: { quantity: 500, repeat: [1, "month"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "canceled", + currentPeriodStartMillis: 11 * DAY_MS, + currentPeriodEndMillis: 11 * DAY_MS + MONTH_MS, + cancelAtPeriodEnd: true, + canceledAtMillis: 20 * DAY_MS, + endedAtMillis: 20 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 11 * DAY_MS, + }))); + }); + + it("should show only the incoming sub's allowance right after the upgrade", async () => { + const rows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-upgrade") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + + const atUpgrade = rows.find((r: any) => r.txnId === "sub-start:sub-upgrade-team"); + expect(atUpgrade).toBeDefined(); + // Before the fix this was 100 (free) + 500 (team) = 600 because the + // free sub's when-repeated grant was not expired at subscription-end. + expect(atUpgrade.itemQuantities.emails).toBe(500); + }); + }); + + // ============================================================ // Subscription map LFold // ============================================================ diff --git a/apps/backend/src/lib/payments/schema/__tests__/phase-1.test.ts b/apps/backend/src/lib/payments/schema/__tests__/phase-1.test.ts index 52c4feec54..c0d316c272 100644 --- a/apps/backend/src/lib/payments/schema/__tests__/phase-1.test.ts +++ b/apps/backend/src/lib/payments/schema/__tests__/phase-1.test.ts @@ -345,6 +345,107 @@ describe.sequential("payments schema phase 1 (real postgres)", () => { .filter((e: any) => e.subscriptionId === "sub-tf-start"); expect(endEvents).toHaveLength(0); }); + + it("should expire when-repeated grants alongside when-purchase-expires when ending before any repeat", async () => { + await runStatements(schema.subscriptions.setRow("sub-tf-end-mix-pre", jsonbExpr(makeSubscription("sub-tf-end-mix-pre", { + product: { + displayName: "Mix Pre-Repeat Plan", + customerType: "user", + productLineId: "line-tf-end-mix-pre", + prices: { p1: { USD: "10" } }, + includedItems: { + storage: { quantity: 100, expires: "when-purchase-expires" }, + quota: { quantity: 500, repeat: [7, "day"], expires: "when-repeated" }, + }, + }, + endedAtMillis: 3 * DAY_MS, + createdAtMillis: 0, + })))); + + const endEvents = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId === "sub-tf-end-mix-pre"); + expect(endEvents).toHaveLength(1); + + const expiredByItem = new Map(); + for (const expiry of endEvents[0].itemQuantityChangesToExpire) { + expiredByItem.set(expiry.itemId, expiry); + } + expect([...expiredByItem.keys()].sort()).toEqual(["quota", "storage"]); + expect(expiredByItem.get("storage").transactionId).toBe("sub-start:sub-tf-end-mix-pre"); + expect(expiredByItem.get("quota").transactionId).toBe("sub-start:sub-tf-end-mix-pre"); + expect(expiredByItem.get("quota").quantity).toBe(500); + + const repeats = (await getRowDatas(schema.itemGrantRepeatEvents)) + .filter((e: any) => e.sourceId === "sub-tf-end-mix-pre"); + expect(repeats).toHaveLength(0); + }); + + it("should reference the latest igr txnId for when-repeated grants when ending after repeats", async () => { + await runStatements(schema.subscriptions.setRow("sub-tf-end-mix-post", jsonbExpr(makeSubscription("sub-tf-end-mix-post", { + product: { + displayName: "Mix Post-Repeat Plan", + customerType: "user", + productLineId: "line-tf-end-mix-post", + prices: { p1: { USD: "10" } }, + includedItems: { + storage: { quantity: 100, expires: "when-purchase-expires" }, + quota: { quantity: 500, repeat: [7, "day"], expires: "when-repeated" }, + }, + }, + endedAtMillis: 17 * DAY_MS, + createdAtMillis: 0, + })))); + + const endEvents = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId === "sub-tf-end-mix-post"); + expect(endEvents).toHaveLength(1); + + const expiredByItem = new Map(); + for (const expiry of endEvents[0].itemQuantityChangesToExpire) { + expiredByItem.set(expiry.itemId, expiry); + } + expect([...expiredByItem.keys()].sort()).toEqual(["quota", "storage"]); + + // storage never repeated; still points at sub-start + expect(expiredByItem.get("storage").transactionId).toBe("sub-start:sub-tf-end-mix-post"); + + // quota last repeated at 14d; should point at that igr txn + const latestRepeatMillis = 14 * DAY_MS; + expect(expiredByItem.get("quota").transactionId).toBe(`igr:sub-tf-end-mix-post:${latestRepeatMillis}`); + expect(expiredByItem.get("quota").quantity).toBe(500); + + const repeats = (await getRowDatas(schema.itemGrantRepeatEvents)) + .filter((e: any) => e.sourceId === "sub-tf-end-mix-post") + .sort((a: any, b: any) => a.effectiveAtMillis - b.effectiveAtMillis); + expect(repeats.map((r: any) => r.effectiveAtMillis)).toEqual([7 * DAY_MS, 14 * DAY_MS]); + }); + + it("should NOT expire permanent grants (expires=never, absent, or invalid)", async () => { + await runStatements(schema.subscriptions.setRow("sub-tf-end-permanent", jsonbExpr(makeSubscription("sub-tf-end-permanent", { + product: { + displayName: "Permanent Grants Plan", + customerType: "user", + productLineId: "line-tf-end-permanent", + prices: { p1: { USD: "10" } }, + includedItems: { + expiring: { quantity: 50, expires: "when-purchase-expires" }, + repeating: { quantity: 10, repeat: [1, "day"], expires: "when-repeated" }, + permanent_never: { quantity: 20, expires: "never" }, + permanent_absent: { quantity: 30 }, + permanent_invalid: { quantity: 40, expires: "not-a-real-value" }, + }, + }, + endedAtMillis: 2 * MONTH_MS, + createdAtMillis: 0, + })))); + + const endEvents = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId === "sub-tf-end-permanent"); + expect(endEvents).toHaveLength(1); + + const expiredItemIds = endEvents[0].itemQuantityChangesToExpire.map((e: any) => e.itemId).sort(); + expect(expiredItemIds).toEqual(["expiring", "repeating"]); + }); }); diff --git a/apps/backend/src/lib/payments/schema/phase-1/subscription-timefold-algo.ts b/apps/backend/src/lib/payments/schema/phase-1/subscription-timefold-algo.ts index 59d9ac367e..33caef1b4b 100644 --- a/apps/backend/src/lib/payments/schema/phase-1/subscription-timefold-algo.ts +++ b/apps/backend/src/lib/payments/schema/phase-1/subscription-timefold-algo.ts @@ -391,7 +391,22 @@ export function getSubscriptionTimeFoldReducerSql(): string { )`; // ── subscription-end event ── - // Expire all outstanding grants with expiresWhen="when-purchase-expires" + // Expire all outstanding grants that are tied to the subscription's + // lifetime — both 'when-purchase-expires' and 'when-repeated'. The latter + // must be expired here too: otherwise the last-granted monthly quota + // (emails_per_month, analytics_events, …) persists in the item-quantity + // ledger after the subscription is gone and stacks on top of any + // replacement subscription for the remainder of the period. + // + // Permanent grants (item has no `expires` configured, or an unrecognized + // value) were normalized to JSONB null at subscription-start time, so + // `"g"->>'expiresWhen'` returns SQL NULL for them and the IN predicate + // correctly excludes them. + // + // outstandingGrants always carries the *current* grant ref for each item: + // initially { txnId: 'sub-start:' }, and each item-grant-repeat tick + // replaces the matching when-repeated entries with fresh ones keyed by + // the igr txnId, so iterating here works identically pre- and post-repeat. const endItemQuantityChangesToExpire = (stateSql: string) => `( SELECT COALESCE(jsonb_agg( jsonb_build_object( @@ -402,7 +417,7 @@ export function getSubscriptionTimeFoldReducerSql(): string { ) ), '[]'::jsonb) FROM jsonb_array_elements(${stateSql}->'outstandingGrants') AS "g" - WHERE "g"->>'expiresWhen' = 'when-purchase-expires' + WHERE "g"->>'expiresWhen' IN ('when-purchase-expires', 'when-repeated') )`; const endEventRowFromState = (stateSql: string) => `jsonb_build_object( From 6d1aff3c4c8f002313ba3b454d3478fae26f4bd0 Mon Sep 17 00:00:00 2001 From: nams1570 Date: Sat, 18 Apr 2026 00:47:59 -0700 Subject: [PATCH 2/6] fix: cast currentMillis to bigint so txn IDs match across reducer refs and downstream transactions PG 12+ returns EXTRACT(EPOCH ...) as NUMERIC with scale 6, so the reducer's currentMillis serialized into JSONB as "604800000.000000". transactions.ts built igr txn IDs via ->>effectiveAtMillis (decimal-tailed) while the same reducer emitted item-quantity-expire.adjustedTransactionId via ::bigint::text (decimal-free). The mismatch meant a subscription-end following an item-grant-repeat silently failed to expire the igr's grant, leaving when-repeated balances stuck at the last-granted quantity. --- .../schema/__tests__/integration-1-3.test.ts | 108 ++++++++++++++++++ .../phase-1/subscription-timefold-algo.ts | 28 ++++- 2 files changed, 132 insertions(+), 4 deletions(-) diff --git a/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts index 90214682b3..9e95f4730c 100644 --- a/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts +++ b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts @@ -893,6 +893,114 @@ describe.sequential("payments schema integration phase 1→3 (real postgres)", ( }); + // ============================================================ + // Regression: the subscription-timefold reducer built `currentMillis` as + // NUMERIC. PG 12+ `EXTRACT(EPOCH ...)` returns NUMERIC with scale 6, so + // the value serialized into JSONB as e.g. "604800000.000000". Downstream + // transactions.ts builds igr txn IDs via `->>effectiveAtMillis` → the + // decimal-tailed form. The same reducer, when producing + // `item-quantity-expire.adjustedTransactionId`, used `::bigint::text` → + // the decimal-free form. Mismatch → a subscription-end that follows an + // item-grant-repeat could not find the igr's grant to expire, so the + // customer's `when-repeated` balance silently stayed at the last-granted + // quantity instead of dropping to 0. + // + // Fix: cast `currentMillis` to bigint at the root so both paths produce + // byte-identical references. This test exercises the full inline + // lifecycle: sub-start → item-grant-repeat → subscription-end, and + // asserts the final itemQuantities row drops to 0 and the txn-ID formats + // match on both sides of the reference. + // ============================================================ + + describe("item-quantity-expire resolves across item-grant-repeat → sub-end", () => { + const DAY_MS = 86400000; + + beforeAll(async () => { + await runStatements(schema.subscriptions.setRow("sub-bigint-repeat", jsonbExpr({ + id: "sub-bigint-repeat", + tenancyId: "t1", + customerId: "u-bigint", + customerType: "user", + productId: "prod-bigint-repeat", + priceId: "p1", + product: { + displayName: "Bigint Repeat Plan", + customerType: "user", + productLineId: "line-bigint-repeat", + prices: { p1: { USD: "10" } }, + includedItems: { + // Repeat at exactly 7 days — a whole-second epoch offset that + // amplifies the NUMERIC scale-6 artifact (604800000 vs + // 604800000.000000 in JSONB). + quota: { quantity: 100, repeat: [7, "day"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "canceled", + currentPeriodStartMillis: 0, + currentPeriodEndMillis: MONTH_MS, + cancelAtPeriodEnd: true, + // Ends at 14d: fires one repeat at 7d, then sub-end at 14d. + canceledAtMillis: 14 * DAY_MS, + endedAtMillis: 14 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 0, + }))); + }); + + it("item-grant-repeat transaction id has no trailing decimals", async () => { + const txns = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.customerId === "u-bigint"); + const igr = txns.find((t: any) => + typeof t.txnId === "string" && t.txnId.startsWith("igr:sub-bigint-repeat:") + ); + expect(igr).toBeDefined(); + // The txn id is built by transactions.ts from the event's + // effectiveAtMillis (which flows through JSONB as the NUMERIC value + // from the reducer's currentMillis). Any trailing ".000000" here is + // the bug 3 fingerprint. + expect(igr.txnId).toMatch(/^igr:sub-bigint-repeat:\d+$/); + expect(igr.txnId).not.toContain("."); + }); + + it("sub-end's item-quantity-expire adjustedTransactionId matches the igr txn id", async () => { + const txns = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.customerId === "u-bigint"); + + const igr = txns.find((t: any) => + typeof t.txnId === "string" && t.txnId.startsWith("igr:sub-bigint-repeat:") + ); + const subEnd = txns.find((t: any) => t.txnId === "sub-end:sub-bigint-repeat"); + expect(igr).toBeDefined(); + expect(subEnd).toBeDefined(); + + const expireEntry = (subEnd.entries as any[]).find((e: any) => + e.type === "item-quantity-expire" && e.itemId === "quota" + ); + expect(expireEntry).toBeDefined(); + // Before the fix the expire referenced `igr:sub-bigint-repeat:604800000` + // while the actual igr txn was `igr:sub-bigint-repeat:604800000.000000` + // — same value, different text → no match → quota stuck. + expect(expireEntry.adjustedTransactionId).toBe(igr.txnId); + }); + + it("quota balance drops to 0 after sub-end resolves the igr's grant", async () => { + const rows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-bigint") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + expect(rows.length).toBeGreaterThan(0); + + const latest = rows[rows.length - 1]; + // With bug 3 present, expire can't find the grant and quota stays at + // 100 (the last igr-granted quantity). With the fix, the expire + // resolves and quota drops to 0. + expect(latest.itemQuantities.quota).toBe(0); + }); + }); + + // ============================================================ // when-repeated grants must expire at subscription-end // (regression: they were previously left stacked in the ledger) diff --git a/apps/backend/src/lib/payments/schema/phase-1/subscription-timefold-algo.ts b/apps/backend/src/lib/payments/schema/phase-1/subscription-timefold-algo.ts index 33caef1b4b..febec76039 100644 --- a/apps/backend/src/lib/payments/schema/phase-1/subscription-timefold-algo.ts +++ b/apps/backend/src/lib/payments/schema/phase-1/subscription-timefold-algo.ts @@ -261,8 +261,26 @@ export function getSubscriptionTimeFoldReducerSql(): string { )`; // ── item-grant-repeat event ── - // Emitted when timestamp matches an item's nextRepeatMillis - const currentMillis = `(EXTRACT(EPOCH FROM ${T}) * 1000)::numeric`; + // Emitted when timestamp matches an item's nextRepeatMillis. + // + // PG 12+ returns EXTRACT(EPOCH ...) as NUMERIC with scale 6 (microsecond + // precision), so if we left currentMillis as NUMERIC it would serialize + // into JSONB with trailing ".000000". That round-trips fine for our own + // comparisons but leaks into txn IDs built by downstream tables via + // `->>effectiveAtMillis`, producing e.g. `igr::2592000000.000000`, + // while references built inline in this algo via `::text` would produce + // the decimal-free `igr::2592000000`. The two wouldn't match → + // `item-quantity-expire` entries would fail to resolve the grant they're + // meant to expire, leaving `when-repeated` balances stuck after a + // subscription-end that follows an item-grant-repeat. + // + // Explicit ROUND before the bigint cast: NUMERIC::bigint rounds + // half-away-from-zero on PG 12+, which happens to match what we want, + // but if `T` ever comes from a path that returns DOUBLE PRECISION (older + // PG, or a future regression) the implicit cast rounds half-to-even and + // could disagree on midpoint values. Being explicit about the rounding + // intent is both self-documenting and stable across numeric types. + const currentMillis = `(ROUND(EXTRACT(EPOCH FROM ${T}) * 1000)::bigint)`; // Items due at current timestamp const dueItems = `( @@ -279,8 +297,10 @@ export function getSubscriptionTimeFoldReducerSql(): string { AND (${S}->>'endedAtMillis')::numeric <= ${currentMillis} )`; - // item-grant-repeat: txnId uses sourceId + effectiveAtMillis - const igrTxnId = `('igr:' || (${S}->>'subscriptionId') || ':' || ${currentMillis}::bigint::text)`; + // item-grant-repeat: txnId uses sourceId + effectiveAtMillis. currentMillis + // is already ::bigint (see above) so plain ::text is enough — no decimal + // tail, no redundant double-cast. + const igrTxnId = `('igr:' || (${S}->>'subscriptionId') || ':' || ${currentMillis}::text)`; const repeatCount = `(${S}->>'repeatCount')::int`; // Build previousGrantsToExpire: outstanding grants with expiresWhen="when-repeated" that match due items From c57cbec78a113343a7446a8d36f9f16d274a0050 Mon Sep 17 00:00:00 2001 From: nams1570 Date: Sat, 18 Apr 2026 02:04:08 -0700 Subject: [PATCH 3/6] fix: pg_cron to update timefold timestamp didnt hit triggers This meant downstream tables of the timefold werent updating with the new rows. This wasnt caught in our old tests since they just advanced the timestamp. --- .../migration.sql | 341 ++++++++ .../tests/downstream-cascade.ts | 263 ++++++ .../src/lib/bulldozer/db/index.fuzz.test.ts | 11 + .../src/lib/bulldozer/db/index.perf.test.ts | 11 + .../src/lib/bulldozer/db/index.test.ts | 11 + apps/backend/src/lib/bulldozer/db/index.ts | 113 ++- .../bulldozer/db/tables/time-fold-table.ts | 58 +- .../db/timefold-queue-downstream.test.ts | 795 ++++++++++++++++++ .../integration-1-3-queue-drained.test.ts | 292 +++++++ .../schema/__tests__/integration-1-3.test.ts | 77 +- .../payments/schema/__tests__/test-helpers.ts | 107 ++- 11 files changed, 2026 insertions(+), 53 deletions(-) create mode 100644 apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/migration.sql create mode 100644 apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/tests/downstream-cascade.ts create mode 100644 apps/backend/src/lib/bulldozer/db/timefold-queue-downstream.test.ts create mode 100644 apps/backend/src/lib/payments/schema/__tests__/integration-1-3-queue-drained.test.ts diff --git a/apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/migration.sql b/apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/migration.sql new file mode 100644 index 0000000000..06afec335d --- /dev/null +++ b/apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/migration.sql @@ -0,0 +1,341 @@ +-- Propagate TimeFold queue-drained emissions through the registered +-- downstream trigger cascade (filter/map/LFold/etc.), mirroring what the +-- inline setRow path does via collectRowChangeTriggerStatements. Before +-- this migration, `bulldozer_timefold_process_queue` updated the +-- TimeFold's own state and materialized its output rows but never touched +-- any downstream materialized table, so any event scheduled for the +-- future (subscription-end on cancel-at-period-end, monthly +-- item-grant-repeat ticks) silently stopped after updating the TimeFold. +-- +-- Strategy: per-timefold cascade templates are precomputed in TypeScript +-- (declareTimeFoldTable.init() \u2192 toCascadeSqlBlock) and persisted here as +-- data. The rewritten process_queue: +-- 1. Drains due queue rows per-timefold (existing per-row logic, moved +-- inside an outer per-timefold loop). +-- 2. After each queue row's reducer emits new rows, populates +-- __bulldozer_seq with them under the timefold's cascade input name. +-- 3. Once per timefold per tick, EXECUTEs the stored cascadeTemplate, +-- which reads from __bulldozer_seq and writes through filter/map/ +-- LFold/etc. to their materialized outputs. +-- +-- The shipped BulldozerTimeFoldQueue & BulldozerTimeFoldMetadata tables +-- from 20260323150000_add_bulldozer_timefold_queue are unchanged. pg_cron +-- keeps calling public.bulldozer_timefold_process_queue() as before. + +-- CreateTable +CREATE TABLE "BulldozerTimeFoldDownstreamCascade" ( + "tableStoragePath" JSONB[] NOT NULL, + "cascadeInputName" TEXT NOT NULL, + "cascadeTemplate" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath") +); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue() +RETURNS void +LANGUAGE plpgsql +AS $function$ +DECLARE + cutoff_timestamp timestamptz; + current_timefold_path jsonb[]; + current_cascade_input_name text; + current_cascade_template text; + queued_row "BulldozerTimeFoldQueue"%ROWTYPE; + group_path jsonb[]; + rows_path jsonb[]; + states_path jsonb[]; + state_row_path jsonb[]; + existing_state jsonb; + old_emitted_rows jsonb; + newly_emitted_rows jsonb; + accumulated_emitted_rows jsonb; + current_state jsonb; + current_timestamp_value timestamptz; + next_state jsonb; + next_rows_data jsonb; + normalized_next_rows_data jsonb; + next_timestamp timestamptz; + previous_emitted_row_count int; + reducer_iterations int; + new_row_record record; +BEGIN + PERFORM pg_advisory_xact_lock(7857391); + + INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt") + VALUES ('singleton', now()) + ON CONFLICT ("key") DO NOTHING; + + cutoff_timestamp := now(); + + UPDATE "BulldozerTimeFoldMetadata" + SET + "lastProcessedAt" = cutoff_timestamp, + "updatedAt" = CURRENT_TIMESTAMP + WHERE "key" = 'singleton'; + + -- The cascade templates generated by toExecutableSqlTransaction/ + -- toCascadeSqlBlock read and write via this per-transaction temp table, + -- keyed by string output names. It's created idempotently here so the + -- cascade never fails on a cold queue-drain invocation. + CREATE TEMP TABLE IF NOT EXISTS "__bulldozer_seq" ( + "__output_name" text NOT NULL, + "__output_row" jsonb NOT NULL + ) ON COMMIT DROP; + + -- Outer loop: each distinct timefold that has queue rows due at this tick. + FOR current_timefold_path IN + SELECT DISTINCT "tableStoragePath" + FROM "BulldozerTimeFoldQueue" + WHERE "scheduledAt" <= cutoff_timestamp + LOOP + -- Look up the cascade template for this timefold. + current_cascade_input_name := NULL; + current_cascade_template := NULL; + SELECT "cascadeInputName", "cascadeTemplate" + INTO current_cascade_input_name, current_cascade_template + FROM "BulldozerTimeFoldDownstreamCascade" + WHERE "tableStoragePath" = current_timefold_path; + + -- If no registry row exists, this timefold hasn't been init()'d + -- yet — most commonly the deploy-window gap between this + -- migration applying (creating an empty registry table) and the + -- backend running declareTimeFoldTable.init() (which upserts one + -- row per timefold). Skipping just the cascade EXECUTE but still + -- draining the queue rows would let the timefold's own state + -- advance while the downstream filters/maps/LFolds silently miss + -- the update — a permanent desync with no retry path, since the + -- queue row is gone. Defer the entire timefold for this tick so + -- its queue rows stay queued; the next tick (after init finishes) + -- drains them with cascade intact. + -- + -- See `timefold-queue-downstream.test.ts`'s "process_queue defers + -- a timefold whose cascade registry row is missing" for the + -- regression guard. + IF NOT FOUND THEN + CONTINUE; + END IF; + + -- Defensive clean slate: clear ALL rows before populating this + -- timefold's cascade input. Cascade templates emit intermediate + -- rows (row_change_diag_*, mapped_changes_*, etc.) under unique + -- random names, so cross-timefold contamination isn't possible + -- today, but keeping __bulldozer_seq empty between iterations is + -- simpler to reason about ("between iterations, __bulldozer_seq is + -- empty") and prevents memory growth across many timefolds in one + -- tick. ON COMMIT DROP handles cleanup if the function throws. + DELETE FROM "__bulldozer_seq"; + + -- Inner loop: per-queue-row drain for this timefold, mirroring the + -- pre-fix function body (semantics preserved so the existing + -- process-queue.ts migration test keeps passing). + LOOP + SELECT * + INTO queued_row + FROM "BulldozerTimeFoldQueue" + WHERE "tableStoragePath" = current_timefold_path + AND "scheduledAt" <= cutoff_timestamp + ORDER BY "scheduledAt" ASC, "id" ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED; + + EXIT WHEN NOT FOUND; + + DELETE FROM "BulldozerTimeFoldQueue" + WHERE "id" = queued_row."id"; + + group_path := queued_row."tableStoragePath" || ARRAY[to_jsonb('groups'::text), queued_row."groupKey"]::jsonb[]; + rows_path := group_path || ARRAY[to_jsonb('rows'::text)]::jsonb[]; + states_path := group_path || ARRAY[to_jsonb('states'::text)]::jsonb[]; + state_row_path := states_path || ARRAY[to_jsonb(queued_row."rowIdentifier")]::jsonb[]; + + SELECT "value" + INTO existing_state + FROM "BulldozerStorageEngine" + WHERE "keyPath" = state_row_path; + + IF existing_state IS NULL THEN + CONTINUE; + END IF; + + IF existing_state->'rowData' IS DISTINCT FROM queued_row."rowData" THEN + CONTINUE; + END IF; + + old_emitted_rows := CASE + WHEN jsonb_typeof(existing_state->'emittedRowsData') = 'array' THEN existing_state->'emittedRowsData' + ELSE '[]'::jsonb + END; + newly_emitted_rows := '[]'::jsonb; + accumulated_emitted_rows := old_emitted_rows; + previous_emitted_row_count := jsonb_array_length(old_emitted_rows); + + current_state := queued_row."stateAfter"; + current_timestamp_value := queued_row."scheduledAt"; + reducer_iterations := 0; + + LOOP + reducer_iterations := reducer_iterations + 1; + IF reducer_iterations > 10000 THEN + RAISE EXCEPTION 'bulldozer timefold reducer exceeded 10k iterations for row %', queued_row."rowIdentifier"; + END IF; + + EXECUTE format( + $reducer$ + SELECT + to_jsonb("reducerRows"."newState") AS "newState", + to_jsonb("reducerRows"."newRowsData") AS "newRowsData", + CASE + WHEN "reducerRows"."nextTimestamp" IS NULL THEN NULL::timestamptz + ELSE ("reducerRows"."nextTimestamp")::timestamptz + END AS "nextTimestamp" + FROM ( + SELECT %s + FROM ( + SELECT + $1::jsonb AS "oldState", + $2::jsonb AS "oldRowData", + $3::timestamptz AS "timestamp" + ) AS "reducerInput" + ) AS "reducerRows" + $reducer$, + queued_row."reducerSql" + ) + INTO next_state, next_rows_data, next_timestamp + USING current_state, queued_row."rowData", current_timestamp_value; + + normalized_next_rows_data := CASE + WHEN jsonb_typeof(next_rows_data) = 'array' THEN next_rows_data + ELSE '[]'::jsonb + END; + newly_emitted_rows := newly_emitted_rows || normalized_next_rows_data; + accumulated_emitted_rows := accumulated_emitted_rows || normalized_next_rows_data; + current_state := next_state; + + EXIT WHEN next_timestamp IS NULL OR next_timestamp > cutoff_timestamp; + current_timestamp_value := next_timestamp; + END LOOP; + + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES + (gen_random_uuid(), group_path, 'null'::jsonb), + (gen_random_uuid(), rows_path, 'null'::jsonb), + (gen_random_uuid(), states_path, 'null'::jsonb) + ON CONFLICT ("keyPath") DO NOTHING; + + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES ( + gen_random_uuid(), + state_row_path, + jsonb_build_object( + 'rowData', queued_row."rowData", + 'stateAfter', current_state, + 'emittedRowsData', accumulated_emitted_rows, + 'nextTimestamp', + CASE + WHEN next_timestamp IS NULL THEN 'null'::jsonb + ELSE to_jsonb(next_timestamp) + END + ) + ) + ON CONFLICT ("keyPath") DO UPDATE + SET "value" = EXCLUDED."value"; + + FOR new_row_record IN + SELECT + "rows"."rowData" AS "rowData", + "rows"."rowIndex" AS "rowIndex" + FROM jsonb_array_elements(newly_emitted_rows) WITH ORDINALITY AS "rows"("rowData", "rowIndex") + LOOP + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES ( + gen_random_uuid(), + rows_path || ARRAY[to_jsonb((queued_row."rowIdentifier" || ':' || (previous_emitted_row_count + new_row_record."rowIndex")::text)::text)]::jsonb[], + jsonb_build_object('rowData', new_row_record."rowData") + ) + ON CONFLICT ("keyPath") DO UPDATE + SET "value" = EXCLUDED."value"; + END LOOP; + + -- Accumulate the newly-emitted rows into __bulldozer_seq under this + -- timefold's cascade input name, shaped like timeFoldChangesTableName + -- (oldRowData/newRowData diff). Queue-drained emissions are always + -- new rows, so oldRowData/oldRowSortKey are null. + IF current_cascade_input_name IS NOT NULL THEN + INSERT INTO "__bulldozer_seq" ("__output_name", "__output_row") + SELECT + current_cascade_input_name, + jsonb_build_object( + 'groupKey', queued_row."groupKey", + 'rowIdentifier', queued_row."rowIdentifier" || ':' || (previous_emitted_row_count + "emitted"."rowIndex")::text, + 'oldRowSortKey', 'null'::jsonb, + 'newRowSortKey', 'null'::jsonb, + 'oldRowData', 'null'::jsonb, + 'newRowData', "emitted"."rowData" + ) + FROM jsonb_array_elements(newly_emitted_rows) WITH ORDINALITY AS "emitted"("rowData", "rowIndex"); + END IF; + + IF next_timestamp IS NOT NULL AND next_timestamp > cutoff_timestamp THEN + INSERT INTO "BulldozerTimeFoldQueue" ( + "id", + "tableStoragePath", + "groupKey", + "rowIdentifier", + "scheduledAt", + "stateAfter", + "rowData", + "reducerSql" + ) + VALUES ( + gen_random_uuid(), + queued_row."tableStoragePath", + queued_row."groupKey", + queued_row."rowIdentifier", + next_timestamp, + current_state, + queued_row."rowData", + queued_row."reducerSql" + ) + ON CONFLICT ("tableStoragePath", "groupKey", "rowIdentifier") DO UPDATE + SET + "scheduledAt" = EXCLUDED."scheduledAt", + "stateAfter" = EXCLUDED."stateAfter", + "rowData" = EXCLUDED."rowData", + "reducerSql" = EXCLUDED."reducerSql", + "updatedAt" = CURRENT_TIMESTAMP; + END IF; + + -- Note: the sibling migration 20260323150000 carried an "orphan + -- group cleanup" IF-NOT-EXISTS block here that deleted + -- rows_path / states_path / group_path when both had no children. + -- That block was unreachable in that function body and is still + -- unreachable here: every path that reaches this point has just + -- UPSERTed `state_row_path` (a child of `states_path`) a few + -- statements up, so the NOT EXISTS on states_path is always + -- false. Dropped here rather than inherited verbatim. + END LOOP; + + -- After draining all due queue rows for this timefold, run the + -- downstream trigger cascade exactly once. The template reads its input + -- from __bulldozer_seq (populated above) and writes through every + -- registered downstream filter/map/LFold/etc. to their materialized + -- outputs. Skipped when no template is registered \u2014 matches the + -- inline path's no-op behavior for timefolds without downstream + -- triggers. + IF current_cascade_template IS NOT NULL THEN + EXECUTE current_cascade_template; + END IF; + + -- Clean slate before the next timefold iteration. Clears the + -- cascade input rows AND every intermediate-stage row the cascade + -- template's EXECUTE just emitted into __bulldozer_seq. See the + -- identical DELETE at the top of this loop for the rationale. + DELETE FROM "__bulldozer_seq"; + END LOOP; +END; +$function$; +-- SPLIT_STATEMENT_SENTINEL diff --git a/apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/tests/downstream-cascade.ts b/apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/tests/downstream-cascade.ts new file mode 100644 index 0000000000..3f48c29bad --- /dev/null +++ b/apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/tests/downstream-cascade.ts @@ -0,0 +1,263 @@ +import type { Sql } from "postgres"; +import { expect } from "vitest"; + +/** + * Migration-level test for `20260417000000_bulldozer_timefold_downstream_cascade`. + * + * Exercises the shape the migration is responsible for: + * - `BulldozerTimeFoldDownstreamCascade` exists with the right columns, + * - `public.bulldozer_timefold_process_queue()` consults that registry, + * - when a timefold has a registered `cascadeTemplate`, process_queue + * populates `__bulldozer_seq` with newly-emitted rows and EXECUTEs the + * template (i.e. the downstream cascade actually fires on the + * queue-drain path — the regression this migration fixes), + * - re-draining with nothing due is a no-op (idempotency). + * + * The cascade template here is constructed by hand (not via + * `toCascadeSqlBlock` in TypeScript) so the test stays purely at the + * migration-SQL layer, matching the other migration tests under + * `apps/backend/prisma/migrations/.../tests/`. + */ +export const postMigration = async (sql: Sql) => { + // 1) Migration shape: the registry table exists with the expected columns. + const registryColumnRows = await sql>` + SELECT column_name + FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = 'BulldozerTimeFoldDownstreamCascade' + ORDER BY ordinal_position + `; + expect(registryColumnRows.map((r) => r.column_name)).toEqual([ + "tableStoragePath", + "cascadeInputName", + "cascadeTemplate", + "createdAt", + "updatedAt", + ]); + + // 2) Set up a minimal timefold-shaped storage hierarchy. Each + // BulldozerStorageEngine insert must have its parent keyPath already + // present (FK: keyPathParent → keyPath). + const tablePathSql = `ARRAY[to_jsonb('table'::text), to_jsonb('external:cascade-migration-test'::text)]::jsonb[]`; + const storagePathSql = `${tablePathSql} || ARRAY[to_jsonb('storage'::text)]::jsonb[]`; + const groupsPathSql = `${storagePathSql} || ARRAY[to_jsonb('groups'::text)]::jsonb[]`; + const groupPathSql = `${groupsPathSql} || ARRAY[to_jsonb('alpha'::text)]::jsonb[]`; + const rowsPathSql = `${groupPathSql} || ARRAY[to_jsonb('rows'::text)]::jsonb[]`; + const statesPathSql = `${groupPathSql} || ARRAY[to_jsonb('states'::text)]::jsonb[]`; + const stateRowPathSql = `${statesPathSql} || ARRAY[to_jsonb('u1'::text)]::jsonb[]`; + + // Parallel downstream tree the cascade template writes into. Its root + // (`ARRAY['cascade-out']`) is a direct child of the already-seeded + // `ARRAY[]` root, and only the root is needed as a parent for the + // cascade's row inserts below (rows hang directly off of it). + const cascadeOutRootSql = `ARRAY[to_jsonb('cascade-out'::text)]::jsonb[]`; + + await sql.unsafe(` + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES + (gen_random_uuid(), ${tablePathSql}, 'null'::jsonb), + (gen_random_uuid(), ${storagePathSql}, 'null'::jsonb), + (gen_random_uuid(), ${groupsPathSql}, 'null'::jsonb), + (gen_random_uuid(), ${groupPathSql}, 'null'::jsonb), + (gen_random_uuid(), ${rowsPathSql}, 'null'::jsonb), + (gen_random_uuid(), ${statesPathSql}, 'null'::jsonb), + (gen_random_uuid(), ${cascadeOutRootSql}, 'null'::jsonb) + ON CONFLICT ("keyPath") DO NOTHING + `); + + await sql.unsafe(` + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES ( + gen_random_uuid(), + ${stateRowPathSql}, + jsonb_build_object( + 'rowData', '{"value": 2}'::jsonb, + 'stateAfter', '{"counter": 1}'::jsonb, + 'emittedRowsData', '[]'::jsonb, + 'nextTimestamp', 'null'::jsonb + ) + ) + ON CONFLICT ("keyPath") DO UPDATE + SET "value" = EXCLUDED."value" + `); + + // 3) Register this timefold's cascade. The template reads the rows + // that process_queue pushes into `__bulldozer_seq` under the input + // name and writes them under `ARRAY['cascade-out', ]`. + // EXECUTEing this DO block is exactly what process_queue does with + // the stored cascadeTemplate once per timefold per tick. + const cascadeInputName = "migration_test_cascade_input"; + const cascadeTemplateSql = ` + DO $tf_cascade$ + BEGIN + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + SELECT + gen_random_uuid(), + ARRAY[to_jsonb('cascade-out'::text), "__bulldozer_seq"."__output_row"->'rowIdentifier']::jsonb[], + "__bulldozer_seq"."__output_row"->'newRowData' + FROM "__bulldozer_seq" + WHERE "__output_name" = '${cascadeInputName}' + ON CONFLICT ("keyPath") DO UPDATE + SET "value" = EXCLUDED."value"; + END; + $tf_cascade$ LANGUAGE plpgsql; + `; + + await sql.unsafe(` + INSERT INTO "BulldozerTimeFoldDownstreamCascade" + ("tableStoragePath", "cascadeInputName", "cascadeTemplate") + VALUES ( + ${storagePathSql}, + '${cascadeInputName}', + $cascade_template$${cascadeTemplateSql}$cascade_template$ + ) + `); + + // 4) Queue a reducer row that emits one output row (value=100). Reducer + // SQL is the same shape the real timefold table emits: + // newState / newRowsData / nextTimestamp. + await sql.unsafe(` + INSERT INTO "BulldozerTimeFoldQueue" ( + "id", + "tableStoragePath", + "groupKey", + "rowIdentifier", + "scheduledAt", + "stateAfter", + "rowData", + "reducerSql" + ) + VALUES ( + gen_random_uuid(), + ${storagePathSql}, + to_jsonb('alpha'::text), + 'u1', + now() - interval '1 minute', + '{"counter": 1}'::jsonb, + '{"value": 2}'::jsonb, + 'jsonb_build_object(''counter'', COALESCE(("oldState"->>''counter'')::int, 0) + 1) AS "newState", jsonb_build_array(jsonb_build_object(''value'', 100)) AS "newRowsData", NULL::timestamptz AS "nextTimestamp"' + ) + `); + + // 5) Drain the queue. This is the real prod entry point — pg_cron calls + // exactly this function. + await sql.unsafe(`SELECT public.bulldozer_timefold_process_queue()`); + + // The queue row must be consumed. Scope by tableStoragePath — the + // sibling `20260323150000_add_bulldozer_timefold_queue/tests/process-queue.ts` + // ran on this same shared DB and left a future-dated queue row behind + // under its own tableStoragePath, which we must not match here. + const remainingQueueRows = await sql.unsafe(` + SELECT 1 FROM "BulldozerTimeFoldQueue" + WHERE "tableStoragePath" = ${storagePathSql} + AND "rowIdentifier" = 'u1' + `); + expect(remainingQueueRows).toHaveLength(0); + + // The timefold's own state row must be updated (baseline the prior + // migration already covered). + const stateRows = await sql.unsafe(` + SELECT "value" FROM "BulldozerStorageEngine" WHERE "keyPath" = ${stateRowPathSql} + `); + expect(stateRows).toHaveLength(1); + expect(stateRows[0].value).toMatchObject({ + rowData: { value: 2 }, + stateAfter: { counter: 2 }, + }); + + // The regression guard: the cascade template must have run. Without + // the migration's rewrite, `__bulldozer_seq` is never populated and + // the template is never EXECUTEd, so `cascade-out/u1:1` would not + // exist. + const cascadeOutRows = await sql.unsafe(` + SELECT "keyPath", "value" + FROM "BulldozerStorageEngine" + WHERE "keyPathParent" = ${cascadeOutRootSql} + `); + expect(cascadeOutRows).toHaveLength(1); + expect(cascadeOutRows[0].value).toEqual({ value: 100 }); + + // 6) Idempotency: re-draining with nothing new in the queue must not + // re-run the cascade (no duplicate rows, no FK errors). + await sql.unsafe(`SELECT public.bulldozer_timefold_process_queue()`); + const cascadeOutAfterRedrain = await sql.unsafe(` + SELECT 1 FROM "BulldozerStorageEngine" WHERE "keyPathParent" = ${cascadeOutRootSql} + `); + expect(cascadeOutAfterRedrain).toHaveLength(1); + + // 7) No-template path: a timefold with cascadeTemplate = NULL must + // drain queued rows without error. Use a second, independent + // tableStoragePath so the FK'd storage engine rows for this + // branch don't collide with the one above. + const nullTableSql = `ARRAY[to_jsonb('table'::text), to_jsonb('external:cascade-null-template'::text)]::jsonb[]`; + const nullStorageSql = `${nullTableSql} || ARRAY[to_jsonb('storage'::text)]::jsonb[]`; + const nullGroupsSql = `${nullStorageSql} || ARRAY[to_jsonb('groups'::text)]::jsonb[]`; + const nullGroupSql = `${nullGroupsSql} || ARRAY[to_jsonb('alpha'::text)]::jsonb[]`; + const nullRowsSql = `${nullGroupSql} || ARRAY[to_jsonb('rows'::text)]::jsonb[]`; + const nullStatesSql = `${nullGroupSql} || ARRAY[to_jsonb('states'::text)]::jsonb[]`; + const nullStateRowSql = `${nullStatesSql} || ARRAY[to_jsonb('u1'::text)]::jsonb[]`; + + await sql.unsafe(` + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES + (gen_random_uuid(), ${nullTableSql}, 'null'::jsonb), + (gen_random_uuid(), ${nullStorageSql}, 'null'::jsonb), + (gen_random_uuid(), ${nullGroupsSql}, 'null'::jsonb), + (gen_random_uuid(), ${nullGroupSql}, 'null'::jsonb), + (gen_random_uuid(), ${nullRowsSql}, 'null'::jsonb), + (gen_random_uuid(), ${nullStatesSql}, 'null'::jsonb) + ON CONFLICT ("keyPath") DO NOTHING + `); + await sql.unsafe(` + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES ( + gen_random_uuid(), + ${nullStateRowSql}, + jsonb_build_object( + 'rowData', '{"value": 2}'::jsonb, + 'stateAfter', '{"counter": 1}'::jsonb, + 'emittedRowsData', '[]'::jsonb, + 'nextTimestamp', 'null'::jsonb + ) + ) + `); + await sql.unsafe(` + INSERT INTO "BulldozerTimeFoldDownstreamCascade" + ("tableStoragePath", "cascadeInputName", "cascadeTemplate") + VALUES ( + ${nullStorageSql}, + 'null_template_input', + NULL + ) + `); + await sql.unsafe(` + INSERT INTO "BulldozerTimeFoldQueue" ( + "id", "tableStoragePath", "groupKey", "rowIdentifier", + "scheduledAt", "stateAfter", "rowData", "reducerSql" + ) + VALUES ( + gen_random_uuid(), + ${nullStorageSql}, + to_jsonb('alpha'::text), + 'u1', + now() - interval '1 minute', + '{"counter": 1}'::jsonb, + '{"value": 2}'::jsonb, + 'jsonb_build_object(''counter'', 2) AS "newState", jsonb_build_array(jsonb_build_object(''value'', 100)) AS "newRowsData", NULL::timestamptz AS "nextTimestamp"' + ) + `); + + await sql.unsafe(`SELECT public.bulldozer_timefold_process_queue()`); + + const nullRemainingQueue = await sql.unsafe(` + SELECT 1 FROM "BulldozerTimeFoldQueue" + WHERE "tableStoragePath" = ${nullStorageSql} + `); + expect(nullRemainingQueue).toHaveLength(0); + + const nullStateAfterDrain = await sql.unsafe(` + SELECT "value" FROM "BulldozerStorageEngine" WHERE "keyPath" = ${nullStateRowSql} + `); + expect(nullStateAfterDrain).toHaveLength(1); + expect(nullStateAfterDrain[0].value).toMatchObject({ stateAfter: { counter: 2 } }); +}; diff --git a/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts b/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts index 03ff3538b1..bec0d4c0a0 100644 --- a/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts +++ b/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts @@ -714,6 +714,17 @@ describe.sequential("bulldozer db fuzz composition (real postgres)", () => { INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt") VALUES ('singleton', now()) `; + await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldDownstreamCascade"`; + await sql` + CREATE TABLE "BulldozerTimeFoldDownstreamCascade" ( + "tableStoragePath" JSONB[] NOT NULL, + "cascadeInputName" TEXT NOT NULL, + "cascadeTemplate" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath") + ) + `; }); afterEach(async () => { diff --git a/apps/backend/src/lib/bulldozer/db/index.perf.test.ts b/apps/backend/src/lib/bulldozer/db/index.perf.test.ts index c47c5311ed..d05cbd4acd 100644 --- a/apps/backend/src/lib/bulldozer/db/index.perf.test.ts +++ b/apps/backend/src/lib/bulldozer/db/index.perf.test.ts @@ -354,6 +354,17 @@ describe.sequential("bulldozer db performance (real postgres)", () => { INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt") VALUES ('singleton', now()) `; + await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldDownstreamCascade"`; + await sql` + CREATE TABLE "BulldozerTimeFoldDownstreamCascade" ( + "tableStoragePath" JSONB[] NOT NULL, + "cascadeInputName" TEXT NOT NULL, + "cascadeTemplate" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath") + ) + `; }); afterEach(async () => { diff --git a/apps/backend/src/lib/bulldozer/db/index.test.ts b/apps/backend/src/lib/bulldozer/db/index.test.ts index d0e39c6c45..8468e2a2be 100644 --- a/apps/backend/src/lib/bulldozer/db/index.test.ts +++ b/apps/backend/src/lib/bulldozer/db/index.test.ts @@ -140,6 +140,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { beforeEach(async () => { await sql`CREATE EXTENSION IF NOT EXISTS pgcrypto`; + await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldDownstreamCascade"`; await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldQueue"`; await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldMetadata"`; await sql`DROP TABLE IF EXISTS "BulldozerMapTriggerAudit"`; @@ -233,6 +234,16 @@ describe.sequential("declareStoredTable (real postgres)", () => { INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt") VALUES ('singleton', now()) `; + await sql` + CREATE TABLE "BulldozerTimeFoldDownstreamCascade" ( + "tableStoragePath" JSONB[] NOT NULL, + "cascadeInputName" TEXT NOT NULL, + "cascadeTemplate" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath") + ) + `; }); // any is used here because the verifier works with heterogeneous table types diff --git a/apps/backend/src/lib/bulldozer/db/index.ts b/apps/backend/src/lib/bulldozer/db/index.ts index bc01643a15..d53e8a3749 100644 --- a/apps/backend/src/lib/bulldozer/db/index.ts +++ b/apps/backend/src/lib/bulldozer/db/index.ts @@ -1,3 +1,5 @@ +import { generateSecureRandomString } from "@stackframe/stack-shared/dist/utils/crypto"; +import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors"; import { deindent } from "@stackframe/stack-shared/dist/utils/strings"; import { BULLDOZER_SORT_HELPERS_SQL } from "./bulldozer-sort-helpers-sql"; @@ -66,10 +68,23 @@ export function toQueryableSqlQuery(query: SqlQuery): string { return query.sql; } -export function toExecutableSqlTransaction(statements: SqlStatement[], options: { statementTimeout?: string } = {}): string { - const requiresSortHelpers = statements.some((statement) => statement.sql.includes("pg_temp.bulldozer_sort_")); - const seqOutputs = new Map(); - const executableStatementsInDoBlock = statements.map((statement) => { +/** + * Core body-builder shared by `toExecutableSqlTransaction` and + * `toCascadeSqlBlock`. Serializes a list of `SqlStatement`s into a single + * string suitable to drop into a plpgsql DO block, rewriting references to + * named outputs into `__bulldozer_seq` subqueries. + * + * `seededSeqOutputs` lets callers pretend a given `__output_name` has + * already been produced by an upstream statement. Used by the timefold + * queue-drain cascade, which pre-populates `__bulldozer_seq` in plpgsql + * BEFORE executing the stored cascade template. + */ +function buildExecutableStatementsBlock( + statements: SqlStatement[], + seededSeqOutputs: Map, +): string { + const seqOutputs = new Map(seededSeqOutputs); + return statements.map((statement) => { let sql = statement.sql; for (const [outputName, outputColumns] of seqOutputs) { const quotedOutputName = `"${outputName}"`; @@ -112,12 +127,22 @@ export function toExecutableSqlTransaction(statements: SqlStatement[], options: `; })(); - // Keep the outer DO block delimiter stable even when statements define $$ functions. - const normalizedSql = executableSql.replaceAll("$$", "$__bulldozer_do_inline$").trimEnd(); + const normalizedSql = executableSql.trimEnd(); return normalizedSql.endsWith(";") ? normalizedSql : `${normalizedSql};`; }).join("\n\n"); +} + +export function toExecutableSqlTransaction(statements: SqlStatement[], options: { statementTimeout?: string } = {}): string { + const requiresSortHelpers = statements.some((statement) => statement.sql.includes("pg_temp.bulldozer_sort_")); + const executableStatementsInDoBlock = buildExecutableStatementsBlock(statements, new Map()); + // Randomize the outer DO-block delimiter so nested `$$` that individual + // statements legitimately use (e.g. `CREATE FUNCTION ... AS $$ ... $$` + // in `reduce-table.ts`) don't collide with it, and so user-provided SQL + // containing a literal `'$$'` string doesn't need to be rewritten. Same + // approach as `toCascadeSqlBlock` below. + const outerTag = chooseSafeDollarQuoteTag(executableStatementsInDoBlock, "bulldozer_tx"); return deindent` BEGIN; @@ -131,12 +156,84 @@ export function toExecutableSqlTransaction(statements: SqlStatement[], options: ${BULLDOZER_SEQ_TABLE_SQL} - DO $$ + DO $${outerTag}$ BEGIN ${executableStatementsInDoBlock} END; - $$ LANGUAGE plpgsql; + $${outerTag}$ LANGUAGE plpgsql; COMMIT; `; } + +/** + * Picks a plpgsql dollar-quote tag that is guaranteed not to appear + * verbatim inside `bodyContents`. + * + * We need this for any `DO $tag$ ... $tag$` block whose body is + * concatenated from caller-supplied SQL: a naive fixed `$tag$` would + * close the outer block prematurely if any embedded statement happened + * to include the same literal `$tag$` (e.g. a user filter predicate + * that references the string, or a comment, or a CASE branch). + * + * A cryptographically-random 224-bit suffix makes accidental collision + * astronomically unlikely. We still assert (rather than silently + * retrying) because a collision here would almost certainly mean the + * caller is constructing the body adversarially, and we'd rather fail + * loud. + */ +function chooseSafeDollarQuoteTag(bodyContents: string, tagPrefix: string): string { + const tag = `${tagPrefix}_${generateSecureRandomString()}`; + if (bodyContents.includes(`$${tag}$`)) { + throw new StackAssertionError( + "Randomly generated dollar-quote tag collided with body contents; this is astronomically unlikely with a 224-bit suffix and almost certainly indicates adversarial input", + { tag, tagPrefix }, + ); + } + return tag; +} + +/** + * Compiles a downstream-trigger cascade into a plpgsql `DO` block body that + * can be stored in `BulldozerTimeFoldDownstreamCascade.cascadeTemplate` and + * EXECUTEd by `bulldozer_timefold_process_queue()` at runtime. + * + * The generated body: + * - Assumes the caller has already populated `__bulldozer_seq` under + * `cascadeInputName` with rows matching `cascadeInputColumns`. + * - Does NOT acquire the advisory lock or SET LOCAL settings — that + * responsibility belongs to the function wrapping the cascade. + * - Wraps the statement sequence in a `DO $$ ... $$` + * block so `EXECUTE` in plpgsql can run it as a single dispatch. The + * tag is randomized per-call to ensure user-supplied SQL can never + * contain a literal copy of it and close the outer DO block early + * (see `chooseSafeDollarQuoteTag`). + * + * If the downstream trigger graph is empty (no filters/maps/etc. registered), + * returns `null`. Callers should skip the EXECUTE in that case. + */ +export function toCascadeSqlBlock(options: { + cascadeInputName: string, + cascadeInputColumns: string, + statements: SqlStatement[], +}): string | null { + if (options.statements.length === 0) return null; + const seeded = new Map([[options.cascadeInputName, options.cascadeInputColumns]]); + const body = buildExecutableStatementsBlock(options.statements, seeded); + const requiresSortHelpers = options.statements.some((statement) => statement.sql.includes("pg_temp.bulldozer_sort_")); + // Sort helpers use their own $$ dollar quoting inside `CREATE OR REPLACE + // FUNCTION` bodies. They live inside the outer DO so they share the + // enclosing transaction's pg_temp scope with the cascade statements. The + // outer tag is randomized below so nested $$ (or any other fixed tag in + // user SQL) can't close it. + const prelude = requiresSortHelpers ? BULLDOZER_SORT_HELPERS_SQL : ""; + const outerTag = chooseSafeDollarQuoteTag(`${prelude}\n${body}`, "tf_cascade"); + return deindent` + DO $${outerTag}$ + BEGIN + ${prelude} + ${body} + END; + $${outerTag}$ LANGUAGE plpgsql; + `; +} diff --git a/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts b/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts index e5c9f6e93f..6576162ace 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts @@ -1,7 +1,8 @@ import { generateSecureRandomString } from "@stackframe/stack-shared/dist/utils/crypto"; import type { Table } from ".."; +import { toCascadeSqlBlock } from ".."; import type { RegisteredRowChangeTrigger } from "../row-change-trigger-dispatch"; -import { attachRowChangeTriggerMetadata, normalizeRowChangeTrigger } from "../row-change-trigger-dispatch"; +import { attachRowChangeTriggerMetadata, collectRowChangeTriggerStatements, normalizeRowChangeTrigger } from "../row-change-trigger-dispatch"; import type { Json, RowData, RowIdentifier, SqlExpression, SqlMapper, TableId, Timestamp } from "../utilities"; import { getStorageEnginePath, @@ -15,6 +16,15 @@ import { tableIdToDebugString, } from "../utilities"; +/** + * Column shape matching `timeFoldChangesTableName` in + * `createApplyChangesStatements` below. Used both by the inline + * trigger (which produces this shape directly) and by the queue-drain + * cascade (which synthesizes this shape in plpgsql from newly-emitted + * rows). + */ +const TIMEFOLD_OUTPUT_CHANGE_COLUMNS = '"groupKey" jsonb, "rowIdentifier" text, "oldRowSortKey" jsonb, "newRowSortKey" jsonb, "oldRowData" jsonb, "newRowData" jsonb'; + /** * Materialized time-aware fold with queue-backed future reprocessing. * @@ -467,6 +477,30 @@ export function declareTimeFoldTable< const fromGroupsTableName = `from_groups_${generateSecureRandomString()}`; const fromRowsTableName = `from_rows_${generateSecureRandomString()}`; const initChangesTableName = `init_changes_${generateSecureRandomString()}`; + + // Compile the downstream trigger cascade into a plpgsql DO block and + // persist it in BulldozerTimeFoldDownstreamCascade, keyed by this + // timefold's tableStoragePath. bulldozer_timefold_process_queue() + // reads and EXECUTEs it after each batch of queue-drained emissions. + // + // This mirrors, on the queue-drain path, what collectRowChangeTriggerStatements + // does on the inline setRow path (see row-change-trigger-dispatch.ts's + // use by the outer runStatements pipeline). Without this, pg_cron- + // drained emissions update the timefold's own rows but never propagate + // to filters/maps/LFolds above — see apps/backend/src/lib/bulldozer/db/ + // timefold-queue-downstream.test.ts. + const cascadeInputName = `tf_cascade_input_${generateSecureRandomString()}`; + const cascadeCollected = collectRowChangeTriggerStatements({ + sourceTableId: tableIdToDebugString(options.tableId), + sourceChangesTable: quoteSqlIdentifier(cascadeInputName), + sourceTableTriggers: triggers, + }); + const cascadeTemplate = toCascadeSqlBlock({ + cascadeInputName, + cascadeInputColumns: TIMEFOLD_OUTPUT_CHANGE_COLUMNS, + statements: cascadeCollected.statements, + }); + return [ sqlStatement` INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") @@ -477,6 +511,24 @@ export function declareTimeFoldTable< (gen_random_uuid(), ${getStorageEnginePath(options.tableId, ["metadata"])}, '{ "version": 1 }'::jsonb) ON CONFLICT ("keyPath") DO NOTHING `, + // Upsert the cascade registry row. A null template means "no + // downstream triggers registered" — process_queue will skip the + // EXECUTE in that case, matching the no-op semantics of the inline + // path when no triggers are attached. + sqlStatement` + INSERT INTO "BulldozerTimeFoldDownstreamCascade" + ("tableStoragePath", "cascadeInputName", "cascadeTemplate") + VALUES ( + ${tableStoragePath}::jsonb[], + ${quoteSqlStringLiteral(cascadeInputName)}, + ${cascadeTemplate == null ? sqlExpression`NULL::text` : quoteSqlStringLiteral(cascadeTemplate)} + ) + ON CONFLICT ("tableStoragePath") DO UPDATE + SET + "cascadeInputName" = EXCLUDED."cascadeInputName", + "cascadeTemplate" = EXCLUDED."cascadeTemplate", + "updatedAt" = now() + `, options.fromTable.listGroups({ start: "start", end: "end", @@ -518,6 +570,10 @@ export function declareTimeFoldTable< DELETE FROM "BulldozerTimeFoldQueue" WHERE "tableStoragePath" = ${tableStoragePath}::jsonb[] `, + sqlStatement` + DELETE FROM "BulldozerTimeFoldDownstreamCascade" + WHERE "tableStoragePath" = ${tableStoragePath}::jsonb[] + `, sqlStatement` WITH RECURSIVE "pathsToDelete" AS ( SELECT ${getTablePath(options.tableId)}::jsonb[] AS "path" diff --git a/apps/backend/src/lib/bulldozer/db/timefold-queue-downstream.test.ts b/apps/backend/src/lib/bulldozer/db/timefold-queue-downstream.test.ts new file mode 100644 index 0000000000..d4f069bdba --- /dev/null +++ b/apps/backend/src/lib/bulldozer/db/timefold-queue-downstream.test.ts @@ -0,0 +1,795 @@ +/** + * `bulldozer_timefold_process_queue()` must propagate emitted rows through + * the downstream trigger cascade (filter/map/LFold/...) the same way the + * inline `setRow` path does via `collectRowChangeTriggerStatements`. + * + * Each timefold's cascade template is precomputed at `init()` time and + * stored in BulldozerTimeFoldDownstreamCascade; the rewritten process_queue + * populates `__bulldozer_seq` with newly-emitted rows under the timefold's + * input name and EXECUTEs the template. + */ + +import { readFileSync } from "node:fs"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; +import postgres from "postgres"; +import { afterAll, beforeAll, beforeEach, describe, expect, test } from "vitest"; +import { + declareFilterTable, + declareGroupByTable, + declareMapTable, + declareStoredTable, + declareTimeFoldTable, + toExecutableSqlTransaction, + toQueryableSqlQuery, +} from "./index"; + +type SqlExpression = { type: "expression", sql: string }; +type SqlStatement = { type: "statement", sql: string, outputName?: string }; +type SqlQuery = { type: "query", sql: string, toStatement(outputName?: string): SqlStatement }; +type SqlMapper = { type: "mapper", sql: string }; +type SqlPredicate = { type: "predicate", sql: string }; + +function expr(sql: string): SqlExpression { + return { type: "expression", sql }; +} +function mapper(sql: string): SqlMapper { + return { type: "mapper", sql }; +} +function predicate(sql: string): SqlPredicate { + return { type: "predicate", sql }; +} + +const TEST_DB_PREFIX = "stack_bulldozer_queue_downstream_test"; + +function getTestDbUrls() { + const env = Reflect.get(import.meta, "env"); + const connectionString = Reflect.get(env, "STACK_DATABASE_CONNECTION_STRING"); + if (typeof connectionString !== "string" || connectionString.length === 0) { + throw new Error("Missing STACK_DATABASE_CONNECTION_STRING"); + } + const base = connectionString.replace(/\/[^/]*(\?.*)?$/, ""); + const query = connectionString.split("?")[1] ?? ""; + const dbName = `${TEST_DB_PREFIX}_${Math.random().toString(16).slice(2, 12)}`; + return { + full: query.length === 0 ? `${base}/${dbName}` : `${base}/${dbName}?${query}`, + base, + }; +} + +/** + * Extracts the CREATE OR REPLACE FUNCTION body for + * public.bulldozer_timefold_process_queue from the downstream-cascade + * migration. The 20260323150000 migration creates the queue infrastructure; + * the later 20260417000000 migration rewrites the process_queue function + * body to run downstream cascades. We install the latter so tests exercise + * the cascade behaviour. + */ +function loadProcessQueueFunctionSql(): string { + const here = dirname(fileURLToPath(import.meta.url)); + const migrationPath = join( + here, + "..", + "..", + "..", + "..", + "prisma", + "migrations", + "20260417000000_bulldozer_timefold_downstream_cascade", + "migration.sql", + ); + const raw = readFileSync(migrationPath, "utf8"); + const block = raw + .split("-- SPLIT_STATEMENT_SENTINEL") + .map((s) => s.replaceAll("-- SINGLE_STATEMENT_SENTINEL", "").trim()) + .find((s) => s.startsWith("CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue")); + if (block == null) { + throw new Error("could not locate bulldozer_timefold_process_queue function body in cascade migration"); + } + return block.replace(/;$/, ""); +} + +const PROCESS_QUEUE_FN_SQL = loadProcessQueueFunctionSql(); + +describe.sequential("timefold queue downstream cascade (real postgres)", () => { + const dbUrls = getTestDbUrls(); + const dbName = dbUrls.full.replace(/^.*\//, "").replace(/\?.*$/, ""); + const adminSql = postgres(dbUrls.base, { onnotice: () => undefined }); + const sql = postgres(dbUrls.full, { onnotice: () => undefined, max: 1 }); + + async function runStatements(statements: SqlStatement[]) { + await sql.unsafe(toExecutableSqlTransaction(statements)); + } + async function readRows(query: SqlQuery) { + return await sql.unsafe(toQueryableSqlQuery(query)); + } + async function setLastProcessedAt(isoOrExpression: string) { + await sql.unsafe(` + UPDATE "BulldozerTimeFoldMetadata" + SET "lastProcessedAt" = (${isoOrExpression})::timestamptz, + "updatedAt" = now() + WHERE "key" = 'singleton' + `); + } + async function processQueue() { + await sql.unsafe(`SELECT public.bulldozer_timefold_process_queue()`); + } + async function countQueueRows() { + const rows = await sql>` + SELECT COUNT(*)::int AS "count" FROM "BulldozerTimeFoldQueue" + `; + return rows[0].count; + } + + beforeAll(async () => { + await adminSql.unsafe(`CREATE DATABASE ${dbName}`); + }, 60_000); + + // beforeEach does a lot (drop/recreate all bulldozer tables + install the + // ~250-line process_queue plpgsql function); the default 10s vitest hook + // timeout can be tight especially under parallel test files. + const HOOK_TIMEOUT_MS = 60_000; + + beforeEach(async () => { + await sql`CREATE EXTENSION IF NOT EXISTS pgcrypto`; + await sql`DROP FUNCTION IF EXISTS public.bulldozer_timefold_process_queue()`; + await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldDownstreamCascade"`; + await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldQueue"`; + await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldMetadata"`; + await sql`DROP TABLE IF EXISTS "BulldozerStorageEngine"`; + await sql` + CREATE TABLE "BulldozerStorageEngine" ( + "id" UUID NOT NULL DEFAULT gen_random_uuid(), + "keyPath" JSONB[] NOT NULL, + "keyPathParent" JSONB[] GENERATED ALWAYS AS ( + CASE + WHEN cardinality("keyPath") = 0 THEN NULL + ELSE "keyPath"[1:cardinality("keyPath") - 1] + END + ) STORED, + "value" JSONB NOT NULL, + CONSTRAINT "BulldozerStorageEngine_pkey" PRIMARY KEY ("id"), + CONSTRAINT "BulldozerStorageEngine_keyPath_key" UNIQUE ("keyPath"), + CONSTRAINT "BulldozerStorageEngine_keyPathParent_fkey" + FOREIGN KEY ("keyPathParent") + REFERENCES "BulldozerStorageEngine"("keyPath") + ON DELETE CASCADE + ) + `; + await sql`CREATE INDEX "BulldozerStorageEngine_keyPathParent_idx" ON "BulldozerStorageEngine"("keyPathParent")`; + await sql` + INSERT INTO "BulldozerStorageEngine" ("keyPath", "value") + VALUES + (ARRAY[]::jsonb[], 'null'::jsonb), + (ARRAY[to_jsonb('table'::text)]::jsonb[], 'null'::jsonb) + `; + await sql` + CREATE TABLE "BulldozerTimeFoldQueue" ( + "id" UUID NOT NULL DEFAULT gen_random_uuid(), + "tableStoragePath" JSONB[] NOT NULL, + "groupKey" JSONB NOT NULL, + "rowIdentifier" TEXT NOT NULL, + "scheduledAt" TIMESTAMPTZ NOT NULL, + "stateAfter" JSONB NOT NULL, + "rowData" JSONB NOT NULL, + "reducerSql" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "BulldozerTimeFoldQueue_pkey" PRIMARY KEY ("id"), + CONSTRAINT "BulldozerTimeFoldQueue_table_group_row_key" UNIQUE ("tableStoragePath", "groupKey", "rowIdentifier") + ) + `; + await sql` + CREATE INDEX "BulldozerTimeFoldQueue_scheduledAt_idx" + ON "BulldozerTimeFoldQueue"("scheduledAt") + `; + await sql` + CREATE TABLE "BulldozerTimeFoldMetadata" ( + "key" TEXT PRIMARY KEY, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "lastProcessedAt" TIMESTAMPTZ NOT NULL + ) + `; + await sql` + INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt") + VALUES ('singleton', now()) + `; + await sql` + CREATE TABLE "BulldozerTimeFoldDownstreamCascade" ( + "tableStoragePath" JSONB[] NOT NULL, + "cascadeInputName" TEXT NOT NULL, + "cascadeTemplate" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath") + ) + `; + // Install the rewritten process_queue function body from the cascade + // migration. + await sql.unsafe(PROCESS_QUEUE_FN_SQL); + }, HOOK_TIMEOUT_MS); + + afterAll(async () => { + await sql.end(); + await adminSql.unsafe(` + SELECT pg_terminate_backend(pg_stat_activity.pid) + FROM pg_stat_activity + WHERE pg_stat_activity.datname = '${dbName}' + AND pid <> pg_backend_pid() + `); + await adminSql.unsafe(`DROP DATABASE IF EXISTS ${dbName}`); + await adminSql.end(); + }); + + // Reducer that emits {phase:'initial'} inline and schedules a past-due tick + // that emits {phase:'scheduled'}. The inline recursion stops because + // nextTimestamp > lastProcessedAt (we back lastProcessedAt off below). + // After process_queue runs, the scheduled-tick emission must propagate. + const splitPhaseReducerSql = ` + CASE WHEN "timestamp" IS NULL THEN 1 ELSE 2 END AS "newState", + jsonb_build_array( + jsonb_build_object( + 'phase', CASE WHEN "timestamp" IS NULL THEN 'initial' ELSE 'scheduled' END, + 'team', "oldRowData"->'team', + 'value', (("oldRowData"->>'value')::int) + ) + ) AS "newRowsData", + CASE + WHEN "timestamp" IS NULL THEN (now() - interval '1 second') + ELSE NULL::timestamptz + END AS "nextTimestamp" + `; + + // ──────────────────────────────────────────────────────────────────── + // Test 1: single filter downstream + // ──────────────────────────────────────────────────────────────────── + test("process_queue propagates emissions to a single downstream filter", async () => { + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-filter-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-filter-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-filter-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-filter-u-scheduled-only", + fromTable: timeFoldTable, + filter: predicate(`"rowData"->>'phase' = 'scheduled'`), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + // Back the clock up before any setRow so the inline recursion bails. + // Use pre-epoch time so the reducer's scheduled tick (now()-1s) stays + // ahead of lastProcessedAt regardless of wall-clock drift. + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + + await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":7}'::jsonb`))); + + expect(await countQueueRows()).toBe(1); + const filteredBeforeDrain = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(filteredBeforeDrain).toEqual([]); + + await setLastProcessedAt(`now()`); + await processQueue(); + + const filteredAfterDrain = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + // Filter wraps rows through an internal flat-map that appends a + // flatIndex to the rowIdentifier (":1" for the single-element array). + // That's an implementation detail, not something to assert on. + expect(filteredAfterDrain.map((row) => row.rowdata)).toEqual([ + { phase: "scheduled", team: "alpha", value: 7 }, + ]); + expect(await countQueueRows()).toBe(0); + }); + + // ──────────────────────────────────────────────────────────────────── + // Test 2: multi-stage cascade (timefold → filter → map → map) + // Exercises transitive propagation across three downstream stages. + // ──────────────────────────────────────────────────────────────────── + test("process_queue propagates emissions through filter → map → map", async () => { + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-multistage-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-multistage-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-multistage-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-multistage-u-scheduled-only", + fromTable: timeFoldTable, + filter: predicate(`"rowData"->>'phase' = 'scheduled'`), + }); + const mappedTable = declareMapTable({ + tableId: "queue-cascade-multistage-u-mapped", + fromTable: filteredTable, + mapper: mapper(` + ("rowData"->'team') AS "team", + (("rowData"->>'value')::int * 10) AS "valueTimesTen" + `), + }); + const reMappedTable = declareMapTable({ + tableId: "queue-cascade-multistage-u-remapped", + fromTable: mappedTable, + mapper: mapper(` + ("rowData"->'team') AS "team", + (("rowData"->>'valueTimesTen')::int + 1) AS "valueTimesTenPlusOne" + `), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + await runStatements(mappedTable.init()); + await runStatements(reMappedTable.init()); + + await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":3}'::jsonb`))); + await runStatements(fromTable.setRow("u2", expr(`'{"team":"alpha","value":5}'::jsonb`))); + + expect(await countQueueRows()).toBe(2); + for (const table of [filteredTable, mappedTable, reMappedTable]) { + const rows = await readRows(table.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(rows).toEqual([]); + } + + await setLastProcessedAt(`now()`); + await processQueue(); + + const filtered = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(filtered.map((r) => r.rowdata).sort((a, b) => + Number(Reflect.get(a as object, "value")) - Number(Reflect.get(b as object, "value")) + )).toEqual([ + { phase: "scheduled", team: "alpha", value: 3 }, + { phase: "scheduled", team: "alpha", value: 5 }, + ]); + + const mapped = await readRows(mappedTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(mapped.map((r) => r.rowdata).sort((a, b) => + Number(Reflect.get(a as object, "valueTimesTen")) - Number(Reflect.get(b as object, "valueTimesTen")) + )).toEqual([ + { team: "alpha", valueTimesTen: 30 }, + { team: "alpha", valueTimesTen: 50 }, + ]); + + const reMapped = await readRows(reMappedTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(reMapped.map((r) => r.rowdata).sort((a, b) => + Number(Reflect.get(a as object, "valueTimesTenPlusOne")) - Number(Reflect.get(b as object, "valueTimesTenPlusOne")) + )).toEqual([ + { team: "alpha", valueTimesTenPlusOne: 31 }, + { team: "alpha", valueTimesTenPlusOne: 51 }, + ]); + }); + + // ──────────────────────────────────────────────────────────────────── + // Test 3: inline path and queue path produce identical downstream state + // ──────────────────────────────────────────────────────────────────── + test("inline-drain and queue-drain produce identical downstream state", async () => { + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-parity-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-parity-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-parity-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-parity-u-scheduled-only", + fromTable: timeFoldTable, + filter: predicate(`"rowData"->>'phase' = 'scheduled'`), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + + // Customer "inline": lastProcessedAt way ahead → tick fires inline. + await setLastProcessedAt(`'2099-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + await runStatements(fromTable.setRow("inline-u1", expr(`'{"team":"inline","value":9}'::jsonb`))); + expect(await countQueueRows()).toBe(0); + + // Customer "queue": lastProcessedAt behind → tick queued. + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(fromTable.setRow("queue-u1", expr(`'{"team":"queue","value":9}'::jsonb`))); + expect(await countQueueRows()).toBe(1); + + await setLastProcessedAt(`now()`); + await processQueue(); + expect(await countQueueRows()).toBe(0); + + const inlineRows = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('inline'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + const queueRows = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('queue'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + + const normalize = (rows: ReadonlyArray>) => + rows.map((row) => { + const rowIdentifier = row.rowidentifier; + if (typeof rowIdentifier !== "string") throw new Error("expected string rowidentifier"); + const rowData = row.rowdata; + if (rowData == null || typeof rowData !== "object") throw new Error("expected object rowdata"); + return { + rowIdentifierSuffix: rowIdentifier.replace(/^(inline|queue)-u1:/, "u1:"), + rowData: { + ...(rowData as Record), + team: "", + }, + }; + }); + + expect(normalize([...queueRows])).toEqual(normalize([...inlineRows])); + expect([...inlineRows]).not.toEqual([]); + }); + + // ──────────────────────────────────────────────────────────────────── + // Test 4: idempotency — redraining with no new rows is a no-op + // ──────────────────────────────────────────────────────────────────── + test("process_queue is idempotent when there is nothing new to drain", async () => { + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-idempotency-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-idempotency-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-idempotency-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-idempotency-u-scheduled-only", + fromTable: timeFoldTable, + filter: predicate(`"rowData"->>'phase' = 'scheduled'`), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + + await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":4}'::jsonb`))); + await setLastProcessedAt(`now()`); + + await processQueue(); + const afterFirstDrain = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(afterFirstDrain).toHaveLength(1); + + // Second drain: no due queue rows. No-op at every layer. + await processQueue(); + const afterSecondDrain = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(afterSecondDrain).toEqual(afterFirstDrain); + expect(await countQueueRows()).toBe(0); + }); + + // ──────────────────────────────────────────────────────────────────── + // Test 5: deleting a downstream table must NOT wedge process_queue. + // + // The cascade template is compiled at upstream-init() time and + // references the downstream's storage paths. If the downstream is + // .delete()d while the upstream still has queue rows pending, the + // drain must still succeed without a FK violation. The safety comes + // from every trigger's first statement carrying a + // `WHERE isInitializedExpression` clause that short-circuits the rest + // of the pipeline when the downstream's metadata row is absent. The + // queue-drain cascade inherits the same statements verbatim, so it + // inherits the same safety — this test pins that invariant down. + // ──────────────────────────────────────────────────────────────────── + test("process_queue does not wedge when a downstream table is deleted", async () => { + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-deleted-downstream-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-deleted-downstream-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-deleted-downstream-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-deleted-downstream-u-scheduled-only", + fromTable: timeFoldTable, + filter: predicate(`"rowData"->>'phase' = 'scheduled'`), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + + await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":7}'::jsonb`))); + expect(await countQueueRows()).toBe(1); + + // At this point the inline setRow path has already propagated the + // {phase:initial} row all the way through to the filter's storage. + // Blow the filter's storage away while the {phase:scheduled} tick + // is still queued — the upstream's cascade template was compiled + // referencing the filter's paths and is NOT updated by .delete(), + // so this is the delete-before-drain scenario. + await runStatements(filteredTable.delete()); + + await setLastProcessedAt(`now()`); + // If the cascade template's WHERE-gated statements didn't no-op on + // missing downstream storage, this would throw an FK violation and + // the queue would stay wedged forever. + await processQueue(); + + expect(await countQueueRows()).toBe(0); + + // The timefold's own state must reflect both emissions (the inline + // {initial} from before the delete and the queue-drained {scheduled} + // from after). A wedged drain would roll everything back and leave + // the timefold with only the initial row. + const timefoldRows = await readRows(timeFoldTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + const phases = timefoldRows.map((row) => (row.rowdata as { phase: string }).phase).sort(); + expect(phases).toEqual(["initial", "scheduled"]); + }); + + // ──────────────────────────────────────────────────────────────────── + // Test 6: statements whose SQL contains `$tf_cascade$` as a literal + // substring must not prematurely close the outer cascade DO-block. + // + // The cascade template is wrapped in `DO $tf_cascade$ ... $tf_cascade$`. + // If any embedded statement happens to include that literal — e.g. in + // a SQL comment or a user-provided expression — the outer dollar quote + // closes mid-body and EXECUTE fails at parse time. + // ──────────────────────────────────────────────────────────────────── + test("process_queue tolerates downstream SQL containing the cascade dollar-quote delimiter", async () => { + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-dollar-collision-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-dollar-collision-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-dollar-collision-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + // The filter predicate embeds the literal `$tf_cascade$` in a way + // that always evaluates true. This text flows through + // `collectRowChangeTriggerStatements` into the stored cascade + // template verbatim, so if the outer dollar-quoting is not robust, + // parse time EXECUTE inside process_queue will fail. + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-dollar-collision-u-all", + fromTable: timeFoldTable, + filter: predicate(`('$tf_cascade$' IS NOT NULL) OR ("rowData"->>'phase' = 'scheduled')`), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + + await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":11}'::jsonb`))); + expect(await countQueueRows()).toBe(1); + + await setLastProcessedAt(`now()`); + // If the outer dollar-quote delimiter collided with the embedded + // `$tf_cascade$` string, EXECUTE raises `syntax error at or near ...` + // and the entire drain rolls back (queue stays at 1). The fix + // (`chooseSafeDollarQuoteTag`) randomizes the outer tag per call, so + // the embedded literal is just another string in the body. + await processQueue(); + + expect(await countQueueRows()).toBe(0); + + // Predicate evaluates to always-true thanks to the first disjunct, + // so both the inline-emitted {initial} row and the queue-drained + // {scheduled} row make it through. The point of the assertion is + // that the cascade ran and wrote the scheduled row at all — if the + // delimiter collision had broken the DO block, we'd see only + // {initial} (written synchronously at setRow time, before the queue + // drain). + const filteredRows = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + const phases = filteredRows.map((row) => (row.rowdata as { phase: string }).phase).sort(); + expect(phases).toEqual(["initial", "scheduled"]); + }); + + // ──────────────────────────────────────────────────────────────────── + // when process_queue() can't find a cascade + // registry row for a timefold, it must DEFER that timefold's queue + // rows (leave them queued) instead of draining them and silently + // skipping the downstream cascade. + // + // Why this matters — a concrete example of the failure mode: + // + // 1. This migration (20260417000000) creates the registry table + // BulldozerTimeFoldDownstreamCascade. It's empty at first. + // + // 2. The backend starts up. `declareTimeFoldTable.init()` runs for + // every timefold and upserts one row per timefold into the + // registry, storing the pre-compiled cascade SQL template. + // + // 3. There is a short gap between (1) and (2). During that gap, + // pg_cron keeps calling process_queue() every second. + // + // 4. If a due queue row exists for a timefold whose registry row + // hasn't been upserted yet, and process_queue() drains it + // anyway without a cascade to run, then: + // + // - the queue row is gone, + // - the timefold's own state is updated, + // - but NONE of the downstream filters/maps/LFolds hear + // about it, and there's no queue row left for a future + // tick to retry. + // + // → downstream tables are permanently desynchronized from + // the timefold. + // + // The right behavior: no registry row → do nothing for this + // timefold this tick. The queue row stays queued. Once init() + // finishes and the registry row appears, the next pg_cron tick + // drains with cascade intact. + // + // This test simulates the deploy-window gap by deleting the + // registry row after init() has run (so we know the rest of the + // pipeline is set up), then calling process_queue() and asserting + // that nothing silently advanced. + // ──────────────────────────────────────────────────────────────────── + test("process_queue defers a timefold whose cascade registry row is missing", async () => { + // ---- Setup: build a small timefold pipeline ---- + // + // The chain is: source data → group by team → timefold (recurses + // through time) → filter (keeps only phase=scheduled rows). + // + // Calling each table's init() wires up its storage. The + // timeFoldTable's init() is the one that inserts the registry + // row we care about — it stores the cascade template that + // process_queue() looks up at drain time. + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-missing-registry-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-missing-registry-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-missing-registry-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-missing-registry-u-scheduled-only", + fromTable: timeFoldTable, + filter: predicate(`"rowData"->>'phase' = 'scheduled'`), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + // Backdate the "last processed" clock so any future tick the + // reducer schedules looks like it's still in the future when + // setRow fires. That way the {scheduled} emission gets QUEUED + // instead of running inline. + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + + // ---- Generate one inline emission + one queued emission ---- + // + // splitPhaseReducerSql is written so: + // - First call (timestamp=NULL) emits {phase:'initial'} and + // schedules a future tick. + // - The future tick (when drained) emits {phase:'scheduled'}. + // + // setRow fires the inline path: {initial} flows through the + // whole chain right now. The filter predicate + // `"rowData"->>'phase' = 'scheduled'` rejects {initial}, so the + // filter stays empty. Meanwhile the scheduled tick lands in + // BulldozerTimeFoldQueue for later. + await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":7}'::jsonb`))); + expect(await countQueueRows()).toBe(1); + + // ---- Simulate the deploy-window gap ---- + // + // We just saw init() upsert a registry row above. To mimic the + // case where init() hasn't run yet (migration applied but + // backend hasn't reached init() for this timefold), delete + // every row in the registry table by hand. Now it looks + // identical to the fresh-after-migration state. + await sql.unsafe(`DELETE FROM "BulldozerTimeFoldDownstreamCascade"`); + + // ---- Run process_queue() as pg_cron would ---- + // + // Advance the clock past the queued tick's scheduledAt so it's + // due, then drain. + await setLastProcessedAt(`now()`); + await processQueue(); + + // ---- Assert we deferred instead of silently losing state ---- + // + // (1) The queue row is still there. process_queue() saw that + // the registry had no row for this timefold and said "I + // don't know which cascade to run, so I'll leave this for + // the next tick." Nothing was drained, nothing was + // skipped. + expect(await countQueueRows()).toBe(1); + + // (2) The timefold's own state wasn't advanced by the drain. + // Only the inline-emitted {initial} row is visible; the + // {scheduled} row is still sitting in the queue waiting + // for a tick with a registered cascade to process it. + // + // Contrast with the buggy (pre-fix) behavior: the timefold + // would have had BOTH "initial" and "scheduled" here, with + // the filter permanently missing "scheduled". + const timefoldRows = await readRows(timeFoldTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(timefoldRows.map(rowPhase).sort()).toEqual(["initial"]); + + // (3) The filter is empty, which is the correct steady state: + // the inline setRow's {initial} was filtered out, and the + // {scheduled} row hasn't been drained yet. No partial + // writes, no orphan rows. Once init() runs and the + // registry is populated, the next pg_cron tick will drain + // the queue row and propagate {scheduled} into this + // filter. + const filterRows = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(filterRows).toEqual([]); + }); +}); diff --git a/apps/backend/src/lib/payments/schema/__tests__/integration-1-3-queue-drained.test.ts b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3-queue-drained.test.ts new file mode 100644 index 0000000000..73bf11559f --- /dev/null +++ b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3-queue-drained.test.ts @@ -0,0 +1,292 @@ +/** + * Queue-drained variant of the phase 1→3 integration tests: subscription + * lifecycle events scheduled in the future (sub-end on + * `cancelAtPeriodEnd`, monthly item-grant-repeat ticks) defer to the + * BulldozerTimeFoldQueue and must propagate through the downstream cascade + * — events → transactions → itemQuantities / ownedProducts — when drained + * by `public.bulldozer_timefold_process_queue()` (the pg_cron path). + * + * The sibling `integration-1-3.test.ts` seeds lastProcessedAt = 2099 so + * every scheduled tick fires inline at setRow time and the queue is never + * exercised. These tests instead keep lastProcessedAt at the present, let + * future ticks stay queued, then advance the clock and invoke the drain + * function — mirroring real pg_cron behaviour. + */ + +import { afterAll, beforeAll, describe, expect, it } from "vitest"; +import { createPaymentsSchema } from "../index"; +import { createTestDb, jsonbExpr } from "./test-helpers"; + +const DAY_MS = 86400000; +const MONTH_MS = 2592000000; + +describe.sequential("payments schema integration phase 1→3, queue-drained path (real postgres)", () => { + // Clock starts at now() so that future scheduledAt values stay queued + // (vs the default `createTestDb` behaviour of lastProcessedAt = 2099, + // which would fire every tick inline). `installProcessQueueFn` installs + // the rewritten process_queue function body from the cascade migration + // so `processQueue()` exercises the real prod function. + const db = createTestDb({ + lastProcessedAt: "now()", + installProcessQueueFn: true, + }); + const { runStatements, readRows, setLastProcessedAt, processQueue, countQueueRows } = db; + const schema = createPaymentsSchema(); + + const getRowDatas = async (table: { listRowsInGroup: (opts: any) => any }) => { + const rows = await readRows(table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + return rows.map((r: any) => r.rowdata); + }; + + beforeAll(async () => { + await db.setup(); + for (const table of schema._allTables) { + await runStatements(table.init()); + } + }, 120_000); + + afterAll(async () => { + await db.teardown(); + }); + + + // ============================================================ + // Test 6: mid-period upgrade with pg_cron-drained end event + // + // Free sub ending mid-period + team sub starting the next day. + // With lastProcessedAt in the recent past, the free sub's end event + // gets QUEUED (not fired inline). Once drained, downstream ledgers + // must reflect: only team's 500 emails, not 100 + 500 = 600. + // ============================================================ + describe("mid-period upgrade with queue-drained end event", () => { + it("queues the subscription-end event instead of firing inline", async () => { + // subscription-timefold-algo derives nextTimestamp from millis fields + // like endedAtMillis / repeat intervals. These are raw epoch millis, + // so they map to ~1970. To make the inline recursion's + // `nextTimestamp > lastProcessedAt` check hold (= "defer to queue"), + // we set lastProcessedAt to pre-epoch. + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + + await runStatements(schema.subscriptions.setRow("sub-q-free", jsonbExpr({ + id: "sub-q-free", + tenancyId: "t1", + customerId: "u-q-upgrade", + customerType: "user", + productId: "prod-q-free", + priceId: "p-free", + product: { + displayName: "Free (queued)", + customerType: "user", + productLineId: "line-q-upgrade", + prices: { "p-free": { USD: "0" } }, + includedItems: { + emails: { quantity: 100, repeat: [1, "month"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "canceled", + currentPeriodStartMillis: 0, + currentPeriodEndMillis: MONTH_MS, + cancelAtPeriodEnd: false, + canceledAtMillis: 10 * DAY_MS, + endedAtMillis: 10 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 0, + }))); + + // The subscription-end tick is scheduled at `endedAtMillis` which is + // > lastProcessedAt (10 days from epoch > "one hour ago"). It must + // therefore be queued, not emitted inline. + const queued = await countQueueRows(); + expect(queued).toBeGreaterThan(0); + + const endEventsBeforeDrain = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId === "sub-q-free"); + expect(endEventsBeforeDrain).toEqual([]); + }); + + it("after drain, end event fires AND downstream cascade runs; upgrade does not stack", async () => { + // Now bring the team sub online. + await runStatements(schema.subscriptions.setRow("sub-q-team", jsonbExpr({ + id: "sub-q-team", + tenancyId: "t1", + customerId: "u-q-upgrade", + customerType: "user", + productId: "prod-q-team", + priceId: "p-team", + product: { + displayName: "Team (queued)", + customerType: "user", + productLineId: "line-q-upgrade", + prices: { "p-team": { USD: "30" } }, + includedItems: { + emails: { quantity: 500, repeat: [1, "month"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "canceled", + currentPeriodStartMillis: 11 * DAY_MS, + currentPeriodEndMillis: 11 * DAY_MS + MONTH_MS, + cancelAtPeriodEnd: true, + canceledAtMillis: 20 * DAY_MS, + endedAtMillis: 20 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 11 * DAY_MS, + }))); + + // Bump clock far enough forward to make every queued tick due, then drain. + await setLastProcessedAt(`'2099-01-01T00:00:00Z'`); + await processQueue(); + expect(await countQueueRows()).toBe(0); + + const endEvents = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId === "sub-q-free"); + expect(endEvents).toHaveLength(1); + + const transactions = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.customerId === "u-q-upgrade"); + const endTxns = transactions.filter((t: any) => t.txnId === "sub-end:sub-q-free"); + expect(endTxns).toHaveLength(1); + + const itemQuantityRows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-q-upgrade") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + + const atTeamStart = itemQuantityRows.find((r: any) => r.txnId === "sub-start:sub-q-team"); + expect(atTeamStart).toBeDefined(); + // Stacking regression: without downstream-cascade propagation on the + // queue path, sub-free's end event never fires the cascade, so + // atTeamStart.emails accumulates free.100 + team.500 = 600 instead of + // the team-only 500. + expect(atTeamStart.itemQuantities.emails).toBe(500); + }); + }); + + + // ============================================================ + // Test 7: monthly repeat reset via pg_cron + // + // Sub with a monthly-repeating quota item. Between sub-start and the + // first repeat, balance must reflect the initial grant. After pg_cron + // drains the repeat tick, balance must reflect the REFRESHED grant + // (not doubled, not zero). + // ============================================================ + describe("monthly repeat reset via queue drain", () => { + it("repeat tick is queued until the clock is advanced past it", async () => { + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + + await runStatements(schema.subscriptions.setRow("sub-q-repeat", jsonbExpr({ + id: "sub-q-repeat", + tenancyId: "t1", + customerId: "u-q-repeat", + customerType: "user", + productId: "prod-q-repeat", + priceId: "p1", + product: { + displayName: "Repeat (queued)", + customerType: "user", + productLineId: "line-q-repeat", + prices: { p1: { USD: "10" } }, + includedItems: { + quota: { quantity: 200, repeat: [1, "month"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "active", + currentPeriodStartMillis: 0, + currentPeriodEndMillis: MONTH_MS, + cancelAtPeriodEnd: false, + canceledAtMillis: null, + endedAtMillis: 45 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 0, + }))); + + expect(await countQueueRows()).toBeGreaterThan(0); + + const initialRows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-q-repeat") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + const atStart = initialRows.find((r: any) => r.txnId === "sub-start:sub-q-repeat"); + expect(atStart).toBeDefined(); + expect(atStart.itemQuantities.quota).toBe(200); + }); + + it("after drain, quota is refreshed (not doubled, not stuck)", async () => { + // Advance clock past all repeats (and past endedAt). + await setLastProcessedAt(`'2099-01-01T00:00:00Z'`); + await processQueue(); + expect(await countQueueRows()).toBe(0); + + const rows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-q-repeat") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + + // After every repeat and the final subscription-end, quota must drop + // to 0 — both repeat grants and the final one expire. Without the + // downstream cascade on the queue path, queued events never reach + // itemQuantities and this stays at the initial 200 (or whatever the + // inline path wrote at setRow time). + const latest = rows[rows.length - 1]; + expect(latest.itemQuantities.quota).toBe(0); + + // Between repeats, there should be exactly one active repeat grant at + // a time — never doubled. The queue drain is expected to emit at + // least one `igr:sub-q-repeat:*` row into itemQuantities (the + // 30-day repeat fires once between sub-start at t=0 and sub-end at + // t=45d). Asserting its presence unconditionally is the whole point + // of this test — the original regression was that the cascade + // dropped these emissions entirely, so a `find() == null` must fail + // loud, not silently skip. + const midPeriodRow = rows.find((r: any) => + r.txnId?.startsWith("igr:sub-q-repeat:") && r.itemQuantities?.quota != null + ); + expect( + midPeriodRow, + "expected at least one igr:sub-q-repeat:* row in itemQuantities; if this is null the queue-drain cascade dropped the repeat emission entirely", + ).toBeDefined(); + expect(midPeriodRow.itemQuantities.quota).toBe(200); + }); + }); + + + // ============================================================ + // Test 8: re-drain idempotency at the payments layer + // + // Draining twice with nothing new in between must not duplicate + // subscription-end events, transactions, or item-quantity changes. + // ============================================================ + describe("re-drain idempotency at the payments layer", () => { + it("second process_queue call with no new queue rows is a no-op", async () => { + // Snapshot counts after prior tests' drains. + const endEventsBefore = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId?.startsWith("sub-q-")); + const endTxnsBefore = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.txnId?.startsWith("sub-end:sub-q-")); + const itemQuantitiesBefore = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId?.startsWith("u-q-")); + + expect(await countQueueRows()).toBe(0); + await processQueue(); + + const endEventsAfter = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId?.startsWith("sub-q-")); + const endTxnsAfter = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.txnId?.startsWith("sub-end:sub-q-")); + const itemQuantitiesAfter = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId?.startsWith("u-q-")); + + expect(endEventsAfter).toHaveLength(endEventsBefore.length); + expect(endTxnsAfter).toHaveLength(endTxnsBefore.length); + expect(itemQuantitiesAfter).toHaveLength(itemQuantitiesBefore.length); + }); + }); +}); diff --git a/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts index 9e95f4730c..784b287b09 100644 --- a/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts +++ b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts @@ -894,44 +894,34 @@ describe.sequential("payments schema integration phase 1→3 (real postgres)", ( // ============================================================ - // Regression: the subscription-timefold reducer built `currentMillis` as - // NUMERIC. PG 12+ `EXTRACT(EPOCH ...)` returns NUMERIC with scale 6, so - // the value serialized into JSONB as e.g. "604800000.000000". Downstream - // transactions.ts builds igr txn IDs via `->>effectiveAtMillis` → the - // decimal-tailed form. The same reducer, when producing - // `item-quantity-expire.adjustedTransactionId`, used `::bigint::text` → - // the decimal-free form. Mismatch → a subscription-end that follows an - // item-grant-repeat could not find the igr's grant to expire, so the - // customer's `when-repeated` balance silently stayed at the last-granted - // quantity instead of dropping to 0. - // - // Fix: cast `currentMillis` to bigint at the root so both paths produce - // byte-identical references. This test exercises the full inline - // lifecycle: sub-start → item-grant-repeat → subscription-end, and - // asserts the final itemQuantities row drops to 0 and the txn-ID formats - // match on both sides of the reference. + // Full when-repeated lifecycle: sub-start → item-grant-repeat → sub-end. + // item-quantity-expire entries in the sub-end transaction reference the + // preceding item-grant-repeat by txn id. Both the id text and the + // reference text must match byte-for-byte or the expire silently fails + // to resolve the grant and the `when-repeated` balance stays at the + // last-granted quantity instead of dropping to 0. // ============================================================ describe("item-quantity-expire resolves across item-grant-repeat → sub-end", () => { const DAY_MS = 86400000; beforeAll(async () => { - await runStatements(schema.subscriptions.setRow("sub-bigint-repeat", jsonbExpr({ - id: "sub-bigint-repeat", + await runStatements(schema.subscriptions.setRow("sub-repeat-to-end", jsonbExpr({ + id: "sub-repeat-to-end", tenancyId: "t1", - customerId: "u-bigint", + customerId: "u-repeat-to-end", customerType: "user", - productId: "prod-bigint-repeat", + productId: "prod-repeat-to-end", priceId: "p1", product: { - displayName: "Bigint Repeat Plan", + displayName: "Repeat Then End Plan", customerType: "user", - productLineId: "line-bigint-repeat", + productLineId: "line-repeat-to-end", prices: { p1: { USD: "10" } }, includedItems: { - // Repeat at exactly 7 days — a whole-second epoch offset that - // amplifies the NUMERIC scale-6 artifact (604800000 vs - // 604800000.000000 in JSONB). + // 7-day repeat interval is an exact whole-second epoch offset, + // which is where subtle NUMERIC-vs-bigint mismatches around + // `->>effectiveAtMillis` tend to surface. quota: { quantity: 100, repeat: [7, "day"], expires: "when-repeated" }, }, }, @@ -952,27 +942,28 @@ describe.sequential("payments schema integration phase 1→3 (real postgres)", ( it("item-grant-repeat transaction id has no trailing decimals", async () => { const txns = (await getRowDatas(schema.transactions)) - .filter((t: any) => t.customerId === "u-bigint"); + .filter((t: any) => t.customerId === "u-repeat-to-end"); const igr = txns.find((t: any) => - typeof t.txnId === "string" && t.txnId.startsWith("igr:sub-bigint-repeat:") + typeof t.txnId === "string" && t.txnId.startsWith("igr:sub-repeat-to-end:") ); expect(igr).toBeDefined(); - // The txn id is built by transactions.ts from the event's - // effectiveAtMillis (which flows through JSONB as the NUMERIC value - // from the reducer's currentMillis). Any trailing ".000000" here is - // the bug 3 fingerprint. - expect(igr.txnId).toMatch(/^igr:sub-bigint-repeat:\d+$/); + // transactions.ts derives this id from the event's effectiveAtMillis + // via `->>`. If that value was stored in JSONB as a NUMERIC with + // fractional scale (e.g. "604800000.000000") the id text picks up + // the trailing zeros and no longer matches references built via + // `::bigint::text` elsewhere in the reducer. + expect(igr.txnId).toMatch(/^igr:sub-repeat-to-end:\d+$/); expect(igr.txnId).not.toContain("."); }); it("sub-end's item-quantity-expire adjustedTransactionId matches the igr txn id", async () => { const txns = (await getRowDatas(schema.transactions)) - .filter((t: any) => t.customerId === "u-bigint"); + .filter((t: any) => t.customerId === "u-repeat-to-end"); const igr = txns.find((t: any) => - typeof t.txnId === "string" && t.txnId.startsWith("igr:sub-bigint-repeat:") + typeof t.txnId === "string" && t.txnId.startsWith("igr:sub-repeat-to-end:") ); - const subEnd = txns.find((t: any) => t.txnId === "sub-end:sub-bigint-repeat"); + const subEnd = txns.find((t: any) => t.txnId === "sub-end:sub-repeat-to-end"); expect(igr).toBeDefined(); expect(subEnd).toBeDefined(); @@ -980,22 +971,24 @@ describe.sequential("payments schema integration phase 1→3 (real postgres)", ( e.type === "item-quantity-expire" && e.itemId === "quota" ); expect(expireEntry).toBeDefined(); - // Before the fix the expire referenced `igr:sub-bigint-repeat:604800000` - // while the actual igr txn was `igr:sub-bigint-repeat:604800000.000000` - // — same value, different text → no match → quota stuck. + // The two texts must be byte-identical for the expire to resolve + // the grant. Same value in different representations (e.g. + // "604800000" vs "604800000.000000") is the failure mode this + // guards against. expect(expireEntry.adjustedTransactionId).toBe(igr.txnId); }); it("quota balance drops to 0 after sub-end resolves the igr's grant", async () => { const rows = (await getRowDatas(schema.itemQuantities)) - .filter((r: any) => r.customerId === "u-bigint") + .filter((r: any) => r.customerId === "u-repeat-to-end") .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); expect(rows.length).toBeGreaterThan(0); const latest = rows[rows.length - 1]; - // With bug 3 present, expire can't find the grant and quota stays at - // 100 (the last igr-granted quantity). With the fix, the expire - // resolves and quota drops to 0. + // If the expire ref mismatches the igr txn id, the expire silently + // becomes a no-op and quota stays at the last igr-granted quantity + // (100). When the ids match, sub-end's expire resolves and the + // ledger drops to 0. expect(latest.itemQuantities.quota).toBe(0); }); }); diff --git a/apps/backend/src/lib/payments/schema/__tests__/test-helpers.ts b/apps/backend/src/lib/payments/schema/__tests__/test-helpers.ts index 6a55938e0d..d8d88c1235 100644 --- a/apps/backend/src/lib/payments/schema/__tests__/test-helpers.ts +++ b/apps/backend/src/lib/payments/schema/__tests__/test-helpers.ts @@ -6,6 +6,9 @@ * with no leftover state. */ +import { readFileSync } from "node:fs"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; import postgres from "postgres"; import { toExecutableSqlTransaction, toQueryableSqlQuery } from "@/lib/bulldozer/db/index"; @@ -21,13 +24,64 @@ function getConnectionString(): string { return connectionString; } +/** + * Extracts `CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue` + * from the cascade migration file so tests can install the real prod + * function body. Scoped here (rather than duplicated across test files) + * so there's one place to update if the migration's comment markers + * change. + */ +function loadProcessQueueFunctionSql(): string { + const here = dirname(fileURLToPath(import.meta.url)); + const migrationPath = join( + here, + "..", "..", "..", "..", "..", + "prisma", + "migrations", + "20260417000000_bulldozer_timefold_downstream_cascade", + "migration.sql", + ); + const raw = readFileSync(migrationPath, "utf8"); + const block = raw + .split("-- SPLIT_STATEMENT_SENTINEL") + .map((s) => s.replaceAll("-- SINGLE_STATEMENT_SENTINEL", "").trim()) + .find((s) => s.startsWith("CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue")); + if (block == null) { + throw new Error("could not locate bulldozer_timefold_process_queue function body in cascade migration"); + } + return block.replace(/;$/, ""); +} + +export type CreateTestDbOptions = { + /** + * SQL expression used to seed `BulldozerTimeFoldMetadata.lastProcessedAt` + * at setup time. The default (`'2099-01-01T00:00:00Z'::timestamptz`) puts + * the metadata clock far in the future so every timefold tick fires + * inline at `setRow` time — this is what most payments tests rely on. + * + * Set this to `now()` (or a pre-epoch timestamp) to exercise the queued + * path where future ticks defer to `bulldozer_timefold_process_queue()`. + */ + lastProcessedAt?: string, + /** + * When true, also installs `public.bulldozer_timefold_process_queue()` + * from the cascade migration so callers can invoke `processQueue()` + * against a real prod-shape function body. Default: false (inline-path + * tests don't need it). + */ + installProcessQueueFn?: boolean, +}; + /** * Creates an isolated test database. Call `setup()` in beforeAll and * `teardown()` in afterAll. Access `runStatements` / `readRows` after setup. * * Follows the same pattern as apps/backend/src/lib/bulldozer/db/index.test.ts. */ -export function createTestDb() { +export function createTestDb(options: CreateTestDbOptions = {}) { + const lastProcessedAtExpression = options.lastProcessedAt ?? `'2099-01-01T00:00:00Z'::timestamptz`; + const installProcessQueueFn = options.installProcessQueueFn ?? false; + const connectionString = getConnectionString(); const base = connectionString.replace(/\/[^/]*(\?.*)?$/, ""); const queryString = connectionString.split("?")[1] ?? ""; @@ -53,6 +107,38 @@ export function createTestDb() { return await getSql().unsafe(toQueryableSqlQuery(query)); }, + /** + * Overwrites `BulldozerTimeFoldMetadata.lastProcessedAt` to the given + * SQL expression. Useful for tests that need to bump the clock forward + * (to make queued ticks due) or backward (to force future ticks into + * the queue). + */ + setLastProcessedAt: async (isoOrExpression: string) => { + await getSql().unsafe(` + UPDATE "BulldozerTimeFoldMetadata" + SET "lastProcessedAt" = (${isoOrExpression})::timestamptz, + "updatedAt" = now() + WHERE "key" = 'singleton' + `); + }, + + /** Invokes the real pg_cron drain entry point. Requires `installProcessQueueFn`. */ + processQueue: async () => { + if (!installProcessQueueFn) { + throw new Error( + "processQueue() requires createTestDb({ installProcessQueueFn: true })", + ); + } + await getSql().unsafe(`SELECT public.bulldozer_timefold_process_queue()`); + }, + + countQueueRows: async (): Promise => { + const rows = await getSql()>` + SELECT COUNT(*)::int AS "count" FROM "BulldozerTimeFoldQueue" + `; + return rows[0].count; + }, + setup: async () => { await adminSql.unsafe(`CREATE DATABASE ${dbName}`); _sql = postgres(dbUrl, { onnotice: () => undefined, max: 1 }); @@ -115,8 +201,25 @@ export function createTestDb() { `); await _sql.unsafe(` INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt") - VALUES ('singleton', '2099-01-01T00:00:00Z'::timestamptz) + VALUES ('singleton', ${lastProcessedAtExpression}) `); + // declareTimeFoldTable.init() upserts a cascade template here (see + // 20260417000000_bulldozer_timefold_downstream_cascade). The + // table must exist even when tests don't exercise the queue path, + // because init() runs the upsert unconditionally. + await _sql.unsafe(` + CREATE TABLE "BulldozerTimeFoldDownstreamCascade" ( + "tableStoragePath" JSONB[] NOT NULL, + "cascadeInputName" TEXT NOT NULL, + "cascadeTemplate" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath") + ) + `); + if (installProcessQueueFn) { + await _sql.unsafe(loadProcessQueueFunctionSql()); + } }, teardown: async () => { From b844eae233704defa2890da8504623973bcb6589 Mon Sep 17 00:00:00 2001 From: nams1570 Date: Sat, 18 Apr 2026 02:15:57 -0700 Subject: [PATCH 4/6] refactor: deduplication of code --- .../db/row-change-trigger-dispatch.ts | 10 ++- .../bulldozer/db/tables/time-fold-table.ts | 25 +++---- .../src/lib/bulldozer/db/test-sql-loaders.ts | 51 ++++++++++++++ .../db/timefold-queue-downstream.test.ts | 67 ++++++++----------- .../payments/schema/__tests__/test-helpers.ts | 32 +-------- 5 files changed, 99 insertions(+), 86 deletions(-) create mode 100644 apps/backend/src/lib/bulldozer/db/test-sql-loaders.ts diff --git a/apps/backend/src/lib/bulldozer/db/row-change-trigger-dispatch.ts b/apps/backend/src/lib/bulldozer/db/row-change-trigger-dispatch.ts index d4c45f1279..21e9686ba6 100644 --- a/apps/backend/src/lib/bulldozer/db/row-change-trigger-dispatch.ts +++ b/apps/backend/src/lib/bulldozer/db/row-change-trigger-dispatch.ts @@ -4,7 +4,15 @@ import { stringCompare } from "@stackframe/stack-shared/dist/utils/strings"; import type { SqlExpression, SqlStatement } from "./utilities"; import { quoteSqlIdentifier, quoteSqlStringLiteral, sqlQuery } from "./utilities"; -const CHANGE_OUTPUT_COLUMNS = '"groupKey" jsonb, "rowIdentifier" text, "oldRowSortKey" jsonb, "newRowSortKey" jsonb, "oldRowData" jsonb, "newRowData" jsonb'; +/** + * Column shape of every row-change changes-table flowing between tables + * in the bulldozer graph. One canonical source of truth for both the + * inline trigger dispatch (here), the queue-drain cascade (whose input + * is seeded to this same shape in `declareTimeFoldTable.init()`), and + * any downstream consumer that needs to describe a changes-table's + * columns for `jsonb_to_record(...)` etc. + */ +export const CHANGE_OUTPUT_COLUMNS = '"groupKey" jsonb, "rowIdentifier" text, "oldRowSortKey" jsonb, "newRowSortKey" jsonb, "oldRowData" jsonb, "newRowData" jsonb'; const ROW_CHANGE_DIAGNOSTIC_COLUMN_NAME = "__row_change_table_id"; export type ChangesTableExpression = SqlExpression<{ __brand: "$SQL_Table" }>; export type RowChangeTriggerDiagnostics = { diff --git a/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts b/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts index 6576162ace..7d56a12675 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts @@ -1,8 +1,12 @@ import { generateSecureRandomString } from "@stackframe/stack-shared/dist/utils/crypto"; -import type { Table } from ".."; -import { toCascadeSqlBlock } from ".."; -import type { RegisteredRowChangeTrigger } from "../row-change-trigger-dispatch"; -import { attachRowChangeTriggerMetadata, collectRowChangeTriggerStatements, normalizeRowChangeTrigger } from "../row-change-trigger-dispatch"; +import { toCascadeSqlBlock, type Table } from ".."; +import { + attachRowChangeTriggerMetadata, + CHANGE_OUTPUT_COLUMNS, + collectRowChangeTriggerStatements, + normalizeRowChangeTrigger, + type RegisteredRowChangeTrigger, +} from "../row-change-trigger-dispatch"; import type { Json, RowData, RowIdentifier, SqlExpression, SqlMapper, TableId, Timestamp } from "../utilities"; import { getStorageEnginePath, @@ -16,15 +20,6 @@ import { tableIdToDebugString, } from "../utilities"; -/** - * Column shape matching `timeFoldChangesTableName` in - * `createApplyChangesStatements` below. Used both by the inline - * trigger (which produces this shape directly) and by the queue-drain - * cascade (which synthesizes this shape in plpgsql from newly-emitted - * rows). - */ -const TIMEFOLD_OUTPUT_CHANGE_COLUMNS = '"groupKey" jsonb, "rowIdentifier" text, "oldRowSortKey" jsonb, "newRowSortKey" jsonb, "oldRowData" jsonb, "newRowData" jsonb'; - /** * Materialized time-aware fold with queue-backed future reprocessing. * @@ -420,7 +415,7 @@ export function declareTimeFoldTable< ON "oldRows"."groupKey" IS NOT DISTINCT FROM "newRows"."groupKey" AND "oldRows"."rowIdentifier" = "newRows"."rowIdentifier" WHERE "oldRows"."rowData" IS DISTINCT FROM "newRows"."rowData" - `.toStatement(timeFoldChangesTableName, '"groupKey" jsonb, "rowIdentifier" text, "oldRowSortKey" jsonb, "newRowSortKey" jsonb, "oldRowData" jsonb, "newRowData" jsonb'), + `.toStatement(timeFoldChangesTableName, CHANGE_OUTPUT_COLUMNS), ]; }; const createFromTableTriggerStatements = (fromChangesTable: SqlExpression<{ __brand: "$SQL_Table" }>) => { @@ -497,7 +492,7 @@ export function declareTimeFoldTable< }); const cascadeTemplate = toCascadeSqlBlock({ cascadeInputName, - cascadeInputColumns: TIMEFOLD_OUTPUT_CHANGE_COLUMNS, + cascadeInputColumns: CHANGE_OUTPUT_COLUMNS, statements: cascadeCollected.statements, }); diff --git a/apps/backend/src/lib/bulldozer/db/test-sql-loaders.ts b/apps/backend/src/lib/bulldozer/db/test-sql-loaders.ts new file mode 100644 index 0000000000..969b8f386a --- /dev/null +++ b/apps/backend/src/lib/bulldozer/db/test-sql-loaders.ts @@ -0,0 +1,51 @@ +/** + * Test-only SQL loaders for bulldozer migration artifacts. + * + * Lives next to the bulldozer db code because several test files (both in + * `apps/backend/src/lib/bulldozer` and in `apps/backend/src/lib/payments`) + * need to install the `public.bulldozer_timefold_process_queue()` function + * body from its migration file to exercise the queue-drain path. Keeping + * one canonical loader here avoids drift between copies when the + * migration's comment sentinels or function name change. + * + * Not intended for production code paths — only imported from `*.test.ts` + * files and the payments test-helpers (which is itself only used from + * tests). + */ + +import { readFileSync } from "node:fs"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; + +const HERE = dirname(fileURLToPath(import.meta.url)); + +// apps/backend/src/lib/bulldozer/db/ → apps/backend/prisma/migrations +const MIGRATIONS_DIR = join(HERE, "..", "..", "..", "..", "prisma", "migrations"); + +const DOWNSTREAM_CASCADE_MIGRATION = "20260417000000_bulldozer_timefold_downstream_cascade"; + +/** + * Extracts the `CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue` + * block from the downstream-cascade migration file. The function body is + * what `pg_cron` invokes in prod; installing it into a test database lets + * tests exercise the real drain behaviour end-to-end. + * + * The migration file is split into statements via the + * `-- SPLIT_STATEMENT_SENTINEL` markers already used by the bulldozer + * migration tooling; this loader just picks the block that starts with + * the function definition. + */ +export function loadProcessQueueFunctionSql(): string { + const migrationPath = join(MIGRATIONS_DIR, DOWNSTREAM_CASCADE_MIGRATION, "migration.sql"); + const raw = readFileSync(migrationPath, "utf8"); + const block = raw + .split("-- SPLIT_STATEMENT_SENTINEL") + .map((chunk) => chunk.replaceAll("-- SINGLE_STATEMENT_SENTINEL", "").trim()) + .find((chunk) => chunk.startsWith("CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue")); + if (block == null) { + throw new Error( + `Could not locate bulldozer_timefold_process_queue function body in ${DOWNSTREAM_CASCADE_MIGRATION}/migration.sql`, + ); + } + return block.replace(/;$/, ""); +} diff --git a/apps/backend/src/lib/bulldozer/db/timefold-queue-downstream.test.ts b/apps/backend/src/lib/bulldozer/db/timefold-queue-downstream.test.ts index d4f069bdba..7ebc729711 100644 --- a/apps/backend/src/lib/bulldozer/db/timefold-queue-downstream.test.ts +++ b/apps/backend/src/lib/bulldozer/db/timefold-queue-downstream.test.ts @@ -9,9 +9,6 @@ * input name and EXECUTEs the template. */ -import { readFileSync } from "node:fs"; -import { dirname, join } from "node:path"; -import { fileURLToPath } from "node:url"; import postgres from "postgres"; import { afterAll, beforeAll, beforeEach, describe, expect, test } from "vitest"; import { @@ -23,6 +20,7 @@ import { toExecutableSqlTransaction, toQueryableSqlQuery, } from "./index"; +import { loadProcessQueueFunctionSql } from "./test-sql-loaders"; type SqlExpression = { type: "expression", sql: string }; type SqlStatement = { type: "statement", sql: string, outputName?: string }; @@ -57,38 +55,6 @@ function getTestDbUrls() { }; } -/** - * Extracts the CREATE OR REPLACE FUNCTION body for - * public.bulldozer_timefold_process_queue from the downstream-cascade - * migration. The 20260323150000 migration creates the queue infrastructure; - * the later 20260417000000 migration rewrites the process_queue function - * body to run downstream cascades. We install the latter so tests exercise - * the cascade behaviour. - */ -function loadProcessQueueFunctionSql(): string { - const here = dirname(fileURLToPath(import.meta.url)); - const migrationPath = join( - here, - "..", - "..", - "..", - "..", - "prisma", - "migrations", - "20260417000000_bulldozer_timefold_downstream_cascade", - "migration.sql", - ); - const raw = readFileSync(migrationPath, "utf8"); - const block = raw - .split("-- SPLIT_STATEMENT_SENTINEL") - .map((s) => s.replaceAll("-- SINGLE_STATEMENT_SENTINEL", "").trim()) - .find((s) => s.startsWith("CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue")); - if (block == null) { - throw new Error("could not locate bulldozer_timefold_process_queue function body in cascade migration"); - } - return block.replace(/;$/, ""); -} - const PROCESS_QUEUE_FN_SQL = loadProcessQueueFunctionSql(); describe.sequential("timefold queue downstream cascade (real postgres)", () => { @@ -241,6 +207,31 @@ describe.sequential("timefold queue downstream cascade (real postgres)", () => { END AS "nextTimestamp" `; + /** + * Reads the `phase` string out of a row's `rowdata` with runtime type + * checks. Used by the delete-before-drain and dollar-quote tests to + * assert which phases made it through the pipeline. Fails loud rather + * than silently returning `undefined` if the row shape is unexpected. + * + * Takes `unknown` (rather than a narrower row type) because the + * `postgres.js` driver's `Row` type doesn't statically guarantee a + * `rowdata` column. + */ + function rowPhase(row: unknown): string { + if (row == null || typeof row !== "object") { + throw new Error(`Expected row object, got ${typeof row}`); + } + const rowData = Reflect.get(row, "rowdata"); + if (rowData == null || typeof rowData !== "object") { + throw new Error(`Expected object rowdata, got ${typeof rowData}`); + } + const phase = Reflect.get(rowData, "phase"); + if (typeof phase !== "string") { + throw new Error(`Expected string 'phase' field in rowdata, got ${typeof phase}: ${JSON.stringify(rowData)}`); + } + return phase; + } + // ──────────────────────────────────────────────────────────────────── // Test 1: single filter downstream // ──────────────────────────────────────────────────────────────────── @@ -575,8 +566,7 @@ describe.sequential("timefold queue downstream cascade (real postgres)", () => { groupKey: expr(`to_jsonb('alpha'::text)`), start: "start", end: "end", startInclusive: true, endInclusive: true, })); - const phases = timefoldRows.map((row) => (row.rowdata as { phase: string }).phase).sort(); - expect(phases).toEqual(["initial", "scheduled"]); + expect(timefoldRows.map(rowPhase).sort()).toEqual(["initial", "scheduled"]); }); // ──────────────────────────────────────────────────────────────────── @@ -642,8 +632,7 @@ describe.sequential("timefold queue downstream cascade (real postgres)", () => { groupKey: expr(`to_jsonb('alpha'::text)`), start: "start", end: "end", startInclusive: true, endInclusive: true, })); - const phases = filteredRows.map((row) => (row.rowdata as { phase: string }).phase).sort(); - expect(phases).toEqual(["initial", "scheduled"]); + expect(filteredRows.map(rowPhase).sort()).toEqual(["initial", "scheduled"]); }); // ──────────────────────────────────────────────────────────────────── diff --git a/apps/backend/src/lib/payments/schema/__tests__/test-helpers.ts b/apps/backend/src/lib/payments/schema/__tests__/test-helpers.ts index d8d88c1235..98bf69ee0f 100644 --- a/apps/backend/src/lib/payments/schema/__tests__/test-helpers.ts +++ b/apps/backend/src/lib/payments/schema/__tests__/test-helpers.ts @@ -6,11 +6,9 @@ * with no leftover state. */ -import { readFileSync } from "node:fs"; -import { dirname, join } from "node:path"; -import { fileURLToPath } from "node:url"; import postgres from "postgres"; import { toExecutableSqlTransaction, toQueryableSqlQuery } from "@/lib/bulldozer/db/index"; +import { loadProcessQueueFunctionSql } from "@/lib/bulldozer/db/test-sql-loaders"; type SqlStatement = { type: "statement", sql: string, outputName?: string }; type SqlQuery = { type: "query", sql: string, toStatement(outputName?: string): SqlStatement }; @@ -24,34 +22,6 @@ function getConnectionString(): string { return connectionString; } -/** - * Extracts `CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue` - * from the cascade migration file so tests can install the real prod - * function body. Scoped here (rather than duplicated across test files) - * so there's one place to update if the migration's comment markers - * change. - */ -function loadProcessQueueFunctionSql(): string { - const here = dirname(fileURLToPath(import.meta.url)); - const migrationPath = join( - here, - "..", "..", "..", "..", "..", - "prisma", - "migrations", - "20260417000000_bulldozer_timefold_downstream_cascade", - "migration.sql", - ); - const raw = readFileSync(migrationPath, "utf8"); - const block = raw - .split("-- SPLIT_STATEMENT_SENTINEL") - .map((s) => s.replaceAll("-- SINGLE_STATEMENT_SENTINEL", "").trim()) - .find((s) => s.startsWith("CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue")); - if (block == null) { - throw new Error("could not locate bulldozer_timefold_process_queue function body in cascade migration"); - } - return block.replace(/;$/, ""); -} - export type CreateTestDbOptions = { /** * SQL expression used to seed `BulldozerTimeFoldMetadata.lastProcessedAt` From 24d67d20f392f8b25e948b58787102866bacc6f3 Mon Sep 17 00:00:00 2001 From: nams1570 Date: Sat, 18 Apr 2026 02:49:54 -0700 Subject: [PATCH 5/6] fix: apply the same bigint currentMillis fix to the OTP timefold algo --- .../schema/__tests__/integration-1-3.test.ts | 118 ++++++++++++++++++ .../schema/phase-1/otp-timefold-algo.ts | 12 +- 2 files changed, 128 insertions(+), 2 deletions(-) diff --git a/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts index 784b287b09..32a7824a98 100644 --- a/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts +++ b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts @@ -994,6 +994,124 @@ describe.sequential("payments schema integration phase 1→3 (real postgres)", ( }); + // ============================================================ + // Same bigint-vs-NUMERIC txn-id fingerprint as the subscription test + // above, but on the one-time-purchase algo (no sub-end analog). A + // one-time purchase with a repeating `when-repeated` item relies on + // each IGR expiring the previous IGR's grant by referencing its + // `txnId` inside `item-quantity-expire.adjustedTransactionId`. If the + // reducer-internal reference (decimal-free via `::text`) doesn't + // match the downstream-materialized txn id (built from + // `->>effectiveAtMillis` stored in JSONB), consecutive IGRs stack + // instead of refreshing and `quota` climbs without bound. + // ============================================================ + + describe("item-quantity-expire resolves across consecutive one-time-purchase item-grant-repeats", () => { + const DAY_MS = 86400000; + + beforeAll(async () => { + // OTP has no terminating event (unlike subscriptions with endedAt), + // so the inline reducer would otherwise try to fire ~6700 repeats + // between createdAtMillis=0 and the test-helpers default + // lastProcessedAt=2099 and time out the hook. Cap the clock to 16d + // post-epoch for this describe so exactly 2 IGRs fire inline (at + // +7d and +14d) — the minimum needed to exercise the IGR-N expiring + // IGR-(N-1)'s grant path. Restored to the default in afterAll so + // later tests in this file still run under inline recursion. + await db.setLastProcessedAt(`'1970-01-17T00:00:00Z'`); + await runStatements(schema.oneTimePurchases.setRow("otp-repeat-bigint", jsonbExpr({ + id: "otp-repeat-bigint", + tenancyId: "t1", + customerId: "u-otp-repeat-bigint", + customerType: "user", + productId: "prod-otp-repeat-bigint", + priceId: "p1", + product: { + displayName: "Repeating OTP (bigint regression)", + customerType: "user", + productLineId: "line-otp-repeat-bigint", + prices: { p1: { USD: "5" } }, + includedItems: { + // 7-day repeat: same whole-second epoch offset the subscription + // test uses to surface NUMERIC-vs-bigint mismatches. Two IGRs + // fire inline under the clock-cap above — enough to exercise + // the second IGR's `previousGrantsToExpire` referencing the + // first's outstandingGrants[].txnId. + quota: { quantity: 100, repeat: [7, "day"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripePaymentIntentId: null, + revokedAtMillis: null, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 0, + }))); + }); + + afterAll(async () => { + // Restore the default so subsequent describes in this file still + // fire their reducer recursion inline (the payments suite relies + // on this). + await db.setLastProcessedAt(`'2099-01-01T00:00:00Z'`); + }); + + it("every item-grant-repeat txn id is decimal-free", async () => { + const txns = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.customerId === "u-otp-repeat-bigint"); + const igrs = txns.filter((t: any) => + typeof t.txnId === "string" && t.txnId.startsWith("igr:otp-repeat-bigint:") + ); + expect(igrs.length).toBeGreaterThan(0); + for (const igr of igrs) { + expect(igr.txnId).toMatch(/^igr:otp-repeat-bigint:\d+$/); + expect(igr.txnId).not.toContain("."); + } + }); + + it("every IGR after the first carries an item-quantity-expire whose adjustedTransactionId matches the prior IGR's txnId", async () => { + const txns = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.customerId === "u-otp-repeat-bigint") + .sort((a: any, b: any) => a.effectiveAtMillis - b.effectiveAtMillis); + const igrs = txns.filter((t: any) => + typeof t.txnId === "string" && t.txnId.startsWith("igr:otp-repeat-bigint:") + ); + // Need ≥ 2 IGRs for the expire reference path to fire. Any fewer + // means the test setup isn't exercising the bug's surface area — + // fail loud rather than silently passing. + expect(igrs.length).toBeGreaterThanOrEqual(2); + + for (let i = 1; i < igrs.length; i++) { + const priorIgr = igrs[i - 1]; + const currentIgr = igrs[i]; + const expireEntry = (currentIgr.entries as any[]).find((e: any) => + e.type === "item-quantity-expire" && e.itemId === "quota" + ); + expect(expireEntry, `IGR ${currentIgr.txnId} missing item-quantity-expire for quota`).toBeDefined(); + // Byte-identical match is the invariant: if the prior IGR's + // downstream txnId picked up a `.000000` tail and this expire + // built its adjustedTransactionId via `::text` on a bigint, the + // two strings diverge and the expire silently no-ops. + expect(expireEntry.adjustedTransactionId).toBe(priorIgr.txnId); + } + }); + + it("quota balance stays at exactly the per-repeat grant, never stacks", async () => { + const rows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-otp-repeat-bigint") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + expect(rows.length).toBeGreaterThan(0); + + // Without the fix, each IGR adds 100 without expiring the previous + // grant, so quota climbs monotonically (100, 200, 300, …). With + // the fix, each IGR expires the prior grant before adding its own, + // and quota stays at exactly 100. + const latest = rows[rows.length - 1]; + expect(latest.itemQuantities.quota).toBe(100); + }); + }); + + // ============================================================ // when-repeated grants must expire at subscription-end // (regression: they were previously left stacked in the ledger) diff --git a/apps/backend/src/lib/payments/schema/phase-1/otp-timefold-algo.ts b/apps/backend/src/lib/payments/schema/phase-1/otp-timefold-algo.ts index 274dd834b8..83bffca37b 100644 --- a/apps/backend/src/lib/payments/schema/phase-1/otp-timefold-algo.ts +++ b/apps/backend/src/lib/payments/schema/phase-1/otp-timefold-algo.ts @@ -143,7 +143,14 @@ export function getOtpTimeFoldReducerSql(): string { )`; // ── item-grant-repeat event (same logic as subscription but with sourceType=one_time_purchase) ── - const currentMillis = `(EXTRACT(EPOCH FROM ${T}) * 1000)::numeric`; + // Keep currentMillis as bigint at the root (not NUMERIC) for the same + // reason as `subscription-timefold-algo.ts` — see the comment there for + // the full failure mode. Explicit ROUND before the cast is defensive: + // NUMERIC::bigint already rounds on PG 12+, but explicit rounding makes + // intent clear at the callsite and stays stable if `EXTRACT(EPOCH ...)` + // ever comes back typed as DOUBLE PRECISION (older PG, or a future + // regression) where implicit casts use half-to-even. + const currentMillis = `(ROUND(EXTRACT(EPOCH FROM ${T}) * 1000)::bigint)`; const dueItems = `( SELECT jsonb_agg(jsonb_build_object('itemId', "sched"."key", 'schedule', "sched"."value")) @@ -153,7 +160,8 @@ export function getOtpTimeFoldReducerSql(): string { AND ("sched"."value"->>'nextRepeatMillis')::numeric <= ${currentMillis} )`; - const igrTxnId = `('igr:' || (${S}->>'purchaseId') || ':' || ${currentMillis}::bigint::text)`; + // currentMillis is already ::bigint (see above) so plain ::text is enough. + const igrTxnId = `('igr:' || (${S}->>'purchaseId') || ':' || ${currentMillis}::text)`; const previousGrantsToExpire = `( SELECT COALESCE(jsonb_agg( From 70a261a084d59540d0925752a738c21b8b2e4141 Mon Sep 17 00:00:00 2001 From: nams1570 Date: Sat, 18 Apr 2026 02:59:40 -0700 Subject: [PATCH 6/6] chore: schema model mismatch via external MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PK on JSONB[] (tableStoragePath) — not expressible via Prisma's @id list types are treated as non-required). Managed entirely by bulldozer code via raw SQL. See schema.prisma note next to the BulldozerTimeFoldMetadata model. --- apps/backend/prisma.config.ts | 5 +++++ apps/backend/prisma/schema.prisma | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/apps/backend/prisma.config.ts b/apps/backend/prisma.config.ts index 45622baa85..c80c218642 100644 --- a/apps/backend/prisma.config.ts +++ b/apps/backend/prisma.config.ts @@ -16,6 +16,11 @@ export default defineConfig({ tables: { external: [ "public.BulldozerStorageEngine", + // PK on JSONB[] (tableStoragePath) — not expressible via Prisma's @id + // (list types are treated as non-required). Managed entirely by + // bulldozer code via raw SQL. See schema.prisma note next to the + // BulldozerTimeFoldMetadata model. + "public.BulldozerTimeFoldDownstreamCascade", ], }, }) diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index fafaf24cda..1b20c77b17 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -1356,6 +1356,14 @@ model BulldozerTimeFoldMetadata { lastProcessedAt DateTime @db.Timestamptz } +// BulldozerTimeFoldDownstreamCascade is managed externally (see +// prisma.config.ts `tables.external`). Same reason as BulldozerStorageEngine +// above: its primary key is on a JSONB[] column, which Prisma's @id +// attribute rejects ("fields that are marked as id must be required"; list +// types are treated as non-required). It's written only by bulldozer +// internals (declareTimeFoldTable.init()/.delete()) and read by +// public.bulldozer_timefold_process_queue() — never via the Prisma client. + model DeletedRow { id String @id @default(uuid()) @db.Uuid tenancyId String @db.Uuid