From f779e9a30368172cb71fbdaae663fb4f859764da Mon Sep 17 00:00:00 2001 From: mantrakp04 Date: Mon, 13 Apr 2026 19:23:57 -0700 Subject: [PATCH 1/3] feat(bulldozer): add verifyDataIntegrity to all Table types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a `verifyDataIntegrity()` method to the Bulldozer `Table` interface that returns a SQL query producing error rows when materialized data diverges from what the input tables imply. Empty result = healthy. Each table operator implements verification appropriate to its semantics: - Full re-derivation (flat-map, sort, group-by, left-join, compact, limit) - Delegation to internal table (filter, map → nested flat-map) - Structural group-correspondence checks (reduce, l-fold, time-fold) - No-op for leaf/virtual tables (stored, concat) All queries are gated on isInitialized so uninitialized tables are silently skipped. A `verifyAllTablesIntegrity(tables)` helper UNION ALLs individual queries with a tableid column for easy debugging. The test file now runs verification after every test via an afterEach hook and a trackTable() wrapper around all 68 declare*Table() calls. Made-with: Cursor --- .../src/lib/bulldozer/db/index.test.ts | 292 +++++++++--------- apps/backend/src/lib/bulldozer/db/index.ts | 19 +- .../lib/bulldozer/db/tables/compact-table.ts | 53 +++- .../lib/bulldozer/db/tables/concat-table.ts | 4 + .../lib/bulldozer/db/tables/filter-table.ts | 1 + .../lib/bulldozer/db/tables/flat-map-table.ts | 57 +++- .../lib/bulldozer/db/tables/group-by-table.ts | 51 ++- .../lib/bulldozer/db/tables/l-fold-table.ts | 37 ++- .../bulldozer/db/tables/left-join-table.ts | 91 +++++- .../lib/bulldozer/db/tables/limit-table.ts | 43 ++- .../src/lib/bulldozer/db/tables/map-table.ts | 1 + .../lib/bulldozer/db/tables/reduce-table.ts | 49 ++- .../src/lib/bulldozer/db/tables/sort-table.ts | 39 ++- .../lib/bulldozer/db/tables/stored-table.ts | 4 + .../bulldozer/db/tables/time-fold-table.ts | 28 +- claude/CLAUDE-KNOWLEDGE.md | 6 + 16 files changed, 627 insertions(+), 148 deletions(-) diff --git a/apps/backend/src/lib/bulldozer/db/index.test.ts b/apps/backend/src/lib/bulldozer/db/index.test.ts index eea14e667f..333f6a1336 100644 --- a/apps/backend/src/lib/bulldozer/db/index.test.ts +++ b/apps/backend/src/lib/bulldozer/db/index.test.ts @@ -1,7 +1,8 @@ import { stringCompare, templateIdentity } from "@stackframe/stack-shared/dist/utils/strings"; import postgres from "postgres"; -import { afterAll, beforeAll, beforeEach, describe, expect, it, test } from "vitest"; -import { declareCompactTable, declareConcatTable, declareFilterTable, declareFlatMapTable, declareGroupByTable, declareLeftJoinTable, declareLFoldTable, declareLimitTable, declareMapTable, declareReduceTable, declareSortTable, declareStoredTable, declareTimeFoldTable, toExecutableSqlTransaction, toQueryableSqlQuery } from "./index"; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, test } from "vitest"; +import type { Table } from "./index"; +import { declareCompactTable, declareConcatTable, declareFilterTable, declareFlatMapTable, declareGroupByTable, declareLeftJoinTable, declareLFoldTable, declareLimitTable, declareMapTable, declareReduceTable, declareSortTable, declareStoredTable, declareTimeFoldTable, toExecutableSqlTransaction, toQueryableSqlQuery, verifyAllTablesIntegrity } from "./index"; type TestDb = { full: string, base: string }; @@ -234,6 +235,21 @@ describe.sequential("declareStoredTable (real postgres)", () => { `; }); + // any is used here because the verifier works with heterogeneous table types + const allInitializedTables: Table[] = []; + function trackTable>(t: T): T { + allInitializedTables.push(t); + return t; + } + + afterEach(async () => { + if (allInitializedTables.length > 0) { + const errors = await readRows(verifyAllTablesIntegrity(allInitializedTables)); + expect(errors).toEqual([]); + } + allInitializedTables.length = 0; + }); + afterAll(async () => { await sql.end(); await adminSql.unsafe(` @@ -269,28 +285,28 @@ describe.sequential("declareStoredTable (real postgres)", () => { } function createGroupedTable() { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); return { fromTable, groupedTable }; } function createMappedTable() { const { fromTable, groupedTable } = createGroupedTable(); - const mappedTable = declareMapTable({ + const mappedTable = trackTable(declareMapTable({ tableId: "users-by-team-mapped", fromTable: groupedTable, mapper: mapper(` ("rowData"->'team') AS "team", (("rowData"->>'value')::int + 100) AS "mappedValue" `), - }); + })); return { fromTable, groupedTable, mappedTable }; } function createFlatMappedTable() { const { fromTable, groupedTable } = createGroupedTable(); - const flatMappedTable = declareFlatMapTable({ + const flatMappedTable = trackTable(declareFlatMapTable({ tableId: "users-by-team-flat-mapped", fromTable: groupedTable, mapper: mapper(` @@ -310,78 +326,78 @@ describe.sequential("declareStoredTable (real postgres)", () => { ) END AS "rows" `), - }); + })); return { fromTable, groupedTable, flatMappedTable }; } function createFilteredTable() { const { fromTable, groupedTable } = createGroupedTable(); - const filteredTable = declareFilterTable({ + const filteredTable = trackTable(declareFilterTable({ tableId: "users-by-team-filtered", fromTable: groupedTable, filter: predicate(`(("rowData"->>'value')::int) >= 2`), - }); + })); return { fromTable, groupedTable, filteredTable }; } function createLimitedTable() { const { fromTable, groupedTable } = createGroupedTable(); - const limitedTable = declareLimitTable({ + const limitedTable = trackTable(declareLimitTable({ tableId: "users-by-team-limited", fromTable: groupedTable, limit: expr(`2`), - }); + })); return { fromTable, groupedTable, limitedTable }; } function createConcatenatedTable() { const fromTableA = declareStoredTable<{ value: number, team: string }>({ tableId: "users-a" }); const fromTableB = declareStoredTable<{ value: number, team: string }>({ tableId: "users-b" }); - const groupedTableA = declareGroupByTable({ + const groupedTableA = trackTable(declareGroupByTable({ tableId: "users-a-by-team", fromTable: fromTableA, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const groupedTableB = declareGroupByTable({ + })); + const groupedTableB = trackTable(declareGroupByTable({ tableId: "users-b-by-team", fromTable: fromTableB, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const concatenatedTable = declareConcatTable({ + })); + const concatenatedTable = trackTable(declareConcatTable({ tableId: "users-by-team-concat", tables: [groupedTableA, groupedTableB], - }); + })); return { fromTableA, fromTableB, groupedTableA, groupedTableB, concatenatedTable }; } function createSortedTable() { const { fromTable, groupedTable } = createGroupedTable(); - const sortedTable = declareSortTable({ + const sortedTable = trackTable(declareSortTable({ tableId: "users-by-team-sorted", fromTable: groupedTable, getSortKey: mapper(`(("rowData"->>'value')::int) AS "newSortKey"`), compareSortKeys: (a, b) => expr(`(((${a.sql}) #>> '{}')::int) - (((${b.sql}) #>> '{}')::int)`), - }); + })); return { fromTable, groupedTable, sortedTable }; } function createDescendingSortedTable() { const { fromTable, groupedTable } = createGroupedTable(); - const sortedTable = declareSortTable({ + const sortedTable = trackTable(declareSortTable({ tableId: "users-by-team-sorted-desc", fromTable: groupedTable, getSortKey: mapper(`(("rowData"->>'value')::int) AS "newSortKey"`), compareSortKeys: (a, b) => expr(`(((${b.sql}) #>> '{}')::int) - (((${a.sql}) #>> '{}')::int)`), - }); + })); return { fromTable, groupedTable, sortedTable }; } function createDescendingLimitedTable() { const { fromTable, groupedTable, sortedTable } = createDescendingSortedTable(); - const limitedTable = declareLimitTable({ + const limitedTable = trackTable(declareLimitTable({ tableId: "users-by-team-limit-desc", fromTable: sortedTable, limit: expr(`2`), - }); + })); return { fromTable, groupedTable, sortedTable, limitedTable }; } function createDescendingLFoldTable() { const { fromTable, groupedTable, sortedTable } = createDescendingSortedTable(); - const lFoldTable = declareLFoldTable({ + const lFoldTable = trackTable(declareLFoldTable({ tableId: "users-by-team-lfold-desc", fromTable: sortedTable, initialState: expr(`'0'::jsonb`), @@ -393,12 +409,12 @@ describe.sequential("declareStoredTable (real postgres)", () => { ) ) AS "newRowsData" `), - }); + })); return { fromTable, groupedTable, sortedTable, lFoldTable }; } function createLFoldTable() { const { fromTable, groupedTable, sortedTable } = createSortedTable(); - const lFoldTable = declareLFoldTable({ + const lFoldTable = trackTable(declareLFoldTable({ tableId: "users-by-team-lfold", fromTable: sortedTable, initialState: expr(`'0'::jsonb`), @@ -430,12 +446,12 @@ describe.sequential("declareStoredTable (real postgres)", () => { END ) AS "newRowsData" `), - }); + })); return { fromTable, groupedTable, sortedTable, lFoldTable }; } function createTimeFoldTable() { const { fromTable, groupedTable } = createGroupedTable(); - const timeFoldTable = declareTimeFoldTable({ + const timeFoldTable = trackTable(declareTimeFoldTable({ tableId: "users-by-team-timefold", fromTable: groupedTable, initialState: expr(`'0'::jsonb`), @@ -459,34 +475,34 @@ describe.sequential("declareStoredTable (real postgres)", () => { ELSE NULL::timestamptz END AS "nextTimestamp" `), - }); + })); return { fromTable, groupedTable, timeFoldTable }; } function createLeftJoinedTable() { const fromTable = declareStoredTable<{ value: number, team: string | null }>({ tableId: "left-join-users" }); const joinTable = declareStoredTable<{ team: string | null, threshold: number, label: string }>({ tableId: "left-join-rules" }); - const groupedFromTable = declareGroupByTable({ + const groupedFromTable = trackTable(declareGroupByTable({ tableId: "left-join-users-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const groupedJoinTable = declareGroupByTable({ + })); + const groupedJoinTable = trackTable(declareGroupByTable({ tableId: "left-join-rules-by-team", fromTable: joinTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const leftJoinedTable = declareLeftJoinTable({ + })); + const leftJoinedTable = trackTable(declareLeftJoinTable({ tableId: "left-join-users-rules", leftTable: groupedFromTable, rightTable: groupedJoinTable, leftJoinKey: mapper(`(("rowData"->>'value')::int) AS "joinKey"`), rightJoinKey: mapper(`(("rowData"->>'threshold')::int) AS "joinKey"`), - }); + })); return { fromTable, joinTable, groupedFromTable, groupedJoinTable, leftJoinedTable }; } function createFlatMapMapGroupPipeline() { const { fromTable, groupedTable, flatMappedTable } = createFlatMappedTable(); - const mappedAfterFlatMap = declareMapTable({ + const mappedAfterFlatMap = trackTable(declareMapTable({ tableId: "users-by-team-flat-map-then-map", fromTable: flatMappedTable, mapper: mapper(` @@ -494,25 +510,25 @@ describe.sequential("declareStoredTable (real postgres)", () => { ("rowData"->'kind') AS "kind", (("rowData"->>'mappedValue')::int + 1) AS "mappedValuePlusOne" `), - }); - const groupedByKind = declareGroupByTable({ + })); + const groupedByKind = trackTable(declareGroupByTable({ tableId: "users-by-kind", fromTable: mappedAfterFlatMap, groupBy: mapper(`"rowData"->'kind' AS "groupKey"`), - }); + })); return { fromTable, groupedTable, flatMappedTable, mappedAfterFlatMap, groupedByKind }; } function createStackedMappedTables() { const { fromTable, groupedTable } = createGroupedTable(); - const mappedTableLevel1 = declareMapTable({ + const mappedTableLevel1 = trackTable(declareMapTable({ tableId: "users-by-team-map-level-1", fromTable: groupedTable, mapper: mapper(` ("rowData"->'team') AS "team", (("rowData"->>'value')::int + 10) AS "valuePlusTen" `), - }); - const mappedTableLevel2 = declareMapTable({ + })); + const mappedTableLevel2 = trackTable(declareMapTable({ tableId: "users-by-team-map-level-2", fromTable: mappedTableLevel1, mapper: mapper(` @@ -525,16 +541,16 @@ describe.sequential("declareStoredTable (real postgres)", () => { END ) AS "bucket" `), - }); + })); return { fromTable, groupedTable, mappedTableLevel1, mappedTableLevel2 }; } function createGroupMapGroupPipeline() { const { fromTable, groupedTable, mappedTableLevel1, mappedTableLevel2 } = createStackedMappedTables(); - const groupedByBucketTable = declareGroupByTable({ + const groupedByBucketTable = trackTable(declareGroupByTable({ tableId: "users-by-bucket", fromTable: mappedTableLevel2, groupBy: mapper(`"rowData"->'bucket' AS "groupKey"`), - }); + })); return { fromTable, groupedTable, mappedTableLevel1, mappedTableLevel2, groupedByBucketTable }; } function registerGroupAuditTrigger( @@ -829,11 +845,11 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("groupBy registers upstream trigger in init and deregisters in delete", () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-groupby-lifecycle" }); const fromTableInstrumentation = instrumentTriggerLifecycle(fromTable); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-groupby-lifecycle-by-team", fromTable: fromTableInstrumentation.table, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); expect(fromTableInstrumentation.getStats()).toEqual({ registerCalls: 0, deregisterCalls: 0, activeRegistrations: 0 }); groupedTable.init(); @@ -852,17 +868,17 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("flatMap registers upstream trigger in init and deregisters in delete", () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-flatmap-lifecycle" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-flatmap-lifecycle-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); const groupedTableInstrumentation = instrumentTriggerLifecycle(groupedTable); - const flatMappedTable = declareFlatMapTable({ + const flatMappedTable = trackTable(declareFlatMapTable({ tableId: "users-flatmap-lifecycle-expanded", fromTable: groupedTableInstrumentation.table, mapper: mapper(`jsonb_build_array("rowData") AS "rows"`), - }); + })); expect(groupedTableInstrumentation.getStats()).toEqual({ registerCalls: 0, deregisterCalls: 0, activeRegistrations: 0 }); flatMappedTable.init(); @@ -877,18 +893,18 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("sort registers upstream trigger in init and deregisters in delete", () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-sort-lifecycle" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-sort-lifecycle-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); const groupedTableInstrumentation = instrumentTriggerLifecycle(groupedTable); - const sortedTable = declareSortTable({ + const sortedTable = trackTable(declareSortTable({ tableId: "users-sort-lifecycle-sorted", fromTable: groupedTableInstrumentation.table, getSortKey: mapper(`(("rowData"->>'value')::int) AS "newSortKey"`), compareSortKeys: (a, b) => expr(`(((${a.sql}) #>> '{}')::int) - (((${b.sql}) #>> '{}')::int)`), - }); + })); expect(groupedTableInstrumentation.getStats()).toEqual({ registerCalls: 0, deregisterCalls: 0, activeRegistrations: 0 }); sortedTable.init(); @@ -903,17 +919,17 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("limit registers upstream trigger in init and deregisters in delete", () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-limit-lifecycle" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-limit-lifecycle-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); const groupedTableInstrumentation = instrumentTriggerLifecycle(groupedTable); - const limitedTable = declareLimitTable({ + const limitedTable = trackTable(declareLimitTable({ tableId: "users-limit-lifecycle-limited", fromTable: groupedTableInstrumentation.table, limit: expr(`2`), - }); + })); expect(groupedTableInstrumentation.getStats()).toEqual({ registerCalls: 0, deregisterCalls: 0, activeRegistrations: 0 }); limitedTable.init(); @@ -929,22 +945,22 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("concat registers all upstream triggers in init and deregisters in delete", () => { const fromTableA = declareStoredTable<{ value: number, team: string }>({ tableId: "users-concat-lifecycle-a" }); const fromTableB = declareStoredTable<{ value: number, team: string }>({ tableId: "users-concat-lifecycle-b" }); - const groupedTableA = declareGroupByTable({ + const groupedTableA = trackTable(declareGroupByTable({ tableId: "users-concat-lifecycle-a-by-team", fromTable: fromTableA, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const groupedTableB = declareGroupByTable({ + })); + const groupedTableB = trackTable(declareGroupByTable({ tableId: "users-concat-lifecycle-b-by-team", fromTable: fromTableB, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); const groupedTableAInstrumentation = instrumentTriggerLifecycle(groupedTableA); const groupedTableBInstrumentation = instrumentTriggerLifecycle(groupedTableB); - const concatenatedTable = declareConcatTable({ + const concatenatedTable = trackTable(declareConcatTable({ tableId: "users-concat-lifecycle", tables: [groupedTableAInstrumentation.table, groupedTableBInstrumentation.table], - }); + })); expect(groupedTableAInstrumentation.getStats()).toEqual({ registerCalls: 0, deregisterCalls: 0, activeRegistrations: 0 }); expect(groupedTableBInstrumentation.getStats()).toEqual({ registerCalls: 0, deregisterCalls: 0, activeRegistrations: 0 }); @@ -964,19 +980,19 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("lfold registers upstream trigger in init and deregisters in delete", () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-lfold-lifecycle" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-lfold-lifecycle-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const sortedTable = declareSortTable({ + })); + const sortedTable = trackTable(declareSortTable({ tableId: "users-lfold-lifecycle-sorted", fromTable: groupedTable, getSortKey: mapper(`(("rowData"->>'value')::int) AS "newSortKey"`), compareSortKeys: (a, b) => expr(`(((${a.sql}) #>> '{}')::int) - (((${b.sql}) #>> '{}')::int)`), - }); + })); const sortedTableInstrumentation = instrumentTriggerLifecycle(sortedTable); - const lFoldTable = declareLFoldTable({ + const lFoldTable = trackTable(declareLFoldTable({ tableId: "users-lfold-lifecycle-folded", fromTable: sortedTableInstrumentation.table, initialState: expr(`'0'::jsonb`), @@ -984,7 +1000,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { "oldState" AS "newState", jsonb_build_array("oldRowData") AS "newRowsData" `), - }); + })); expect(sortedTableInstrumentation.getStats()).toEqual({ registerCalls: 0, deregisterCalls: 0, activeRegistrations: 0 }); lFoldTable.init(); @@ -999,13 +1015,13 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("timefold registers upstream trigger in init and deregisters in delete", () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-timefold-lifecycle" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-timefold-lifecycle-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); const groupedTableInstrumentation = instrumentTriggerLifecycle(groupedTable); - const timeFoldTable = declareTimeFoldTable({ + const timeFoldTable = trackTable(declareTimeFoldTable({ tableId: "users-timefold-lifecycle-folded", fromTable: groupedTableInstrumentation.table, initialState: expr(`'0'::jsonb`), @@ -1014,7 +1030,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { jsonb_build_array("oldRowData") AS "newRowsData", NULL::timestamptz AS "nextTimestamp" `), - }); + })); expect(groupedTableInstrumentation.getStats()).toEqual({ registerCalls: 0, deregisterCalls: 0, activeRegistrations: 0 }); timeFoldTable.init(); @@ -1030,25 +1046,25 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("leftJoin registers all upstream triggers in init and deregisters in delete", () => { const fromTable = declareStoredTable<{ value: number, team: string | null }>({ tableId: "users-left-join-lifecycle" }); const joinTable = declareStoredTable<{ team: string | null, threshold: number, label: string }>({ tableId: "rules-left-join-lifecycle" }); - const groupedFromTable = declareGroupByTable({ + const groupedFromTable = trackTable(declareGroupByTable({ tableId: "users-left-join-lifecycle-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const groupedJoinTable = declareGroupByTable({ + })); + const groupedJoinTable = trackTable(declareGroupByTable({ tableId: "rules-left-join-lifecycle-by-team", fromTable: joinTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); const groupedFromTableInstrumentation = instrumentTriggerLifecycle(groupedFromTable); const groupedJoinTableInstrumentation = instrumentTriggerLifecycle(groupedJoinTable); - const leftJoinedTable = declareLeftJoinTable({ + const leftJoinedTable = trackTable(declareLeftJoinTable({ tableId: "users-rules-left-join-lifecycle", leftTable: groupedFromTableInstrumentation.table, rightTable: groupedJoinTableInstrumentation.table, leftJoinKey: mapper(`(("rowData"->>'value')::int) AS "joinKey"`), rightJoinKey: mapper(`(("rowData"->>'threshold')::int) AS "joinKey"`), - }); + })); expect(groupedFromTableInstrumentation.getStats()).toEqual({ registerCalls: 0, deregisterCalls: 0, activeRegistrations: 0 }); expect(groupedJoinTableInstrumentation.getStats()).toEqual({ registerCalls: 0, deregisterCalls: 0, activeRegistrations: 0 }); @@ -2015,7 +2031,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("mapTable matches equivalent single-row flatMap for rows, groups, and trigger payloads", async () => { const { fromTable, groupedTable, mappedTable } = createMappedTable(); - const equivalentFlatMapTable = declareFlatMapTable({ + const equivalentFlatMapTable = trackTable(declareFlatMapTable({ tableId: "users-by-team-mapped-equivalent-flatmap", fromTable: groupedTable, mapper: mapper(` @@ -2033,7 +2049,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { ) ) AS "rows" `), - }); + })); await runStatements(fromTable.init()); await runStatements(groupedTable.init()); @@ -2836,35 +2852,35 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("concatTable allows input tables with different sort comparators", async () => { const fromTableAsc = declareStoredTable<{ value: number, team: string }>({ tableId: "users-concat-sort-asc" }); - const groupedTableAsc = declareGroupByTable({ + const groupedTableAsc = trackTable(declareGroupByTable({ tableId: "users-concat-sort-asc-by-team", fromTable: fromTableAsc, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const sortedTableAsc = declareSortTable({ + })); + const sortedTableAsc = trackTable(declareSortTable({ tableId: "users-concat-sort-asc-sorted", fromTable: groupedTableAsc, getSortKey: mapper(`(("rowData"->>'value')::int) AS "newSortKey"`), compareSortKeys: (a, b) => expr(`(((${a.sql}) #>> '{}')::int) - (((${b.sql}) #>> '{}')::int)`), - }); + })); const fromTableDesc = declareStoredTable<{ value: number, team: string }>({ tableId: "users-concat-sort-desc" }); - const groupedTableDesc = declareGroupByTable({ + const groupedTableDesc = trackTable(declareGroupByTable({ tableId: "users-concat-sort-desc-by-team", fromTable: fromTableDesc, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const sortedTableDesc = declareSortTable({ + })); + const sortedTableDesc = trackTable(declareSortTable({ tableId: "users-concat-sort-desc-sorted", fromTable: groupedTableDesc, getSortKey: mapper(`(("rowData"->>'value')::int) AS "newSortKey"`), compareSortKeys: (a, b) => expr(`(((${b.sql}) #>> '{}')::int) - (((${a.sql}) #>> '{}')::int)`), - }); + })); - const concatenatedTable = declareConcatTable({ + const concatenatedTable = trackTable(declareConcatTable({ tableId: "users-by-team-concat-sort-mismatch", tables: [sortedTableAsc, sortedTableDesc], - }); + })); await runStatements(fromTableAsc.init()); await runStatements(groupedTableAsc.init()); @@ -3318,12 +3334,12 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("timeFoldTable reruns immediately when reducer timestamp is already due", async () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-timefold-immediate" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-timefold-immediate-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const timeFoldTable = declareTimeFoldTable({ + })); + const timeFoldTable = trackTable(declareTimeFoldTable({ tableId: "users-timefold-immediate-folded", fromTable: groupedTable, initialState: expr(`'0'::jsonb`), @@ -3352,7 +3368,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { ELSE NULL::timestamptz END AS "nextTimestamp" `), - }); + })); await runStatements(fromTable.init()); await runStatements(groupedTable.init()); @@ -3387,12 +3403,12 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("timeFoldTable does not enqueue when reducer returns null nextTimestamp", async () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-timefold-no-queue" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-timefold-no-queue-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const timeFoldTable = declareTimeFoldTable({ + })); + const timeFoldTable = trackTable(declareTimeFoldTable({ tableId: "users-timefold-no-queue-folded", fromTable: groupedTable, initialState: expr(`'0'::jsonb`), @@ -3406,7 +3422,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { ) AS "newRowsData", NULL::timestamptz AS "nextTimestamp" `), - }); + })); await runStatements(fromTable.init()); await runStatements(groupedTable.init()); @@ -3540,23 +3556,23 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("leftJoinTable matches null join keys with IS NOT DISTINCT FROM semantics", async () => { const fromTable = declareStoredTable<{ value: number | null, team: string | null }>({ tableId: "left-join-null-users" }); const joinTable = declareStoredTable<{ threshold: number | null, team: string | null, label: string }>({ tableId: "left-join-null-rules" }); - const groupedFromTable = declareGroupByTable({ + const groupedFromTable = trackTable(declareGroupByTable({ tableId: "left-join-null-users-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const groupedJoinTable = declareGroupByTable({ + })); + const groupedJoinTable = trackTable(declareGroupByTable({ tableId: "left-join-null-rules-by-team", fromTable: joinTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const leftJoinedTable = declareLeftJoinTable({ + })); + const leftJoinedTable = trackTable(declareLeftJoinTable({ tableId: "left-join-null-users-rules", leftTable: groupedFromTable, rightTable: groupedJoinTable, leftJoinKey: mapper(`"rowData"->'value' AS "joinKey"`), rightJoinKey: mapper(`"rowData"->'threshold' AS "joinKey"`), - }); + })); await runStatements(fromTable.init()); await runStatements(joinTable.init()); @@ -4152,22 +4168,22 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("parallel map tables on the same grouped source stay isolated", async () => { const { fromTable, groupedTable } = createGroupedTable(); - const mapTableA = declareMapTable({ + const mapTableA = trackTable(declareMapTable({ tableId: "users-map-a", fromTable: groupedTable, mapper: mapper(` ("rowData"->'team') AS "team", (("rowData"->>'value')::int + 100) AS "mappedValueA" `), - }); - const mapTableB = declareMapTable({ + })); + const mapTableB = trackTable(declareMapTable({ tableId: "users-map-b", fromTable: groupedTable, mapper: mapper(` ("rowData"->'team') AS "team", ((("rowData"->>'value')::int) * -1) AS "mappedValueB" `), - }); + })); await runStatements(fromTable.init()); await runStatements(groupedTable.init()); @@ -4224,26 +4240,26 @@ describe.sequential("declareStoredTable (real postgres)", () => { const boundaries = declareStoredTable<{ t: number }>({ tableId: "compact-test-boundaries", }); - const entriesSorted = declareSortTable({ + const entriesSorted = trackTable(declareSortTable({ tableId: "compact-test-entries-sorted", fromTable: entries, getSortKey: mapper(`(("rowData"->>'t')::numeric) AS "newSortKey"`), compareSortKeys: (a, b) => ({ type: "expression", sql: `(((${a.sql}) #>> '{}')::numeric > ((${b.sql}) #>> '{}')::numeric)::int - (((${a.sql}) #>> '{}')::numeric < ((${b.sql}) #>> '{}')::numeric)::int` }), - }); - const boundariesSorted = declareSortTable({ + })); + const boundariesSorted = trackTable(declareSortTable({ tableId: "compact-test-boundaries-sorted", fromTable: boundaries, getSortKey: mapper(`(("rowData"->>'t')::numeric) AS "newSortKey"`), compareSortKeys: (a, b) => ({ type: "expression", sql: `(((${a.sql}) #>> '{}')::numeric > ((${b.sql}) #>> '{}')::numeric)::int - (((${a.sql}) #>> '{}')::numeric < ((${b.sql}) #>> '{}')::numeric)::int` }), - }); - const compacted = declareCompactTable({ + })); + const compacted = trackTable(declareCompactTable({ tableId: "compact-test-compacted", toBeCompactedTable: entriesSorted, boundaryTable: boundariesSorted, orderingKey: "t", compactKey: "quantity", partitionKey: "itemId", - }); + })); return { entries, boundaries, entriesSorted, boundariesSorted, compacted }; } @@ -4513,12 +4529,12 @@ describe.sequential("declareStoredTable (real postgres)", () => { const source = declareStoredTable<{ team: string, value: number }>({ tableId: "reduce-test-source", }); - const grouped = declareGroupByTable({ + const grouped = trackTable(declareGroupByTable({ tableId: "reduce-test-grouped", fromTable: source, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const reduced = declareReduceTable({ + })); + const reduced = trackTable(declareReduceTable({ tableId: "reduce-test-sum", fromTable: grouped, initialState: expr(`'0'::jsonb`), @@ -4532,7 +4548,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { "groupKey" AS "team", ("state" #>> '{}')::numeric AS "total" `), - }); + })); return { source, grouped, reduced }; } @@ -4541,18 +4557,18 @@ describe.sequential("declareStoredTable (real postgres)", () => { const source = declareStoredTable<{ category: string, label: string, t: number }>({ tableId: "reduce-test-arr-source", }); - const grouped = declareGroupByTable({ + const grouped = trackTable(declareGroupByTable({ tableId: "reduce-test-arr-grouped", fromTable: source, groupBy: mapper(`"rowData"->'category' AS "groupKey"`), - }); - const sorted = declareSortTable({ + })); + const sorted = trackTable(declareSortTable({ tableId: "reduce-test-arr-sorted", fromTable: grouped, getSortKey: mapper(`(("rowData"->>'t')::numeric) AS "newSortKey"`), compareSortKeys: (a, b) => ({ type: "expression", sql: `(((${a.sql}) #>> '{}')::numeric > ((${b.sql}) #>> '{}')::numeric)::int - (((${a.sql}) #>> '{}')::numeric < ((${b.sql}) #>> '{}')::numeric)::int` }), - }); - const reduced = declareReduceTable({ + })); + const reduced = trackTable(declareReduceTable({ tableId: "reduce-test-arr", fromTable: sorted, initialState: expr(`'[]'::jsonb`), @@ -4563,7 +4579,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { "groupKey" AS "category", "state" AS "labels" `), - }); + })); return { source, grouped, sorted, reduced }; } @@ -4637,7 +4653,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { const source = declareStoredTable<{ value: number }>({ tableId: "reduce-test-ungrouped-source", }); - const reduced = declareReduceTable({ + const reduced = trackTable(declareReduceTable({ tableId: "reduce-test-ungrouped", fromTable: source, initialState: expr(`'0'::jsonb`), @@ -4650,7 +4666,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { finalize: mapper(` ("state" #>> '{}')::numeric AS "total" `), - }); + })); await runStatements(source.init()); await runStatements(reduced.init()); @@ -4800,12 +4816,12 @@ describe.sequential("declareStoredTable (real postgres)", () => { const source = declareStoredTable<{ team: string | null, value: number }>({ tableId: "reduce-test-null-gk-source", }); - const grouped = declareGroupByTable({ + const grouped = trackTable(declareGroupByTable({ tableId: "reduce-test-null-gk-grouped", fromTable: source, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const reduced = declareReduceTable({ + })); + const reduced = trackTable(declareReduceTable({ tableId: "reduce-test-null-gk", fromTable: grouped, initialState: expr(`'0'::jsonb`), @@ -4819,7 +4835,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { "groupKey" AS "team", ("state" #>> '{}')::numeric AS "total" `), - }); + })); await runStatements(source.init()); await runStatements(grouped.init()); await runStatements(reduced.init()); @@ -4841,7 +4857,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { const source = declareStoredTable<{ tenancyId: string, customerId: string, value: number }>({ tableId: "reduce-test-complex-gk-source", }); - const grouped = declareGroupByTable({ + const grouped = trackTable(declareGroupByTable({ tableId: "reduce-test-complex-gk-grouped", fromTable: source, groupBy: mapper(` @@ -4850,8 +4866,8 @@ describe.sequential("declareStoredTable (real postgres)", () => { 'customerId', "rowData"->'customerId' ) AS "groupKey" `), - }); - const reduced = declareReduceTable({ + })); + const reduced = trackTable(declareReduceTable({ tableId: "reduce-test-complex-gk", fromTable: grouped, initialState: expr(`'0'::jsonb`), @@ -4866,7 +4882,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { "groupKey"->'customerId' AS "customerId", ("state" #>> '{}')::numeric AS "total" `), - }); + })); await runStatements(source.init()); await runStatements(grouped.init()); await runStatements(reduced.init()); diff --git a/apps/backend/src/lib/bulldozer/db/index.ts b/apps/backend/src/lib/bulldozer/db/index.ts index e426bfbcf2..b704cf6815 100644 --- a/apps/backend/src/lib/bulldozer/db/index.ts +++ b/apps/backend/src/lib/bulldozer/db/index.ts @@ -3,7 +3,7 @@ import { deindent } from "@stackframe/stack-shared/dist/utils/strings"; import { BULLDOZER_SORT_HELPERS_SQL } from "./bulldozer-sort-helpers-sql"; import type { Json, RowData, RowIdentifier, SqlExpression, SqlQuery, SqlStatement, TableId } from "./utilities"; -import { quoteSqlIdentifier } from "./utilities"; +import { quoteSqlIdentifier, sqlQuery, tableIdToDebugString } from "./utilities"; // ====== Table implementations ====== // IMPORTANT NOTE: For every new table implementation, we should also add tests (unit, fuzzing, & perf; including an entry in the "hundreds of thousands" perf test), an example in the example schema, and support in Bulldozer Studio. @@ -37,6 +37,9 @@ export type Table = { * @param trigger A SQL statement that can reference the changes table with columns `groupKey: GK`, `rowIdentifier: RowIdentifier`, `oldRowSortKey: SK | null`, `newRowSortKey: SK | null`, `oldRowData: RowData | null`, `newRowData: RowData | null`. Note that this trigger should be a no-op if the table that created this trigger is not initialized. */ registerRowChangeTrigger(trigger: (changesTable: SqlExpression<{ __brand: "$SQL_Table" }>) => SqlStatement[]): { deregister: () => void }, + + /** Returns a query producing error rows if materialized data differs from re-derivation from inputs. Empty result = healthy. */ + verifyDataIntegrity(): SqlQuery>, }; export { declareCompactTable } from "./tables/compact-table"; @@ -138,3 +141,17 @@ export function toExecutableSqlTransaction(statements: SqlStatement[], options: COMMIT; `; } + +// any is used here because the verifier works with heterogeneous table types +export function verifyAllTablesIntegrity(tables: Table[]): SqlQuery> { + if (tables.length === 0) { + return sqlQuery`SELECT NULL::text AS tableid, NULL::text AS errortype, NULL::jsonb AS groupkey, NULL::text AS rowidentifier, NULL::jsonb AS expected, NULL::jsonb AS actual WHERE false`; + } + const combined: { sql: string } = { + sql: tables.map(t => { + const label = tableIdToDebugString(t.tableId).replaceAll("'", "''"); + return `SELECT '${label}' AS tableid, "v".* FROM (${t.verifyDataIntegrity().sql}) AS "v"`; + }).join("\nUNION ALL\n"), + }; + return sqlQuery`${combined}`; +} diff --git a/apps/backend/src/lib/bulldozer/db/tables/compact-table.ts b/apps/backend/src/lib/bulldozer/db/tables/compact-table.ts index 7812ef7bd0..f032a62d3f 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/compact-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/compact-table.ts @@ -335,7 +335,7 @@ export function declareCompactTable< inputTriggerRegistrations = []; }; - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.toBeCompactedTable, options.boundaryTable], debugArgs: { @@ -490,5 +490,56 @@ export function declareCompactTable< triggers.set(id, trigger); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allCompactedGroups = options.toBeCompactedTable.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allBoundaryGroups = options.boundaryTable.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "allGroups" AS ( + SELECT "g"."groupkey" AS "groupKey" FROM (${allCompactedGroups}) AS "g" + UNION + SELECT "g"."groupkey" AS "groupKey" FROM (${allBoundaryGroups}) AS "g" + ), + "expected" AS ( + SELECT + "groups"."groupKey" AS "groupKey", + "rows"."rowIdentifier" AS "rowIdentifier", + "rows"."rowData" AS "rowData" + FROM "allGroups" AS "groups" + CROSS JOIN LATERAL ( + ${computeCompactedRowsSql(sqlExpression`"groups"."groupKey"`)} + ) AS "rows" + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ) + SELECT + CASE + WHEN "expected"."rowIdentifier" IS NULL THEN 'extra_row' + WHEN "actual"."rowIdentifier" IS NULL THEN 'missing_row' + ELSE 'data_mismatch' + END AS errortype, + COALESCE("expected"."groupKey", "actual"."groupKey") AS groupkey, + COALESCE("expected"."rowIdentifier", "actual"."rowIdentifier") AS rowidentifier, + "expected"."rowData" AS expected, + "actual"."rowData" AS actual + FROM "expected" + FULL OUTER JOIN "actual" + ON "expected"."groupKey" IS NOT DISTINCT FROM "actual"."groupKey" + AND "expected"."rowIdentifier" = "actual"."rowIdentifier" + WHERE ("expected"."rowIdentifier" IS NULL + OR "actual"."rowIdentifier" IS NULL + OR "expected"."rowData" IS DISTINCT FROM "actual"."rowData") + AND ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/concat-table.ts b/apps/backend/src/lib/bulldozer/db/tables/concat-table.ts index 6e7d7bf82e..3cc5e51b55 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/concat-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/concat-table.ts @@ -211,5 +211,9 @@ export function declareConcatTable< triggers.set(id, trigger); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => sqlQuery` + SELECT NULL::text AS errortype, NULL::jsonb AS groupkey, NULL::text AS rowidentifier, NULL::jsonb AS expected, NULL::jsonb AS actual + WHERE false + `, }; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/filter-table.ts b/apps/backend/src/lib/bulldozer/db/tables/filter-table.ts index 0c29bab72a..48a99d2f8d 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/filter-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/filter-table.ts @@ -77,6 +77,7 @@ export function declareFilterTable< "listGroups", "listRowsInGroup", "registerRowChangeTrigger", + "verifyDataIntegrity", ]), }; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/flat-map-table.ts b/apps/backend/src/lib/bulldozer/db/tables/flat-map-table.ts index 7217d443da..8769a66855 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/flat-map-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/flat-map-table.ts @@ -216,7 +216,7 @@ export function declareFlatMapTable< fromTableTriggerRegistration = null; }; - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -405,5 +405,60 @@ export function declareFlatMapTable< triggers.set(id, trigger); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputRows = options.fromTable.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "expected" AS ( + SELECT + "source"."groupkey" AS "groupKey", + ${createExpandedRowIdentifier( + sqlExpression`"source"."rowidentifier"`, + sqlExpression`"flatRow"."flatIndex"`, + )} AS "rowIdentifier", + "flatRow"."rowData" AS "rowData" + FROM (${allInputRows}) AS "source" + LEFT JOIN LATERAL ( + SELECT "mapped"."rows" AS "mappedRows" + FROM ( + SELECT ${options.mapper} + FROM ( + SELECT "source"."rowidentifier" AS "rowIdentifier", "source"."rowdata" AS "rowData" + ) AS "mapperInput" + ) AS "mapped" + ) AS "mapped" ON true + CROSS JOIN LATERAL jsonb_array_elements( + CASE WHEN jsonb_typeof("mapped"."mappedRows") = 'array' THEN "mapped"."mappedRows" ELSE '[]'::jsonb END + ) WITH ORDINALITY AS "flatRow"("rowData", "flatIndex") + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ) + SELECT + CASE + WHEN "expected"."rowIdentifier" IS NULL THEN 'extra_row' + WHEN "actual"."rowIdentifier" IS NULL THEN 'missing_row' + ELSE 'data_mismatch' + END AS errortype, + COALESCE("expected"."groupKey", "actual"."groupKey") AS groupkey, + COALESCE("expected"."rowIdentifier", "actual"."rowIdentifier") AS rowidentifier, + "expected"."rowData" AS expected, + "actual"."rowData" AS actual + FROM "expected" + FULL OUTER JOIN "actual" + ON "expected"."groupKey" IS NOT DISTINCT FROM "actual"."groupKey" + AND "expected"."rowIdentifier" = "actual"."rowIdentifier" + WHERE ("expected"."rowIdentifier" IS NULL + OR "actual"."rowIdentifier" IS NULL + OR "expected"."rowData" IS DISTINCT FROM "actual"."rowData") + AND ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/group-by-table.ts b/apps/backend/src/lib/bulldozer/db/tables/group-by-table.ts index 910f13828e..b83ab2a4c5 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/group-by-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/group-by-table.ts @@ -196,7 +196,7 @@ export function declareGroupByTable< fromTableTriggerRegistration = null; }; - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -350,5 +350,54 @@ export function declareGroupByTable< triggers.set(id, trigger); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputRows = options.fromTable.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "expected" AS ( + SELECT + "mapped"."groupKey" AS "groupKey", + "source"."rowidentifier" AS "rowIdentifier", + "source"."rowdata" AS "rowData" + FROM (${allInputRows}) AS "source" + LEFT JOIN LATERAL ( + SELECT "mapped"."groupKey" + FROM ( + SELECT ${options.groupBy} + FROM ( + SELECT "source"."rowidentifier" AS "rowIdentifier", "source"."rowdata" AS "rowData" + ) AS "groupByInput" + ) AS "mapped" + ) AS "mapped" ON true + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ) + SELECT + CASE + WHEN "expected"."rowIdentifier" IS NULL THEN 'extra_row' + WHEN "actual"."rowIdentifier" IS NULL THEN 'missing_row' + ELSE 'data_mismatch' + END AS errortype, + COALESCE("expected"."groupKey", "actual"."groupKey") AS groupkey, + COALESCE("expected"."rowIdentifier", "actual"."rowIdentifier") AS rowidentifier, + "expected"."rowData" AS expected, + "actual"."rowData" AS actual + FROM "expected" + FULL OUTER JOIN "actual" + ON "expected"."groupKey" IS NOT DISTINCT FROM "actual"."groupKey" + AND "expected"."rowIdentifier" = "actual"."rowIdentifier" + WHERE ("expected"."rowIdentifier" IS NULL + OR "actual"."rowIdentifier" IS NULL + OR "expected"."rowData" IS DISTINCT FROM "actual"."rowData") + AND ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/l-fold-table.ts b/apps/backend/src/lib/bulldozer/db/tables/l-fold-table.ts index fce35f6992..457b95c794 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/l-fold-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/l-fold-table.ts @@ -503,7 +503,7 @@ export function declareLFoldTable< sourceSortTriggerRegistration = null; }; - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -799,5 +799,40 @@ export function declareLFoldTable< triggers.set(id, trigger); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputGroups = options.fromTable.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualGroups = table.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "inputGroups" AS ( + SELECT "g"."groupkey" AS "groupKey" FROM (${allInputGroups}) AS "g" + ), + "actualGroups" AS ( + SELECT "g"."groupkey" AS "groupKey" FROM (${allActualGroups}) AS "g" + ), + "missingGroups" AS ( + SELECT 'missing_group' AS errortype, + "inputGroups"."groupKey" AS groupkey, NULL::text AS rowidentifier, + NULL::jsonb AS expected, NULL::jsonb AS actual + FROM "inputGroups" + LEFT JOIN "actualGroups" ON "actualGroups"."groupKey" IS NOT DISTINCT FROM "inputGroups"."groupKey" + WHERE "actualGroups"."groupKey" IS NULL + ), + "extraGroups" AS ( + SELECT 'extra_group' AS errortype, + "actualGroups"."groupKey" AS groupkey, NULL::text AS rowidentifier, + NULL::jsonb AS expected, NULL::jsonb AS actual + FROM "actualGroups" + LEFT JOIN "inputGroups" ON "inputGroups"."groupKey" IS NOT DISTINCT FROM "actualGroups"."groupKey" + WHERE "inputGroups"."groupKey" IS NULL + ) + SELECT * FROM "missingGroups" WHERE ${isInitializedExpression} + UNION ALL SELECT * FROM "extraGroups" WHERE ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/left-join-table.ts b/apps/backend/src/lib/bulldozer/db/tables/left-join-table.ts index ec7ac59252..48a16a9c07 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/left-join-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/left-join-table.ts @@ -369,7 +369,7 @@ export function declareLeftJoinTable< inputTriggerRegistrations = []; }; - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.leftTable, options.rightTable], debugArgs: { @@ -519,5 +519,94 @@ export function declareLeftJoinTable< triggers.set(id, trigger); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allLeftRows = options.leftTable.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allRightRows = options.rightTable.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "leftRows" AS ( + SELECT + "source"."groupkey" AS "groupKey", + "source"."rowidentifier" AS "leftRowIdentifier", + "source"."rowdata" AS "leftRowData", + "joinKey"."joinKey" AS "leftJoinKey" + FROM (${allLeftRows}) AS "source" + LEFT JOIN LATERAL ( + SELECT "mapped"."joinKey" + FROM ( + SELECT ${options.leftJoinKey} + FROM (SELECT "source"."rowidentifier" AS "rowIdentifier", "source"."rowdata" AS "rowData") AS "joinKeyInput" + ) AS "mapped" + ) AS "joinKey" ON true + ), + "rightRows" AS ( + SELECT + "source"."groupkey" AS "groupKey", + "source"."rowidentifier" AS "rightRowIdentifier", + "source"."rowdata" AS "rightRowData", + "joinKey"."joinKey" AS "rightJoinKey" + FROM (${allRightRows}) AS "source" + LEFT JOIN LATERAL ( + SELECT "mapped"."joinKey" + FROM ( + SELECT ${options.rightJoinKey} + FROM (SELECT "source"."rowidentifier" AS "rowIdentifier", "source"."rowdata" AS "rowData") AS "joinKeyInput" + ) AS "mapped" + ) AS "joinKey" ON true + ), + "expected" AS ( + SELECT DISTINCT ON ("joined"."groupKey", "joined"."rowIdentifier") + "joined"."groupKey" AS "groupKey", + "joined"."rowIdentifier" AS "rowIdentifier", + "joined"."rowData" AS "rowData" + FROM ( + SELECT + "leftRows"."groupKey" AS "groupKey", + ${createJoinedRowIdentifier( + sqlExpression`"leftRows"."leftRowIdentifier"`, + sqlExpression`"rightRows"."rightRowIdentifier"`, + )} AS "rowIdentifier", + jsonb_build_object( + 'leftRowData', "leftRows"."leftRowData", + 'rightRowData', "rightRows"."rightRowData" + ) AS "rowData" + FROM "leftRows" + LEFT JOIN "rightRows" + ON "rightRows"."groupKey" IS NOT DISTINCT FROM "leftRows"."groupKey" + AND "rightRows"."rightJoinKey" IS NOT DISTINCT FROM "leftRows"."leftJoinKey" + ) AS "joined" + ORDER BY "joined"."groupKey", "joined"."rowIdentifier" + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ) + SELECT + CASE + WHEN "expected"."rowIdentifier" IS NULL THEN 'extra_row' + WHEN "actual"."rowIdentifier" IS NULL THEN 'missing_row' + ELSE 'data_mismatch' + END AS errortype, + COALESCE("expected"."groupKey", "actual"."groupKey") AS groupkey, + COALESCE("expected"."rowIdentifier", "actual"."rowIdentifier") AS rowidentifier, + "expected"."rowData" AS expected, + "actual"."rowData" AS actual + FROM "expected" + FULL OUTER JOIN "actual" + ON "expected"."groupKey" IS NOT DISTINCT FROM "actual"."groupKey" + AND "expected"."rowIdentifier" = "actual"."rowIdentifier" + WHERE ("expected"."rowIdentifier" IS NULL + OR "actual"."rowIdentifier" IS NULL + OR "expected"."rowData" IS DISTINCT FROM "actual"."rowData") + AND ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/limit-table.ts b/apps/backend/src/lib/bulldozer/db/tables/limit-table.ts index 01eed32169..8e125eb513 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/limit-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/limit-table.ts @@ -219,7 +219,7 @@ export function declareLimitTable< fromTableTriggerRegistration = null; }; - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -493,5 +493,46 @@ export function declareLimitTable< triggers.set(id, trigger); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputRows = options.fromTable.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "inputRows" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allInputRows}) AS "r" + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ), + "extraRows" AS ( + SELECT 'extra_row' AS errortype, + "actual"."groupKey" AS groupkey, "actual"."rowIdentifier" AS rowidentifier, + NULL::jsonb AS expected, "actual"."rowData" AS actual + FROM "actual" + LEFT JOIN "inputRows" + ON "inputRows"."groupKey" IS NOT DISTINCT FROM "actual"."groupKey" + AND "inputRows"."rowIdentifier" = "actual"."rowIdentifier" + WHERE "inputRows"."rowIdentifier" IS NULL + ), + "overLimit" AS ( + SELECT 'over_limit' AS errortype, + "counts"."groupKey" AS groupkey, NULL::text AS rowidentifier, + to_jsonb("counts"."cnt") AS expected, to_jsonb(${normalizedLimit}) AS actual + FROM ( + SELECT "groupKey", COUNT(*)::int AS "cnt" FROM "actual" GROUP BY "groupKey" + ) AS "counts" + WHERE "counts"."cnt" > ${normalizedLimit} + ) + SELECT * FROM "extraRows" WHERE ${isInitializedExpression} + UNION ALL + SELECT * FROM "overLimit" WHERE ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/map-table.ts b/apps/backend/src/lib/bulldozer/db/tables/map-table.ts index 87071a1295..69b34c1ea3 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/map-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/map-table.ts @@ -84,6 +84,7 @@ export function declareMapTable< "listGroups", "listRowsInGroup", "registerRowChangeTrigger", + "verifyDataIntegrity", ]), }; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/reduce-table.ts b/apps/backend/src/lib/bulldozer/db/tables/reduce-table.ts index e6c28b33aa..72d9abfc3d 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/reduce-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/reduce-table.ts @@ -286,7 +286,7 @@ export function declareReduceTable< fromTableTriggerRegistration = null; }; - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -432,5 +432,52 @@ export function declareReduceTable< triggers.set(id, trigger); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputGroups = options.fromTable.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "inputGroups" AS ( + SELECT "g"."groupkey" AS "groupKey" FROM (${allInputGroups}) AS "g" + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ), + "actualGroupCounts" AS ( + SELECT "groupKey", COUNT(*)::int AS "cnt" FROM "actual" GROUP BY "groupKey" + ), + "missingGroups" AS ( + SELECT 'missing_group' AS errortype, + "inputGroups"."groupKey" AS groupkey, NULL::text AS rowidentifier, + NULL::jsonb AS expected, NULL::jsonb AS actual + FROM "inputGroups" + LEFT JOIN "actualGroupCounts" ON "actualGroupCounts"."groupKey" IS NOT DISTINCT FROM "inputGroups"."groupKey" + WHERE "actualGroupCounts"."groupKey" IS NULL + ), + "extraGroups" AS ( + SELECT 'extra_group' AS errortype, + "actualGroupCounts"."groupKey" AS groupkey, NULL::text AS rowidentifier, + NULL::jsonb AS expected, NULL::jsonb AS actual + FROM "actualGroupCounts" + LEFT JOIN "inputGroups" ON "inputGroups"."groupKey" IS NOT DISTINCT FROM "actualGroupCounts"."groupKey" + WHERE "inputGroups"."groupKey" IS NULL + ), + "wrongRowCount" AS ( + SELECT 'wrong_row_count' AS errortype, + "actualGroupCounts"."groupKey" AS groupkey, NULL::text AS rowidentifier, + '1'::jsonb AS expected, to_jsonb("actualGroupCounts"."cnt") AS actual + FROM "actualGroupCounts" + WHERE "actualGroupCounts"."cnt" <> 1 + ) + SELECT * FROM "missingGroups" WHERE ${isInitializedExpression} + UNION ALL SELECT * FROM "extraGroups" WHERE ${isInitializedExpression} + UNION ALL SELECT * FROM "wrongRowCount" WHERE ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/sort-table.ts b/apps/backend/src/lib/bulldozer/db/tables/sort-table.ts index eb916b4bf3..822fba262a 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/sort-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/sort-table.ts @@ -175,7 +175,7 @@ export function declareSortTable< fromTableTriggerRegistration = null; }; - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -414,5 +414,42 @@ export function declareSortTable< triggers.set(id, trigger); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputRows = options.fromTable.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "expected" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allInputRows}) AS "r" + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ) + SELECT + CASE + WHEN "expected"."rowIdentifier" IS NULL THEN 'extra_row' + WHEN "actual"."rowIdentifier" IS NULL THEN 'missing_row' + ELSE 'data_mismatch' + END AS errortype, + COALESCE("expected"."groupKey", "actual"."groupKey") AS groupkey, + COALESCE("expected"."rowIdentifier", "actual"."rowIdentifier") AS rowidentifier, + "expected"."rowData" AS expected, + "actual"."rowData" AS actual + FROM "expected" + FULL OUTER JOIN "actual" + ON "expected"."groupKey" IS NOT DISTINCT FROM "actual"."groupKey" + AND "expected"."rowIdentifier" = "actual"."rowIdentifier" + WHERE ("expected"."rowIdentifier" IS NULL + OR "actual"."rowIdentifier" IS NULL + OR "expected"."rowData" IS DISTINCT FROM "actual"."rowData") + AND ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/stored-table.ts b/apps/backend/src/lib/bulldozer/db/tables/stored-table.ts index f17a737a0b..3d3dc26465 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/stored-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/stored-table.ts @@ -77,6 +77,10 @@ export function declareStoredTable(options: { triggers.set(id, trigger); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => sqlQuery` + SELECT NULL::text AS errortype, NULL::jsonb AS groupkey, NULL::text AS rowidentifier, NULL::jsonb AS expected, NULL::jsonb AS actual + WHERE false + `, setRow: (rowIdentifier, rowData) => { const oldRowsTableName = `old_rows_${generateSecureRandomString()}`; const upsertedRowsTableName = `upserted_rows_${generateSecureRandomString()}`; diff --git a/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts b/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts index e83c5b2710..ce708629a3 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts @@ -422,7 +422,7 @@ export function declareTimeFoldTable< fromTableTriggerRegistration = null; }; - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -564,5 +564,31 @@ export function declareTimeFoldTable< triggers.set(id, trigger); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputGroups = options.fromTable.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualGroups = table.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "inputGroups" AS ( + SELECT "g"."groupkey" AS "groupKey" FROM (${allInputGroups}) AS "g" + ), + "actualGroups" AS ( + SELECT "g"."groupkey" AS "groupKey" FROM (${allActualGroups}) AS "g" + ), + "extraGroups" AS ( + SELECT 'extra_group' AS errortype, + "actualGroups"."groupKey" AS groupkey, NULL::text AS rowidentifier, + NULL::jsonb AS expected, NULL::jsonb AS actual + FROM "actualGroups" + LEFT JOIN "inputGroups" ON "inputGroups"."groupKey" IS NOT DISTINCT FROM "actualGroups"."groupKey" + WHERE "inputGroups"."groupKey" IS NULL + ) + SELECT * FROM "extraGroups" WHERE ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/claude/CLAUDE-KNOWLEDGE.md b/claude/CLAUDE-KNOWLEDGE.md index a4ea4ec286..0d601d9e91 100644 --- a/claude/CLAUDE-KNOWLEDGE.md +++ b/claude/CLAUDE-KNOWLEDGE.md @@ -237,3 +237,9 @@ A: No. It was removed because item quantity changes have nothing to do with paym Q: Are Bulldozer table `init()` calls idempotent? A: No. They use plain `INSERT INTO "BulldozerStorageEngine"` without `ON CONFLICT DO NOTHING`, so calling `init()` twice crashes with a unique constraint violation. The ingress script (`bulldozer-payments-init.ts`) checks `table.isInitialized()` per-table before calling `init()` to handle this safely. + +Q: How does the Bulldozer `verifyDataIntegrity()` method work? +A: Each `Table` has a `verifyDataIntegrity()` method returning a `SqlQuery` that produces error rows (empty = healthy). For derived tables (flat-map, sort, group-by, left-join, compact), it re-derives expected rows from input tables and does a FULL OUTER JOIN with actual materialized rows. For stored/concat tables (leaf/virtual), it returns empty. For reduce/l-fold/time-fold, it does structural group-correspondence checks. All derived-table queries are gated on `isInitialized` so uninitialized tables are silently skipped. The helper `verifyAllTablesIntegrity(tables)` UNION ALLs all tables' queries with a `tableid` column. Filter and map tables delegate to their internal nested flat-map table's verifyDataIntegrity via the `pick()` spread. + +Q: How are Bulldozer table implementations structured when they need self-referencing methods? +A: When a table method (like `verifyDataIntegrity`) needs to call another method on the same table (like `listRowsInGroup`), the `declare*Table` function assigns the result object to a `const table` variable first, then returns it: `const table: ReturnType> = { ... }; return table;`. This lets closures reference `table` by the time they execute. From 5ecb7523259cff8d787e4b2c8792d5afaca96ac1 Mon Sep 17 00:00:00 2001 From: mantrakp04 Date: Tue, 14 Apr 2026 10:06:26 -0700 Subject: [PATCH 2/3] feat(bulldozer): enhance data integrity verification across payments tables Added a recursive verification process for all payments tables in the Bulldozer system. This includes the implementation of a `verifyDataIntegrity()` method for each table, which checks for data consistency and throws errors if discrepancies are found. The verification is integrated into the main execution flow, ensuring that data integrity is maintained across all operations. Additionally, updated tests to validate the integrity of initialized tables after each test execution. --- .../scripts/verify-data-integrity/index.ts | 18 +++++++ .../src/lib/bulldozer/db/index.fuzz.test.ts | 51 +++++++++++++++++- .../src/lib/bulldozer/db/index.perf.test.ts | 53 ++++++++++++++++++- .../src/lib/bulldozer/db/index.test.ts | 6 +-- apps/backend/src/lib/bulldozer/db/index.ts | 16 +----- .../backend/src/lib/bulldozer/db/utilities.ts | 2 +- apps/backend/src/lib/tokens.tsx | 1 + 7 files changed, 124 insertions(+), 23 deletions(-) diff --git a/apps/backend/scripts/verify-data-integrity/index.ts b/apps/backend/scripts/verify-data-integrity/index.ts index ea1f01d703..965966b7b8 100644 --- a/apps/backend/scripts/verify-data-integrity/index.ts +++ b/apps/backend/scripts/verify-data-integrity/index.ts @@ -1,3 +1,6 @@ +import { toQueryableSqlQuery } from "@/lib/bulldozer/db/index"; +import { tableIdToDebugString } from "@/lib/bulldozer/db/utilities"; +import { createPaymentsSchema } from "@/lib/payments/schema/index"; import { DEFAULT_BRANCH_ID, getSoleTenancyFromProjectBranch } from "@/lib/tenancies"; import { getPrismaClientForTenancy, globalPrismaClient } from "@/prisma-client"; import type { OrganizationRenderedConfig } from "@stackframe/stack-shared/dist/config/schema"; @@ -158,6 +161,21 @@ async function main() { console.log(`Will check at most ${maxUsersPerProject} users per project.`); } + await recurse(`[bulldozer] verifying data integrity across all payments tables`, async () => { + const schema = createPaymentsSchema(); + for (const table of schema._allTables) { + const label = tableIdToDebugString(table.tableId); + await recurse(`[bulldozer table] ${label}`, async () => { + const errors = await prismaClient.$queryRawUnsafe(toQueryableSqlQuery(table.verifyDataIntegrity())); + if (errors.length > 0) { + throw new StackAssertionError(deindent` + Bulldozer data integrity violation in table ${label}: found ${errors.length} error row(s). + `, { errors }); + } + }); + } + }); + const endAt = Math.min(startAt + count, projects.length); for (let i = startAt; i < endAt; i++) { const projectId = projects[i].id; diff --git a/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts b/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts index b9e73d5f3b..03ff3538b1 100644 --- a/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts +++ b/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts @@ -1,7 +1,48 @@ import { stringCompare } from "@stackframe/stack-shared/dist/utils/strings"; import postgres from "postgres"; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test } from "vitest"; -import { declareCompactTable, declareConcatTable, declareFilterTable, declareFlatMapTable, declareGroupByTable, declareLeftJoinTable, declareLFoldTable, declareLimitTable, declareMapTable, declareReduceTable, declareSortTable, declareStoredTable, declareTimeFoldTable, toExecutableSqlTransaction, toQueryableSqlQuery } from "./index"; +import type { Table } from "./index"; +import { + declareCompactTable as _declareCompactTable, + declareConcatTable as _declareConcatTable, + declareFilterTable as _declareFilterTable, + declareFlatMapTable as _declareFlatMapTable, + declareGroupByTable as _declareGroupByTable, + declareLeftJoinTable as _declareLeftJoinTable, + declareLFoldTable as _declareLFoldTable, + declareLimitTable as _declareLimitTable, + declareMapTable as _declareMapTable, + declareReduceTable as _declareReduceTable, + declareSortTable as _declareSortTable, + declareStoredTable as _declareStoredTable, + declareTimeFoldTable as _declareTimeFoldTable, + toExecutableSqlTransaction, + toQueryableSqlQuery, +} from "./index"; + +// any is used here because the verifier works with heterogeneous table types +const allInitializedTables: Table[] = []; +function trackTable>(table: T): T { + allInitializedTables.push(table); + return table; +} +function tracked Table>(fn: Fn): Fn { + return ((...args: unknown[]) => trackTable(fn(...args))) as Fn; +} + +const declareCompactTable = tracked(_declareCompactTable); +const declareConcatTable = tracked(_declareConcatTable); +const declareFilterTable = tracked(_declareFilterTable); +const declareFlatMapTable = tracked(_declareFlatMapTable); +const declareGroupByTable = tracked(_declareGroupByTable); +const declareLeftJoinTable = tracked(_declareLeftJoinTable); +const declareLFoldTable = tracked(_declareLFoldTable); +const declareLimitTable = tracked(_declareLimitTable); +const declareMapTable = tracked(_declareMapTable); +const declareReduceTable = tracked(_declareReduceTable); +const declareSortTable = tracked(_declareSortTable); +const declareStoredTable = tracked(_declareStoredTable); +const declareTimeFoldTable = tracked(_declareTimeFoldTable); type TestDb = { full: string, base: string }; @@ -675,7 +716,13 @@ describe.sequential("bulldozer db fuzz composition (real postgres)", () => { `; }); - afterEach(() => { + afterEach(async () => { + for (const table of allInitializedTables) { + const errors = await readRows(table.verifyDataIntegrity(), "afterEach.verifyDataIntegrity"); + expect(errors).toEqual([]); + } + allInitializedTables.length = 0; + if (!FUZZ_TRACE_ENABLED) return; const testName = getCurrentTestNameForTrace(); const bucket = tracesByTest.get(testName); diff --git a/apps/backend/src/lib/bulldozer/db/index.perf.test.ts b/apps/backend/src/lib/bulldozer/db/index.perf.test.ts index de3bab1232..eccfdbab51 100644 --- a/apps/backend/src/lib/bulldozer/db/index.perf.test.ts +++ b/apps/backend/src/lib/bulldozer/db/index.perf.test.ts @@ -1,7 +1,48 @@ import { stringCompare } from "@stackframe/stack-shared/dist/utils/strings"; import postgres from "postgres"; -import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -import { declareCompactTable, declareConcatTable, declareFilterTable, declareFlatMapTable, declareGroupByTable, declareLeftJoinTable, declareLFoldTable, declareLimitTable, declareMapTable, declareReduceTable, declareSortTable, declareStoredTable, declareTimeFoldTable, toExecutableSqlTransaction, toQueryableSqlQuery } from "./index"; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { Table } from "./index"; +import { + declareCompactTable as _declareCompactTable, + declareConcatTable as _declareConcatTable, + declareFilterTable as _declareFilterTable, + declareFlatMapTable as _declareFlatMapTable, + declareGroupByTable as _declareGroupByTable, + declareLeftJoinTable as _declareLeftJoinTable, + declareLFoldTable as _declareLFoldTable, + declareLimitTable as _declareLimitTable, + declareMapTable as _declareMapTable, + declareReduceTable as _declareReduceTable, + declareSortTable as _declareSortTable, + declareStoredTable as _declareStoredTable, + declareTimeFoldTable as _declareTimeFoldTable, + toExecutableSqlTransaction, + toQueryableSqlQuery, +} from "./index"; + +// any is used here because the verifier works with heterogeneous table types +const allInitializedTables: Table[] = []; +function trackTable>(table: T): T { + allInitializedTables.push(table); + return table; +} +function tracked Table>(fn: Fn): Fn { + return ((...args: unknown[]) => trackTable(fn(...args))) as Fn; +} + +const declareCompactTable = tracked(_declareCompactTable); +const declareConcatTable = tracked(_declareConcatTable); +const declareFilterTable = tracked(_declareFilterTable); +const declareFlatMapTable = tracked(_declareFlatMapTable); +const declareGroupByTable = tracked(_declareGroupByTable); +const declareLeftJoinTable = tracked(_declareLeftJoinTable); +const declareLFoldTable = tracked(_declareLFoldTable); +const declareLimitTable = tracked(_declareLimitTable); +const declareMapTable = tracked(_declareMapTable); +const declareReduceTable = tracked(_declareReduceTable); +const declareSortTable = tracked(_declareSortTable); +const declareStoredTable = tracked(_declareStoredTable); +const declareTimeFoldTable = tracked(_declareTimeFoldTable); type TestDb = { full: string, base: string }; type SqlExpression = { type: "expression", sql: string }; @@ -315,6 +356,14 @@ describe.sequential("bulldozer db performance (real postgres)", () => { `; }); + afterEach(async () => { + for (const table of allInitializedTables) { + const errors = await readRows(table.verifyDataIntegrity()); + expect(errors).toEqual([]); + } + allInitializedTables.length = 0; + }); + afterAll(async () => { await sql.end(); await adminSql.unsafe(` diff --git a/apps/backend/src/lib/bulldozer/db/index.test.ts b/apps/backend/src/lib/bulldozer/db/index.test.ts index 333f6a1336..7d627e7536 100644 --- a/apps/backend/src/lib/bulldozer/db/index.test.ts +++ b/apps/backend/src/lib/bulldozer/db/index.test.ts @@ -2,7 +2,7 @@ import { stringCompare, templateIdentity } from "@stackframe/stack-shared/dist/u import postgres from "postgres"; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, test } from "vitest"; import type { Table } from "./index"; -import { declareCompactTable, declareConcatTable, declareFilterTable, declareFlatMapTable, declareGroupByTable, declareLeftJoinTable, declareLFoldTable, declareLimitTable, declareMapTable, declareReduceTable, declareSortTable, declareStoredTable, declareTimeFoldTable, toExecutableSqlTransaction, toQueryableSqlQuery, verifyAllTablesIntegrity } from "./index"; +import { declareCompactTable, declareConcatTable, declareFilterTable, declareFlatMapTable, declareGroupByTable, declareLeftJoinTable, declareLFoldTable, declareLimitTable, declareMapTable, declareReduceTable, declareSortTable, declareStoredTable, declareTimeFoldTable, toExecutableSqlTransaction, toQueryableSqlQuery } from "./index"; type TestDb = { full: string, base: string }; @@ -243,8 +243,8 @@ describe.sequential("declareStoredTable (real postgres)", () => { } afterEach(async () => { - if (allInitializedTables.length > 0) { - const errors = await readRows(verifyAllTablesIntegrity(allInitializedTables)); + for (const table of allInitializedTables) { + const errors = await readRows(table.verifyDataIntegrity()); expect(errors).toEqual([]); } allInitializedTables.length = 0; diff --git a/apps/backend/src/lib/bulldozer/db/index.ts b/apps/backend/src/lib/bulldozer/db/index.ts index b704cf6815..039c2fc9e5 100644 --- a/apps/backend/src/lib/bulldozer/db/index.ts +++ b/apps/backend/src/lib/bulldozer/db/index.ts @@ -3,7 +3,7 @@ import { deindent } from "@stackframe/stack-shared/dist/utils/strings"; import { BULLDOZER_SORT_HELPERS_SQL } from "./bulldozer-sort-helpers-sql"; import type { Json, RowData, RowIdentifier, SqlExpression, SqlQuery, SqlStatement, TableId } from "./utilities"; -import { quoteSqlIdentifier, sqlQuery, tableIdToDebugString } from "./utilities"; +import { quoteSqlIdentifier } from "./utilities"; // ====== Table implementations ====== // IMPORTANT NOTE: For every new table implementation, we should also add tests (unit, fuzzing, & perf; including an entry in the "hundreds of thousands" perf test), an example in the example schema, and support in Bulldozer Studio. @@ -141,17 +141,3 @@ export function toExecutableSqlTransaction(statements: SqlStatement[], options: COMMIT; `; } - -// any is used here because the verifier works with heterogeneous table types -export function verifyAllTablesIntegrity(tables: Table[]): SqlQuery> { - if (tables.length === 0) { - return sqlQuery`SELECT NULL::text AS tableid, NULL::text AS errortype, NULL::jsonb AS groupkey, NULL::text AS rowidentifier, NULL::jsonb AS expected, NULL::jsonb AS actual WHERE false`; - } - const combined: { sql: string } = { - sql: tables.map(t => { - const label = tableIdToDebugString(t.tableId).replaceAll("'", "''"); - return `SELECT '${label}' AS tableid, "v".* FROM (${t.verifyDataIntegrity().sql}) AS "v"`; - }).join("\nUNION ALL\n"), - }; - return sqlQuery`${combined}`; -} diff --git a/apps/backend/src/lib/bulldozer/db/utilities.ts b/apps/backend/src/lib/bulldozer/db/utilities.ts index 9d580a8452..6856777094 100644 --- a/apps/backend/src/lib/bulldozer/db/utilities.ts +++ b/apps/backend/src/lib/bulldozer/db/utilities.ts @@ -6,7 +6,7 @@ const sqlTemplateLiteral = (type: T) => (strings: TemplateStringsArray, ...va export type SqlStatement = { type: "statement", outputName?: string, outputColumns?: string, sql: string, requiresSequentialExecution?: boolean }; export const sqlStatement = sqlTemplateLiteral<"statement">("statement"); -export type SqlQuery = void> = { type: "query", sql: string, toStatement(outputName?: string): SqlStatement }; +export type SqlQuery = void> = { type: "query", sql: string, toStatement(outputName?: string, outputColumns?: string): SqlStatement }; export const sqlQuery = (...args: Parameters>>) => { return { ...sqlTemplateLiteral<"query">("query")(...args), diff --git a/apps/backend/src/lib/tokens.tsx b/apps/backend/src/lib/tokens.tsx index 82bb8f4afe..fe93b40fd9 100644 --- a/apps/backend/src/lib/tokens.tsx +++ b/apps/backend/src/lib/tokens.tsx @@ -318,6 +318,7 @@ export async function generateAccessTokenFromRefreshTokenIfValid(options: Refres is_restricted: user.is_restricted, restricted_reason: user.restricted_reason, requires_totp_mfa: user.requires_totp_mfa, + signed_up_at: user.signed_up_at_millis, }; // Validate the payload matches the accessTokenSchema before signing, to catch inconsistencies early From 3375c2896e5613eec5a736b9d3eae213a12999cc Mon Sep 17 00:00:00 2001 From: Konsti Wohlwend Date: Tue, 14 Apr 2026 21:38:08 -0700 Subject: [PATCH 3/3] Update apps/backend/src/lib/tokens.tsx Co-authored-by: vercel[bot] <35613825+vercel[bot]@users.noreply.github.com> --- apps/backend/src/lib/tokens.tsx | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/backend/src/lib/tokens.tsx b/apps/backend/src/lib/tokens.tsx index 374088d35b..a7dcfd4101 100644 --- a/apps/backend/src/lib/tokens.tsx +++ b/apps/backend/src/lib/tokens.tsx @@ -319,7 +319,6 @@ export async function generateAccessTokenFromRefreshTokenIfValid(options: Refres is_restricted: user.is_restricted, restricted_reason: user.restricted_reason, requires_totp_mfa: user.requires_totp_mfa, - signed_up_at: user.signed_up_at_millis, }; // Validate the payload matches the accessTokenSchema before signing, to catch inconsistencies early