diff --git a/apps/backend/prisma.config.ts b/apps/backend/prisma.config.ts index 45622baa85..c80c218642 100644 --- a/apps/backend/prisma.config.ts +++ b/apps/backend/prisma.config.ts @@ -16,6 +16,11 @@ export default defineConfig({ tables: { external: [ "public.BulldozerStorageEngine", + // PK on JSONB[] (tableStoragePath) — not expressible via Prisma's @id + // (list types are treated as non-required). Managed entirely by + // bulldozer code via raw SQL. See schema.prisma note next to the + // BulldozerTimeFoldMetadata model. + "public.BulldozerTimeFoldDownstreamCascade", ], }, }) diff --git a/apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/migration.sql b/apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/migration.sql new file mode 100644 index 0000000000..06afec335d --- /dev/null +++ b/apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/migration.sql @@ -0,0 +1,341 @@ +-- Propagate TimeFold queue-drained emissions through the registered +-- downstream trigger cascade (filter/map/LFold/etc.), mirroring what the +-- inline setRow path does via collectRowChangeTriggerStatements. Before +-- this migration, `bulldozer_timefold_process_queue` updated the +-- TimeFold's own state and materialized its output rows but never touched +-- any downstream materialized table, so any event scheduled for the +-- future (subscription-end on cancel-at-period-end, monthly +-- item-grant-repeat ticks) silently stopped after updating the TimeFold. +-- +-- Strategy: per-timefold cascade templates are precomputed in TypeScript +-- (declareTimeFoldTable.init() \u2192 toCascadeSqlBlock) and persisted here as +-- data. The rewritten process_queue: +-- 1. Drains due queue rows per-timefold (existing per-row logic, moved +-- inside an outer per-timefold loop). +-- 2. After each queue row's reducer emits new rows, populates +-- __bulldozer_seq with them under the timefold's cascade input name. +-- 3. Once per timefold per tick, EXECUTEs the stored cascadeTemplate, +-- which reads from __bulldozer_seq and writes through filter/map/ +-- LFold/etc. to their materialized outputs. +-- +-- The shipped BulldozerTimeFoldQueue & BulldozerTimeFoldMetadata tables +-- from 20260323150000_add_bulldozer_timefold_queue are unchanged. pg_cron +-- keeps calling public.bulldozer_timefold_process_queue() as before. + +-- CreateTable +CREATE TABLE "BulldozerTimeFoldDownstreamCascade" ( + "tableStoragePath" JSONB[] NOT NULL, + "cascadeInputName" TEXT NOT NULL, + "cascadeTemplate" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath") +); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue() +RETURNS void +LANGUAGE plpgsql +AS $function$ +DECLARE + cutoff_timestamp timestamptz; + current_timefold_path jsonb[]; + current_cascade_input_name text; + current_cascade_template text; + queued_row "BulldozerTimeFoldQueue"%ROWTYPE; + group_path jsonb[]; + rows_path jsonb[]; + states_path jsonb[]; + state_row_path jsonb[]; + existing_state jsonb; + old_emitted_rows jsonb; + newly_emitted_rows jsonb; + accumulated_emitted_rows jsonb; + current_state jsonb; + current_timestamp_value timestamptz; + next_state jsonb; + next_rows_data jsonb; + normalized_next_rows_data jsonb; + next_timestamp timestamptz; + previous_emitted_row_count int; + reducer_iterations int; + new_row_record record; +BEGIN + PERFORM pg_advisory_xact_lock(7857391); + + INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt") + VALUES ('singleton', now()) + ON CONFLICT ("key") DO NOTHING; + + cutoff_timestamp := now(); + + UPDATE "BulldozerTimeFoldMetadata" + SET + "lastProcessedAt" = cutoff_timestamp, + "updatedAt" = CURRENT_TIMESTAMP + WHERE "key" = 'singleton'; + + -- The cascade templates generated by toExecutableSqlTransaction/ + -- toCascadeSqlBlock read and write via this per-transaction temp table, + -- keyed by string output names. It's created idempotently here so the + -- cascade never fails on a cold queue-drain invocation. + CREATE TEMP TABLE IF NOT EXISTS "__bulldozer_seq" ( + "__output_name" text NOT NULL, + "__output_row" jsonb NOT NULL + ) ON COMMIT DROP; + + -- Outer loop: each distinct timefold that has queue rows due at this tick. + FOR current_timefold_path IN + SELECT DISTINCT "tableStoragePath" + FROM "BulldozerTimeFoldQueue" + WHERE "scheduledAt" <= cutoff_timestamp + LOOP + -- Look up the cascade template for this timefold. + current_cascade_input_name := NULL; + current_cascade_template := NULL; + SELECT "cascadeInputName", "cascadeTemplate" + INTO current_cascade_input_name, current_cascade_template + FROM "BulldozerTimeFoldDownstreamCascade" + WHERE "tableStoragePath" = current_timefold_path; + + -- If no registry row exists, this timefold hasn't been init()'d + -- yet — most commonly the deploy-window gap between this + -- migration applying (creating an empty registry table) and the + -- backend running declareTimeFoldTable.init() (which upserts one + -- row per timefold). Skipping just the cascade EXECUTE but still + -- draining the queue rows would let the timefold's own state + -- advance while the downstream filters/maps/LFolds silently miss + -- the update — a permanent desync with no retry path, since the + -- queue row is gone. Defer the entire timefold for this tick so + -- its queue rows stay queued; the next tick (after init finishes) + -- drains them with cascade intact. + -- + -- See `timefold-queue-downstream.test.ts`'s "process_queue defers + -- a timefold whose cascade registry row is missing" for the + -- regression guard. + IF NOT FOUND THEN + CONTINUE; + END IF; + + -- Defensive clean slate: clear ALL rows before populating this + -- timefold's cascade input. Cascade templates emit intermediate + -- rows (row_change_diag_*, mapped_changes_*, etc.) under unique + -- random names, so cross-timefold contamination isn't possible + -- today, but keeping __bulldozer_seq empty between iterations is + -- simpler to reason about ("between iterations, __bulldozer_seq is + -- empty") and prevents memory growth across many timefolds in one + -- tick. ON COMMIT DROP handles cleanup if the function throws. + DELETE FROM "__bulldozer_seq"; + + -- Inner loop: per-queue-row drain for this timefold, mirroring the + -- pre-fix function body (semantics preserved so the existing + -- process-queue.ts migration test keeps passing). + LOOP + SELECT * + INTO queued_row + FROM "BulldozerTimeFoldQueue" + WHERE "tableStoragePath" = current_timefold_path + AND "scheduledAt" <= cutoff_timestamp + ORDER BY "scheduledAt" ASC, "id" ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED; + + EXIT WHEN NOT FOUND; + + DELETE FROM "BulldozerTimeFoldQueue" + WHERE "id" = queued_row."id"; + + group_path := queued_row."tableStoragePath" || ARRAY[to_jsonb('groups'::text), queued_row."groupKey"]::jsonb[]; + rows_path := group_path || ARRAY[to_jsonb('rows'::text)]::jsonb[]; + states_path := group_path || ARRAY[to_jsonb('states'::text)]::jsonb[]; + state_row_path := states_path || ARRAY[to_jsonb(queued_row."rowIdentifier")]::jsonb[]; + + SELECT "value" + INTO existing_state + FROM "BulldozerStorageEngine" + WHERE "keyPath" = state_row_path; + + IF existing_state IS NULL THEN + CONTINUE; + END IF; + + IF existing_state->'rowData' IS DISTINCT FROM queued_row."rowData" THEN + CONTINUE; + END IF; + + old_emitted_rows := CASE + WHEN jsonb_typeof(existing_state->'emittedRowsData') = 'array' THEN existing_state->'emittedRowsData' + ELSE '[]'::jsonb + END; + newly_emitted_rows := '[]'::jsonb; + accumulated_emitted_rows := old_emitted_rows; + previous_emitted_row_count := jsonb_array_length(old_emitted_rows); + + current_state := queued_row."stateAfter"; + current_timestamp_value := queued_row."scheduledAt"; + reducer_iterations := 0; + + LOOP + reducer_iterations := reducer_iterations + 1; + IF reducer_iterations > 10000 THEN + RAISE EXCEPTION 'bulldozer timefold reducer exceeded 10k iterations for row %', queued_row."rowIdentifier"; + END IF; + + EXECUTE format( + $reducer$ + SELECT + to_jsonb("reducerRows"."newState") AS "newState", + to_jsonb("reducerRows"."newRowsData") AS "newRowsData", + CASE + WHEN "reducerRows"."nextTimestamp" IS NULL THEN NULL::timestamptz + ELSE ("reducerRows"."nextTimestamp")::timestamptz + END AS "nextTimestamp" + FROM ( + SELECT %s + FROM ( + SELECT + $1::jsonb AS "oldState", + $2::jsonb AS "oldRowData", + $3::timestamptz AS "timestamp" + ) AS "reducerInput" + ) AS "reducerRows" + $reducer$, + queued_row."reducerSql" + ) + INTO next_state, next_rows_data, next_timestamp + USING current_state, queued_row."rowData", current_timestamp_value; + + normalized_next_rows_data := CASE + WHEN jsonb_typeof(next_rows_data) = 'array' THEN next_rows_data + ELSE '[]'::jsonb + END; + newly_emitted_rows := newly_emitted_rows || normalized_next_rows_data; + accumulated_emitted_rows := accumulated_emitted_rows || normalized_next_rows_data; + current_state := next_state; + + EXIT WHEN next_timestamp IS NULL OR next_timestamp > cutoff_timestamp; + current_timestamp_value := next_timestamp; + END LOOP; + + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES + (gen_random_uuid(), group_path, 'null'::jsonb), + (gen_random_uuid(), rows_path, 'null'::jsonb), + (gen_random_uuid(), states_path, 'null'::jsonb) + ON CONFLICT ("keyPath") DO NOTHING; + + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES ( + gen_random_uuid(), + state_row_path, + jsonb_build_object( + 'rowData', queued_row."rowData", + 'stateAfter', current_state, + 'emittedRowsData', accumulated_emitted_rows, + 'nextTimestamp', + CASE + WHEN next_timestamp IS NULL THEN 'null'::jsonb + ELSE to_jsonb(next_timestamp) + END + ) + ) + ON CONFLICT ("keyPath") DO UPDATE + SET "value" = EXCLUDED."value"; + + FOR new_row_record IN + SELECT + "rows"."rowData" AS "rowData", + "rows"."rowIndex" AS "rowIndex" + FROM jsonb_array_elements(newly_emitted_rows) WITH ORDINALITY AS "rows"("rowData", "rowIndex") + LOOP + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES ( + gen_random_uuid(), + rows_path || ARRAY[to_jsonb((queued_row."rowIdentifier" || ':' || (previous_emitted_row_count + new_row_record."rowIndex")::text)::text)]::jsonb[], + jsonb_build_object('rowData', new_row_record."rowData") + ) + ON CONFLICT ("keyPath") DO UPDATE + SET "value" = EXCLUDED."value"; + END LOOP; + + -- Accumulate the newly-emitted rows into __bulldozer_seq under this + -- timefold's cascade input name, shaped like timeFoldChangesTableName + -- (oldRowData/newRowData diff). Queue-drained emissions are always + -- new rows, so oldRowData/oldRowSortKey are null. + IF current_cascade_input_name IS NOT NULL THEN + INSERT INTO "__bulldozer_seq" ("__output_name", "__output_row") + SELECT + current_cascade_input_name, + jsonb_build_object( + 'groupKey', queued_row."groupKey", + 'rowIdentifier', queued_row."rowIdentifier" || ':' || (previous_emitted_row_count + "emitted"."rowIndex")::text, + 'oldRowSortKey', 'null'::jsonb, + 'newRowSortKey', 'null'::jsonb, + 'oldRowData', 'null'::jsonb, + 'newRowData', "emitted"."rowData" + ) + FROM jsonb_array_elements(newly_emitted_rows) WITH ORDINALITY AS "emitted"("rowData", "rowIndex"); + END IF; + + IF next_timestamp IS NOT NULL AND next_timestamp > cutoff_timestamp THEN + INSERT INTO "BulldozerTimeFoldQueue" ( + "id", + "tableStoragePath", + "groupKey", + "rowIdentifier", + "scheduledAt", + "stateAfter", + "rowData", + "reducerSql" + ) + VALUES ( + gen_random_uuid(), + queued_row."tableStoragePath", + queued_row."groupKey", + queued_row."rowIdentifier", + next_timestamp, + current_state, + queued_row."rowData", + queued_row."reducerSql" + ) + ON CONFLICT ("tableStoragePath", "groupKey", "rowIdentifier") DO UPDATE + SET + "scheduledAt" = EXCLUDED."scheduledAt", + "stateAfter" = EXCLUDED."stateAfter", + "rowData" = EXCLUDED."rowData", + "reducerSql" = EXCLUDED."reducerSql", + "updatedAt" = CURRENT_TIMESTAMP; + END IF; + + -- Note: the sibling migration 20260323150000 carried an "orphan + -- group cleanup" IF-NOT-EXISTS block here that deleted + -- rows_path / states_path / group_path when both had no children. + -- That block was unreachable in that function body and is still + -- unreachable here: every path that reaches this point has just + -- UPSERTed `state_row_path` (a child of `states_path`) a few + -- statements up, so the NOT EXISTS on states_path is always + -- false. Dropped here rather than inherited verbatim. + END LOOP; + + -- After draining all due queue rows for this timefold, run the + -- downstream trigger cascade exactly once. The template reads its input + -- from __bulldozer_seq (populated above) and writes through every + -- registered downstream filter/map/LFold/etc. to their materialized + -- outputs. Skipped when no template is registered \u2014 matches the + -- inline path's no-op behavior for timefolds without downstream + -- triggers. + IF current_cascade_template IS NOT NULL THEN + EXECUTE current_cascade_template; + END IF; + + -- Clean slate before the next timefold iteration. Clears the + -- cascade input rows AND every intermediate-stage row the cascade + -- template's EXECUTE just emitted into __bulldozer_seq. See the + -- identical DELETE at the top of this loop for the rationale. + DELETE FROM "__bulldozer_seq"; + END LOOP; +END; +$function$; +-- SPLIT_STATEMENT_SENTINEL diff --git a/apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/tests/downstream-cascade.ts b/apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/tests/downstream-cascade.ts new file mode 100644 index 0000000000..3f48c29bad --- /dev/null +++ b/apps/backend/prisma/migrations/20260417000000_bulldozer_timefold_downstream_cascade/tests/downstream-cascade.ts @@ -0,0 +1,263 @@ +import type { Sql } from "postgres"; +import { expect } from "vitest"; + +/** + * Migration-level test for `20260417000000_bulldozer_timefold_downstream_cascade`. + * + * Exercises the shape the migration is responsible for: + * - `BulldozerTimeFoldDownstreamCascade` exists with the right columns, + * - `public.bulldozer_timefold_process_queue()` consults that registry, + * - when a timefold has a registered `cascadeTemplate`, process_queue + * populates `__bulldozer_seq` with newly-emitted rows and EXECUTEs the + * template (i.e. the downstream cascade actually fires on the + * queue-drain path — the regression this migration fixes), + * - re-draining with nothing due is a no-op (idempotency). + * + * The cascade template here is constructed by hand (not via + * `toCascadeSqlBlock` in TypeScript) so the test stays purely at the + * migration-SQL layer, matching the other migration tests under + * `apps/backend/prisma/migrations/.../tests/`. + */ +export const postMigration = async (sql: Sql) => { + // 1) Migration shape: the registry table exists with the expected columns. + const registryColumnRows = await sql>` + SELECT column_name + FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = 'BulldozerTimeFoldDownstreamCascade' + ORDER BY ordinal_position + `; + expect(registryColumnRows.map((r) => r.column_name)).toEqual([ + "tableStoragePath", + "cascadeInputName", + "cascadeTemplate", + "createdAt", + "updatedAt", + ]); + + // 2) Set up a minimal timefold-shaped storage hierarchy. Each + // BulldozerStorageEngine insert must have its parent keyPath already + // present (FK: keyPathParent → keyPath). + const tablePathSql = `ARRAY[to_jsonb('table'::text), to_jsonb('external:cascade-migration-test'::text)]::jsonb[]`; + const storagePathSql = `${tablePathSql} || ARRAY[to_jsonb('storage'::text)]::jsonb[]`; + const groupsPathSql = `${storagePathSql} || ARRAY[to_jsonb('groups'::text)]::jsonb[]`; + const groupPathSql = `${groupsPathSql} || ARRAY[to_jsonb('alpha'::text)]::jsonb[]`; + const rowsPathSql = `${groupPathSql} || ARRAY[to_jsonb('rows'::text)]::jsonb[]`; + const statesPathSql = `${groupPathSql} || ARRAY[to_jsonb('states'::text)]::jsonb[]`; + const stateRowPathSql = `${statesPathSql} || ARRAY[to_jsonb('u1'::text)]::jsonb[]`; + + // Parallel downstream tree the cascade template writes into. Its root + // (`ARRAY['cascade-out']`) is a direct child of the already-seeded + // `ARRAY[]` root, and only the root is needed as a parent for the + // cascade's row inserts below (rows hang directly off of it). + const cascadeOutRootSql = `ARRAY[to_jsonb('cascade-out'::text)]::jsonb[]`; + + await sql.unsafe(` + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES + (gen_random_uuid(), ${tablePathSql}, 'null'::jsonb), + (gen_random_uuid(), ${storagePathSql}, 'null'::jsonb), + (gen_random_uuid(), ${groupsPathSql}, 'null'::jsonb), + (gen_random_uuid(), ${groupPathSql}, 'null'::jsonb), + (gen_random_uuid(), ${rowsPathSql}, 'null'::jsonb), + (gen_random_uuid(), ${statesPathSql}, 'null'::jsonb), + (gen_random_uuid(), ${cascadeOutRootSql}, 'null'::jsonb) + ON CONFLICT ("keyPath") DO NOTHING + `); + + await sql.unsafe(` + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES ( + gen_random_uuid(), + ${stateRowPathSql}, + jsonb_build_object( + 'rowData', '{"value": 2}'::jsonb, + 'stateAfter', '{"counter": 1}'::jsonb, + 'emittedRowsData', '[]'::jsonb, + 'nextTimestamp', 'null'::jsonb + ) + ) + ON CONFLICT ("keyPath") DO UPDATE + SET "value" = EXCLUDED."value" + `); + + // 3) Register this timefold's cascade. The template reads the rows + // that process_queue pushes into `__bulldozer_seq` under the input + // name and writes them under `ARRAY['cascade-out', ]`. + // EXECUTEing this DO block is exactly what process_queue does with + // the stored cascadeTemplate once per timefold per tick. + const cascadeInputName = "migration_test_cascade_input"; + const cascadeTemplateSql = ` + DO $tf_cascade$ + BEGIN + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + SELECT + gen_random_uuid(), + ARRAY[to_jsonb('cascade-out'::text), "__bulldozer_seq"."__output_row"->'rowIdentifier']::jsonb[], + "__bulldozer_seq"."__output_row"->'newRowData' + FROM "__bulldozer_seq" + WHERE "__output_name" = '${cascadeInputName}' + ON CONFLICT ("keyPath") DO UPDATE + SET "value" = EXCLUDED."value"; + END; + $tf_cascade$ LANGUAGE plpgsql; + `; + + await sql.unsafe(` + INSERT INTO "BulldozerTimeFoldDownstreamCascade" + ("tableStoragePath", "cascadeInputName", "cascadeTemplate") + VALUES ( + ${storagePathSql}, + '${cascadeInputName}', + $cascade_template$${cascadeTemplateSql}$cascade_template$ + ) + `); + + // 4) Queue a reducer row that emits one output row (value=100). Reducer + // SQL is the same shape the real timefold table emits: + // newState / newRowsData / nextTimestamp. + await sql.unsafe(` + INSERT INTO "BulldozerTimeFoldQueue" ( + "id", + "tableStoragePath", + "groupKey", + "rowIdentifier", + "scheduledAt", + "stateAfter", + "rowData", + "reducerSql" + ) + VALUES ( + gen_random_uuid(), + ${storagePathSql}, + to_jsonb('alpha'::text), + 'u1', + now() - interval '1 minute', + '{"counter": 1}'::jsonb, + '{"value": 2}'::jsonb, + 'jsonb_build_object(''counter'', COALESCE(("oldState"->>''counter'')::int, 0) + 1) AS "newState", jsonb_build_array(jsonb_build_object(''value'', 100)) AS "newRowsData", NULL::timestamptz AS "nextTimestamp"' + ) + `); + + // 5) Drain the queue. This is the real prod entry point — pg_cron calls + // exactly this function. + await sql.unsafe(`SELECT public.bulldozer_timefold_process_queue()`); + + // The queue row must be consumed. Scope by tableStoragePath — the + // sibling `20260323150000_add_bulldozer_timefold_queue/tests/process-queue.ts` + // ran on this same shared DB and left a future-dated queue row behind + // under its own tableStoragePath, which we must not match here. + const remainingQueueRows = await sql.unsafe(` + SELECT 1 FROM "BulldozerTimeFoldQueue" + WHERE "tableStoragePath" = ${storagePathSql} + AND "rowIdentifier" = 'u1' + `); + expect(remainingQueueRows).toHaveLength(0); + + // The timefold's own state row must be updated (baseline the prior + // migration already covered). + const stateRows = await sql.unsafe(` + SELECT "value" FROM "BulldozerStorageEngine" WHERE "keyPath" = ${stateRowPathSql} + `); + expect(stateRows).toHaveLength(1); + expect(stateRows[0].value).toMatchObject({ + rowData: { value: 2 }, + stateAfter: { counter: 2 }, + }); + + // The regression guard: the cascade template must have run. Without + // the migration's rewrite, `__bulldozer_seq` is never populated and + // the template is never EXECUTEd, so `cascade-out/u1:1` would not + // exist. + const cascadeOutRows = await sql.unsafe(` + SELECT "keyPath", "value" + FROM "BulldozerStorageEngine" + WHERE "keyPathParent" = ${cascadeOutRootSql} + `); + expect(cascadeOutRows).toHaveLength(1); + expect(cascadeOutRows[0].value).toEqual({ value: 100 }); + + // 6) Idempotency: re-draining with nothing new in the queue must not + // re-run the cascade (no duplicate rows, no FK errors). + await sql.unsafe(`SELECT public.bulldozer_timefold_process_queue()`); + const cascadeOutAfterRedrain = await sql.unsafe(` + SELECT 1 FROM "BulldozerStorageEngine" WHERE "keyPathParent" = ${cascadeOutRootSql} + `); + expect(cascadeOutAfterRedrain).toHaveLength(1); + + // 7) No-template path: a timefold with cascadeTemplate = NULL must + // drain queued rows without error. Use a second, independent + // tableStoragePath so the FK'd storage engine rows for this + // branch don't collide with the one above. + const nullTableSql = `ARRAY[to_jsonb('table'::text), to_jsonb('external:cascade-null-template'::text)]::jsonb[]`; + const nullStorageSql = `${nullTableSql} || ARRAY[to_jsonb('storage'::text)]::jsonb[]`; + const nullGroupsSql = `${nullStorageSql} || ARRAY[to_jsonb('groups'::text)]::jsonb[]`; + const nullGroupSql = `${nullGroupsSql} || ARRAY[to_jsonb('alpha'::text)]::jsonb[]`; + const nullRowsSql = `${nullGroupSql} || ARRAY[to_jsonb('rows'::text)]::jsonb[]`; + const nullStatesSql = `${nullGroupSql} || ARRAY[to_jsonb('states'::text)]::jsonb[]`; + const nullStateRowSql = `${nullStatesSql} || ARRAY[to_jsonb('u1'::text)]::jsonb[]`; + + await sql.unsafe(` + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES + (gen_random_uuid(), ${nullTableSql}, 'null'::jsonb), + (gen_random_uuid(), ${nullStorageSql}, 'null'::jsonb), + (gen_random_uuid(), ${nullGroupsSql}, 'null'::jsonb), + (gen_random_uuid(), ${nullGroupSql}, 'null'::jsonb), + (gen_random_uuid(), ${nullRowsSql}, 'null'::jsonb), + (gen_random_uuid(), ${nullStatesSql}, 'null'::jsonb) + ON CONFLICT ("keyPath") DO NOTHING + `); + await sql.unsafe(` + INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") + VALUES ( + gen_random_uuid(), + ${nullStateRowSql}, + jsonb_build_object( + 'rowData', '{"value": 2}'::jsonb, + 'stateAfter', '{"counter": 1}'::jsonb, + 'emittedRowsData', '[]'::jsonb, + 'nextTimestamp', 'null'::jsonb + ) + ) + `); + await sql.unsafe(` + INSERT INTO "BulldozerTimeFoldDownstreamCascade" + ("tableStoragePath", "cascadeInputName", "cascadeTemplate") + VALUES ( + ${nullStorageSql}, + 'null_template_input', + NULL + ) + `); + await sql.unsafe(` + INSERT INTO "BulldozerTimeFoldQueue" ( + "id", "tableStoragePath", "groupKey", "rowIdentifier", + "scheduledAt", "stateAfter", "rowData", "reducerSql" + ) + VALUES ( + gen_random_uuid(), + ${nullStorageSql}, + to_jsonb('alpha'::text), + 'u1', + now() - interval '1 minute', + '{"counter": 1}'::jsonb, + '{"value": 2}'::jsonb, + 'jsonb_build_object(''counter'', 2) AS "newState", jsonb_build_array(jsonb_build_object(''value'', 100)) AS "newRowsData", NULL::timestamptz AS "nextTimestamp"' + ) + `); + + await sql.unsafe(`SELECT public.bulldozer_timefold_process_queue()`); + + const nullRemainingQueue = await sql.unsafe(` + SELECT 1 FROM "BulldozerTimeFoldQueue" + WHERE "tableStoragePath" = ${nullStorageSql} + `); + expect(nullRemainingQueue).toHaveLength(0); + + const nullStateAfterDrain = await sql.unsafe(` + SELECT "value" FROM "BulldozerStorageEngine" WHERE "keyPath" = ${nullStateRowSql} + `); + expect(nullStateAfterDrain).toHaveLength(1); + expect(nullStateAfterDrain[0].value).toMatchObject({ stateAfter: { counter: 2 } }); +}; diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index fafaf24cda..1b20c77b17 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -1356,6 +1356,14 @@ model BulldozerTimeFoldMetadata { lastProcessedAt DateTime @db.Timestamptz } +// BulldozerTimeFoldDownstreamCascade is managed externally (see +// prisma.config.ts `tables.external`). Same reason as BulldozerStorageEngine +// above: its primary key is on a JSONB[] column, which Prisma's @id +// attribute rejects ("fields that are marked as id must be required"; list +// types are treated as non-required). It's written only by bulldozer +// internals (declareTimeFoldTable.init()/.delete()) and read by +// public.bulldozer_timefold_process_queue() — never via the Prisma client. + model DeletedRow { id String @id @default(uuid()) @db.Uuid tenancyId String @db.Uuid diff --git a/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts b/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts index 03ff3538b1..bec0d4c0a0 100644 --- a/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts +++ b/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts @@ -714,6 +714,17 @@ describe.sequential("bulldozer db fuzz composition (real postgres)", () => { INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt") VALUES ('singleton', now()) `; + await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldDownstreamCascade"`; + await sql` + CREATE TABLE "BulldozerTimeFoldDownstreamCascade" ( + "tableStoragePath" JSONB[] NOT NULL, + "cascadeInputName" TEXT NOT NULL, + "cascadeTemplate" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath") + ) + `; }); afterEach(async () => { diff --git a/apps/backend/src/lib/bulldozer/db/index.perf.test.ts b/apps/backend/src/lib/bulldozer/db/index.perf.test.ts index c47c5311ed..d05cbd4acd 100644 --- a/apps/backend/src/lib/bulldozer/db/index.perf.test.ts +++ b/apps/backend/src/lib/bulldozer/db/index.perf.test.ts @@ -354,6 +354,17 @@ describe.sequential("bulldozer db performance (real postgres)", () => { INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt") VALUES ('singleton', now()) `; + await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldDownstreamCascade"`; + await sql` + CREATE TABLE "BulldozerTimeFoldDownstreamCascade" ( + "tableStoragePath" JSONB[] NOT NULL, + "cascadeInputName" TEXT NOT NULL, + "cascadeTemplate" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath") + ) + `; }); afterEach(async () => { diff --git a/apps/backend/src/lib/bulldozer/db/index.test.ts b/apps/backend/src/lib/bulldozer/db/index.test.ts index d0e39c6c45..8468e2a2be 100644 --- a/apps/backend/src/lib/bulldozer/db/index.test.ts +++ b/apps/backend/src/lib/bulldozer/db/index.test.ts @@ -140,6 +140,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { beforeEach(async () => { await sql`CREATE EXTENSION IF NOT EXISTS pgcrypto`; + await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldDownstreamCascade"`; await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldQueue"`; await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldMetadata"`; await sql`DROP TABLE IF EXISTS "BulldozerMapTriggerAudit"`; @@ -233,6 +234,16 @@ describe.sequential("declareStoredTable (real postgres)", () => { INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt") VALUES ('singleton', now()) `; + await sql` + CREATE TABLE "BulldozerTimeFoldDownstreamCascade" ( + "tableStoragePath" JSONB[] NOT NULL, + "cascadeInputName" TEXT NOT NULL, + "cascadeTemplate" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath") + ) + `; }); // any is used here because the verifier works with heterogeneous table types diff --git a/apps/backend/src/lib/bulldozer/db/index.ts b/apps/backend/src/lib/bulldozer/db/index.ts index bc01643a15..d53e8a3749 100644 --- a/apps/backend/src/lib/bulldozer/db/index.ts +++ b/apps/backend/src/lib/bulldozer/db/index.ts @@ -1,3 +1,5 @@ +import { generateSecureRandomString } from "@stackframe/stack-shared/dist/utils/crypto"; +import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors"; import { deindent } from "@stackframe/stack-shared/dist/utils/strings"; import { BULLDOZER_SORT_HELPERS_SQL } from "./bulldozer-sort-helpers-sql"; @@ -66,10 +68,23 @@ export function toQueryableSqlQuery(query: SqlQuery): string { return query.sql; } -export function toExecutableSqlTransaction(statements: SqlStatement[], options: { statementTimeout?: string } = {}): string { - const requiresSortHelpers = statements.some((statement) => statement.sql.includes("pg_temp.bulldozer_sort_")); - const seqOutputs = new Map(); - const executableStatementsInDoBlock = statements.map((statement) => { +/** + * Core body-builder shared by `toExecutableSqlTransaction` and + * `toCascadeSqlBlock`. Serializes a list of `SqlStatement`s into a single + * string suitable to drop into a plpgsql DO block, rewriting references to + * named outputs into `__bulldozer_seq` subqueries. + * + * `seededSeqOutputs` lets callers pretend a given `__output_name` has + * already been produced by an upstream statement. Used by the timefold + * queue-drain cascade, which pre-populates `__bulldozer_seq` in plpgsql + * BEFORE executing the stored cascade template. + */ +function buildExecutableStatementsBlock( + statements: SqlStatement[], + seededSeqOutputs: Map, +): string { + const seqOutputs = new Map(seededSeqOutputs); + return statements.map((statement) => { let sql = statement.sql; for (const [outputName, outputColumns] of seqOutputs) { const quotedOutputName = `"${outputName}"`; @@ -112,12 +127,22 @@ export function toExecutableSqlTransaction(statements: SqlStatement[], options: `; })(); - // Keep the outer DO block delimiter stable even when statements define $$ functions. - const normalizedSql = executableSql.replaceAll("$$", "$__bulldozer_do_inline$").trimEnd(); + const normalizedSql = executableSql.trimEnd(); return normalizedSql.endsWith(";") ? normalizedSql : `${normalizedSql};`; }).join("\n\n"); +} + +export function toExecutableSqlTransaction(statements: SqlStatement[], options: { statementTimeout?: string } = {}): string { + const requiresSortHelpers = statements.some((statement) => statement.sql.includes("pg_temp.bulldozer_sort_")); + const executableStatementsInDoBlock = buildExecutableStatementsBlock(statements, new Map()); + // Randomize the outer DO-block delimiter so nested `$$` that individual + // statements legitimately use (e.g. `CREATE FUNCTION ... AS $$ ... $$` + // in `reduce-table.ts`) don't collide with it, and so user-provided SQL + // containing a literal `'$$'` string doesn't need to be rewritten. Same + // approach as `toCascadeSqlBlock` below. + const outerTag = chooseSafeDollarQuoteTag(executableStatementsInDoBlock, "bulldozer_tx"); return deindent` BEGIN; @@ -131,12 +156,84 @@ export function toExecutableSqlTransaction(statements: SqlStatement[], options: ${BULLDOZER_SEQ_TABLE_SQL} - DO $$ + DO $${outerTag}$ BEGIN ${executableStatementsInDoBlock} END; - $$ LANGUAGE plpgsql; + $${outerTag}$ LANGUAGE plpgsql; COMMIT; `; } + +/** + * Picks a plpgsql dollar-quote tag that is guaranteed not to appear + * verbatim inside `bodyContents`. + * + * We need this for any `DO $tag$ ... $tag$` block whose body is + * concatenated from caller-supplied SQL: a naive fixed `$tag$` would + * close the outer block prematurely if any embedded statement happened + * to include the same literal `$tag$` (e.g. a user filter predicate + * that references the string, or a comment, or a CASE branch). + * + * A cryptographically-random 224-bit suffix makes accidental collision + * astronomically unlikely. We still assert (rather than silently + * retrying) because a collision here would almost certainly mean the + * caller is constructing the body adversarially, and we'd rather fail + * loud. + */ +function chooseSafeDollarQuoteTag(bodyContents: string, tagPrefix: string): string { + const tag = `${tagPrefix}_${generateSecureRandomString()}`; + if (bodyContents.includes(`$${tag}$`)) { + throw new StackAssertionError( + "Randomly generated dollar-quote tag collided with body contents; this is astronomically unlikely with a 224-bit suffix and almost certainly indicates adversarial input", + { tag, tagPrefix }, + ); + } + return tag; +} + +/** + * Compiles a downstream-trigger cascade into a plpgsql `DO` block body that + * can be stored in `BulldozerTimeFoldDownstreamCascade.cascadeTemplate` and + * EXECUTEd by `bulldozer_timefold_process_queue()` at runtime. + * + * The generated body: + * - Assumes the caller has already populated `__bulldozer_seq` under + * `cascadeInputName` with rows matching `cascadeInputColumns`. + * - Does NOT acquire the advisory lock or SET LOCAL settings — that + * responsibility belongs to the function wrapping the cascade. + * - Wraps the statement sequence in a `DO $$ ... $$` + * block so `EXECUTE` in plpgsql can run it as a single dispatch. The + * tag is randomized per-call to ensure user-supplied SQL can never + * contain a literal copy of it and close the outer DO block early + * (see `chooseSafeDollarQuoteTag`). + * + * If the downstream trigger graph is empty (no filters/maps/etc. registered), + * returns `null`. Callers should skip the EXECUTE in that case. + */ +export function toCascadeSqlBlock(options: { + cascadeInputName: string, + cascadeInputColumns: string, + statements: SqlStatement[], +}): string | null { + if (options.statements.length === 0) return null; + const seeded = new Map([[options.cascadeInputName, options.cascadeInputColumns]]); + const body = buildExecutableStatementsBlock(options.statements, seeded); + const requiresSortHelpers = options.statements.some((statement) => statement.sql.includes("pg_temp.bulldozer_sort_")); + // Sort helpers use their own $$ dollar quoting inside `CREATE OR REPLACE + // FUNCTION` bodies. They live inside the outer DO so they share the + // enclosing transaction's pg_temp scope with the cascade statements. The + // outer tag is randomized below so nested $$ (or any other fixed tag in + // user SQL) can't close it. + const prelude = requiresSortHelpers ? BULLDOZER_SORT_HELPERS_SQL : ""; + const outerTag = chooseSafeDollarQuoteTag(`${prelude}\n${body}`, "tf_cascade"); + return deindent` + DO $${outerTag}$ + BEGIN + ${prelude} + ${body} + END; + $${outerTag}$ LANGUAGE plpgsql; + `; +} diff --git a/apps/backend/src/lib/bulldozer/db/row-change-trigger-dispatch.ts b/apps/backend/src/lib/bulldozer/db/row-change-trigger-dispatch.ts index d4c45f1279..21e9686ba6 100644 --- a/apps/backend/src/lib/bulldozer/db/row-change-trigger-dispatch.ts +++ b/apps/backend/src/lib/bulldozer/db/row-change-trigger-dispatch.ts @@ -4,7 +4,15 @@ import { stringCompare } from "@stackframe/stack-shared/dist/utils/strings"; import type { SqlExpression, SqlStatement } from "./utilities"; import { quoteSqlIdentifier, quoteSqlStringLiteral, sqlQuery } from "./utilities"; -const CHANGE_OUTPUT_COLUMNS = '"groupKey" jsonb, "rowIdentifier" text, "oldRowSortKey" jsonb, "newRowSortKey" jsonb, "oldRowData" jsonb, "newRowData" jsonb'; +/** + * Column shape of every row-change changes-table flowing between tables + * in the bulldozer graph. One canonical source of truth for both the + * inline trigger dispatch (here), the queue-drain cascade (whose input + * is seeded to this same shape in `declareTimeFoldTable.init()`), and + * any downstream consumer that needs to describe a changes-table's + * columns for `jsonb_to_record(...)` etc. + */ +export const CHANGE_OUTPUT_COLUMNS = '"groupKey" jsonb, "rowIdentifier" text, "oldRowSortKey" jsonb, "newRowSortKey" jsonb, "oldRowData" jsonb, "newRowData" jsonb'; const ROW_CHANGE_DIAGNOSTIC_COLUMN_NAME = "__row_change_table_id"; export type ChangesTableExpression = SqlExpression<{ __brand: "$SQL_Table" }>; export type RowChangeTriggerDiagnostics = { diff --git a/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts b/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts index e5c9f6e93f..7d56a12675 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts @@ -1,7 +1,12 @@ import { generateSecureRandomString } from "@stackframe/stack-shared/dist/utils/crypto"; -import type { Table } from ".."; -import type { RegisteredRowChangeTrigger } from "../row-change-trigger-dispatch"; -import { attachRowChangeTriggerMetadata, normalizeRowChangeTrigger } from "../row-change-trigger-dispatch"; +import { toCascadeSqlBlock, type Table } from ".."; +import { + attachRowChangeTriggerMetadata, + CHANGE_OUTPUT_COLUMNS, + collectRowChangeTriggerStatements, + normalizeRowChangeTrigger, + type RegisteredRowChangeTrigger, +} from "../row-change-trigger-dispatch"; import type { Json, RowData, RowIdentifier, SqlExpression, SqlMapper, TableId, Timestamp } from "../utilities"; import { getStorageEnginePath, @@ -410,7 +415,7 @@ export function declareTimeFoldTable< ON "oldRows"."groupKey" IS NOT DISTINCT FROM "newRows"."groupKey" AND "oldRows"."rowIdentifier" = "newRows"."rowIdentifier" WHERE "oldRows"."rowData" IS DISTINCT FROM "newRows"."rowData" - `.toStatement(timeFoldChangesTableName, '"groupKey" jsonb, "rowIdentifier" text, "oldRowSortKey" jsonb, "newRowSortKey" jsonb, "oldRowData" jsonb, "newRowData" jsonb'), + `.toStatement(timeFoldChangesTableName, CHANGE_OUTPUT_COLUMNS), ]; }; const createFromTableTriggerStatements = (fromChangesTable: SqlExpression<{ __brand: "$SQL_Table" }>) => { @@ -467,6 +472,30 @@ export function declareTimeFoldTable< const fromGroupsTableName = `from_groups_${generateSecureRandomString()}`; const fromRowsTableName = `from_rows_${generateSecureRandomString()}`; const initChangesTableName = `init_changes_${generateSecureRandomString()}`; + + // Compile the downstream trigger cascade into a plpgsql DO block and + // persist it in BulldozerTimeFoldDownstreamCascade, keyed by this + // timefold's tableStoragePath. bulldozer_timefold_process_queue() + // reads and EXECUTEs it after each batch of queue-drained emissions. + // + // This mirrors, on the queue-drain path, what collectRowChangeTriggerStatements + // does on the inline setRow path (see row-change-trigger-dispatch.ts's + // use by the outer runStatements pipeline). Without this, pg_cron- + // drained emissions update the timefold's own rows but never propagate + // to filters/maps/LFolds above — see apps/backend/src/lib/bulldozer/db/ + // timefold-queue-downstream.test.ts. + const cascadeInputName = `tf_cascade_input_${generateSecureRandomString()}`; + const cascadeCollected = collectRowChangeTriggerStatements({ + sourceTableId: tableIdToDebugString(options.tableId), + sourceChangesTable: quoteSqlIdentifier(cascadeInputName), + sourceTableTriggers: triggers, + }); + const cascadeTemplate = toCascadeSqlBlock({ + cascadeInputName, + cascadeInputColumns: CHANGE_OUTPUT_COLUMNS, + statements: cascadeCollected.statements, + }); + return [ sqlStatement` INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value") @@ -477,6 +506,24 @@ export function declareTimeFoldTable< (gen_random_uuid(), ${getStorageEnginePath(options.tableId, ["metadata"])}, '{ "version": 1 }'::jsonb) ON CONFLICT ("keyPath") DO NOTHING `, + // Upsert the cascade registry row. A null template means "no + // downstream triggers registered" — process_queue will skip the + // EXECUTE in that case, matching the no-op semantics of the inline + // path when no triggers are attached. + sqlStatement` + INSERT INTO "BulldozerTimeFoldDownstreamCascade" + ("tableStoragePath", "cascadeInputName", "cascadeTemplate") + VALUES ( + ${tableStoragePath}::jsonb[], + ${quoteSqlStringLiteral(cascadeInputName)}, + ${cascadeTemplate == null ? sqlExpression`NULL::text` : quoteSqlStringLiteral(cascadeTemplate)} + ) + ON CONFLICT ("tableStoragePath") DO UPDATE + SET + "cascadeInputName" = EXCLUDED."cascadeInputName", + "cascadeTemplate" = EXCLUDED."cascadeTemplate", + "updatedAt" = now() + `, options.fromTable.listGroups({ start: "start", end: "end", @@ -518,6 +565,10 @@ export function declareTimeFoldTable< DELETE FROM "BulldozerTimeFoldQueue" WHERE "tableStoragePath" = ${tableStoragePath}::jsonb[] `, + sqlStatement` + DELETE FROM "BulldozerTimeFoldDownstreamCascade" + WHERE "tableStoragePath" = ${tableStoragePath}::jsonb[] + `, sqlStatement` WITH RECURSIVE "pathsToDelete" AS ( SELECT ${getTablePath(options.tableId)}::jsonb[] AS "path" diff --git a/apps/backend/src/lib/bulldozer/db/test-sql-loaders.ts b/apps/backend/src/lib/bulldozer/db/test-sql-loaders.ts new file mode 100644 index 0000000000..969b8f386a --- /dev/null +++ b/apps/backend/src/lib/bulldozer/db/test-sql-loaders.ts @@ -0,0 +1,51 @@ +/** + * Test-only SQL loaders for bulldozer migration artifacts. + * + * Lives next to the bulldozer db code because several test files (both in + * `apps/backend/src/lib/bulldozer` and in `apps/backend/src/lib/payments`) + * need to install the `public.bulldozer_timefold_process_queue()` function + * body from its migration file to exercise the queue-drain path. Keeping + * one canonical loader here avoids drift between copies when the + * migration's comment sentinels or function name change. + * + * Not intended for production code paths — only imported from `*.test.ts` + * files and the payments test-helpers (which is itself only used from + * tests). + */ + +import { readFileSync } from "node:fs"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; + +const HERE = dirname(fileURLToPath(import.meta.url)); + +// apps/backend/src/lib/bulldozer/db/ → apps/backend/prisma/migrations +const MIGRATIONS_DIR = join(HERE, "..", "..", "..", "..", "prisma", "migrations"); + +const DOWNSTREAM_CASCADE_MIGRATION = "20260417000000_bulldozer_timefold_downstream_cascade"; + +/** + * Extracts the `CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue` + * block from the downstream-cascade migration file. The function body is + * what `pg_cron` invokes in prod; installing it into a test database lets + * tests exercise the real drain behaviour end-to-end. + * + * The migration file is split into statements via the + * `-- SPLIT_STATEMENT_SENTINEL` markers already used by the bulldozer + * migration tooling; this loader just picks the block that starts with + * the function definition. + */ +export function loadProcessQueueFunctionSql(): string { + const migrationPath = join(MIGRATIONS_DIR, DOWNSTREAM_CASCADE_MIGRATION, "migration.sql"); + const raw = readFileSync(migrationPath, "utf8"); + const block = raw + .split("-- SPLIT_STATEMENT_SENTINEL") + .map((chunk) => chunk.replaceAll("-- SINGLE_STATEMENT_SENTINEL", "").trim()) + .find((chunk) => chunk.startsWith("CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue")); + if (block == null) { + throw new Error( + `Could not locate bulldozer_timefold_process_queue function body in ${DOWNSTREAM_CASCADE_MIGRATION}/migration.sql`, + ); + } + return block.replace(/;$/, ""); +} diff --git a/apps/backend/src/lib/bulldozer/db/timefold-queue-downstream.test.ts b/apps/backend/src/lib/bulldozer/db/timefold-queue-downstream.test.ts new file mode 100644 index 0000000000..7ebc729711 --- /dev/null +++ b/apps/backend/src/lib/bulldozer/db/timefold-queue-downstream.test.ts @@ -0,0 +1,784 @@ +/** + * `bulldozer_timefold_process_queue()` must propagate emitted rows through + * the downstream trigger cascade (filter/map/LFold/...) the same way the + * inline `setRow` path does via `collectRowChangeTriggerStatements`. + * + * Each timefold's cascade template is precomputed at `init()` time and + * stored in BulldozerTimeFoldDownstreamCascade; the rewritten process_queue + * populates `__bulldozer_seq` with newly-emitted rows under the timefold's + * input name and EXECUTEs the template. + */ + +import postgres from "postgres"; +import { afterAll, beforeAll, beforeEach, describe, expect, test } from "vitest"; +import { + declareFilterTable, + declareGroupByTable, + declareMapTable, + declareStoredTable, + declareTimeFoldTable, + toExecutableSqlTransaction, + toQueryableSqlQuery, +} from "./index"; +import { loadProcessQueueFunctionSql } from "./test-sql-loaders"; + +type SqlExpression = { type: "expression", sql: string }; +type SqlStatement = { type: "statement", sql: string, outputName?: string }; +type SqlQuery = { type: "query", sql: string, toStatement(outputName?: string): SqlStatement }; +type SqlMapper = { type: "mapper", sql: string }; +type SqlPredicate = { type: "predicate", sql: string }; + +function expr(sql: string): SqlExpression { + return { type: "expression", sql }; +} +function mapper(sql: string): SqlMapper { + return { type: "mapper", sql }; +} +function predicate(sql: string): SqlPredicate { + return { type: "predicate", sql }; +} + +const TEST_DB_PREFIX = "stack_bulldozer_queue_downstream_test"; + +function getTestDbUrls() { + const env = Reflect.get(import.meta, "env"); + const connectionString = Reflect.get(env, "STACK_DATABASE_CONNECTION_STRING"); + if (typeof connectionString !== "string" || connectionString.length === 0) { + throw new Error("Missing STACK_DATABASE_CONNECTION_STRING"); + } + const base = connectionString.replace(/\/[^/]*(\?.*)?$/, ""); + const query = connectionString.split("?")[1] ?? ""; + const dbName = `${TEST_DB_PREFIX}_${Math.random().toString(16).slice(2, 12)}`; + return { + full: query.length === 0 ? `${base}/${dbName}` : `${base}/${dbName}?${query}`, + base, + }; +} + +const PROCESS_QUEUE_FN_SQL = loadProcessQueueFunctionSql(); + +describe.sequential("timefold queue downstream cascade (real postgres)", () => { + const dbUrls = getTestDbUrls(); + const dbName = dbUrls.full.replace(/^.*\//, "").replace(/\?.*$/, ""); + const adminSql = postgres(dbUrls.base, { onnotice: () => undefined }); + const sql = postgres(dbUrls.full, { onnotice: () => undefined, max: 1 }); + + async function runStatements(statements: SqlStatement[]) { + await sql.unsafe(toExecutableSqlTransaction(statements)); + } + async function readRows(query: SqlQuery) { + return await sql.unsafe(toQueryableSqlQuery(query)); + } + async function setLastProcessedAt(isoOrExpression: string) { + await sql.unsafe(` + UPDATE "BulldozerTimeFoldMetadata" + SET "lastProcessedAt" = (${isoOrExpression})::timestamptz, + "updatedAt" = now() + WHERE "key" = 'singleton' + `); + } + async function processQueue() { + await sql.unsafe(`SELECT public.bulldozer_timefold_process_queue()`); + } + async function countQueueRows() { + const rows = await sql>` + SELECT COUNT(*)::int AS "count" FROM "BulldozerTimeFoldQueue" + `; + return rows[0].count; + } + + beforeAll(async () => { + await adminSql.unsafe(`CREATE DATABASE ${dbName}`); + }, 60_000); + + // beforeEach does a lot (drop/recreate all bulldozer tables + install the + // ~250-line process_queue plpgsql function); the default 10s vitest hook + // timeout can be tight especially under parallel test files. + const HOOK_TIMEOUT_MS = 60_000; + + beforeEach(async () => { + await sql`CREATE EXTENSION IF NOT EXISTS pgcrypto`; + await sql`DROP FUNCTION IF EXISTS public.bulldozer_timefold_process_queue()`; + await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldDownstreamCascade"`; + await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldQueue"`; + await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldMetadata"`; + await sql`DROP TABLE IF EXISTS "BulldozerStorageEngine"`; + await sql` + CREATE TABLE "BulldozerStorageEngine" ( + "id" UUID NOT NULL DEFAULT gen_random_uuid(), + "keyPath" JSONB[] NOT NULL, + "keyPathParent" JSONB[] GENERATED ALWAYS AS ( + CASE + WHEN cardinality("keyPath") = 0 THEN NULL + ELSE "keyPath"[1:cardinality("keyPath") - 1] + END + ) STORED, + "value" JSONB NOT NULL, + CONSTRAINT "BulldozerStorageEngine_pkey" PRIMARY KEY ("id"), + CONSTRAINT "BulldozerStorageEngine_keyPath_key" UNIQUE ("keyPath"), + CONSTRAINT "BulldozerStorageEngine_keyPathParent_fkey" + FOREIGN KEY ("keyPathParent") + REFERENCES "BulldozerStorageEngine"("keyPath") + ON DELETE CASCADE + ) + `; + await sql`CREATE INDEX "BulldozerStorageEngine_keyPathParent_idx" ON "BulldozerStorageEngine"("keyPathParent")`; + await sql` + INSERT INTO "BulldozerStorageEngine" ("keyPath", "value") + VALUES + (ARRAY[]::jsonb[], 'null'::jsonb), + (ARRAY[to_jsonb('table'::text)]::jsonb[], 'null'::jsonb) + `; + await sql` + CREATE TABLE "BulldozerTimeFoldQueue" ( + "id" UUID NOT NULL DEFAULT gen_random_uuid(), + "tableStoragePath" JSONB[] NOT NULL, + "groupKey" JSONB NOT NULL, + "rowIdentifier" TEXT NOT NULL, + "scheduledAt" TIMESTAMPTZ NOT NULL, + "stateAfter" JSONB NOT NULL, + "rowData" JSONB NOT NULL, + "reducerSql" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "BulldozerTimeFoldQueue_pkey" PRIMARY KEY ("id"), + CONSTRAINT "BulldozerTimeFoldQueue_table_group_row_key" UNIQUE ("tableStoragePath", "groupKey", "rowIdentifier") + ) + `; + await sql` + CREATE INDEX "BulldozerTimeFoldQueue_scheduledAt_idx" + ON "BulldozerTimeFoldQueue"("scheduledAt") + `; + await sql` + CREATE TABLE "BulldozerTimeFoldMetadata" ( + "key" TEXT PRIMARY KEY, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "lastProcessedAt" TIMESTAMPTZ NOT NULL + ) + `; + await sql` + INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt") + VALUES ('singleton', now()) + `; + await sql` + CREATE TABLE "BulldozerTimeFoldDownstreamCascade" ( + "tableStoragePath" JSONB[] NOT NULL, + "cascadeInputName" TEXT NOT NULL, + "cascadeTemplate" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath") + ) + `; + // Install the rewritten process_queue function body from the cascade + // migration. + await sql.unsafe(PROCESS_QUEUE_FN_SQL); + }, HOOK_TIMEOUT_MS); + + afterAll(async () => { + await sql.end(); + await adminSql.unsafe(` + SELECT pg_terminate_backend(pg_stat_activity.pid) + FROM pg_stat_activity + WHERE pg_stat_activity.datname = '${dbName}' + AND pid <> pg_backend_pid() + `); + await adminSql.unsafe(`DROP DATABASE IF EXISTS ${dbName}`); + await adminSql.end(); + }); + + // Reducer that emits {phase:'initial'} inline and schedules a past-due tick + // that emits {phase:'scheduled'}. The inline recursion stops because + // nextTimestamp > lastProcessedAt (we back lastProcessedAt off below). + // After process_queue runs, the scheduled-tick emission must propagate. + const splitPhaseReducerSql = ` + CASE WHEN "timestamp" IS NULL THEN 1 ELSE 2 END AS "newState", + jsonb_build_array( + jsonb_build_object( + 'phase', CASE WHEN "timestamp" IS NULL THEN 'initial' ELSE 'scheduled' END, + 'team', "oldRowData"->'team', + 'value', (("oldRowData"->>'value')::int) + ) + ) AS "newRowsData", + CASE + WHEN "timestamp" IS NULL THEN (now() - interval '1 second') + ELSE NULL::timestamptz + END AS "nextTimestamp" + `; + + /** + * Reads the `phase` string out of a row's `rowdata` with runtime type + * checks. Used by the delete-before-drain and dollar-quote tests to + * assert which phases made it through the pipeline. Fails loud rather + * than silently returning `undefined` if the row shape is unexpected. + * + * Takes `unknown` (rather than a narrower row type) because the + * `postgres.js` driver's `Row` type doesn't statically guarantee a + * `rowdata` column. + */ + function rowPhase(row: unknown): string { + if (row == null || typeof row !== "object") { + throw new Error(`Expected row object, got ${typeof row}`); + } + const rowData = Reflect.get(row, "rowdata"); + if (rowData == null || typeof rowData !== "object") { + throw new Error(`Expected object rowdata, got ${typeof rowData}`); + } + const phase = Reflect.get(rowData, "phase"); + if (typeof phase !== "string") { + throw new Error(`Expected string 'phase' field in rowdata, got ${typeof phase}: ${JSON.stringify(rowData)}`); + } + return phase; + } + + // ──────────────────────────────────────────────────────────────────── + // Test 1: single filter downstream + // ──────────────────────────────────────────────────────────────────── + test("process_queue propagates emissions to a single downstream filter", async () => { + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-filter-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-filter-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-filter-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-filter-u-scheduled-only", + fromTable: timeFoldTable, + filter: predicate(`"rowData"->>'phase' = 'scheduled'`), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + // Back the clock up before any setRow so the inline recursion bails. + // Use pre-epoch time so the reducer's scheduled tick (now()-1s) stays + // ahead of lastProcessedAt regardless of wall-clock drift. + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + + await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":7}'::jsonb`))); + + expect(await countQueueRows()).toBe(1); + const filteredBeforeDrain = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(filteredBeforeDrain).toEqual([]); + + await setLastProcessedAt(`now()`); + await processQueue(); + + const filteredAfterDrain = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + // Filter wraps rows through an internal flat-map that appends a + // flatIndex to the rowIdentifier (":1" for the single-element array). + // That's an implementation detail, not something to assert on. + expect(filteredAfterDrain.map((row) => row.rowdata)).toEqual([ + { phase: "scheduled", team: "alpha", value: 7 }, + ]); + expect(await countQueueRows()).toBe(0); + }); + + // ──────────────────────────────────────────────────────────────────── + // Test 2: multi-stage cascade (timefold → filter → map → map) + // Exercises transitive propagation across three downstream stages. + // ──────────────────────────────────────────────────────────────────── + test("process_queue propagates emissions through filter → map → map", async () => { + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-multistage-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-multistage-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-multistage-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-multistage-u-scheduled-only", + fromTable: timeFoldTable, + filter: predicate(`"rowData"->>'phase' = 'scheduled'`), + }); + const mappedTable = declareMapTable({ + tableId: "queue-cascade-multistage-u-mapped", + fromTable: filteredTable, + mapper: mapper(` + ("rowData"->'team') AS "team", + (("rowData"->>'value')::int * 10) AS "valueTimesTen" + `), + }); + const reMappedTable = declareMapTable({ + tableId: "queue-cascade-multistage-u-remapped", + fromTable: mappedTable, + mapper: mapper(` + ("rowData"->'team') AS "team", + (("rowData"->>'valueTimesTen')::int + 1) AS "valueTimesTenPlusOne" + `), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + await runStatements(mappedTable.init()); + await runStatements(reMappedTable.init()); + + await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":3}'::jsonb`))); + await runStatements(fromTable.setRow("u2", expr(`'{"team":"alpha","value":5}'::jsonb`))); + + expect(await countQueueRows()).toBe(2); + for (const table of [filteredTable, mappedTable, reMappedTable]) { + const rows = await readRows(table.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(rows).toEqual([]); + } + + await setLastProcessedAt(`now()`); + await processQueue(); + + const filtered = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(filtered.map((r) => r.rowdata).sort((a, b) => + Number(Reflect.get(a as object, "value")) - Number(Reflect.get(b as object, "value")) + )).toEqual([ + { phase: "scheduled", team: "alpha", value: 3 }, + { phase: "scheduled", team: "alpha", value: 5 }, + ]); + + const mapped = await readRows(mappedTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(mapped.map((r) => r.rowdata).sort((a, b) => + Number(Reflect.get(a as object, "valueTimesTen")) - Number(Reflect.get(b as object, "valueTimesTen")) + )).toEqual([ + { team: "alpha", valueTimesTen: 30 }, + { team: "alpha", valueTimesTen: 50 }, + ]); + + const reMapped = await readRows(reMappedTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(reMapped.map((r) => r.rowdata).sort((a, b) => + Number(Reflect.get(a as object, "valueTimesTenPlusOne")) - Number(Reflect.get(b as object, "valueTimesTenPlusOne")) + )).toEqual([ + { team: "alpha", valueTimesTenPlusOne: 31 }, + { team: "alpha", valueTimesTenPlusOne: 51 }, + ]); + }); + + // ──────────────────────────────────────────────────────────────────── + // Test 3: inline path and queue path produce identical downstream state + // ──────────────────────────────────────────────────────────────────── + test("inline-drain and queue-drain produce identical downstream state", async () => { + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-parity-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-parity-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-parity-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-parity-u-scheduled-only", + fromTable: timeFoldTable, + filter: predicate(`"rowData"->>'phase' = 'scheduled'`), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + + // Customer "inline": lastProcessedAt way ahead → tick fires inline. + await setLastProcessedAt(`'2099-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + await runStatements(fromTable.setRow("inline-u1", expr(`'{"team":"inline","value":9}'::jsonb`))); + expect(await countQueueRows()).toBe(0); + + // Customer "queue": lastProcessedAt behind → tick queued. + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(fromTable.setRow("queue-u1", expr(`'{"team":"queue","value":9}'::jsonb`))); + expect(await countQueueRows()).toBe(1); + + await setLastProcessedAt(`now()`); + await processQueue(); + expect(await countQueueRows()).toBe(0); + + const inlineRows = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('inline'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + const queueRows = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('queue'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + + const normalize = (rows: ReadonlyArray>) => + rows.map((row) => { + const rowIdentifier = row.rowidentifier; + if (typeof rowIdentifier !== "string") throw new Error("expected string rowidentifier"); + const rowData = row.rowdata; + if (rowData == null || typeof rowData !== "object") throw new Error("expected object rowdata"); + return { + rowIdentifierSuffix: rowIdentifier.replace(/^(inline|queue)-u1:/, "u1:"), + rowData: { + ...(rowData as Record), + team: "", + }, + }; + }); + + expect(normalize([...queueRows])).toEqual(normalize([...inlineRows])); + expect([...inlineRows]).not.toEqual([]); + }); + + // ──────────────────────────────────────────────────────────────────── + // Test 4: idempotency — redraining with no new rows is a no-op + // ──────────────────────────────────────────────────────────────────── + test("process_queue is idempotent when there is nothing new to drain", async () => { + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-idempotency-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-idempotency-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-idempotency-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-idempotency-u-scheduled-only", + fromTable: timeFoldTable, + filter: predicate(`"rowData"->>'phase' = 'scheduled'`), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + + await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":4}'::jsonb`))); + await setLastProcessedAt(`now()`); + + await processQueue(); + const afterFirstDrain = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(afterFirstDrain).toHaveLength(1); + + // Second drain: no due queue rows. No-op at every layer. + await processQueue(); + const afterSecondDrain = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(afterSecondDrain).toEqual(afterFirstDrain); + expect(await countQueueRows()).toBe(0); + }); + + // ──────────────────────────────────────────────────────────────────── + // Test 5: deleting a downstream table must NOT wedge process_queue. + // + // The cascade template is compiled at upstream-init() time and + // references the downstream's storage paths. If the downstream is + // .delete()d while the upstream still has queue rows pending, the + // drain must still succeed without a FK violation. The safety comes + // from every trigger's first statement carrying a + // `WHERE isInitializedExpression` clause that short-circuits the rest + // of the pipeline when the downstream's metadata row is absent. The + // queue-drain cascade inherits the same statements verbatim, so it + // inherits the same safety — this test pins that invariant down. + // ──────────────────────────────────────────────────────────────────── + test("process_queue does not wedge when a downstream table is deleted", async () => { + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-deleted-downstream-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-deleted-downstream-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-deleted-downstream-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-deleted-downstream-u-scheduled-only", + fromTable: timeFoldTable, + filter: predicate(`"rowData"->>'phase' = 'scheduled'`), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + + await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":7}'::jsonb`))); + expect(await countQueueRows()).toBe(1); + + // At this point the inline setRow path has already propagated the + // {phase:initial} row all the way through to the filter's storage. + // Blow the filter's storage away while the {phase:scheduled} tick + // is still queued — the upstream's cascade template was compiled + // referencing the filter's paths and is NOT updated by .delete(), + // so this is the delete-before-drain scenario. + await runStatements(filteredTable.delete()); + + await setLastProcessedAt(`now()`); + // If the cascade template's WHERE-gated statements didn't no-op on + // missing downstream storage, this would throw an FK violation and + // the queue would stay wedged forever. + await processQueue(); + + expect(await countQueueRows()).toBe(0); + + // The timefold's own state must reflect both emissions (the inline + // {initial} from before the delete and the queue-drained {scheduled} + // from after). A wedged drain would roll everything back and leave + // the timefold with only the initial row. + const timefoldRows = await readRows(timeFoldTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(timefoldRows.map(rowPhase).sort()).toEqual(["initial", "scheduled"]); + }); + + // ──────────────────────────────────────────────────────────────────── + // Test 6: statements whose SQL contains `$tf_cascade$` as a literal + // substring must not prematurely close the outer cascade DO-block. + // + // The cascade template is wrapped in `DO $tf_cascade$ ... $tf_cascade$`. + // If any embedded statement happens to include that literal — e.g. in + // a SQL comment or a user-provided expression — the outer dollar quote + // closes mid-body and EXECUTE fails at parse time. + // ──────────────────────────────────────────────────────────────────── + test("process_queue tolerates downstream SQL containing the cascade dollar-quote delimiter", async () => { + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-dollar-collision-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-dollar-collision-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-dollar-collision-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + // The filter predicate embeds the literal `$tf_cascade$` in a way + // that always evaluates true. This text flows through + // `collectRowChangeTriggerStatements` into the stored cascade + // template verbatim, so if the outer dollar-quoting is not robust, + // parse time EXECUTE inside process_queue will fail. + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-dollar-collision-u-all", + fromTable: timeFoldTable, + filter: predicate(`('$tf_cascade$' IS NOT NULL) OR ("rowData"->>'phase' = 'scheduled')`), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + + await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":11}'::jsonb`))); + expect(await countQueueRows()).toBe(1); + + await setLastProcessedAt(`now()`); + // If the outer dollar-quote delimiter collided with the embedded + // `$tf_cascade$` string, EXECUTE raises `syntax error at or near ...` + // and the entire drain rolls back (queue stays at 1). The fix + // (`chooseSafeDollarQuoteTag`) randomizes the outer tag per call, so + // the embedded literal is just another string in the body. + await processQueue(); + + expect(await countQueueRows()).toBe(0); + + // Predicate evaluates to always-true thanks to the first disjunct, + // so both the inline-emitted {initial} row and the queue-drained + // {scheduled} row make it through. The point of the assertion is + // that the cascade ran and wrote the scheduled row at all — if the + // delimiter collision had broken the DO block, we'd see only + // {initial} (written synchronously at setRow time, before the queue + // drain). + const filteredRows = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(filteredRows.map(rowPhase).sort()).toEqual(["initial", "scheduled"]); + }); + + // ──────────────────────────────────────────────────────────────────── + // when process_queue() can't find a cascade + // registry row for a timefold, it must DEFER that timefold's queue + // rows (leave them queued) instead of draining them and silently + // skipping the downstream cascade. + // + // Why this matters — a concrete example of the failure mode: + // + // 1. This migration (20260417000000) creates the registry table + // BulldozerTimeFoldDownstreamCascade. It's empty at first. + // + // 2. The backend starts up. `declareTimeFoldTable.init()` runs for + // every timefold and upserts one row per timefold into the + // registry, storing the pre-compiled cascade SQL template. + // + // 3. There is a short gap between (1) and (2). During that gap, + // pg_cron keeps calling process_queue() every second. + // + // 4. If a due queue row exists for a timefold whose registry row + // hasn't been upserted yet, and process_queue() drains it + // anyway without a cascade to run, then: + // + // - the queue row is gone, + // - the timefold's own state is updated, + // - but NONE of the downstream filters/maps/LFolds hear + // about it, and there's no queue row left for a future + // tick to retry. + // + // → downstream tables are permanently desynchronized from + // the timefold. + // + // The right behavior: no registry row → do nothing for this + // timefold this tick. The queue row stays queued. Once init() + // finishes and the registry row appears, the next pg_cron tick + // drains with cascade intact. + // + // This test simulates the deploy-window gap by deleting the + // registry row after init() has run (so we know the rest of the + // pipeline is set up), then calling process_queue() and asserting + // that nothing silently advanced. + // ──────────────────────────────────────────────────────────────────── + test("process_queue defers a timefold whose cascade registry row is missing", async () => { + // ---- Setup: build a small timefold pipeline ---- + // + // The chain is: source data → group by team → timefold (recurses + // through time) → filter (keeps only phase=scheduled rows). + // + // Calling each table's init() wires up its storage. The + // timeFoldTable's init() is the one that inserts the registry + // row we care about — it stores the cascade template that + // process_queue() looks up at drain time. + const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-missing-registry-u" }); + const groupedTable = declareGroupByTable({ + tableId: "queue-cascade-missing-registry-u-by-team", + fromTable, + groupBy: mapper(`"rowData"->'team' AS "groupKey"`), + }); + const timeFoldTable = declareTimeFoldTable({ + tableId: "queue-cascade-missing-registry-u-folded", + fromTable: groupedTable, + initialState: expr(`'0'::jsonb`), + reducer: mapper(splitPhaseReducerSql), + }); + const filteredTable = declareFilterTable({ + tableId: "queue-cascade-missing-registry-u-scheduled-only", + fromTable: timeFoldTable, + filter: predicate(`"rowData"->>'phase' = 'scheduled'`), + }); + + await runStatements(fromTable.init()); + await runStatements(groupedTable.init()); + // Backdate the "last processed" clock so any future tick the + // reducer schedules looks like it's still in the future when + // setRow fires. That way the {scheduled} emission gets QUEUED + // instead of running inline. + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + await runStatements(timeFoldTable.init()); + await runStatements(filteredTable.init()); + + // ---- Generate one inline emission + one queued emission ---- + // + // splitPhaseReducerSql is written so: + // - First call (timestamp=NULL) emits {phase:'initial'} and + // schedules a future tick. + // - The future tick (when drained) emits {phase:'scheduled'}. + // + // setRow fires the inline path: {initial} flows through the + // whole chain right now. The filter predicate + // `"rowData"->>'phase' = 'scheduled'` rejects {initial}, so the + // filter stays empty. Meanwhile the scheduled tick lands in + // BulldozerTimeFoldQueue for later. + await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":7}'::jsonb`))); + expect(await countQueueRows()).toBe(1); + + // ---- Simulate the deploy-window gap ---- + // + // We just saw init() upsert a registry row above. To mimic the + // case where init() hasn't run yet (migration applied but + // backend hasn't reached init() for this timefold), delete + // every row in the registry table by hand. Now it looks + // identical to the fresh-after-migration state. + await sql.unsafe(`DELETE FROM "BulldozerTimeFoldDownstreamCascade"`); + + // ---- Run process_queue() as pg_cron would ---- + // + // Advance the clock past the queued tick's scheduledAt so it's + // due, then drain. + await setLastProcessedAt(`now()`); + await processQueue(); + + // ---- Assert we deferred instead of silently losing state ---- + // + // (1) The queue row is still there. process_queue() saw that + // the registry had no row for this timefold and said "I + // don't know which cascade to run, so I'll leave this for + // the next tick." Nothing was drained, nothing was + // skipped. + expect(await countQueueRows()).toBe(1); + + // (2) The timefold's own state wasn't advanced by the drain. + // Only the inline-emitted {initial} row is visible; the + // {scheduled} row is still sitting in the queue waiting + // for a tick with a registered cascade to process it. + // + // Contrast with the buggy (pre-fix) behavior: the timefold + // would have had BOTH "initial" and "scheduled" here, with + // the filter permanently missing "scheduled". + const timefoldRows = await readRows(timeFoldTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(timefoldRows.map(rowPhase).sort()).toEqual(["initial"]); + + // (3) The filter is empty, which is the correct steady state: + // the inline setRow's {initial} was filtered out, and the + // {scheduled} row hasn't been drained yet. No partial + // writes, no orphan rows. Once init() runs and the + // registry is populated, the next pg_cron tick will drain + // the queue row and propagate {scheduled} into this + // filter. + const filterRows = await readRows(filteredTable.listRowsInGroup({ + groupKey: expr(`to_jsonb('alpha'::text)`), + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + expect(filterRows).toEqual([]); + }); +}); diff --git a/apps/backend/src/lib/payments/schema/__tests__/integration-1-3-queue-drained.test.ts b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3-queue-drained.test.ts new file mode 100644 index 0000000000..73bf11559f --- /dev/null +++ b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3-queue-drained.test.ts @@ -0,0 +1,292 @@ +/** + * Queue-drained variant of the phase 1→3 integration tests: subscription + * lifecycle events scheduled in the future (sub-end on + * `cancelAtPeriodEnd`, monthly item-grant-repeat ticks) defer to the + * BulldozerTimeFoldQueue and must propagate through the downstream cascade + * — events → transactions → itemQuantities / ownedProducts — when drained + * by `public.bulldozer_timefold_process_queue()` (the pg_cron path). + * + * The sibling `integration-1-3.test.ts` seeds lastProcessedAt = 2099 so + * every scheduled tick fires inline at setRow time and the queue is never + * exercised. These tests instead keep lastProcessedAt at the present, let + * future ticks stay queued, then advance the clock and invoke the drain + * function — mirroring real pg_cron behaviour. + */ + +import { afterAll, beforeAll, describe, expect, it } from "vitest"; +import { createPaymentsSchema } from "../index"; +import { createTestDb, jsonbExpr } from "./test-helpers"; + +const DAY_MS = 86400000; +const MONTH_MS = 2592000000; + +describe.sequential("payments schema integration phase 1→3, queue-drained path (real postgres)", () => { + // Clock starts at now() so that future scheduledAt values stay queued + // (vs the default `createTestDb` behaviour of lastProcessedAt = 2099, + // which would fire every tick inline). `installProcessQueueFn` installs + // the rewritten process_queue function body from the cascade migration + // so `processQueue()` exercises the real prod function. + const db = createTestDb({ + lastProcessedAt: "now()", + installProcessQueueFn: true, + }); + const { runStatements, readRows, setLastProcessedAt, processQueue, countQueueRows } = db; + const schema = createPaymentsSchema(); + + const getRowDatas = async (table: { listRowsInGroup: (opts: any) => any }) => { + const rows = await readRows(table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + })); + return rows.map((r: any) => r.rowdata); + }; + + beforeAll(async () => { + await db.setup(); + for (const table of schema._allTables) { + await runStatements(table.init()); + } + }, 120_000); + + afterAll(async () => { + await db.teardown(); + }); + + + // ============================================================ + // Test 6: mid-period upgrade with pg_cron-drained end event + // + // Free sub ending mid-period + team sub starting the next day. + // With lastProcessedAt in the recent past, the free sub's end event + // gets QUEUED (not fired inline). Once drained, downstream ledgers + // must reflect: only team's 500 emails, not 100 + 500 = 600. + // ============================================================ + describe("mid-period upgrade with queue-drained end event", () => { + it("queues the subscription-end event instead of firing inline", async () => { + // subscription-timefold-algo derives nextTimestamp from millis fields + // like endedAtMillis / repeat intervals. These are raw epoch millis, + // so they map to ~1970. To make the inline recursion's + // `nextTimestamp > lastProcessedAt` check hold (= "defer to queue"), + // we set lastProcessedAt to pre-epoch. + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + + await runStatements(schema.subscriptions.setRow("sub-q-free", jsonbExpr({ + id: "sub-q-free", + tenancyId: "t1", + customerId: "u-q-upgrade", + customerType: "user", + productId: "prod-q-free", + priceId: "p-free", + product: { + displayName: "Free (queued)", + customerType: "user", + productLineId: "line-q-upgrade", + prices: { "p-free": { USD: "0" } }, + includedItems: { + emails: { quantity: 100, repeat: [1, "month"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "canceled", + currentPeriodStartMillis: 0, + currentPeriodEndMillis: MONTH_MS, + cancelAtPeriodEnd: false, + canceledAtMillis: 10 * DAY_MS, + endedAtMillis: 10 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 0, + }))); + + // The subscription-end tick is scheduled at `endedAtMillis` which is + // > lastProcessedAt (10 days from epoch > "one hour ago"). It must + // therefore be queued, not emitted inline. + const queued = await countQueueRows(); + expect(queued).toBeGreaterThan(0); + + const endEventsBeforeDrain = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId === "sub-q-free"); + expect(endEventsBeforeDrain).toEqual([]); + }); + + it("after drain, end event fires AND downstream cascade runs; upgrade does not stack", async () => { + // Now bring the team sub online. + await runStatements(schema.subscriptions.setRow("sub-q-team", jsonbExpr({ + id: "sub-q-team", + tenancyId: "t1", + customerId: "u-q-upgrade", + customerType: "user", + productId: "prod-q-team", + priceId: "p-team", + product: { + displayName: "Team (queued)", + customerType: "user", + productLineId: "line-q-upgrade", + prices: { "p-team": { USD: "30" } }, + includedItems: { + emails: { quantity: 500, repeat: [1, "month"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "canceled", + currentPeriodStartMillis: 11 * DAY_MS, + currentPeriodEndMillis: 11 * DAY_MS + MONTH_MS, + cancelAtPeriodEnd: true, + canceledAtMillis: 20 * DAY_MS, + endedAtMillis: 20 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 11 * DAY_MS, + }))); + + // Bump clock far enough forward to make every queued tick due, then drain. + await setLastProcessedAt(`'2099-01-01T00:00:00Z'`); + await processQueue(); + expect(await countQueueRows()).toBe(0); + + const endEvents = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId === "sub-q-free"); + expect(endEvents).toHaveLength(1); + + const transactions = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.customerId === "u-q-upgrade"); + const endTxns = transactions.filter((t: any) => t.txnId === "sub-end:sub-q-free"); + expect(endTxns).toHaveLength(1); + + const itemQuantityRows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-q-upgrade") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + + const atTeamStart = itemQuantityRows.find((r: any) => r.txnId === "sub-start:sub-q-team"); + expect(atTeamStart).toBeDefined(); + // Stacking regression: without downstream-cascade propagation on the + // queue path, sub-free's end event never fires the cascade, so + // atTeamStart.emails accumulates free.100 + team.500 = 600 instead of + // the team-only 500. + expect(atTeamStart.itemQuantities.emails).toBe(500); + }); + }); + + + // ============================================================ + // Test 7: monthly repeat reset via pg_cron + // + // Sub with a monthly-repeating quota item. Between sub-start and the + // first repeat, balance must reflect the initial grant. After pg_cron + // drains the repeat tick, balance must reflect the REFRESHED grant + // (not doubled, not zero). + // ============================================================ + describe("monthly repeat reset via queue drain", () => { + it("repeat tick is queued until the clock is advanced past it", async () => { + await setLastProcessedAt(`'1969-01-01T00:00:00Z'`); + + await runStatements(schema.subscriptions.setRow("sub-q-repeat", jsonbExpr({ + id: "sub-q-repeat", + tenancyId: "t1", + customerId: "u-q-repeat", + customerType: "user", + productId: "prod-q-repeat", + priceId: "p1", + product: { + displayName: "Repeat (queued)", + customerType: "user", + productLineId: "line-q-repeat", + prices: { p1: { USD: "10" } }, + includedItems: { + quota: { quantity: 200, repeat: [1, "month"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "active", + currentPeriodStartMillis: 0, + currentPeriodEndMillis: MONTH_MS, + cancelAtPeriodEnd: false, + canceledAtMillis: null, + endedAtMillis: 45 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 0, + }))); + + expect(await countQueueRows()).toBeGreaterThan(0); + + const initialRows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-q-repeat") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + const atStart = initialRows.find((r: any) => r.txnId === "sub-start:sub-q-repeat"); + expect(atStart).toBeDefined(); + expect(atStart.itemQuantities.quota).toBe(200); + }); + + it("after drain, quota is refreshed (not doubled, not stuck)", async () => { + // Advance clock past all repeats (and past endedAt). + await setLastProcessedAt(`'2099-01-01T00:00:00Z'`); + await processQueue(); + expect(await countQueueRows()).toBe(0); + + const rows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-q-repeat") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + + // After every repeat and the final subscription-end, quota must drop + // to 0 — both repeat grants and the final one expire. Without the + // downstream cascade on the queue path, queued events never reach + // itemQuantities and this stays at the initial 200 (or whatever the + // inline path wrote at setRow time). + const latest = rows[rows.length - 1]; + expect(latest.itemQuantities.quota).toBe(0); + + // Between repeats, there should be exactly one active repeat grant at + // a time — never doubled. The queue drain is expected to emit at + // least one `igr:sub-q-repeat:*` row into itemQuantities (the + // 30-day repeat fires once between sub-start at t=0 and sub-end at + // t=45d). Asserting its presence unconditionally is the whole point + // of this test — the original regression was that the cascade + // dropped these emissions entirely, so a `find() == null` must fail + // loud, not silently skip. + const midPeriodRow = rows.find((r: any) => + r.txnId?.startsWith("igr:sub-q-repeat:") && r.itemQuantities?.quota != null + ); + expect( + midPeriodRow, + "expected at least one igr:sub-q-repeat:* row in itemQuantities; if this is null the queue-drain cascade dropped the repeat emission entirely", + ).toBeDefined(); + expect(midPeriodRow.itemQuantities.quota).toBe(200); + }); + }); + + + // ============================================================ + // Test 8: re-drain idempotency at the payments layer + // + // Draining twice with nothing new in between must not duplicate + // subscription-end events, transactions, or item-quantity changes. + // ============================================================ + describe("re-drain idempotency at the payments layer", () => { + it("second process_queue call with no new queue rows is a no-op", async () => { + // Snapshot counts after prior tests' drains. + const endEventsBefore = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId?.startsWith("sub-q-")); + const endTxnsBefore = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.txnId?.startsWith("sub-end:sub-q-")); + const itemQuantitiesBefore = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId?.startsWith("u-q-")); + + expect(await countQueueRows()).toBe(0); + await processQueue(); + + const endEventsAfter = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId?.startsWith("sub-q-")); + const endTxnsAfter = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.txnId?.startsWith("sub-end:sub-q-")); + const itemQuantitiesAfter = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId?.startsWith("u-q-")); + + expect(endEventsAfter).toHaveLength(endEventsBefore.length); + expect(endTxnsAfter).toHaveLength(endTxnsBefore.length); + expect(itemQuantitiesAfter).toHaveLength(itemQuantitiesBefore.length); + }); + }); +}); diff --git a/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts index 5535480154..32a7824a98 100644 --- a/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts +++ b/apps/backend/src/lib/payments/schema/__tests__/integration-1-3.test.ts @@ -893,6 +893,371 @@ describe.sequential("payments schema integration phase 1→3 (real postgres)", ( }); + // ============================================================ + // Full when-repeated lifecycle: sub-start → item-grant-repeat → sub-end. + // item-quantity-expire entries in the sub-end transaction reference the + // preceding item-grant-repeat by txn id. Both the id text and the + // reference text must match byte-for-byte or the expire silently fails + // to resolve the grant and the `when-repeated` balance stays at the + // last-granted quantity instead of dropping to 0. + // ============================================================ + + describe("item-quantity-expire resolves across item-grant-repeat → sub-end", () => { + const DAY_MS = 86400000; + + beforeAll(async () => { + await runStatements(schema.subscriptions.setRow("sub-repeat-to-end", jsonbExpr({ + id: "sub-repeat-to-end", + tenancyId: "t1", + customerId: "u-repeat-to-end", + customerType: "user", + productId: "prod-repeat-to-end", + priceId: "p1", + product: { + displayName: "Repeat Then End Plan", + customerType: "user", + productLineId: "line-repeat-to-end", + prices: { p1: { USD: "10" } }, + includedItems: { + // 7-day repeat interval is an exact whole-second epoch offset, + // which is where subtle NUMERIC-vs-bigint mismatches around + // `->>effectiveAtMillis` tend to surface. + quota: { quantity: 100, repeat: [7, "day"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "canceled", + currentPeriodStartMillis: 0, + currentPeriodEndMillis: MONTH_MS, + cancelAtPeriodEnd: true, + // Ends at 14d: fires one repeat at 7d, then sub-end at 14d. + canceledAtMillis: 14 * DAY_MS, + endedAtMillis: 14 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 0, + }))); + }); + + it("item-grant-repeat transaction id has no trailing decimals", async () => { + const txns = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.customerId === "u-repeat-to-end"); + const igr = txns.find((t: any) => + typeof t.txnId === "string" && t.txnId.startsWith("igr:sub-repeat-to-end:") + ); + expect(igr).toBeDefined(); + // transactions.ts derives this id from the event's effectiveAtMillis + // via `->>`. If that value was stored in JSONB as a NUMERIC with + // fractional scale (e.g. "604800000.000000") the id text picks up + // the trailing zeros and no longer matches references built via + // `::bigint::text` elsewhere in the reducer. + expect(igr.txnId).toMatch(/^igr:sub-repeat-to-end:\d+$/); + expect(igr.txnId).not.toContain("."); + }); + + it("sub-end's item-quantity-expire adjustedTransactionId matches the igr txn id", async () => { + const txns = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.customerId === "u-repeat-to-end"); + + const igr = txns.find((t: any) => + typeof t.txnId === "string" && t.txnId.startsWith("igr:sub-repeat-to-end:") + ); + const subEnd = txns.find((t: any) => t.txnId === "sub-end:sub-repeat-to-end"); + expect(igr).toBeDefined(); + expect(subEnd).toBeDefined(); + + const expireEntry = (subEnd.entries as any[]).find((e: any) => + e.type === "item-quantity-expire" && e.itemId === "quota" + ); + expect(expireEntry).toBeDefined(); + // The two texts must be byte-identical for the expire to resolve + // the grant. Same value in different representations (e.g. + // "604800000" vs "604800000.000000") is the failure mode this + // guards against. + expect(expireEntry.adjustedTransactionId).toBe(igr.txnId); + }); + + it("quota balance drops to 0 after sub-end resolves the igr's grant", async () => { + const rows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-repeat-to-end") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + expect(rows.length).toBeGreaterThan(0); + + const latest = rows[rows.length - 1]; + // If the expire ref mismatches the igr txn id, the expire silently + // becomes a no-op and quota stays at the last igr-granted quantity + // (100). When the ids match, sub-end's expire resolves and the + // ledger drops to 0. + expect(latest.itemQuantities.quota).toBe(0); + }); + }); + + + // ============================================================ + // Same bigint-vs-NUMERIC txn-id fingerprint as the subscription test + // above, but on the one-time-purchase algo (no sub-end analog). A + // one-time purchase with a repeating `when-repeated` item relies on + // each IGR expiring the previous IGR's grant by referencing its + // `txnId` inside `item-quantity-expire.adjustedTransactionId`. If the + // reducer-internal reference (decimal-free via `::text`) doesn't + // match the downstream-materialized txn id (built from + // `->>effectiveAtMillis` stored in JSONB), consecutive IGRs stack + // instead of refreshing and `quota` climbs without bound. + // ============================================================ + + describe("item-quantity-expire resolves across consecutive one-time-purchase item-grant-repeats", () => { + const DAY_MS = 86400000; + + beforeAll(async () => { + // OTP has no terminating event (unlike subscriptions with endedAt), + // so the inline reducer would otherwise try to fire ~6700 repeats + // between createdAtMillis=0 and the test-helpers default + // lastProcessedAt=2099 and time out the hook. Cap the clock to 16d + // post-epoch for this describe so exactly 2 IGRs fire inline (at + // +7d and +14d) — the minimum needed to exercise the IGR-N expiring + // IGR-(N-1)'s grant path. Restored to the default in afterAll so + // later tests in this file still run under inline recursion. + await db.setLastProcessedAt(`'1970-01-17T00:00:00Z'`); + await runStatements(schema.oneTimePurchases.setRow("otp-repeat-bigint", jsonbExpr({ + id: "otp-repeat-bigint", + tenancyId: "t1", + customerId: "u-otp-repeat-bigint", + customerType: "user", + productId: "prod-otp-repeat-bigint", + priceId: "p1", + product: { + displayName: "Repeating OTP (bigint regression)", + customerType: "user", + productLineId: "line-otp-repeat-bigint", + prices: { p1: { USD: "5" } }, + includedItems: { + // 7-day repeat: same whole-second epoch offset the subscription + // test uses to surface NUMERIC-vs-bigint mismatches. Two IGRs + // fire inline under the clock-cap above — enough to exercise + // the second IGR's `previousGrantsToExpire` referencing the + // first's outstandingGrants[].txnId. + quota: { quantity: 100, repeat: [7, "day"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripePaymentIntentId: null, + revokedAtMillis: null, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 0, + }))); + }); + + afterAll(async () => { + // Restore the default so subsequent describes in this file still + // fire their reducer recursion inline (the payments suite relies + // on this). + await db.setLastProcessedAt(`'2099-01-01T00:00:00Z'`); + }); + + it("every item-grant-repeat txn id is decimal-free", async () => { + const txns = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.customerId === "u-otp-repeat-bigint"); + const igrs = txns.filter((t: any) => + typeof t.txnId === "string" && t.txnId.startsWith("igr:otp-repeat-bigint:") + ); + expect(igrs.length).toBeGreaterThan(0); + for (const igr of igrs) { + expect(igr.txnId).toMatch(/^igr:otp-repeat-bigint:\d+$/); + expect(igr.txnId).not.toContain("."); + } + }); + + it("every IGR after the first carries an item-quantity-expire whose adjustedTransactionId matches the prior IGR's txnId", async () => { + const txns = (await getRowDatas(schema.transactions)) + .filter((t: any) => t.customerId === "u-otp-repeat-bigint") + .sort((a: any, b: any) => a.effectiveAtMillis - b.effectiveAtMillis); + const igrs = txns.filter((t: any) => + typeof t.txnId === "string" && t.txnId.startsWith("igr:otp-repeat-bigint:") + ); + // Need ≥ 2 IGRs for the expire reference path to fire. Any fewer + // means the test setup isn't exercising the bug's surface area — + // fail loud rather than silently passing. + expect(igrs.length).toBeGreaterThanOrEqual(2); + + for (let i = 1; i < igrs.length; i++) { + const priorIgr = igrs[i - 1]; + const currentIgr = igrs[i]; + const expireEntry = (currentIgr.entries as any[]).find((e: any) => + e.type === "item-quantity-expire" && e.itemId === "quota" + ); + expect(expireEntry, `IGR ${currentIgr.txnId} missing item-quantity-expire for quota`).toBeDefined(); + // Byte-identical match is the invariant: if the prior IGR's + // downstream txnId picked up a `.000000` tail and this expire + // built its adjustedTransactionId via `::text` on a bigint, the + // two strings diverge and the expire silently no-ops. + expect(expireEntry.adjustedTransactionId).toBe(priorIgr.txnId); + } + }); + + it("quota balance stays at exactly the per-repeat grant, never stacks", async () => { + const rows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-otp-repeat-bigint") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + expect(rows.length).toBeGreaterThan(0); + + // Without the fix, each IGR adds 100 without expiring the previous + // grant, so quota climbs monotonically (100, 200, 300, …). With + // the fix, each IGR expires the prior grant before adding its own, + // and quota stays at exactly 100. + const latest = rows[rows.length - 1]; + expect(latest.itemQuantities.quota).toBe(100); + }); + }); + + + // ============================================================ + // when-repeated grants must expire at subscription-end + // (regression: they were previously left stacked in the ledger) + // ============================================================ + + describe("when-repeated grants expire at subscription-end", () => { + const DAY_MS = 86400000; + + beforeAll(async () => { + await runStatements(schema.subscriptions.setRow("sub-repeat-end", jsonbExpr({ + id: "sub-repeat-end", + tenancyId: "t1", + customerId: "u-repeat-end", + customerType: "user", + productId: "prod-repeat-end", + priceId: "p1", + product: { + displayName: "Repeat End Plan", + customerType: "user", + productLineId: "line-repeat-end", + prices: { p1: { USD: "10" } }, + includedItems: { + quota: { quantity: 100, repeat: [7, "day"], expires: "when-repeated" }, + permanent: { quantity: 25, expires: "never" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "canceled", + currentPeriodStartMillis: 0, + currentPeriodEndMillis: MONTH_MS, + cancelAtPeriodEnd: true, + canceledAtMillis: 2 * DAY_MS, + endedAtMillis: 5 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 0, + }))); + }); + + it("should drop when-repeated item balance to 0 after subscription-end", async () => { + const rows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-repeat-end") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + + const latest = rows[rows.length - 1]; + expect(latest.itemQuantities.quota).toBe(0); + // Permanent grants must not be touched. + expect(latest.itemQuantities.permanent).toBe(25); + }); + + it("should revoke owned product at subscription-end", async () => { + const rows = (await getRowDatas(schema.ownedProducts)) + .filter((r: any) => r.customerId === "u-repeat-end") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + + const afterEnd = rows.find((r: any) => r.txnId === "sub-end:sub-repeat-end"); + expect(afterEnd).toBeDefined(); + expect(afterEnd.ownedProducts["prod-repeat-end"].quantity).toBe(0); + }); + }); + + + // ============================================================ + // Upgrade stacking regression: free → team mid-period must not + // leave the outgoing sub's monthly allowance stacked on top of + // the incoming sub's allowance. + // ============================================================ + + describe("mid-period upgrade does not stack when-repeated balances", () => { + const DAY_MS = 86400000; + + beforeAll(async () => { + await runStatements(schema.subscriptions.setRow("sub-upgrade-free", jsonbExpr({ + id: "sub-upgrade-free", + tenancyId: "t1", + customerId: "u-upgrade", + customerType: "user", + productId: "prod-upgrade-free", + priceId: "p-free", + product: { + displayName: "Free", + customerType: "user", + productLineId: "line-upgrade", + prices: { "p-free": { USD: "0" } }, + includedItems: { + emails: { quantity: 100, repeat: [1, "month"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "canceled", + currentPeriodStartMillis: 0, + currentPeriodEndMillis: MONTH_MS, + cancelAtPeriodEnd: false, + canceledAtMillis: 10 * DAY_MS, + endedAtMillis: 10 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 0, + }))); + + await runStatements(schema.subscriptions.setRow("sub-upgrade-team", jsonbExpr({ + id: "sub-upgrade-team", + tenancyId: "t1", + customerId: "u-upgrade", + customerType: "user", + productId: "prod-upgrade-team", + priceId: "p-team", + product: { + displayName: "Team", + customerType: "user", + productLineId: "line-upgrade", + prices: { "p-team": { USD: "30" } }, + includedItems: { + emails: { quantity: 500, repeat: [1, "month"], expires: "when-repeated" }, + }, + }, + quantity: 1, + stripeSubscriptionId: null, + status: "canceled", + currentPeriodStartMillis: 11 * DAY_MS, + currentPeriodEndMillis: 11 * DAY_MS + MONTH_MS, + cancelAtPeriodEnd: true, + canceledAtMillis: 20 * DAY_MS, + endedAtMillis: 20 * DAY_MS, + refundedAtMillis: null, + creationSource: "TEST_MODE", + createdAtMillis: 11 * DAY_MS, + }))); + }); + + it("should show only the incoming sub's allowance right after the upgrade", async () => { + const rows = (await getRowDatas(schema.itemQuantities)) + .filter((r: any) => r.customerId === "u-upgrade") + .sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis); + + const atUpgrade = rows.find((r: any) => r.txnId === "sub-start:sub-upgrade-team"); + expect(atUpgrade).toBeDefined(); + // Before the fix this was 100 (free) + 500 (team) = 600 because the + // free sub's when-repeated grant was not expired at subscription-end. + expect(atUpgrade.itemQuantities.emails).toBe(500); + }); + }); + + // ============================================================ // Subscription map LFold // ============================================================ diff --git a/apps/backend/src/lib/payments/schema/__tests__/phase-1.test.ts b/apps/backend/src/lib/payments/schema/__tests__/phase-1.test.ts index 52c4feec54..c0d316c272 100644 --- a/apps/backend/src/lib/payments/schema/__tests__/phase-1.test.ts +++ b/apps/backend/src/lib/payments/schema/__tests__/phase-1.test.ts @@ -345,6 +345,107 @@ describe.sequential("payments schema phase 1 (real postgres)", () => { .filter((e: any) => e.subscriptionId === "sub-tf-start"); expect(endEvents).toHaveLength(0); }); + + it("should expire when-repeated grants alongside when-purchase-expires when ending before any repeat", async () => { + await runStatements(schema.subscriptions.setRow("sub-tf-end-mix-pre", jsonbExpr(makeSubscription("sub-tf-end-mix-pre", { + product: { + displayName: "Mix Pre-Repeat Plan", + customerType: "user", + productLineId: "line-tf-end-mix-pre", + prices: { p1: { USD: "10" } }, + includedItems: { + storage: { quantity: 100, expires: "when-purchase-expires" }, + quota: { quantity: 500, repeat: [7, "day"], expires: "when-repeated" }, + }, + }, + endedAtMillis: 3 * DAY_MS, + createdAtMillis: 0, + })))); + + const endEvents = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId === "sub-tf-end-mix-pre"); + expect(endEvents).toHaveLength(1); + + const expiredByItem = new Map(); + for (const expiry of endEvents[0].itemQuantityChangesToExpire) { + expiredByItem.set(expiry.itemId, expiry); + } + expect([...expiredByItem.keys()].sort()).toEqual(["quota", "storage"]); + expect(expiredByItem.get("storage").transactionId).toBe("sub-start:sub-tf-end-mix-pre"); + expect(expiredByItem.get("quota").transactionId).toBe("sub-start:sub-tf-end-mix-pre"); + expect(expiredByItem.get("quota").quantity).toBe(500); + + const repeats = (await getRowDatas(schema.itemGrantRepeatEvents)) + .filter((e: any) => e.sourceId === "sub-tf-end-mix-pre"); + expect(repeats).toHaveLength(0); + }); + + it("should reference the latest igr txnId for when-repeated grants when ending after repeats", async () => { + await runStatements(schema.subscriptions.setRow("sub-tf-end-mix-post", jsonbExpr(makeSubscription("sub-tf-end-mix-post", { + product: { + displayName: "Mix Post-Repeat Plan", + customerType: "user", + productLineId: "line-tf-end-mix-post", + prices: { p1: { USD: "10" } }, + includedItems: { + storage: { quantity: 100, expires: "when-purchase-expires" }, + quota: { quantity: 500, repeat: [7, "day"], expires: "when-repeated" }, + }, + }, + endedAtMillis: 17 * DAY_MS, + createdAtMillis: 0, + })))); + + const endEvents = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId === "sub-tf-end-mix-post"); + expect(endEvents).toHaveLength(1); + + const expiredByItem = new Map(); + for (const expiry of endEvents[0].itemQuantityChangesToExpire) { + expiredByItem.set(expiry.itemId, expiry); + } + expect([...expiredByItem.keys()].sort()).toEqual(["quota", "storage"]); + + // storage never repeated; still points at sub-start + expect(expiredByItem.get("storage").transactionId).toBe("sub-start:sub-tf-end-mix-post"); + + // quota last repeated at 14d; should point at that igr txn + const latestRepeatMillis = 14 * DAY_MS; + expect(expiredByItem.get("quota").transactionId).toBe(`igr:sub-tf-end-mix-post:${latestRepeatMillis}`); + expect(expiredByItem.get("quota").quantity).toBe(500); + + const repeats = (await getRowDatas(schema.itemGrantRepeatEvents)) + .filter((e: any) => e.sourceId === "sub-tf-end-mix-post") + .sort((a: any, b: any) => a.effectiveAtMillis - b.effectiveAtMillis); + expect(repeats.map((r: any) => r.effectiveAtMillis)).toEqual([7 * DAY_MS, 14 * DAY_MS]); + }); + + it("should NOT expire permanent grants (expires=never, absent, or invalid)", async () => { + await runStatements(schema.subscriptions.setRow("sub-tf-end-permanent", jsonbExpr(makeSubscription("sub-tf-end-permanent", { + product: { + displayName: "Permanent Grants Plan", + customerType: "user", + productLineId: "line-tf-end-permanent", + prices: { p1: { USD: "10" } }, + includedItems: { + expiring: { quantity: 50, expires: "when-purchase-expires" }, + repeating: { quantity: 10, repeat: [1, "day"], expires: "when-repeated" }, + permanent_never: { quantity: 20, expires: "never" }, + permanent_absent: { quantity: 30 }, + permanent_invalid: { quantity: 40, expires: "not-a-real-value" }, + }, + }, + endedAtMillis: 2 * MONTH_MS, + createdAtMillis: 0, + })))); + + const endEvents = (await getRowDatas(schema.subscriptionEndEvents)) + .filter((e: any) => e.subscriptionId === "sub-tf-end-permanent"); + expect(endEvents).toHaveLength(1); + + const expiredItemIds = endEvents[0].itemQuantityChangesToExpire.map((e: any) => e.itemId).sort(); + expect(expiredItemIds).toEqual(["expiring", "repeating"]); + }); }); diff --git a/apps/backend/src/lib/payments/schema/__tests__/test-helpers.ts b/apps/backend/src/lib/payments/schema/__tests__/test-helpers.ts index 6a55938e0d..98bf69ee0f 100644 --- a/apps/backend/src/lib/payments/schema/__tests__/test-helpers.ts +++ b/apps/backend/src/lib/payments/schema/__tests__/test-helpers.ts @@ -8,6 +8,7 @@ import postgres from "postgres"; import { toExecutableSqlTransaction, toQueryableSqlQuery } from "@/lib/bulldozer/db/index"; +import { loadProcessQueueFunctionSql } from "@/lib/bulldozer/db/test-sql-loaders"; type SqlStatement = { type: "statement", sql: string, outputName?: string }; type SqlQuery = { type: "query", sql: string, toStatement(outputName?: string): SqlStatement }; @@ -21,13 +22,36 @@ function getConnectionString(): string { return connectionString; } +export type CreateTestDbOptions = { + /** + * SQL expression used to seed `BulldozerTimeFoldMetadata.lastProcessedAt` + * at setup time. The default (`'2099-01-01T00:00:00Z'::timestamptz`) puts + * the metadata clock far in the future so every timefold tick fires + * inline at `setRow` time — this is what most payments tests rely on. + * + * Set this to `now()` (or a pre-epoch timestamp) to exercise the queued + * path where future ticks defer to `bulldozer_timefold_process_queue()`. + */ + lastProcessedAt?: string, + /** + * When true, also installs `public.bulldozer_timefold_process_queue()` + * from the cascade migration so callers can invoke `processQueue()` + * against a real prod-shape function body. Default: false (inline-path + * tests don't need it). + */ + installProcessQueueFn?: boolean, +}; + /** * Creates an isolated test database. Call `setup()` in beforeAll and * `teardown()` in afterAll. Access `runStatements` / `readRows` after setup. * * Follows the same pattern as apps/backend/src/lib/bulldozer/db/index.test.ts. */ -export function createTestDb() { +export function createTestDb(options: CreateTestDbOptions = {}) { + const lastProcessedAtExpression = options.lastProcessedAt ?? `'2099-01-01T00:00:00Z'::timestamptz`; + const installProcessQueueFn = options.installProcessQueueFn ?? false; + const connectionString = getConnectionString(); const base = connectionString.replace(/\/[^/]*(\?.*)?$/, ""); const queryString = connectionString.split("?")[1] ?? ""; @@ -53,6 +77,38 @@ export function createTestDb() { return await getSql().unsafe(toQueryableSqlQuery(query)); }, + /** + * Overwrites `BulldozerTimeFoldMetadata.lastProcessedAt` to the given + * SQL expression. Useful for tests that need to bump the clock forward + * (to make queued ticks due) or backward (to force future ticks into + * the queue). + */ + setLastProcessedAt: async (isoOrExpression: string) => { + await getSql().unsafe(` + UPDATE "BulldozerTimeFoldMetadata" + SET "lastProcessedAt" = (${isoOrExpression})::timestamptz, + "updatedAt" = now() + WHERE "key" = 'singleton' + `); + }, + + /** Invokes the real pg_cron drain entry point. Requires `installProcessQueueFn`. */ + processQueue: async () => { + if (!installProcessQueueFn) { + throw new Error( + "processQueue() requires createTestDb({ installProcessQueueFn: true })", + ); + } + await getSql().unsafe(`SELECT public.bulldozer_timefold_process_queue()`); + }, + + countQueueRows: async (): Promise => { + const rows = await getSql()>` + SELECT COUNT(*)::int AS "count" FROM "BulldozerTimeFoldQueue" + `; + return rows[0].count; + }, + setup: async () => { await adminSql.unsafe(`CREATE DATABASE ${dbName}`); _sql = postgres(dbUrl, { onnotice: () => undefined, max: 1 }); @@ -115,8 +171,25 @@ export function createTestDb() { `); await _sql.unsafe(` INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt") - VALUES ('singleton', '2099-01-01T00:00:00Z'::timestamptz) + VALUES ('singleton', ${lastProcessedAtExpression}) + `); + // declareTimeFoldTable.init() upserts a cascade template here (see + // 20260417000000_bulldozer_timefold_downstream_cascade). The + // table must exist even when tests don't exercise the queue path, + // because init() runs the upsert unconditionally. + await _sql.unsafe(` + CREATE TABLE "BulldozerTimeFoldDownstreamCascade" ( + "tableStoragePath" JSONB[] NOT NULL, + "cascadeInputName" TEXT NOT NULL, + "cascadeTemplate" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath") + ) `); + if (installProcessQueueFn) { + await _sql.unsafe(loadProcessQueueFunctionSql()); + } }, teardown: async () => { diff --git a/apps/backend/src/lib/payments/schema/phase-1/otp-timefold-algo.ts b/apps/backend/src/lib/payments/schema/phase-1/otp-timefold-algo.ts index 274dd834b8..83bffca37b 100644 --- a/apps/backend/src/lib/payments/schema/phase-1/otp-timefold-algo.ts +++ b/apps/backend/src/lib/payments/schema/phase-1/otp-timefold-algo.ts @@ -143,7 +143,14 @@ export function getOtpTimeFoldReducerSql(): string { )`; // ── item-grant-repeat event (same logic as subscription but with sourceType=one_time_purchase) ── - const currentMillis = `(EXTRACT(EPOCH FROM ${T}) * 1000)::numeric`; + // Keep currentMillis as bigint at the root (not NUMERIC) for the same + // reason as `subscription-timefold-algo.ts` — see the comment there for + // the full failure mode. Explicit ROUND before the cast is defensive: + // NUMERIC::bigint already rounds on PG 12+, but explicit rounding makes + // intent clear at the callsite and stays stable if `EXTRACT(EPOCH ...)` + // ever comes back typed as DOUBLE PRECISION (older PG, or a future + // regression) where implicit casts use half-to-even. + const currentMillis = `(ROUND(EXTRACT(EPOCH FROM ${T}) * 1000)::bigint)`; const dueItems = `( SELECT jsonb_agg(jsonb_build_object('itemId', "sched"."key", 'schedule', "sched"."value")) @@ -153,7 +160,8 @@ export function getOtpTimeFoldReducerSql(): string { AND ("sched"."value"->>'nextRepeatMillis')::numeric <= ${currentMillis} )`; - const igrTxnId = `('igr:' || (${S}->>'purchaseId') || ':' || ${currentMillis}::bigint::text)`; + // currentMillis is already ::bigint (see above) so plain ::text is enough. + const igrTxnId = `('igr:' || (${S}->>'purchaseId') || ':' || ${currentMillis}::text)`; const previousGrantsToExpire = `( SELECT COALESCE(jsonb_agg( diff --git a/apps/backend/src/lib/payments/schema/phase-1/subscription-timefold-algo.ts b/apps/backend/src/lib/payments/schema/phase-1/subscription-timefold-algo.ts index 59d9ac367e..febec76039 100644 --- a/apps/backend/src/lib/payments/schema/phase-1/subscription-timefold-algo.ts +++ b/apps/backend/src/lib/payments/schema/phase-1/subscription-timefold-algo.ts @@ -261,8 +261,26 @@ export function getSubscriptionTimeFoldReducerSql(): string { )`; // ── item-grant-repeat event ── - // Emitted when timestamp matches an item's nextRepeatMillis - const currentMillis = `(EXTRACT(EPOCH FROM ${T}) * 1000)::numeric`; + // Emitted when timestamp matches an item's nextRepeatMillis. + // + // PG 12+ returns EXTRACT(EPOCH ...) as NUMERIC with scale 6 (microsecond + // precision), so if we left currentMillis as NUMERIC it would serialize + // into JSONB with trailing ".000000". That round-trips fine for our own + // comparisons but leaks into txn IDs built by downstream tables via + // `->>effectiveAtMillis`, producing e.g. `igr::2592000000.000000`, + // while references built inline in this algo via `::text` would produce + // the decimal-free `igr::2592000000`. The two wouldn't match → + // `item-quantity-expire` entries would fail to resolve the grant they're + // meant to expire, leaving `when-repeated` balances stuck after a + // subscription-end that follows an item-grant-repeat. + // + // Explicit ROUND before the bigint cast: NUMERIC::bigint rounds + // half-away-from-zero on PG 12+, which happens to match what we want, + // but if `T` ever comes from a path that returns DOUBLE PRECISION (older + // PG, or a future regression) the implicit cast rounds half-to-even and + // could disagree on midpoint values. Being explicit about the rounding + // intent is both self-documenting and stable across numeric types. + const currentMillis = `(ROUND(EXTRACT(EPOCH FROM ${T}) * 1000)::bigint)`; // Items due at current timestamp const dueItems = `( @@ -279,8 +297,10 @@ export function getSubscriptionTimeFoldReducerSql(): string { AND (${S}->>'endedAtMillis')::numeric <= ${currentMillis} )`; - // item-grant-repeat: txnId uses sourceId + effectiveAtMillis - const igrTxnId = `('igr:' || (${S}->>'subscriptionId') || ':' || ${currentMillis}::bigint::text)`; + // item-grant-repeat: txnId uses sourceId + effectiveAtMillis. currentMillis + // is already ::bigint (see above) so plain ::text is enough — no decimal + // tail, no redundant double-cast. + const igrTxnId = `('igr:' || (${S}->>'subscriptionId') || ':' || ${currentMillis}::text)`; const repeatCount = `(${S}->>'repeatCount')::int`; // Build previousGrantsToExpire: outstanding grants with expiresWhen="when-repeated" that match due items @@ -391,7 +411,22 @@ export function getSubscriptionTimeFoldReducerSql(): string { )`; // ── subscription-end event ── - // Expire all outstanding grants with expiresWhen="when-purchase-expires" + // Expire all outstanding grants that are tied to the subscription's + // lifetime — both 'when-purchase-expires' and 'when-repeated'. The latter + // must be expired here too: otherwise the last-granted monthly quota + // (emails_per_month, analytics_events, …) persists in the item-quantity + // ledger after the subscription is gone and stacks on top of any + // replacement subscription for the remainder of the period. + // + // Permanent grants (item has no `expires` configured, or an unrecognized + // value) were normalized to JSONB null at subscription-start time, so + // `"g"->>'expiresWhen'` returns SQL NULL for them and the IN predicate + // correctly excludes them. + // + // outstandingGrants always carries the *current* grant ref for each item: + // initially { txnId: 'sub-start:' }, and each item-grant-repeat tick + // replaces the matching when-repeated entries with fresh ones keyed by + // the igr txnId, so iterating here works identically pre- and post-repeat. const endItemQuantityChangesToExpire = (stateSql: string) => `( SELECT COALESCE(jsonb_agg( jsonb_build_object( @@ -402,7 +437,7 @@ export function getSubscriptionTimeFoldReducerSql(): string { ) ), '[]'::jsonb) FROM jsonb_array_elements(${stateSql}->'outstandingGrants') AS "g" - WHERE "g"->>'expiresWhen' = 'when-purchase-expires' + WHERE "g"->>'expiresWhen' IN ('when-purchase-expires', 'when-repeated') )`; const endEventRowFromState = (stateSql: string) => `jsonb_build_object(