diff --git a/apps/backend/scripts/verify-data-integrity/index.ts b/apps/backend/scripts/verify-data-integrity/index.ts index f63ae32a7f..5fc8ad900e 100644 --- a/apps/backend/scripts/verify-data-integrity/index.ts +++ b/apps/backend/scripts/verify-data-integrity/index.ts @@ -1,4 +1,7 @@ +import { toQueryableSqlQuery } from "@/lib/bulldozer/db/index"; +import { tableIdToDebugString } from "@/lib/bulldozer/db/utilities"; import { syncExternalDatabases } from "@/lib/external-db-sync"; +import { createPaymentsSchema } from "@/lib/payments/schema/index"; import { DEFAULT_BRANCH_ID, getSoleTenancyFromProjectBranch } from "@/lib/tenancies"; import { getPrismaClientForTenancy, globalPrismaClient } from "@/prisma-client"; import type { OrganizationRenderedConfig } from "@stackframe/stack-shared/dist/config/schema"; @@ -168,6 +171,21 @@ async function main() { console.log(`Will check at most ${maxUsersPerProject} users per project.`); } + await recurse(`[bulldozer] verifying data integrity across all payments tables`, async () => { + const schema = createPaymentsSchema(); + for (const table of schema._allTables) { + const label = tableIdToDebugString(table.tableId); + await recurse(`[bulldozer table] ${label}`, async () => { + const errors = await prismaClient.$queryRawUnsafe(toQueryableSqlQuery(table.verifyDataIntegrity())); + if (errors.length > 0) { + throw new StackAssertionError(deindent` + Bulldozer data integrity violation in table ${label}: found ${errors.length} error row(s). + `, { errors }); + } + }); + } + }); + const endAt = Math.min(startAt + count, projects.length); for (let i = startAt; i < endAt; i++) { const projectId = projects[i].id; diff --git a/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts b/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts index b9e73d5f3b..03ff3538b1 100644 --- a/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts +++ b/apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts @@ -1,7 +1,48 @@ import { stringCompare } from "@stackframe/stack-shared/dist/utils/strings"; import postgres from "postgres"; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test } from "vitest"; -import { declareCompactTable, declareConcatTable, declareFilterTable, declareFlatMapTable, declareGroupByTable, declareLeftJoinTable, declareLFoldTable, declareLimitTable, declareMapTable, declareReduceTable, declareSortTable, declareStoredTable, declareTimeFoldTable, toExecutableSqlTransaction, toQueryableSqlQuery } from "./index"; +import type { Table } from "./index"; +import { + declareCompactTable as _declareCompactTable, + declareConcatTable as _declareConcatTable, + declareFilterTable as _declareFilterTable, + declareFlatMapTable as _declareFlatMapTable, + declareGroupByTable as _declareGroupByTable, + declareLeftJoinTable as _declareLeftJoinTable, + declareLFoldTable as _declareLFoldTable, + declareLimitTable as _declareLimitTable, + declareMapTable as _declareMapTable, + declareReduceTable as _declareReduceTable, + declareSortTable as _declareSortTable, + declareStoredTable as _declareStoredTable, + declareTimeFoldTable as _declareTimeFoldTable, + toExecutableSqlTransaction, + toQueryableSqlQuery, +} from "./index"; + +// any is used here because the verifier works with heterogeneous table types +const allInitializedTables: Table[] = []; +function trackTable>(table: T): T { + allInitializedTables.push(table); + return table; +} +function tracked Table>(fn: Fn): Fn { + return ((...args: unknown[]) => trackTable(fn(...args))) as Fn; +} + +const declareCompactTable = tracked(_declareCompactTable); +const declareConcatTable = tracked(_declareConcatTable); +const declareFilterTable = tracked(_declareFilterTable); +const declareFlatMapTable = tracked(_declareFlatMapTable); +const declareGroupByTable = tracked(_declareGroupByTable); +const declareLeftJoinTable = tracked(_declareLeftJoinTable); +const declareLFoldTable = tracked(_declareLFoldTable); +const declareLimitTable = tracked(_declareLimitTable); +const declareMapTable = tracked(_declareMapTable); +const declareReduceTable = tracked(_declareReduceTable); +const declareSortTable = tracked(_declareSortTable); +const declareStoredTable = tracked(_declareStoredTable); +const declareTimeFoldTable = tracked(_declareTimeFoldTable); type TestDb = { full: string, base: string }; @@ -675,7 +716,13 @@ describe.sequential("bulldozer db fuzz composition (real postgres)", () => { `; }); - afterEach(() => { + afterEach(async () => { + for (const table of allInitializedTables) { + const errors = await readRows(table.verifyDataIntegrity(), "afterEach.verifyDataIntegrity"); + expect(errors).toEqual([]); + } + allInitializedTables.length = 0; + if (!FUZZ_TRACE_ENABLED) return; const testName = getCurrentTestNameForTrace(); const bucket = tracesByTest.get(testName); diff --git a/apps/backend/src/lib/bulldozer/db/index.perf.test.ts b/apps/backend/src/lib/bulldozer/db/index.perf.test.ts index 4ca0e41d16..54f2f73978 100644 --- a/apps/backend/src/lib/bulldozer/db/index.perf.test.ts +++ b/apps/backend/src/lib/bulldozer/db/index.perf.test.ts @@ -1,7 +1,48 @@ import { stringCompare } from "@stackframe/stack-shared/dist/utils/strings"; import postgres from "postgres"; -import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -import { declareCompactTable, declareConcatTable, declareFilterTable, declareFlatMapTable, declareGroupByTable, declareLeftJoinTable, declareLFoldTable, declareLimitTable, declareMapTable, declareReduceTable, declareSortTable, declareStoredTable, declareTimeFoldTable, toExecutableSqlTransaction, toQueryableSqlQuery } from "./index"; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { Table } from "./index"; +import { + declareCompactTable as _declareCompactTable, + declareConcatTable as _declareConcatTable, + declareFilterTable as _declareFilterTable, + declareFlatMapTable as _declareFlatMapTable, + declareGroupByTable as _declareGroupByTable, + declareLeftJoinTable as _declareLeftJoinTable, + declareLFoldTable as _declareLFoldTable, + declareLimitTable as _declareLimitTable, + declareMapTable as _declareMapTable, + declareReduceTable as _declareReduceTable, + declareSortTable as _declareSortTable, + declareStoredTable as _declareStoredTable, + declareTimeFoldTable as _declareTimeFoldTable, + toExecutableSqlTransaction, + toQueryableSqlQuery, +} from "./index"; + +// any is used here because the verifier works with heterogeneous table types +const allInitializedTables: Table[] = []; +function trackTable>(table: T): T { + allInitializedTables.push(table); + return table; +} +function tracked Table>(fn: Fn): Fn { + return ((...args: unknown[]) => trackTable(fn(...args))) as Fn; +} + +const declareCompactTable = tracked(_declareCompactTable); +const declareConcatTable = tracked(_declareConcatTable); +const declareFilterTable = tracked(_declareFilterTable); +const declareFlatMapTable = tracked(_declareFlatMapTable); +const declareGroupByTable = tracked(_declareGroupByTable); +const declareLeftJoinTable = tracked(_declareLeftJoinTable); +const declareLFoldTable = tracked(_declareLFoldTable); +const declareLimitTable = tracked(_declareLimitTable); +const declareMapTable = tracked(_declareMapTable); +const declareReduceTable = tracked(_declareReduceTable); +const declareSortTable = tracked(_declareSortTable); +const declareStoredTable = tracked(_declareStoredTable); +const declareTimeFoldTable = tracked(_declareTimeFoldTable); type TestDb = { full: string, base: string }; type SqlExpression = { type: "expression", sql: string }; @@ -315,6 +356,14 @@ describe.sequential("bulldozer db performance (real postgres)", () => { `; }); + afterEach(async () => { + for (const table of allInitializedTables) { + const errors = await readRows(table.verifyDataIntegrity()); + expect(errors).toEqual([]); + } + allInitializedTables.length = 0; + }); + afterAll(async () => { await sql.end(); await adminSql.unsafe(` diff --git a/apps/backend/src/lib/bulldozer/db/index.test.ts b/apps/backend/src/lib/bulldozer/db/index.test.ts index 04f67c1cfa..9282dcfd00 100644 --- a/apps/backend/src/lib/bulldozer/db/index.test.ts +++ b/apps/backend/src/lib/bulldozer/db/index.test.ts @@ -1,6 +1,7 @@ import { stringCompare, templateIdentity } from "@stackframe/stack-shared/dist/utils/strings"; import postgres from "postgres"; -import { afterAll, beforeAll, beforeEach, describe, expect, it, test } from "vitest"; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, test } from "vitest"; +import type { Table } from "./index"; import { declareCompactTable, declareConcatTable, declareFilterTable, declareFlatMapTable, declareGroupByTable, declareLeftJoinTable, declareLFoldTable, declareLimitTable, declareMapTable, declareReduceTable, declareSortTable, declareStoredTable, declareTimeFoldTable, toExecutableSqlTransaction, toQueryableSqlQuery } from "./index"; type TestDb = { full: string, base: string }; @@ -234,6 +235,21 @@ describe.sequential("declareStoredTable (real postgres)", () => { `; }); + // any is used here because the verifier works with heterogeneous table types + const allInitializedTables: Table[] = []; + function trackTable>(t: T): T { + allInitializedTables.push(t); + return t; + } + + afterEach(async () => { + for (const table of allInitializedTables) { + const errors = await readRows(table.verifyDataIntegrity()); + expect(errors).toEqual([]); + } + allInitializedTables.length = 0; + }); + afterAll(async () => { await sql.end(); await adminSql.unsafe(` @@ -269,28 +285,28 @@ describe.sequential("declareStoredTable (real postgres)", () => { } function createGroupedTable() { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); return { fromTable, groupedTable }; } function createMappedTable() { const { fromTable, groupedTable } = createGroupedTable(); - const mappedTable = declareMapTable({ + const mappedTable = trackTable(declareMapTable({ tableId: "users-by-team-mapped", fromTable: groupedTable, mapper: mapper(` ("rowData"->'team') AS "team", (("rowData"->>'value')::int + 100) AS "mappedValue" `), - }); + })); return { fromTable, groupedTable, mappedTable }; } function createFlatMappedTable() { const { fromTable, groupedTable } = createGroupedTable(); - const flatMappedTable = declareFlatMapTable({ + const flatMappedTable = trackTable(declareFlatMapTable({ tableId: "users-by-team-flat-mapped", fromTable: groupedTable, mapper: mapper(` @@ -310,78 +326,78 @@ describe.sequential("declareStoredTable (real postgres)", () => { ) END AS "rows" `), - }); + })); return { fromTable, groupedTable, flatMappedTable }; } function createFilteredTable() { const { fromTable, groupedTable } = createGroupedTable(); - const filteredTable = declareFilterTable({ + const filteredTable = trackTable(declareFilterTable({ tableId: "users-by-team-filtered", fromTable: groupedTable, filter: predicate(`(("rowData"->>'value')::int) >= 2`), - }); + })); return { fromTable, groupedTable, filteredTable }; } function createLimitedTable() { const { fromTable, groupedTable } = createGroupedTable(); - const limitedTable = declareLimitTable({ + const limitedTable = trackTable(declareLimitTable({ tableId: "users-by-team-limited", fromTable: groupedTable, limit: expr(`2`), - }); + })); return { fromTable, groupedTable, limitedTable }; } function createConcatenatedTable() { const fromTableA = declareStoredTable<{ value: number, team: string }>({ tableId: "users-a" }); const fromTableB = declareStoredTable<{ value: number, team: string }>({ tableId: "users-b" }); - const groupedTableA = declareGroupByTable({ + const groupedTableA = trackTable(declareGroupByTable({ tableId: "users-a-by-team", fromTable: fromTableA, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const groupedTableB = declareGroupByTable({ + })); + const groupedTableB = trackTable(declareGroupByTable({ tableId: "users-b-by-team", fromTable: fromTableB, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const concatenatedTable = declareConcatTable({ + })); + const concatenatedTable = trackTable(declareConcatTable({ tableId: "users-by-team-concat", tables: [groupedTableA, groupedTableB], - }); + })); return { fromTableA, fromTableB, groupedTableA, groupedTableB, concatenatedTable }; } function createSortedTable() { const { fromTable, groupedTable } = createGroupedTable(); - const sortedTable = declareSortTable({ + const sortedTable = trackTable(declareSortTable({ tableId: "users-by-team-sorted", fromTable: groupedTable, getSortKey: mapper(`(("rowData"->>'value')::int) AS "newSortKey"`), compareSortKeys: (a, b) => expr(`(((${a.sql}) #>> '{}')::int) - (((${b.sql}) #>> '{}')::int)`), - }); + })); return { fromTable, groupedTable, sortedTable }; } function createDescendingSortedTable() { const { fromTable, groupedTable } = createGroupedTable(); - const sortedTable = declareSortTable({ + const sortedTable = trackTable(declareSortTable({ tableId: "users-by-team-sorted-desc", fromTable: groupedTable, getSortKey: mapper(`(("rowData"->>'value')::int) AS "newSortKey"`), compareSortKeys: (a, b) => expr(`(((${b.sql}) #>> '{}')::int) - (((${a.sql}) #>> '{}')::int)`), - }); + })); return { fromTable, groupedTable, sortedTable }; } function createDescendingLimitedTable() { const { fromTable, groupedTable, sortedTable } = createDescendingSortedTable(); - const limitedTable = declareLimitTable({ + const limitedTable = trackTable(declareLimitTable({ tableId: "users-by-team-limit-desc", fromTable: sortedTable, limit: expr(`2`), - }); + })); return { fromTable, groupedTable, sortedTable, limitedTable }; } function createDescendingLFoldTable() { const { fromTable, groupedTable, sortedTable } = createDescendingSortedTable(); - const lFoldTable = declareLFoldTable({ + const lFoldTable = trackTable(declareLFoldTable({ tableId: "users-by-team-lfold-desc", fromTable: sortedTable, initialState: expr(`'0'::jsonb`), @@ -393,12 +409,12 @@ describe.sequential("declareStoredTable (real postgres)", () => { ) ) AS "newRowsData" `), - }); + })); return { fromTable, groupedTable, sortedTable, lFoldTable }; } function createLFoldTable() { const { fromTable, groupedTable, sortedTable } = createSortedTable(); - const lFoldTable = declareLFoldTable({ + const lFoldTable = trackTable(declareLFoldTable({ tableId: "users-by-team-lfold", fromTable: sortedTable, initialState: expr(`'0'::jsonb`), @@ -430,12 +446,12 @@ describe.sequential("declareStoredTable (real postgres)", () => { END ) AS "newRowsData" `), - }); + })); return { fromTable, groupedTable, sortedTable, lFoldTable }; } function createTimeFoldTable() { const { fromTable, groupedTable } = createGroupedTable(); - const timeFoldTable = declareTimeFoldTable({ + const timeFoldTable = trackTable(declareTimeFoldTable({ tableId: "users-by-team-timefold", fromTable: groupedTable, initialState: expr(`'0'::jsonb`), @@ -459,34 +475,34 @@ describe.sequential("declareStoredTable (real postgres)", () => { ELSE NULL::timestamptz END AS "nextTimestamp" `), - }); + })); return { fromTable, groupedTable, timeFoldTable }; } function createLeftJoinedTable() { const fromTable = declareStoredTable<{ value: number, team: string | null }>({ tableId: "left-join-users" }); const joinTable = declareStoredTable<{ team: string | null, threshold: number, label: string }>({ tableId: "left-join-rules" }); - const groupedFromTable = declareGroupByTable({ + const groupedFromTable = trackTable(declareGroupByTable({ tableId: "left-join-users-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const groupedJoinTable = declareGroupByTable({ + })); + const groupedJoinTable = trackTable(declareGroupByTable({ tableId: "left-join-rules-by-team", fromTable: joinTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const leftJoinedTable = declareLeftJoinTable({ + })); + const leftJoinedTable = trackTable(declareLeftJoinTable({ tableId: "left-join-users-rules", leftTable: groupedFromTable, rightTable: groupedJoinTable, leftJoinKey: mapper(`(("rowData"->>'value')::int) AS "joinKey"`), rightJoinKey: mapper(`(("rowData"->>'threshold')::int) AS "joinKey"`), - }); + })); return { fromTable, joinTable, groupedFromTable, groupedJoinTable, leftJoinedTable }; } function createFlatMapMapGroupPipeline() { const { fromTable, groupedTable, flatMappedTable } = createFlatMappedTable(); - const mappedAfterFlatMap = declareMapTable({ + const mappedAfterFlatMap = trackTable(declareMapTable({ tableId: "users-by-team-flat-map-then-map", fromTable: flatMappedTable, mapper: mapper(` @@ -494,25 +510,25 @@ describe.sequential("declareStoredTable (real postgres)", () => { ("rowData"->'kind') AS "kind", (("rowData"->>'mappedValue')::int + 1) AS "mappedValuePlusOne" `), - }); - const groupedByKind = declareGroupByTable({ + })); + const groupedByKind = trackTable(declareGroupByTable({ tableId: "users-by-kind", fromTable: mappedAfterFlatMap, groupBy: mapper(`"rowData"->'kind' AS "groupKey"`), - }); + })); return { fromTable, groupedTable, flatMappedTable, mappedAfterFlatMap, groupedByKind }; } function createStackedMappedTables() { const { fromTable, groupedTable } = createGroupedTable(); - const mappedTableLevel1 = declareMapTable({ + const mappedTableLevel1 = trackTable(declareMapTable({ tableId: "users-by-team-map-level-1", fromTable: groupedTable, mapper: mapper(` ("rowData"->'team') AS "team", (("rowData"->>'value')::int + 10) AS "valuePlusTen" `), - }); - const mappedTableLevel2 = declareMapTable({ + })); + const mappedTableLevel2 = trackTable(declareMapTable({ tableId: "users-by-team-map-level-2", fromTable: mappedTableLevel1, mapper: mapper(` @@ -525,16 +541,16 @@ describe.sequential("declareStoredTable (real postgres)", () => { END ) AS "bucket" `), - }); + })); return { fromTable, groupedTable, mappedTableLevel1, mappedTableLevel2 }; } function createGroupMapGroupPipeline() { const { fromTable, groupedTable, mappedTableLevel1, mappedTableLevel2 } = createStackedMappedTables(); - const groupedByBucketTable = declareGroupByTable({ + const groupedByBucketTable = trackTable(declareGroupByTable({ tableId: "users-by-bucket", fromTable: mappedTableLevel2, groupBy: mapper(`"rowData"->'bucket' AS "groupKey"`), - }); + })); return { fromTable, groupedTable, mappedTableLevel1, mappedTableLevel2, groupedByBucketTable }; } function registerGroupAuditTrigger( @@ -829,11 +845,11 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("groupBy registers upstream trigger in init and deregisters in delete", () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-groupby-lifecycle" }); const fromTableInstrumentation = instrumentTriggerLifecycle(fromTable); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-groupby-lifecycle-by-team", fromTable: fromTableInstrumentation.table, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); expect(fromTableInstrumentation.getStats()).toEqual({ registerCalls: 1, deregisterCalls: 0, activeRegistrations: 1 }); groupedTable.init(); @@ -852,17 +868,17 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("flatMap registers upstream trigger in init and deregisters in delete", () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-flatmap-lifecycle" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-flatmap-lifecycle-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); const groupedTableInstrumentation = instrumentTriggerLifecycle(groupedTable); - const flatMappedTable = declareFlatMapTable({ + const flatMappedTable = trackTable(declareFlatMapTable({ tableId: "users-flatmap-lifecycle-expanded", fromTable: groupedTableInstrumentation.table, mapper: mapper(`jsonb_build_array("rowData") AS "rows"`), - }); + })); expect(groupedTableInstrumentation.getStats()).toEqual({ registerCalls: 1, deregisterCalls: 0, activeRegistrations: 1 }); flatMappedTable.init(); @@ -877,18 +893,18 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("sort registers upstream trigger in init and deregisters in delete", () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-sort-lifecycle" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-sort-lifecycle-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); const groupedTableInstrumentation = instrumentTriggerLifecycle(groupedTable); - const sortedTable = declareSortTable({ + const sortedTable = trackTable(declareSortTable({ tableId: "users-sort-lifecycle-sorted", fromTable: groupedTableInstrumentation.table, getSortKey: mapper(`(("rowData"->>'value')::int) AS "newSortKey"`), compareSortKeys: (a, b) => expr(`(((${a.sql}) #>> '{}')::int) - (((${b.sql}) #>> '{}')::int)`), - }); + })); expect(groupedTableInstrumentation.getStats()).toEqual({ registerCalls: 1, deregisterCalls: 0, activeRegistrations: 1 }); sortedTable.init(); @@ -903,17 +919,17 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("limit registers upstream trigger in init and deregisters in delete", () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-limit-lifecycle" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-limit-lifecycle-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); const groupedTableInstrumentation = instrumentTriggerLifecycle(groupedTable); - const limitedTable = declareLimitTable({ + const limitedTable = trackTable(declareLimitTable({ tableId: "users-limit-lifecycle-limited", fromTable: groupedTableInstrumentation.table, limit: expr(`2`), - }); + })); expect(groupedTableInstrumentation.getStats()).toEqual({ registerCalls: 1, deregisterCalls: 0, activeRegistrations: 1 }); limitedTable.init(); @@ -929,22 +945,22 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("concat registers all upstream triggers in init and deregisters in delete", () => { const fromTableA = declareStoredTable<{ value: number, team: string }>({ tableId: "users-concat-lifecycle-a" }); const fromTableB = declareStoredTable<{ value: number, team: string }>({ tableId: "users-concat-lifecycle-b" }); - const groupedTableA = declareGroupByTable({ + const groupedTableA = trackTable(declareGroupByTable({ tableId: "users-concat-lifecycle-a-by-team", fromTable: fromTableA, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const groupedTableB = declareGroupByTable({ + })); + const groupedTableB = trackTable(declareGroupByTable({ tableId: "users-concat-lifecycle-b-by-team", fromTable: fromTableB, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); const groupedTableAInstrumentation = instrumentTriggerLifecycle(groupedTableA); const groupedTableBInstrumentation = instrumentTriggerLifecycle(groupedTableB); - const concatenatedTable = declareConcatTable({ + const concatenatedTable = trackTable(declareConcatTable({ tableId: "users-concat-lifecycle", tables: [groupedTableAInstrumentation.table, groupedTableBInstrumentation.table], - }); + })); expect(groupedTableAInstrumentation.getStats()).toEqual({ registerCalls: 1, deregisterCalls: 0, activeRegistrations: 1 }); expect(groupedTableBInstrumentation.getStats()).toEqual({ registerCalls: 1, deregisterCalls: 0, activeRegistrations: 1 }); @@ -968,13 +984,13 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("timefold registers upstream trigger in init and deregisters in delete", () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-timefold-lifecycle" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-timefold-lifecycle-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); const groupedTableInstrumentation = instrumentTriggerLifecycle(groupedTable); - const timeFoldTable = declareTimeFoldTable({ + const timeFoldTable = trackTable(declareTimeFoldTable({ tableId: "users-timefold-lifecycle-folded", fromTable: groupedTableInstrumentation.table, initialState: expr(`'0'::jsonb`), @@ -983,7 +999,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { jsonb_build_array("oldRowData") AS "newRowsData", NULL::timestamptz AS "nextTimestamp" `), - }); + })); expect(groupedTableInstrumentation.getStats()).toEqual({ registerCalls: 1, deregisterCalls: 0, activeRegistrations: 1 }); timeFoldTable.init(); @@ -999,25 +1015,25 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("leftJoin registers all upstream triggers in init and deregisters in delete", () => { const fromTable = declareStoredTable<{ value: number, team: string | null }>({ tableId: "users-left-join-lifecycle" }); const joinTable = declareStoredTable<{ team: string | null, threshold: number, label: string }>({ tableId: "rules-left-join-lifecycle" }); - const groupedFromTable = declareGroupByTable({ + const groupedFromTable = trackTable(declareGroupByTable({ tableId: "users-left-join-lifecycle-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const groupedJoinTable = declareGroupByTable({ + })); + const groupedJoinTable = trackTable(declareGroupByTable({ tableId: "rules-left-join-lifecycle-by-team", fromTable: joinTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); + })); const groupedFromTableInstrumentation = instrumentTriggerLifecycle(groupedFromTable); const groupedJoinTableInstrumentation = instrumentTriggerLifecycle(groupedJoinTable); - const leftJoinedTable = declareLeftJoinTable({ + const leftJoinedTable = trackTable(declareLeftJoinTable({ tableId: "users-rules-left-join-lifecycle", leftTable: groupedFromTableInstrumentation.table, rightTable: groupedJoinTableInstrumentation.table, leftJoinKey: mapper(`(("rowData"->>'value')::int) AS "joinKey"`), rightJoinKey: mapper(`(("rowData"->>'threshold')::int) AS "joinKey"`), - }); + })); expect(groupedFromTableInstrumentation.getStats()).toEqual({ registerCalls: 1, deregisterCalls: 0, activeRegistrations: 1 }); expect(groupedJoinTableInstrumentation.getStats()).toEqual({ registerCalls: 1, deregisterCalls: 0, activeRegistrations: 1 }); @@ -1984,7 +2000,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("mapTable matches equivalent single-row flatMap for rows, groups, and trigger payloads", async () => { const { fromTable, groupedTable, mappedTable } = createMappedTable(); - const equivalentFlatMapTable = declareFlatMapTable({ + const equivalentFlatMapTable = trackTable(declareFlatMapTable({ tableId: "users-by-team-mapped-equivalent-flatmap", fromTable: groupedTable, mapper: mapper(` @@ -2002,7 +2018,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { ) ) AS "rows" `), - }); + })); await runStatements(fromTable.init()); await runStatements(groupedTable.init()); @@ -2805,35 +2821,35 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("concatTable allows input tables with different sort comparators", async () => { const fromTableAsc = declareStoredTable<{ value: number, team: string }>({ tableId: "users-concat-sort-asc" }); - const groupedTableAsc = declareGroupByTable({ + const groupedTableAsc = trackTable(declareGroupByTable({ tableId: "users-concat-sort-asc-by-team", fromTable: fromTableAsc, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const sortedTableAsc = declareSortTable({ + })); + const sortedTableAsc = trackTable(declareSortTable({ tableId: "users-concat-sort-asc-sorted", fromTable: groupedTableAsc, getSortKey: mapper(`(("rowData"->>'value')::int) AS "newSortKey"`), compareSortKeys: (a, b) => expr(`(((${a.sql}) #>> '{}')::int) - (((${b.sql}) #>> '{}')::int)`), - }); + })); const fromTableDesc = declareStoredTable<{ value: number, team: string }>({ tableId: "users-concat-sort-desc" }); - const groupedTableDesc = declareGroupByTable({ + const groupedTableDesc = trackTable(declareGroupByTable({ tableId: "users-concat-sort-desc-by-team", fromTable: fromTableDesc, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const sortedTableDesc = declareSortTable({ + })); + const sortedTableDesc = trackTable(declareSortTable({ tableId: "users-concat-sort-desc-sorted", fromTable: groupedTableDesc, getSortKey: mapper(`(("rowData"->>'value')::int) AS "newSortKey"`), compareSortKeys: (a, b) => expr(`(((${b.sql}) #>> '{}')::int) - (((${a.sql}) #>> '{}')::int)`), - }); + })); - const concatenatedTable = declareConcatTable({ + const concatenatedTable = trackTable(declareConcatTable({ tableId: "users-by-team-concat-sort-mismatch", tables: [sortedTableAsc, sortedTableDesc], - }); + })); await runStatements(fromTableAsc.init()); await runStatements(groupedTableAsc.init()); @@ -3287,12 +3303,12 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("timeFoldTable reruns immediately when reducer timestamp is already due", async () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-timefold-immediate" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-timefold-immediate-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const timeFoldTable = declareTimeFoldTable({ + })); + const timeFoldTable = trackTable(declareTimeFoldTable({ tableId: "users-timefold-immediate-folded", fromTable: groupedTable, initialState: expr(`'0'::jsonb`), @@ -3321,7 +3337,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { ELSE NULL::timestamptz END AS "nextTimestamp" `), - }); + })); await runStatements(fromTable.init()); await runStatements(groupedTable.init()); @@ -3356,12 +3372,12 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("timeFoldTable does not enqueue when reducer returns null nextTimestamp", async () => { const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "users-timefold-no-queue" }); - const groupedTable = declareGroupByTable({ + const groupedTable = trackTable(declareGroupByTable({ tableId: "users-timefold-no-queue-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const timeFoldTable = declareTimeFoldTable({ + })); + const timeFoldTable = trackTable(declareTimeFoldTable({ tableId: "users-timefold-no-queue-folded", fromTable: groupedTable, initialState: expr(`'0'::jsonb`), @@ -3375,7 +3391,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { ) AS "newRowsData", NULL::timestamptz AS "nextTimestamp" `), - }); + })); await runStatements(fromTable.init()); await runStatements(groupedTable.init()); @@ -3509,23 +3525,23 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("leftJoinTable matches null join keys with IS NOT DISTINCT FROM semantics", async () => { const fromTable = declareStoredTable<{ value: number | null, team: string | null }>({ tableId: "left-join-null-users" }); const joinTable = declareStoredTable<{ threshold: number | null, team: string | null, label: string }>({ tableId: "left-join-null-rules" }); - const groupedFromTable = declareGroupByTable({ + const groupedFromTable = trackTable(declareGroupByTable({ tableId: "left-join-null-users-by-team", fromTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const groupedJoinTable = declareGroupByTable({ + })); + const groupedJoinTable = trackTable(declareGroupByTable({ tableId: "left-join-null-rules-by-team", fromTable: joinTable, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const leftJoinedTable = declareLeftJoinTable({ + })); + const leftJoinedTable = trackTable(declareLeftJoinTable({ tableId: "left-join-null-users-rules", leftTable: groupedFromTable, rightTable: groupedJoinTable, leftJoinKey: mapper(`"rowData"->'value' AS "joinKey"`), rightJoinKey: mapper(`"rowData"->'threshold' AS "joinKey"`), - }); + })); await runStatements(fromTable.init()); await runStatements(joinTable.init()); @@ -4121,22 +4137,22 @@ describe.sequential("declareStoredTable (real postgres)", () => { test("parallel map tables on the same grouped source stay isolated", async () => { const { fromTable, groupedTable } = createGroupedTable(); - const mapTableA = declareMapTable({ + const mapTableA = trackTable(declareMapTable({ tableId: "users-map-a", fromTable: groupedTable, mapper: mapper(` ("rowData"->'team') AS "team", (("rowData"->>'value')::int + 100) AS "mappedValueA" `), - }); - const mapTableB = declareMapTable({ + })); + const mapTableB = trackTable(declareMapTable({ tableId: "users-map-b", fromTable: groupedTable, mapper: mapper(` ("rowData"->'team') AS "team", ((("rowData"->>'value')::int) * -1) AS "mappedValueB" `), - }); + })); await runStatements(fromTable.init()); await runStatements(groupedTable.init()); @@ -4193,26 +4209,26 @@ describe.sequential("declareStoredTable (real postgres)", () => { const boundaries = declareStoredTable<{ t: number }>({ tableId: "compact-test-boundaries", }); - const entriesSorted = declareSortTable({ + const entriesSorted = trackTable(declareSortTable({ tableId: "compact-test-entries-sorted", fromTable: entries, getSortKey: mapper(`(("rowData"->>'t')::numeric) AS "newSortKey"`), compareSortKeys: (a, b) => ({ type: "expression", sql: `(((${a.sql}) #>> '{}')::numeric > ((${b.sql}) #>> '{}')::numeric)::int - (((${a.sql}) #>> '{}')::numeric < ((${b.sql}) #>> '{}')::numeric)::int` }), - }); - const boundariesSorted = declareSortTable({ + })); + const boundariesSorted = trackTable(declareSortTable({ tableId: "compact-test-boundaries-sorted", fromTable: boundaries, getSortKey: mapper(`(("rowData"->>'t')::numeric) AS "newSortKey"`), compareSortKeys: (a, b) => ({ type: "expression", sql: `(((${a.sql}) #>> '{}')::numeric > ((${b.sql}) #>> '{}')::numeric)::int - (((${a.sql}) #>> '{}')::numeric < ((${b.sql}) #>> '{}')::numeric)::int` }), - }); - const compacted = declareCompactTable({ + })); + const compacted = trackTable(declareCompactTable({ tableId: "compact-test-compacted", toBeCompactedTable: entriesSorted, boundaryTable: boundariesSorted, orderingKey: "t", compactKey: "quantity", partitionKey: "itemId", - }); + })); return { entries, boundaries, entriesSorted, boundariesSorted, compacted }; } @@ -4482,12 +4498,12 @@ describe.sequential("declareStoredTable (real postgres)", () => { const source = declareStoredTable<{ team: string, value: number }>({ tableId: "reduce-test-source", }); - const grouped = declareGroupByTable({ + const grouped = trackTable(declareGroupByTable({ tableId: "reduce-test-grouped", fromTable: source, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const reduced = declareReduceTable({ + })); + const reduced = trackTable(declareReduceTable({ tableId: "reduce-test-sum", fromTable: grouped, initialState: expr(`'0'::jsonb`), @@ -4501,7 +4517,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { "groupKey" AS "team", ("state" #>> '{}')::numeric AS "total" `), - }); + })); return { source, grouped, reduced }; } @@ -4510,18 +4526,18 @@ describe.sequential("declareStoredTable (real postgres)", () => { const source = declareStoredTable<{ category: string, label: string, t: number }>({ tableId: "reduce-test-arr-source", }); - const grouped = declareGroupByTable({ + const grouped = trackTable(declareGroupByTable({ tableId: "reduce-test-arr-grouped", fromTable: source, groupBy: mapper(`"rowData"->'category' AS "groupKey"`), - }); - const sorted = declareSortTable({ + })); + const sorted = trackTable(declareSortTable({ tableId: "reduce-test-arr-sorted", fromTable: grouped, getSortKey: mapper(`(("rowData"->>'t')::numeric) AS "newSortKey"`), compareSortKeys: (a, b) => ({ type: "expression", sql: `(((${a.sql}) #>> '{}')::numeric > ((${b.sql}) #>> '{}')::numeric)::int - (((${a.sql}) #>> '{}')::numeric < ((${b.sql}) #>> '{}')::numeric)::int` }), - }); - const reduced = declareReduceTable({ + })); + const reduced = trackTable(declareReduceTable({ tableId: "reduce-test-arr", fromTable: sorted, initialState: expr(`'[]'::jsonb`), @@ -4532,7 +4548,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { "groupKey" AS "category", "state" AS "labels" `), - }); + })); return { source, grouped, sorted, reduced }; } @@ -4606,7 +4622,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { const source = declareStoredTable<{ value: number }>({ tableId: "reduce-test-ungrouped-source", }); - const reduced = declareReduceTable({ + const reduced = trackTable(declareReduceTable({ tableId: "reduce-test-ungrouped", fromTable: source, initialState: expr(`'0'::jsonb`), @@ -4619,7 +4635,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { finalize: mapper(` ("state" #>> '{}')::numeric AS "total" `), - }); + })); await runStatements(source.init()); await runStatements(reduced.init()); @@ -4881,12 +4897,12 @@ describe.sequential("declareStoredTable (real postgres)", () => { const source = declareStoredTable<{ team: string | null, value: number }>({ tableId: "reduce-test-null-gk-source", }); - const grouped = declareGroupByTable({ + const grouped = trackTable(declareGroupByTable({ tableId: "reduce-test-null-gk-grouped", fromTable: source, groupBy: mapper(`"rowData"->'team' AS "groupKey"`), - }); - const reduced = declareReduceTable({ + })); + const reduced = trackTable(declareReduceTable({ tableId: "reduce-test-null-gk", fromTable: grouped, initialState: expr(`'0'::jsonb`), @@ -4900,7 +4916,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { "groupKey" AS "team", ("state" #>> '{}')::numeric AS "total" `), - }); + })); await runStatements(source.init()); await runStatements(grouped.init()); await runStatements(reduced.init()); @@ -4922,7 +4938,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { const source = declareStoredTable<{ tenancyId: string, customerId: string, value: number }>({ tableId: "reduce-test-complex-gk-source", }); - const grouped = declareGroupByTable({ + const grouped = trackTable(declareGroupByTable({ tableId: "reduce-test-complex-gk-grouped", fromTable: source, groupBy: mapper(` @@ -4931,8 +4947,8 @@ describe.sequential("declareStoredTable (real postgres)", () => { 'customerId', "rowData"->'customerId' ) AS "groupKey" `), - }); - const reduced = declareReduceTable({ + })); + const reduced = trackTable(declareReduceTable({ tableId: "reduce-test-complex-gk", fromTable: grouped, initialState: expr(`'0'::jsonb`), @@ -4947,7 +4963,7 @@ describe.sequential("declareStoredTable (real postgres)", () => { "groupKey"->'customerId' AS "customerId", ("state" #>> '{}')::numeric AS "total" `), - }); + })); await runStatements(source.init()); await runStatements(grouped.init()); await runStatements(reduced.init()); diff --git a/apps/backend/src/lib/bulldozer/db/index.ts b/apps/backend/src/lib/bulldozer/db/index.ts index 05600e2781..b29d2b71a4 100644 --- a/apps/backend/src/lib/bulldozer/db/index.ts +++ b/apps/backend/src/lib/bulldozer/db/index.ts @@ -38,6 +38,9 @@ export type Table = { * @param trigger A SQL statement that can reference the changes table with columns `groupKey: GK`, `rowIdentifier: RowIdentifier`, `oldRowSortKey: SK | null`, `newRowSortKey: SK | null`, `oldRowData: RowData | null`, `newRowData: RowData | null`. Note that this trigger should be a no-op if the table that created this trigger is not initialized. */ registerRowChangeTrigger(trigger: RowChangeTriggerInput): { deregister: () => void }, + + /** Returns a query producing error rows if materialized data differs from re-derivation from inputs. Empty result = healthy. */ + verifyDataIntegrity(): SqlQuery>, }; export type { RegisteredRowChangeTrigger }; diff --git a/apps/backend/src/lib/bulldozer/db/tables/compact-table.ts b/apps/backend/src/lib/bulldozer/db/tables/compact-table.ts index ae6be8749b..479af6096b 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/compact-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/compact-table.ts @@ -334,7 +334,7 @@ export function declareCompactTable< ); options.boundaryTable.registerRowChangeTrigger(boundaryTrigger); - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.toBeCompactedTable, options.boundaryTable], debugArgs: { @@ -487,5 +487,56 @@ export function declareCompactTable< triggers.set(id, normalizeRowChangeTrigger(trigger)); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allCompactedGroups = options.toBeCompactedTable.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allBoundaryGroups = options.boundaryTable.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "allGroups" AS ( + SELECT "g"."groupkey" AS "groupKey" FROM (${allCompactedGroups}) AS "g" + UNION + SELECT "g"."groupkey" AS "groupKey" FROM (${allBoundaryGroups}) AS "g" + ), + "expected" AS ( + SELECT + "groups"."groupKey" AS "groupKey", + "rows"."rowIdentifier" AS "rowIdentifier", + "rows"."rowData" AS "rowData" + FROM "allGroups" AS "groups" + CROSS JOIN LATERAL ( + ${computeCompactedRowsSql(sqlExpression`"groups"."groupKey"`)} + ) AS "rows" + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ) + SELECT + CASE + WHEN "expected"."rowIdentifier" IS NULL THEN 'extra_row' + WHEN "actual"."rowIdentifier" IS NULL THEN 'missing_row' + ELSE 'data_mismatch' + END AS errortype, + COALESCE("expected"."groupKey", "actual"."groupKey") AS groupkey, + COALESCE("expected"."rowIdentifier", "actual"."rowIdentifier") AS rowidentifier, + "expected"."rowData" AS expected, + "actual"."rowData" AS actual + FROM "expected" + FULL OUTER JOIN "actual" + ON "expected"."groupKey" IS NOT DISTINCT FROM "actual"."groupKey" + AND "expected"."rowIdentifier" = "actual"."rowIdentifier" + WHERE ("expected"."rowIdentifier" IS NULL + OR "actual"."rowIdentifier" IS NULL + OR "expected"."rowData" IS DISTINCT FROM "actual"."rowData") + AND ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/concat-table.ts b/apps/backend/src/lib/bulldozer/db/tables/concat-table.ts index 51b35195a0..a7d79ca053 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/concat-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/concat-table.ts @@ -205,5 +205,9 @@ export function declareConcatTable< triggers.set(id, normalizeRowChangeTrigger(trigger)); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => sqlQuery` + SELECT NULL::text AS errortype, NULL::jsonb AS groupkey, NULL::text AS rowidentifier, NULL::jsonb AS expected, NULL::jsonb AS actual + WHERE false + `, }; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/filter-table.ts b/apps/backend/src/lib/bulldozer/db/tables/filter-table.ts index 0c29bab72a..48a99d2f8d 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/filter-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/filter-table.ts @@ -77,6 +77,7 @@ export function declareFilterTable< "listGroups", "listRowsInGroup", "registerRowChangeTrigger", + "verifyDataIntegrity", ]), }; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/flat-map-table.ts b/apps/backend/src/lib/bulldozer/db/tables/flat-map-table.ts index 3ada1cdb04..4742cf4567 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/flat-map-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/flat-map-table.ts @@ -215,7 +215,7 @@ export function declareFlatMapTable< }); options.fromTable.registerRowChangeTrigger(fromTableTrigger); - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -402,5 +402,60 @@ export function declareFlatMapTable< triggers.set(id, normalizeRowChangeTrigger(trigger)); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputRows = options.fromTable.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "expected" AS ( + SELECT + "source"."groupkey" AS "groupKey", + ${createExpandedRowIdentifier( + sqlExpression`"source"."rowidentifier"`, + sqlExpression`"flatRow"."flatIndex"`, + )} AS "rowIdentifier", + "flatRow"."rowData" AS "rowData" + FROM (${allInputRows}) AS "source" + LEFT JOIN LATERAL ( + SELECT "mapped"."rows" AS "mappedRows" + FROM ( + SELECT ${options.mapper} + FROM ( + SELECT "source"."rowidentifier" AS "rowIdentifier", "source"."rowdata" AS "rowData" + ) AS "mapperInput" + ) AS "mapped" + ) AS "mapped" ON true + CROSS JOIN LATERAL jsonb_array_elements( + CASE WHEN jsonb_typeof("mapped"."mappedRows") = 'array' THEN "mapped"."mappedRows" ELSE '[]'::jsonb END + ) WITH ORDINALITY AS "flatRow"("rowData", "flatIndex") + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ) + SELECT + CASE + WHEN "expected"."rowIdentifier" IS NULL THEN 'extra_row' + WHEN "actual"."rowIdentifier" IS NULL THEN 'missing_row' + ELSE 'data_mismatch' + END AS errortype, + COALESCE("expected"."groupKey", "actual"."groupKey") AS groupkey, + COALESCE("expected"."rowIdentifier", "actual"."rowIdentifier") AS rowidentifier, + "expected"."rowData" AS expected, + "actual"."rowData" AS actual + FROM "expected" + FULL OUTER JOIN "actual" + ON "expected"."groupKey" IS NOT DISTINCT FROM "actual"."groupKey" + AND "expected"."rowIdentifier" = "actual"."rowIdentifier" + WHERE ("expected"."rowIdentifier" IS NULL + OR "actual"."rowIdentifier" IS NULL + OR "expected"."rowData" IS DISTINCT FROM "actual"."rowData") + AND ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/group-by-table.ts b/apps/backend/src/lib/bulldozer/db/tables/group-by-table.ts index 4c993aeac7..973b5c40e0 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/group-by-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/group-by-table.ts @@ -194,7 +194,7 @@ export function declareGroupByTable< ); options.fromTable.registerRowChangeTrigger(fromTableTrigger); - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -346,5 +346,54 @@ export function declareGroupByTable< triggers.set(id, normalizeRowChangeTrigger(trigger)); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputRows = options.fromTable.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "expected" AS ( + SELECT + "mapped"."groupKey" AS "groupKey", + "source"."rowidentifier" AS "rowIdentifier", + "source"."rowdata" AS "rowData" + FROM (${allInputRows}) AS "source" + LEFT JOIN LATERAL ( + SELECT "mapped"."groupKey" + FROM ( + SELECT ${options.groupBy} + FROM ( + SELECT "source"."rowidentifier" AS "rowIdentifier", "source"."rowdata" AS "rowData" + ) AS "groupByInput" + ) AS "mapped" + ) AS "mapped" ON true + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ) + SELECT + CASE + WHEN "expected"."rowIdentifier" IS NULL THEN 'extra_row' + WHEN "actual"."rowIdentifier" IS NULL THEN 'missing_row' + ELSE 'data_mismatch' + END AS errortype, + COALESCE("expected"."groupKey", "actual"."groupKey") AS groupkey, + COALESCE("expected"."rowIdentifier", "actual"."rowIdentifier") AS rowidentifier, + "expected"."rowData" AS expected, + "actual"."rowData" AS actual + FROM "expected" + FULL OUTER JOIN "actual" + ON "expected"."groupKey" IS NOT DISTINCT FROM "actual"."groupKey" + AND "expected"."rowIdentifier" = "actual"."rowIdentifier" + WHERE ("expected"."rowIdentifier" IS NULL + OR "actual"."rowIdentifier" IS NULL + OR "expected"."rowData" IS DISTINCT FROM "actual"."rowData") + AND ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/l-fold-table.ts b/apps/backend/src/lib/bulldozer/db/tables/l-fold-table.ts index 4eb36856c8..8aff5f75a3 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/l-fold-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/l-fold-table.ts @@ -501,7 +501,7 @@ export function declareLFoldTable< ); sourceSortTable.registerRowChangeTrigger(sourceSortTrigger); - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -795,5 +795,40 @@ export function declareLFoldTable< triggers.set(id, normalizeRowChangeTrigger(trigger)); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputGroups = options.fromTable.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualGroups = table.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "inputGroups" AS ( + SELECT "g"."groupkey" AS "groupKey" FROM (${allInputGroups}) AS "g" + ), + "actualGroups" AS ( + SELECT "g"."groupkey" AS "groupKey" FROM (${allActualGroups}) AS "g" + ), + "missingGroups" AS ( + SELECT 'missing_group' AS errortype, + "inputGroups"."groupKey" AS groupkey, NULL::text AS rowidentifier, + NULL::jsonb AS expected, NULL::jsonb AS actual + FROM "inputGroups" + LEFT JOIN "actualGroups" ON "actualGroups"."groupKey" IS NOT DISTINCT FROM "inputGroups"."groupKey" + WHERE "actualGroups"."groupKey" IS NULL + ), + "extraGroups" AS ( + SELECT 'extra_group' AS errortype, + "actualGroups"."groupKey" AS groupkey, NULL::text AS rowidentifier, + NULL::jsonb AS expected, NULL::jsonb AS actual + FROM "actualGroups" + LEFT JOIN "inputGroups" ON "inputGroups"."groupKey" IS NOT DISTINCT FROM "actualGroups"."groupKey" + WHERE "inputGroups"."groupKey" IS NULL + ) + SELECT * FROM "missingGroups" WHERE ${isInitializedExpression} + UNION ALL SELECT * FROM "extraGroups" WHERE ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/left-join-table.ts b/apps/backend/src/lib/bulldozer/db/tables/left-join-table.ts index 737d55d0bc..71f94df837 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/left-join-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/left-join-table.ts @@ -363,7 +363,7 @@ export function declareLeftJoinTable< changedSide: "right", }); - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.leftTable, options.rightTable], debugArgs: { @@ -511,5 +511,94 @@ export function declareLeftJoinTable< triggers.set(id, normalizeRowChangeTrigger(trigger)); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allLeftRows = options.leftTable.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allRightRows = options.rightTable.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "leftRows" AS ( + SELECT + "source"."groupkey" AS "groupKey", + "source"."rowidentifier" AS "leftRowIdentifier", + "source"."rowdata" AS "leftRowData", + "joinKey"."joinKey" AS "leftJoinKey" + FROM (${allLeftRows}) AS "source" + LEFT JOIN LATERAL ( + SELECT "mapped"."joinKey" + FROM ( + SELECT ${options.leftJoinKey} + FROM (SELECT "source"."rowidentifier" AS "rowIdentifier", "source"."rowdata" AS "rowData") AS "joinKeyInput" + ) AS "mapped" + ) AS "joinKey" ON true + ), + "rightRows" AS ( + SELECT + "source"."groupkey" AS "groupKey", + "source"."rowidentifier" AS "rightRowIdentifier", + "source"."rowdata" AS "rightRowData", + "joinKey"."joinKey" AS "rightJoinKey" + FROM (${allRightRows}) AS "source" + LEFT JOIN LATERAL ( + SELECT "mapped"."joinKey" + FROM ( + SELECT ${options.rightJoinKey} + FROM (SELECT "source"."rowidentifier" AS "rowIdentifier", "source"."rowdata" AS "rowData") AS "joinKeyInput" + ) AS "mapped" + ) AS "joinKey" ON true + ), + "expected" AS ( + SELECT DISTINCT ON ("joined"."groupKey", "joined"."rowIdentifier") + "joined"."groupKey" AS "groupKey", + "joined"."rowIdentifier" AS "rowIdentifier", + "joined"."rowData" AS "rowData" + FROM ( + SELECT + "leftRows"."groupKey" AS "groupKey", + ${createJoinedRowIdentifier( + sqlExpression`"leftRows"."leftRowIdentifier"`, + sqlExpression`"rightRows"."rightRowIdentifier"`, + )} AS "rowIdentifier", + jsonb_build_object( + 'leftRowData', "leftRows"."leftRowData", + 'rightRowData', "rightRows"."rightRowData" + ) AS "rowData" + FROM "leftRows" + LEFT JOIN "rightRows" + ON "rightRows"."groupKey" IS NOT DISTINCT FROM "leftRows"."groupKey" + AND "rightRows"."rightJoinKey" IS NOT DISTINCT FROM "leftRows"."leftJoinKey" + ) AS "joined" + ORDER BY "joined"."groupKey", "joined"."rowIdentifier" + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ) + SELECT + CASE + WHEN "expected"."rowIdentifier" IS NULL THEN 'extra_row' + WHEN "actual"."rowIdentifier" IS NULL THEN 'missing_row' + ELSE 'data_mismatch' + END AS errortype, + COALESCE("expected"."groupKey", "actual"."groupKey") AS groupkey, + COALESCE("expected"."rowIdentifier", "actual"."rowIdentifier") AS rowidentifier, + "expected"."rowData" AS expected, + "actual"."rowData" AS actual + FROM "expected" + FULL OUTER JOIN "actual" + ON "expected"."groupKey" IS NOT DISTINCT FROM "actual"."groupKey" + AND "expected"."rowIdentifier" = "actual"."rowIdentifier" + WHERE ("expected"."rowIdentifier" IS NULL + OR "actual"."rowIdentifier" IS NULL + OR "expected"."rowData" IS DISTINCT FROM "actual"."rowData") + AND ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/limit-table.ts b/apps/backend/src/lib/bulldozer/db/tables/limit-table.ts index 52e02c7acf..b035ebf562 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/limit-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/limit-table.ts @@ -217,7 +217,7 @@ export function declareLimitTable< ); options.fromTable.registerRowChangeTrigger(fromTableTrigger); - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -489,5 +489,46 @@ export function declareLimitTable< triggers.set(id, normalizeRowChangeTrigger(trigger)); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputRows = options.fromTable.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "inputRows" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allInputRows}) AS "r" + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ), + "extraRows" AS ( + SELECT 'extra_row' AS errortype, + "actual"."groupKey" AS groupkey, "actual"."rowIdentifier" AS rowidentifier, + NULL::jsonb AS expected, "actual"."rowData" AS actual + FROM "actual" + LEFT JOIN "inputRows" + ON "inputRows"."groupKey" IS NOT DISTINCT FROM "actual"."groupKey" + AND "inputRows"."rowIdentifier" = "actual"."rowIdentifier" + WHERE "inputRows"."rowIdentifier" IS NULL + ), + "overLimit" AS ( + SELECT 'over_limit' AS errortype, + "counts"."groupKey" AS groupkey, NULL::text AS rowidentifier, + to_jsonb("counts"."cnt") AS expected, to_jsonb(${normalizedLimit}) AS actual + FROM ( + SELECT "groupKey", COUNT(*)::int AS "cnt" FROM "actual" GROUP BY "groupKey" + ) AS "counts" + WHERE "counts"."cnt" > ${normalizedLimit} + ) + SELECT * FROM "extraRows" WHERE ${isInitializedExpression} + UNION ALL + SELECT * FROM "overLimit" WHERE ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/map-table.ts b/apps/backend/src/lib/bulldozer/db/tables/map-table.ts index 87071a1295..69b34c1ea3 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/map-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/map-table.ts @@ -84,6 +84,7 @@ export function declareMapTable< "listGroups", "listRowsInGroup", "registerRowChangeTrigger", + "verifyDataIntegrity", ]), }; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/reduce-table.ts b/apps/backend/src/lib/bulldozer/db/tables/reduce-table.ts index 865e87b655..9e0310c2e1 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/reduce-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/reduce-table.ts @@ -284,7 +284,7 @@ export function declareReduceTable< ); options.fromTable.registerRowChangeTrigger(fromTableTrigger); - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -428,5 +428,52 @@ export function declareReduceTable< triggers.set(id, normalizeRowChangeTrigger(trigger)); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputGroups = options.fromTable.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "inputGroups" AS ( + SELECT "g"."groupkey" AS "groupKey" FROM (${allInputGroups}) AS "g" + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ), + "actualGroupCounts" AS ( + SELECT "groupKey", COUNT(*)::int AS "cnt" FROM "actual" GROUP BY "groupKey" + ), + "missingGroups" AS ( + SELECT 'missing_group' AS errortype, + "inputGroups"."groupKey" AS groupkey, NULL::text AS rowidentifier, + NULL::jsonb AS expected, NULL::jsonb AS actual + FROM "inputGroups" + LEFT JOIN "actualGroupCounts" ON "actualGroupCounts"."groupKey" IS NOT DISTINCT FROM "inputGroups"."groupKey" + WHERE "actualGroupCounts"."groupKey" IS NULL + ), + "extraGroups" AS ( + SELECT 'extra_group' AS errortype, + "actualGroupCounts"."groupKey" AS groupkey, NULL::text AS rowidentifier, + NULL::jsonb AS expected, NULL::jsonb AS actual + FROM "actualGroupCounts" + LEFT JOIN "inputGroups" ON "inputGroups"."groupKey" IS NOT DISTINCT FROM "actualGroupCounts"."groupKey" + WHERE "inputGroups"."groupKey" IS NULL + ), + "wrongRowCount" AS ( + SELECT 'wrong_row_count' AS errortype, + "actualGroupCounts"."groupKey" AS groupkey, NULL::text AS rowidentifier, + '1'::jsonb AS expected, to_jsonb("actualGroupCounts"."cnt") AS actual + FROM "actualGroupCounts" + WHERE "actualGroupCounts"."cnt" <> 1 + ) + SELECT * FROM "missingGroups" WHERE ${isInitializedExpression} + UNION ALL SELECT * FROM "extraGroups" WHERE ${isInitializedExpression} + UNION ALL SELECT * FROM "wrongRowCount" WHERE ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/sort-table.ts b/apps/backend/src/lib/bulldozer/db/tables/sort-table.ts index 92c8d5bdee..3ca20494e5 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/sort-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/sort-table.ts @@ -173,7 +173,7 @@ export function declareSortTable< ); options.fromTable.registerRowChangeTrigger(fromTableTrigger); - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -410,5 +410,42 @@ export function declareSortTable< triggers.set(id, normalizeRowChangeTrigger(trigger)); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputRows = options.fromTable.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualRows = table.listRowsInGroup({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "expected" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allInputRows}) AS "r" + ), + "actual" AS ( + SELECT "r"."groupkey" AS "groupKey", "r"."rowidentifier" AS "rowIdentifier", "r"."rowdata" AS "rowData" + FROM (${allActualRows}) AS "r" + ) + SELECT + CASE + WHEN "expected"."rowIdentifier" IS NULL THEN 'extra_row' + WHEN "actual"."rowIdentifier" IS NULL THEN 'missing_row' + ELSE 'data_mismatch' + END AS errortype, + COALESCE("expected"."groupKey", "actual"."groupKey") AS groupkey, + COALESCE("expected"."rowIdentifier", "actual"."rowIdentifier") AS rowidentifier, + "expected"."rowData" AS expected, + "actual"."rowData" AS actual + FROM "expected" + FULL OUTER JOIN "actual" + ON "expected"."groupKey" IS NOT DISTINCT FROM "actual"."groupKey" + AND "expected"."rowIdentifier" = "actual"."rowIdentifier" + WHERE ("expected"."rowIdentifier" IS NULL + OR "actual"."rowIdentifier" IS NULL + OR "expected"."rowData" IS DISTINCT FROM "actual"."rowData") + AND ${isInitializedExpression} + `; + }, }; + return table; } diff --git a/apps/backend/src/lib/bulldozer/db/tables/stored-table.ts b/apps/backend/src/lib/bulldozer/db/tables/stored-table.ts index 268982ae6e..858d498576 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/stored-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/stored-table.ts @@ -79,6 +79,10 @@ export function declareStoredTable(options: { triggers.set(id, normalizeRowChangeTrigger(trigger)); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => sqlQuery` + SELECT NULL::text AS errortype, NULL::jsonb AS groupkey, NULL::text AS rowidentifier, NULL::jsonb AS expected, NULL::jsonb AS actual + WHERE false + `, setRow: (rowIdentifier, rowData) => { const oldRowsTableName = `old_rows_${generateSecureRandomString()}`; const upsertedRowsTableName = `upserted_rows_${generateSecureRandomString()}`; diff --git a/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts b/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts index 80838add47..f93a32898b 100644 --- a/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts +++ b/apps/backend/src/lib/bulldozer/db/tables/time-fold-table.ts @@ -420,7 +420,7 @@ export function declareTimeFoldTable< ); options.fromTable.registerRowChangeTrigger(fromTableTrigger); - return { + const table: ReturnType> = { tableId: options.tableId, inputTables: [options.fromTable], debugArgs: { @@ -560,5 +560,31 @@ export function declareTimeFoldTable< triggers.set(id, normalizeRowChangeTrigger(trigger)); return { deregister: () => triggers.delete(id) }; }, + verifyDataIntegrity: () => { + const allInputGroups = options.fromTable.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + const allActualGroups = table.listGroups({ + start: "start", end: "end", startInclusive: true, endInclusive: true, + }); + return sqlQuery` + WITH "inputGroups" AS ( + SELECT "g"."groupkey" AS "groupKey" FROM (${allInputGroups}) AS "g" + ), + "actualGroups" AS ( + SELECT "g"."groupkey" AS "groupKey" FROM (${allActualGroups}) AS "g" + ), + "extraGroups" AS ( + SELECT 'extra_group' AS errortype, + "actualGroups"."groupKey" AS groupkey, NULL::text AS rowidentifier, + NULL::jsonb AS expected, NULL::jsonb AS actual + FROM "actualGroups" + LEFT JOIN "inputGroups" ON "inputGroups"."groupKey" IS NOT DISTINCT FROM "actualGroups"."groupKey" + WHERE "inputGroups"."groupKey" IS NULL + ) + SELECT * FROM "extraGroups" WHERE ${isInitializedExpression} + `; + }, }; + return table; }