Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/backend/prisma.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ export default defineConfig({
tables: {
external: [
"public.BulldozerStorageEngine",
// PK on JSONB[] (tableStoragePath) — not expressible via Prisma's @id
// (list types are treated as non-required). Managed entirely by
// bulldozer code via raw SQL. See schema.prisma note next to the
// BulldozerTimeFoldMetadata model.
"public.BulldozerTimeFoldDownstreamCascade",
],
},
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@
-- Propagate TimeFold queue-drained emissions through the registered
-- downstream trigger cascade (filter/map/LFold/etc.), mirroring what the
-- inline setRow path does via collectRowChangeTriggerStatements. Before
-- this migration, `bulldozer_timefold_process_queue` updated the
-- TimeFold's own state and materialized its output rows but never touched
-- any downstream materialized table, so any event scheduled for the
-- future (subscription-end on cancel-at-period-end, monthly
-- item-grant-repeat ticks) silently stopped after updating the TimeFold.
--
-- Strategy: per-timefold cascade templates are precomputed in TypeScript
-- (declareTimeFoldTable.init() \u2192 toCascadeSqlBlock) and persisted here as
-- data. The rewritten process_queue:
-- 1. Drains due queue rows per-timefold (existing per-row logic, moved
-- inside an outer per-timefold loop).
-- 2. After each queue row's reducer emits new rows, populates
-- __bulldozer_seq with them under the timefold's cascade input name.
-- 3. Once per timefold per tick, EXECUTEs the stored cascadeTemplate,
-- which reads from __bulldozer_seq and writes through filter/map/
-- LFold/etc. to their materialized outputs.
--
-- The shipped BulldozerTimeFoldQueue & BulldozerTimeFoldMetadata tables
-- from 20260323150000_add_bulldozer_timefold_queue are unchanged. pg_cron
-- keeps calling public.bulldozer_timefold_process_queue() as before.

-- CreateTable
CREATE TABLE "BulldozerTimeFoldDownstreamCascade" (
"tableStoragePath" JSONB[] NOT NULL,
"cascadeInputName" TEXT NOT NULL,
"cascadeTemplate" TEXT,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath")
);

-- SPLIT_STATEMENT_SENTINEL
-- SINGLE_STATEMENT_SENTINEL
CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue()
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
cutoff_timestamp timestamptz;
current_timefold_path jsonb[];
current_cascade_input_name text;
current_cascade_template text;
queued_row "BulldozerTimeFoldQueue"%ROWTYPE;
group_path jsonb[];
rows_path jsonb[];
states_path jsonb[];
state_row_path jsonb[];
existing_state jsonb;
old_emitted_rows jsonb;
newly_emitted_rows jsonb;
accumulated_emitted_rows jsonb;
current_state jsonb;
current_timestamp_value timestamptz;
next_state jsonb;
next_rows_data jsonb;
normalized_next_rows_data jsonb;
next_timestamp timestamptz;
previous_emitted_row_count int;
reducer_iterations int;
new_row_record record;
BEGIN
PERFORM pg_advisory_xact_lock(7857391);

INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt")
VALUES ('singleton', now())
ON CONFLICT ("key") DO NOTHING;

cutoff_timestamp := now();

UPDATE "BulldozerTimeFoldMetadata"
SET
"lastProcessedAt" = cutoff_timestamp,
"updatedAt" = CURRENT_TIMESTAMP
WHERE "key" = 'singleton';

-- The cascade templates generated by toExecutableSqlTransaction/
-- toCascadeSqlBlock read and write via this per-transaction temp table,
-- keyed by string output names. It's created idempotently here so the
-- cascade never fails on a cold queue-drain invocation.
CREATE TEMP TABLE IF NOT EXISTS "__bulldozer_seq" (
"__output_name" text NOT NULL,
"__output_row" jsonb NOT NULL
) ON COMMIT DROP;

-- Outer loop: each distinct timefold that has queue rows due at this tick.
FOR current_timefold_path IN
SELECT DISTINCT "tableStoragePath"
FROM "BulldozerTimeFoldQueue"
WHERE "scheduledAt" <= cutoff_timestamp
LOOP
-- Look up the cascade template for this timefold.
current_cascade_input_name := NULL;
current_cascade_template := NULL;
SELECT "cascadeInputName", "cascadeTemplate"
INTO current_cascade_input_name, current_cascade_template
FROM "BulldozerTimeFoldDownstreamCascade"
WHERE "tableStoragePath" = current_timefold_path;

-- If no registry row exists, this timefold hasn't been init()'d
-- yet — most commonly the deploy-window gap between this
-- migration applying (creating an empty registry table) and the
-- backend running declareTimeFoldTable.init() (which upserts one
-- row per timefold). Skipping just the cascade EXECUTE but still
-- draining the queue rows would let the timefold's own state
-- advance while the downstream filters/maps/LFolds silently miss
-- the update — a permanent desync with no retry path, since the
-- queue row is gone. Defer the entire timefold for this tick so
-- its queue rows stay queued; the next tick (after init finishes)
-- drains them with cascade intact.
--
-- See `timefold-queue-downstream.test.ts`'s "process_queue defers
-- a timefold whose cascade registry row is missing" for the
-- regression guard.
IF NOT FOUND THEN
CONTINUE;
END IF;

-- Defensive clean slate: clear ALL rows before populating this
-- timefold's cascade input. Cascade templates emit intermediate
-- rows (row_change_diag_*, mapped_changes_*, etc.) under unique
-- random names, so cross-timefold contamination isn't possible
-- today, but keeping __bulldozer_seq empty between iterations is
-- simpler to reason about ("between iterations, __bulldozer_seq is
-- empty") and prevents memory growth across many timefolds in one
-- tick. ON COMMIT DROP handles cleanup if the function throws.
DELETE FROM "__bulldozer_seq";

-- Inner loop: per-queue-row drain for this timefold, mirroring the
-- pre-fix function body (semantics preserved so the existing
-- process-queue.ts migration test keeps passing).
LOOP
SELECT *
INTO queued_row
FROM "BulldozerTimeFoldQueue"
WHERE "tableStoragePath" = current_timefold_path
AND "scheduledAt" <= cutoff_timestamp
ORDER BY "scheduledAt" ASC, "id" ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;

EXIT WHEN NOT FOUND;

DELETE FROM "BulldozerTimeFoldQueue"
WHERE "id" = queued_row."id";

group_path := queued_row."tableStoragePath" || ARRAY[to_jsonb('groups'::text), queued_row."groupKey"]::jsonb[];
rows_path := group_path || ARRAY[to_jsonb('rows'::text)]::jsonb[];
states_path := group_path || ARRAY[to_jsonb('states'::text)]::jsonb[];
state_row_path := states_path || ARRAY[to_jsonb(queued_row."rowIdentifier")]::jsonb[];

SELECT "value"
INTO existing_state
FROM "BulldozerStorageEngine"
WHERE "keyPath" = state_row_path;

IF existing_state IS NULL THEN
CONTINUE;
END IF;

IF existing_state->'rowData' IS DISTINCT FROM queued_row."rowData" THEN
CONTINUE;
END IF;

old_emitted_rows := CASE
WHEN jsonb_typeof(existing_state->'emittedRowsData') = 'array' THEN existing_state->'emittedRowsData'
ELSE '[]'::jsonb
END;
newly_emitted_rows := '[]'::jsonb;
accumulated_emitted_rows := old_emitted_rows;
previous_emitted_row_count := jsonb_array_length(old_emitted_rows);

current_state := queued_row."stateAfter";
current_timestamp_value := queued_row."scheduledAt";
reducer_iterations := 0;

LOOP
reducer_iterations := reducer_iterations + 1;
IF reducer_iterations > 10000 THEN
RAISE EXCEPTION 'bulldozer timefold reducer exceeded 10k iterations for row %', queued_row."rowIdentifier";
END IF;

EXECUTE format(
$reducer$
SELECT
to_jsonb("reducerRows"."newState") AS "newState",
to_jsonb("reducerRows"."newRowsData") AS "newRowsData",
CASE
WHEN "reducerRows"."nextTimestamp" IS NULL THEN NULL::timestamptz
ELSE ("reducerRows"."nextTimestamp")::timestamptz
END AS "nextTimestamp"
FROM (
SELECT %s
FROM (
SELECT
$1::jsonb AS "oldState",
$2::jsonb AS "oldRowData",
$3::timestamptz AS "timestamp"
) AS "reducerInput"
) AS "reducerRows"
$reducer$,
queued_row."reducerSql"
)
INTO next_state, next_rows_data, next_timestamp
USING current_state, queued_row."rowData", current_timestamp_value;

normalized_next_rows_data := CASE
WHEN jsonb_typeof(next_rows_data) = 'array' THEN next_rows_data
ELSE '[]'::jsonb
END;
newly_emitted_rows := newly_emitted_rows || normalized_next_rows_data;
accumulated_emitted_rows := accumulated_emitted_rows || normalized_next_rows_data;
current_state := next_state;

EXIT WHEN next_timestamp IS NULL OR next_timestamp > cutoff_timestamp;
current_timestamp_value := next_timestamp;
END LOOP;

INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value")
VALUES
(gen_random_uuid(), group_path, 'null'::jsonb),
(gen_random_uuid(), rows_path, 'null'::jsonb),
(gen_random_uuid(), states_path, 'null'::jsonb)
ON CONFLICT ("keyPath") DO NOTHING;

INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value")
VALUES (
gen_random_uuid(),
state_row_path,
jsonb_build_object(
'rowData', queued_row."rowData",
'stateAfter', current_state,
'emittedRowsData', accumulated_emitted_rows,
'nextTimestamp',
CASE
WHEN next_timestamp IS NULL THEN 'null'::jsonb
ELSE to_jsonb(next_timestamp)
END
)
)
ON CONFLICT ("keyPath") DO UPDATE
SET "value" = EXCLUDED."value";

FOR new_row_record IN
SELECT
"rows"."rowData" AS "rowData",
"rows"."rowIndex" AS "rowIndex"
FROM jsonb_array_elements(newly_emitted_rows) WITH ORDINALITY AS "rows"("rowData", "rowIndex")
LOOP
INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value")
VALUES (
gen_random_uuid(),
rows_path || ARRAY[to_jsonb((queued_row."rowIdentifier" || ':' || (previous_emitted_row_count + new_row_record."rowIndex")::text)::text)]::jsonb[],
jsonb_build_object('rowData', new_row_record."rowData")
)
ON CONFLICT ("keyPath") DO UPDATE
SET "value" = EXCLUDED."value";
END LOOP;

-- Accumulate the newly-emitted rows into __bulldozer_seq under this
-- timefold's cascade input name, shaped like timeFoldChangesTableName
-- (oldRowData/newRowData diff). Queue-drained emissions are always
-- new rows, so oldRowData/oldRowSortKey are null.
IF current_cascade_input_name IS NOT NULL THEN
INSERT INTO "__bulldozer_seq" ("__output_name", "__output_row")
SELECT
current_cascade_input_name,
jsonb_build_object(
'groupKey', queued_row."groupKey",
'rowIdentifier', queued_row."rowIdentifier" || ':' || (previous_emitted_row_count + "emitted"."rowIndex")::text,
'oldRowSortKey', 'null'::jsonb,
'newRowSortKey', 'null'::jsonb,
'oldRowData', 'null'::jsonb,
'newRowData', "emitted"."rowData"
)
FROM jsonb_array_elements(newly_emitted_rows) WITH ORDINALITY AS "emitted"("rowData", "rowIndex");
END IF;

IF next_timestamp IS NOT NULL AND next_timestamp > cutoff_timestamp THEN
INSERT INTO "BulldozerTimeFoldQueue" (
"id",
"tableStoragePath",
"groupKey",
"rowIdentifier",
"scheduledAt",
"stateAfter",
"rowData",
"reducerSql"
)
VALUES (
gen_random_uuid(),
queued_row."tableStoragePath",
queued_row."groupKey",
queued_row."rowIdentifier",
next_timestamp,
current_state,
queued_row."rowData",
queued_row."reducerSql"
)
ON CONFLICT ("tableStoragePath", "groupKey", "rowIdentifier") DO UPDATE
SET
"scheduledAt" = EXCLUDED."scheduledAt",
"stateAfter" = EXCLUDED."stateAfter",
"rowData" = EXCLUDED."rowData",
"reducerSql" = EXCLUDED."reducerSql",
"updatedAt" = CURRENT_TIMESTAMP;
END IF;

-- Note: the sibling migration 20260323150000 carried an "orphan
-- group cleanup" IF-NOT-EXISTS block here that deleted
-- rows_path / states_path / group_path when both had no children.
-- That block was unreachable in that function body and is still
-- unreachable here: every path that reaches this point has just
-- UPSERTed `state_row_path` (a child of `states_path`) a few
-- statements up, so the NOT EXISTS on states_path is always
-- false. Dropped here rather than inherited verbatim.
END LOOP;

-- After draining all due queue rows for this timefold, run the
-- downstream trigger cascade exactly once. The template reads its input
-- from __bulldozer_seq (populated above) and writes through every
-- registered downstream filter/map/LFold/etc. to their materialized
-- outputs. Skipped when no template is registered \u2014 matches the
-- inline path's no-op behavior for timefolds without downstream
-- triggers.
IF current_cascade_template IS NOT NULL THEN
EXECUTE current_cascade_template;
END IF;

-- Clean slate before the next timefold iteration. Clears the
-- cascade input rows AND every intermediate-stage row the cascade
-- template's EXECUTE just emitted into __bulldozer_seq. See the
-- identical DELETE at the top of this loop for the rationale.
DELETE FROM "__bulldozer_seq";
END LOOP;
END;
$function$;
-- SPLIT_STATEMENT_SENTINEL
Loading
Loading