From 3c889504191eba9038beccfee1a9cd12dd21fa07 Mon Sep 17 00:00:00 2001 From: Bilal Godil Date: Mon, 16 Mar 2026 10:53:20 -0700 Subject: [PATCH 1/3] clickhouse contact channel, team, and team member tables --- .../migration.sql | 25 + apps/backend/prisma/schema.prisma | 10 + apps/backend/scripts/clickhouse-migrations.ts | 133 +++++ .../external-db-sync/sequencer/route.ts | 59 +- .../app/api/latest/team-memberships/crud.tsx | 12 +- .../backend/src/app/api/latest/teams/crud.tsx | 24 +- .../backend/src/app/api/latest/users/crud.tsx | 7 +- apps/backend/src/lib/external-db-sync.ts | 257 +++++++- .../api/v1/external-db-sync-basics.test.ts | 260 ++++++++ .../api/v1/external-db-sync-utils.ts | 109 +++- .../src/config/db-sync-mappings.ts | 557 ++++++++++++++++++ 11 files changed, 1382 insertions(+), 71 deletions(-) create mode 100644 apps/backend/prisma/migrations/20260316000000_add_team_team_member_sequence_columns/migration.sql diff --git a/apps/backend/prisma/migrations/20260316000000_add_team_team_member_sequence_columns/migration.sql b/apps/backend/prisma/migrations/20260316000000_add_team_team_member_sequence_columns/migration.sql new file mode 100644 index 0000000000..358cf0514c --- /dev/null +++ b/apps/backend/prisma/migrations/20260316000000_add_team_team_member_sequence_columns/migration.sql @@ -0,0 +1,25 @@ +-- AlterTable +ALTER TABLE "Team" ADD COLUMN "sequenceId" BIGINT, +ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; + +-- CreateIndex +CREATE UNIQUE INDEX "Team_sequenceId_key" ON "Team"("sequenceId"); + +-- CreateIndex +CREATE INDEX "Team_tenancyId_sequenceId_idx" ON "Team"("tenancyId", "sequenceId"); + +-- CreateIndex +CREATE INDEX "Team_shouldUpdateSequenceId_idx" ON "Team"("shouldUpdateSequenceId", "tenancyId"); + +-- AlterTable +ALTER TABLE "TeamMember" ADD COLUMN "sequenceId" BIGINT, +ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; + +-- CreateIndex +CREATE UNIQUE INDEX "TeamMember_sequenceId_key" ON "TeamMember"("sequenceId"); + +-- CreateIndex +CREATE INDEX "TeamMember_tenancyId_sequenceId_idx" ON "TeamMember"("tenancyId", "sequenceId"); + +-- CreateIndex +CREATE INDEX "TeamMember_shouldUpdateSequenceId_idx" ON "TeamMember"("shouldUpdateSequenceId", "tenancyId"); diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index 0d408ff0ba..d6077f3039 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -179,11 +179,16 @@ model Team { serverMetadata Json? profileImageUrl String? + sequenceId BigInt? @unique + shouldUpdateSequenceId Boolean @default(true) + teamMembers TeamMember[] projectApiKey ProjectApiKey[] @@id([tenancyId, teamId]) @@unique([mirroredProjectId, mirroredBranchId, teamId]) + @@index([tenancyId, sequenceId], name: "Team_tenancyId_sequenceId_idx") + @@index([shouldUpdateSequenceId, tenancyId], name: "Team_shouldUpdateSequenceId_idx") } // This is used for fields that are boolean but only the true value is part of a unique constraint. @@ -205,6 +210,9 @@ model TeamMember { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt + sequenceId BigInt? @unique + shouldUpdateSequenceId Boolean @default(true) + projectUser ProjectUser @relation(fields: [tenancyId, projectUserId], references: [tenancyId, projectUserId], onDelete: Cascade) team Team @relation(fields: [tenancyId, teamId], references: [tenancyId, teamId], onDelete: Cascade) isSelected BooleanTrue? @@ -213,6 +221,8 @@ model TeamMember { @@id([tenancyId, projectUserId, teamId]) @@unique([tenancyId, projectUserId, isSelected]) @@index([tenancyId, projectUserId, isSelected], map: "TeamMember_projectUserId_isSelected_idx") + @@index([tenancyId, sequenceId], name: "TeamMember_tenancyId_sequenceId_idx") + @@index([shouldUpdateSequenceId, tenancyId], name: "TeamMember_shouldUpdateSequenceId_idx") } model ProjectUserDirectPermission { diff --git a/apps/backend/scripts/clickhouse-migrations.ts b/apps/backend/scripts/clickhouse-migrations.ts index d94f84baa0..f0b533c034 100644 --- a/apps/backend/scripts/clickhouse-migrations.ts +++ b/apps/backend/scripts/clickhouse-migrations.ts @@ -16,6 +16,12 @@ export async function runClickhouseMigrations() { await client.exec({ query: EVENTS_VIEW_SQL }); await client.exec({ query: USERS_TABLE_BASE_SQL }); await client.exec({ query: USERS_VIEW_SQL }); + await client.exec({ query: CONTACT_CHANNELS_TABLE_BASE_SQL }); + await client.exec({ query: CONTACT_CHANNELS_VIEW_SQL }); + await client.exec({ query: TEAMS_TABLE_BASE_SQL }); + await client.exec({ query: TEAMS_VIEW_SQL }); + await client.exec({ query: TEAM_MEMBERS_TABLE_BASE_SQL }); + await client.exec({ query: TEAM_MEMBERS_VIEW_SQL }); await client.exec({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL }); await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL }); await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL }); @@ -27,6 +33,9 @@ export async function runClickhouseMigrations() { "REVOKE ALL FROM limited_user;", "GRANT SELECT ON default.events TO limited_user;", "GRANT SELECT ON default.users TO limited_user;", + "GRANT SELECT ON default.contact_channels TO limited_user;", + "GRANT SELECT ON default.teams TO limited_user;", + "GRANT SELECT ON default.team_members TO limited_user;", ]; await client.exec({ query: "CREATE ROW POLICY IF NOT EXISTS events_project_isolation ON default.events FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", @@ -34,6 +43,15 @@ export async function runClickhouseMigrations() { await client.exec({ query: "CREATE ROW POLICY IF NOT EXISTS users_project_isolation ON default.users FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", }); + await client.exec({ + query: "CREATE ROW POLICY IF NOT EXISTS contact_channels_project_isolation ON default.contact_channels FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", + }); + await client.exec({ + query: "CREATE ROW POLICY IF NOT EXISTS teams_project_isolation ON default.teams FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", + }); + await client.exec({ + query: "CREATE ROW POLICY IF NOT EXISTS team_members_project_isolation ON default.team_members FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", + }); for (const query of queries) { await client.exec({ query }); } @@ -197,6 +215,121 @@ WHERE event_type = '$token-refresh' AND data.refresh_token_id::Nullable(String) IS NOT NULL; `; +const CONTACT_CHANNELS_TABLE_BASE_SQL = ` +CREATE TABLE IF NOT EXISTS analytics_internal.contact_channels ( + project_id String, + branch_id String, + id UUID, + user_id UUID, + type LowCardinality(String), + value String, + is_primary UInt8, + is_verified UInt8, + used_for_auth UInt8, + created_at DateTime64(3, 'UTC'), + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) +) +ENGINE ReplacingMergeTree(sync_sequence_id) +PARTITION BY toYYYYMM(created_at) +ORDER BY (project_id, branch_id, id); +`; + +const CONTACT_CHANNELS_VIEW_SQL = ` +CREATE OR REPLACE VIEW default.contact_channels +SQL SECURITY DEFINER +AS +SELECT + project_id, + branch_id, + id, + user_id, + type, + value, + is_primary, + is_verified, + used_for_auth, + created_at +FROM analytics_internal.contact_channels +FINAL +WHERE sync_is_deleted = 0; +`; + +const TEAMS_TABLE_BASE_SQL = ` +CREATE TABLE IF NOT EXISTS analytics_internal.teams ( + project_id String, + branch_id String, + id UUID, + display_name String, + profile_image_url Nullable(String), + created_at DateTime64(3, 'UTC'), + client_metadata String, + client_read_only_metadata String, + server_metadata String, + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) +) +ENGINE ReplacingMergeTree(sync_sequence_id) +PARTITION BY toYYYYMM(created_at) +ORDER BY (project_id, branch_id, id); +`; + +const TEAMS_VIEW_SQL = ` +CREATE OR REPLACE VIEW default.teams +SQL SECURITY DEFINER +AS +SELECT + project_id, + branch_id, + id, + display_name, + profile_image_url, + created_at, + client_metadata, + client_read_only_metadata, + server_metadata +FROM analytics_internal.teams +FINAL +WHERE sync_is_deleted = 0; +`; + +const TEAM_MEMBERS_TABLE_BASE_SQL = ` +CREATE TABLE IF NOT EXISTS analytics_internal.team_members ( + project_id String, + branch_id String, + team_id UUID, + user_id UUID, + display_name Nullable(String), + profile_image_url Nullable(String), + created_at DateTime64(3, 'UTC'), + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) +) +ENGINE ReplacingMergeTree(sync_sequence_id) +PARTITION BY toYYYYMM(created_at) +ORDER BY (project_id, branch_id, team_id, user_id); +`; + +const TEAM_MEMBERS_VIEW_SQL = ` +CREATE OR REPLACE VIEW default.team_members +SQL SECURITY DEFINER +AS +SELECT + project_id, + branch_id, + team_id, + user_id, + display_name, + profile_image_url, + created_at +FROM analytics_internal.team_members +FINAL +WHERE sync_is_deleted = 0; +`; + const EXTERNAL_ANALYTICS_DB_SQL = ` CREATE DATABASE IF NOT EXISTS analytics_internal; `; diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts index c7808fb53b..78bbb99ffd 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts @@ -109,6 +109,63 @@ async function backfillSequenceIds(batchSize: number): Promise { didUpdate = true; } + const teamTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` + WITH rows_to_update AS ( + SELECT "tenancyId", "teamId" + FROM "Team" + WHERE "shouldUpdateSequenceId" = TRUE + ORDER BY "tenancyId" + LIMIT ${batchSize} + FOR UPDATE SKIP LOCKED + ), + updated_rows AS ( + UPDATE "Team" t + SET "sequenceId" = nextval('global_seq_id'), + "shouldUpdateSequenceId" = FALSE + FROM rows_to_update r + WHERE t."tenancyId" = r."tenancyId" + AND t."teamId" = r."teamId" + RETURNING t."tenancyId" + ) + SELECT DISTINCT "tenancyId" FROM updated_rows + `; + + span.setAttribute("stack.external-db-sync.team-tenants", teamTenants.length); + + if (teamTenants.length > 0) { + await enqueueExternalDbSyncBatch(teamTenants.map(t => t.tenancyId)); + didUpdate = true; + } + + const teamMemberTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` + WITH rows_to_update AS ( + SELECT "tenancyId", "projectUserId", "teamId" + FROM "TeamMember" + WHERE "shouldUpdateSequenceId" = TRUE + ORDER BY "tenancyId" + LIMIT ${batchSize} + FOR UPDATE SKIP LOCKED + ), + updated_rows AS ( + UPDATE "TeamMember" tm + SET "sequenceId" = nextval('global_seq_id'), + "shouldUpdateSequenceId" = FALSE + FROM rows_to_update r + WHERE tm."tenancyId" = r."tenancyId" + AND tm."projectUserId" = r."projectUserId" + AND tm."teamId" = r."teamId" + RETURNING tm."tenancyId" + ) + SELECT DISTINCT "tenancyId" FROM updated_rows + `; + + span.setAttribute("stack.external-db-sync.team-member-tenants", teamMemberTenants.length); + + if (teamMemberTenants.length > 0) { + await enqueueExternalDbSyncBatch(teamMemberTenants.map(t => t.tenancyId)); + didUpdate = true; + } + const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` WITH rows_to_update AS ( SELECT "id", "tenancyId" @@ -138,7 +195,7 @@ async function backfillSequenceIds(batchSize: number): Promise { span.setAttribute("stack.external-db-sync.did-update", didUpdate); if (didUpdate) { - console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, DR=${deletedRowTenants.length}`); + console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, DR=${deletedRowTenants.length}`); } return didUpdate; diff --git a/apps/backend/src/app/api/latest/team-memberships/crud.tsx b/apps/backend/src/app/api/latest/team-memberships/crud.tsx index ae11b32ef5..ae5dbfd43a 100644 --- a/apps/backend/src/app/api/latest/team-memberships/crud.tsx +++ b/apps/backend/src/app/api/latest/team-memberships/crud.tsx @@ -1,3 +1,4 @@ +import { recordExternalDbSyncDeletion, withExternalDbSyncUpdate } from "@/lib/external-db-sync"; import { grantDefaultTeamPermissions } from "@/lib/permissions"; import { ensureTeamExists, ensureTeamMembershipDoesNotExist, ensureTeamMembershipExists, ensureUserExists, ensureUserTeamPermissionExists } from "@/lib/request-checks"; import { Tenancy } from "@/lib/tenancies"; @@ -20,11 +21,11 @@ export async function addUserToTeam(tx: PrismaTransaction, options: { type: 'member' | 'creator', }) { await tx.teamMember.create({ - data: { + data: withExternalDbSyncUpdate({ projectUserId: options.userId, teamId: options.teamId, tenancyId: options.tenancy.id, - }, + }), }); const result = await grantDefaultTeamPermissions(tx, { @@ -138,6 +139,13 @@ export const teamMembershipsCrudHandlers = createLazyProxy(() => createCrudHandl userId: params.user_id, }); + await recordExternalDbSyncDeletion(tx, { + tableName: "TeamMember", + tenancyId: auth.tenancy.id, + projectUserId: params.user_id, + teamId: params.team_id, + }); + await tx.teamMember.delete({ where: { tenancyId_projectUserId_teamId: { diff --git a/apps/backend/src/app/api/latest/teams/crud.tsx b/apps/backend/src/app/api/latest/teams/crud.tsx index 05c7ca1772..aab670506e 100644 --- a/apps/backend/src/app/api/latest/teams/crud.tsx +++ b/apps/backend/src/app/api/latest/teams/crud.tsx @@ -1,3 +1,4 @@ +import { recordExternalDbSyncDeletion, recordExternalDbSyncTeamMemberDeletionsForTeam, withExternalDbSyncUpdate } from "@/lib/external-db-sync"; import { ensureTeamExists, ensureTeamMembershipExists, ensureUserExists, ensureUserTeamPermissionExists } from "@/lib/request-checks"; import { sendTeamCreatedWebhook, sendTeamDeletedWebhook, sendTeamUpdatedWebhook } from "@/lib/webhooks"; import { getPrismaClientForTenancy, retryTransaction } from "@/prisma-client"; @@ -73,7 +74,7 @@ export const teamsCrudHandlers = createLazyProxy(() => createCrudHandlers(teamsC const db = await retryTransaction(prisma, async (tx) => { const db = await tx.team.create({ - data: { + data: withExternalDbSyncUpdate({ displayName: data.display_name, mirroredProjectId: auth.project.id, mirroredBranchId: auth.branchId, @@ -81,8 +82,8 @@ export const teamsCrudHandlers = createLazyProxy(() => createCrudHandlers(teamsC clientMetadata: data.client_metadata === null ? Prisma.JsonNull : data.client_metadata, clientReadOnlyMetadata: data.client_read_only_metadata === null ? Prisma.JsonNull : data.client_read_only_metadata, serverMetadata: data.server_metadata === null ? Prisma.JsonNull : data.server_metadata, - profileImageUrl: await uploadAndGetUrl(data.profile_image_url, "team-profile-images") - }, + profileImageUrl: await uploadAndGetUrl(data.profile_image_url, "team-profile-images"), + }), }); if (addUserId) { @@ -160,13 +161,13 @@ export const teamsCrudHandlers = createLazyProxy(() => createCrudHandlers(teamsC teamId: params.team_id, }, }, - data: { + data: withExternalDbSyncUpdate({ displayName: data.display_name, clientMetadata: data.client_metadata === null ? Prisma.JsonNull : data.client_metadata, clientReadOnlyMetadata: data.client_read_only_metadata === null ? Prisma.JsonNull : data.client_read_only_metadata, serverMetadata: data.server_metadata === null ? Prisma.JsonNull : data.server_metadata, - profileImageUrl: await uploadAndGetUrl(data.profile_image_url, "team-profile-images") - }, + profileImageUrl: await uploadAndGetUrl(data.profile_image_url, "team-profile-images"), + }), }); }); @@ -194,6 +195,17 @@ export const teamsCrudHandlers = createLazyProxy(() => createCrudHandlers(teamsC } await ensureTeamExists(tx, { tenancyId: auth.tenancy.id, teamId: params.team_id }); + await recordExternalDbSyncTeamMemberDeletionsForTeam(tx, { + tenancyId: auth.tenancy.id, + teamId: params.team_id, + }); + + await recordExternalDbSyncDeletion(tx, { + tableName: "Team", + tenancyId: auth.tenancy.id, + teamId: params.team_id, + }); + await tx.team.delete({ where: { tenancyId_teamId: { diff --git a/apps/backend/src/app/api/latest/users/crud.tsx b/apps/backend/src/app/api/latest/users/crud.tsx index 3a16996d3d..fc7e56006c 100644 --- a/apps/backend/src/app/api/latest/users/crud.tsx +++ b/apps/backend/src/app/api/latest/users/crud.tsx @@ -2,7 +2,7 @@ import { BooleanTrue, Prisma } from "@/generated/prisma/client"; import { getRenderedOrganizationConfigQuery, getRenderedProjectConfigQuery } from "@/lib/config"; import { demoteAllContactChannelsToNonPrimary, setContactChannelAsPrimaryByValue } from "@/lib/contact-channel"; import { normalizeEmail } from "@/lib/emails"; -import { recordExternalDbSyncContactChannelDeletionsForUser, recordExternalDbSyncDeletion, withExternalDbSyncUpdate } from "@/lib/external-db-sync"; +import { recordExternalDbSyncContactChannelDeletionsForUser, recordExternalDbSyncDeletion, recordExternalDbSyncTeamMemberDeletionsForUser, withExternalDbSyncUpdate } from "@/lib/external-db-sync"; import { grantDefaultProjectPermissions } from "@/lib/permissions"; import { ensureTeamMembershipExists, ensureUserExists } from "@/lib/request-checks"; import { Tenancy } from "@/lib/tenancies"; @@ -1202,6 +1202,11 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC projectUserId: params.user_id, }); + await recordExternalDbSyncTeamMemberDeletionsForUser(tx, { + tenancyId: auth.tenancy.id, + projectUserId: params.user_id, + }); + await tx.projectUser.delete({ where: { tenancyId_projectUserId: { diff --git a/apps/backend/src/lib/external-db-sync.ts b/apps/backend/src/lib/external-db-sync.ts index 62ac6536bd..bd9b0b1cab 100644 --- a/apps/backend/src/lib/external-db-sync.ts +++ b/apps/backend/src/lib/external-db-sync.ts @@ -41,6 +41,17 @@ type ExternalDbSyncTarget = tenancyId: string, projectUserId: string, contactChannelId: string, + } + | { + tableName: "Team", + tenancyId: string, + teamId: string, + } + | { + tableName: "TeamMember", + tenancyId: string, + projectUserId: string, + teamId: string, }; type ExternalDbType = NonNullable["type"]>; @@ -80,9 +91,9 @@ export async function recordExternalDbSyncDeletion( target: ExternalDbSyncTarget, ): Promise { assertUuid(target.tenancyId, "tenancyId"); - assertUuid(target.projectUserId, "projectUserId"); if (target.tableName === "ProjectUser") { + assertUuid(target.projectUserId, "projectUserId"); const insertedCount = await tx.$executeRaw(Prisma.sql` INSERT INTO "DeletedRow" ( "id", @@ -115,8 +126,132 @@ export async function recordExternalDbSyncDeletion( return; } - assertUuid(target.contactChannelId, "contactChannelId"); - const insertedCount = await tx.$executeRaw(Prisma.sql` + if (target.tableName === "ContactChannel") { + assertUuid(target.projectUserId, "projectUserId"); + assertUuid(target.contactChannelId, "contactChannelId"); + const insertedCount = await tx.$executeRaw(Prisma.sql` + INSERT INTO "DeletedRow" ( + "id", + "tenancyId", + "tableName", + "primaryKey", + "data", + "deletedAt", + "shouldUpdateSequenceId" + ) + SELECT + gen_random_uuid(), + "tenancyId", + 'ContactChannel', + jsonb_build_object( + 'tenancyId', + "tenancyId", + 'projectUserId', + "projectUserId", + 'id', + "id" + ), + to_jsonb("ContactChannel".*), + NOW(), + TRUE + FROM "ContactChannel" + WHERE "tenancyId" = ${target.tenancyId}::uuid + AND "projectUserId" = ${target.projectUserId}::uuid + AND "id" = ${target.contactChannelId}::uuid + FOR UPDATE + `); + + if (insertedCount !== 1) { + throw new StackAssertionError( + `Expected to insert 1 DeletedRow entry for ContactChannel, got ${insertedCount}.` + ); + } + return; + } + + if (target.tableName === "Team") { + assertUuid(target.teamId, "teamId"); + const insertedCount = await tx.$executeRaw(Prisma.sql` + INSERT INTO "DeletedRow" ( + "id", + "tenancyId", + "tableName", + "primaryKey", + "data", + "deletedAt", + "shouldUpdateSequenceId" + ) + SELECT + gen_random_uuid(), + "tenancyId", + 'Team', + jsonb_build_object('tenancyId', "tenancyId", 'teamId', "teamId"), + to_jsonb("Team".*), + NOW(), + TRUE + FROM "Team" + WHERE "tenancyId" = ${target.tenancyId}::uuid + AND "teamId" = ${target.teamId}::uuid + FOR UPDATE + `); + + if (insertedCount !== 1) { + throw new StackAssertionError( + `Expected to insert 1 DeletedRow entry for Team, got ${insertedCount}.` + ); + } + return; + } + + { + const _teamMemberTarget: { tableName: "TeamMember" } = target; + assertUuid(target.projectUserId, "projectUserId"); + assertUuid(target.teamId, "teamId"); + const insertedCount = await tx.$executeRaw(Prisma.sql` + INSERT INTO "DeletedRow" ( + "id", + "tenancyId", + "tableName", + "primaryKey", + "data", + "deletedAt", + "shouldUpdateSequenceId" + ) + SELECT + gen_random_uuid(), + "tenancyId", + 'TeamMember', + jsonb_build_object('tenancyId', "tenancyId", 'projectUserId', "projectUserId", 'teamId', "teamId"), + to_jsonb("TeamMember".*), + NOW(), + TRUE + FROM "TeamMember" + WHERE "tenancyId" = ${target.tenancyId}::uuid + AND "projectUserId" = ${target.projectUserId}::uuid + AND "teamId" = ${target.teamId}::uuid + FOR UPDATE + `); + + if (insertedCount !== 1) { + throw new StackAssertionError( + `Expected to insert 1 DeletedRow entry for TeamMember, got ${insertedCount}.` + ); + } + return; + } +} + +export async function recordExternalDbSyncContactChannelDeletionsForUser( + tx: ExternalDbSyncClient, + options: { + tenancyId: string, + projectUserId: string, + }, +): Promise { + assertUuid(options.tenancyId, "tenancyId"); + assertUuid(options.projectUserId, "projectUserId"); + + await tx.$executeRaw(Prisma.sql` INSERT INTO "DeletedRow" ( "id", "tenancyId", @@ -142,20 +277,48 @@ export async function recordExternalDbSyncDeletion( NOW(), TRUE FROM "ContactChannel" - WHERE "tenancyId" = ${target.tenancyId}::uuid - AND "projectUserId" = ${target.projectUserId}::uuid - AND "id" = ${target.contactChannelId}::uuid + WHERE "tenancyId" = ${options.tenancyId}::uuid + AND "projectUserId" = ${options.projectUserId}::uuid FOR UPDATE `); +} - if (insertedCount !== 1) { - throw new StackAssertionError( - `Expected to insert 1 DeletedRow entry for ContactChannel, got ${insertedCount}.` - ); - } +export async function recordExternalDbSyncTeamMemberDeletionsForTeam( + tx: ExternalDbSyncClient, + options: { + tenancyId: string, + teamId: string, + }, +): Promise { + assertUuid(options.tenancyId, "tenancyId"); + assertUuid(options.teamId, "teamId"); + + await tx.$executeRaw(Prisma.sql` + INSERT INTO "DeletedRow" ( + "id", + "tenancyId", + "tableName", + "primaryKey", + "data", + "deletedAt", + "shouldUpdateSequenceId" + ) + SELECT + gen_random_uuid(), + "tenancyId", + 'TeamMember', + jsonb_build_object('tenancyId', "tenancyId", 'projectUserId', "projectUserId", 'teamId', "teamId"), + to_jsonb("TeamMember".*), + NOW(), + TRUE + FROM "TeamMember" + WHERE "tenancyId" = ${options.tenancyId}::uuid + AND "teamId" = ${options.teamId}::uuid + FOR UPDATE + `); } -export async function recordExternalDbSyncContactChannelDeletionsForUser( +export async function recordExternalDbSyncTeamMemberDeletionsForUser( tx: ExternalDbSyncClient, options: { tenancyId: string, @@ -178,19 +341,12 @@ export async function recordExternalDbSyncContactChannelDeletionsForUser( SELECT gen_random_uuid(), "tenancyId", - 'ContactChannel', - jsonb_build_object( - 'tenancyId', - "tenancyId", - 'projectUserId', - "projectUserId", - 'id', - "id" - ), - to_jsonb("ContactChannel".*), + 'TeamMember', + jsonb_build_object('tenancyId', "tenancyId", 'projectUserId', "projectUserId", 'teamId', "teamId"), + to_jsonb("TeamMember".*), NOW(), TRUE - FROM "ContactChannel" + FROM "TeamMember" WHERE "tenancyId" = ${options.tenancyId}::uuid AND "projectUserId" = ${options.projectUserId}::uuid FOR UPDATE @@ -372,6 +528,35 @@ async function ensureClickhouseSchema( } } +// Map of target table name -> column normalizers for ClickHouse +// 'json' columns get JSON.stringify, 'boolean' columns get normalizeClickhouseBoolean +const CLICKHOUSE_COLUMN_NORMALIZERS: Record> = { + users: { + client_metadata: 'json', + client_read_only_metadata: 'json', + server_metadata: 'json', + primary_email_verified: 'boolean', + is_anonymous: 'boolean', + restricted_by_admin: 'boolean', + sync_is_deleted: 'boolean', + }, + contact_channels: { + is_primary: 'boolean', + is_verified: 'boolean', + used_for_auth: 'boolean', + sync_is_deleted: 'boolean', + }, + teams: { + client_metadata: 'json', + client_read_only_metadata: 'json', + server_metadata: 'json', + sync_is_deleted: 'boolean', + }, + team_members: { + sync_is_deleted: 'boolean', + }, +}; + async function pushRowsToClickhouse( client: ClickHouseClient, tableName: string, @@ -390,6 +575,10 @@ async function pushRowsToClickhouse( const sampleRow = newRows[0] ?? throwErr("Expected at least one row for ClickHouse sync."); const orderedKeys = Object.keys(omit(sampleRow, ["tenancyId"])); + // Derive the target table name from the full tableName (e.g. "analytics_internal.users" -> "users") + const targetTable = tableName.includes('.') ? tableName.split('.').pop()! : tableName; + const normalizers = CLICKHOUSE_COLUMN_NORMALIZERS[targetTable] ?? {}; + const normalizedRows = newRows.map((row) => { const tenancyIdValue = row.tenancyId; if (typeof tenancyIdValue !== "string") { @@ -427,17 +616,23 @@ async function pushRowsToClickhouse( `sync_sequence_id must be defined for ClickHouse sync. Mapping: ${mappingId}` ); } - return { + + const normalized: Record = { ...rest, sync_sequence_id: sequenceId, - client_metadata: JSON.stringify(rest.client_metadata), - client_read_only_metadata: JSON.stringify(rest.client_read_only_metadata), - server_metadata: JSON.stringify(rest.server_metadata), - primary_email_verified: normalizeClickhouseBoolean(rest.primary_email_verified, "primary_email_verified"), - is_anonymous: normalizeClickhouseBoolean(rest.is_anonymous, "is_anonymous"), - restricted_by_admin: normalizeClickhouseBoolean(rest.restricted_by_admin, "restricted_by_admin"), - sync_is_deleted: normalizeClickhouseBoolean(rest.sync_is_deleted, "sync_is_deleted"), }; + + for (const [col, type] of Object.entries(normalizers)) { + if (col in normalized) { + if (type === 'json') { + normalized[col] = JSON.stringify(normalized[col]); + } else { + normalized[col] = normalizeClickhouseBoolean(normalized[col], col); + } + } + } + + return normalized; }); await client.insert({ diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts index 31aaf597a1..4c7bceee8d 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts @@ -9,8 +9,14 @@ import { createProjectWithExternalDb as createProjectWithExternalDbRaw, verifyInExternalDb, verifyNotInExternalDb, + waitForSyncedContactChannel, + waitForSyncedContactChannelDeletion, waitForSyncedData, waitForSyncedDeletion, + waitForSyncedTeam, + waitForSyncedTeamDeletion, + waitForSyncedTeamMember, + waitForSyncedTeamMemberDeletion, waitForTable } from './external-db-sync-utils'; @@ -577,6 +583,260 @@ describe.sequential('External DB Sync - Basic Tests', () => { }, TEST_TIMEOUT); + /** + * What it does: + * - Creates a team, verifies it in the external DB, updates it, verifies the update, + * deletes it, and verifies the removal. + */ + test('Team CRUD sync (Postgres)', async () => { + const dbName = 'team_crud_test'; + const connectionString = await dbManager.createDatabase(dbName); + + await createProjectWithExternalDb({ + main: { + type: 'postgres', + connectionString, + } + }); + + const client = dbManager.getClient(dbName); + + // Create a team + const createResponse = await niceBackendFetch('/api/v1/teams', { + accessType: 'admin', + method: 'POST', + body: { display_name: 'Sync Test Team' }, + }); + expect(createResponse.status).toBe(201); + const teamId = createResponse.body.id; + + await waitForSyncedTeam(client, 'Sync Test Team'); + + const res1 = await client.query(`SELECT * FROM "teams" WHERE "id" = $1`, [teamId]); + expect(res1.rows.length).toBe(1); + expect(res1.rows[0].display_name).toBe('Sync Test Team'); + + // Update the team + await niceBackendFetch(`/api/v1/teams/${teamId}`, { + accessType: 'admin', + method: 'PATCH', + body: { display_name: 'Updated Team Name' }, + }); + + await waitForSyncedTeam(client, 'Updated Team Name'); + + const res2 = await client.query(`SELECT * FROM "teams" WHERE "id" = $1`, [teamId]); + expect(res2.rows[0].display_name).toBe('Updated Team Name'); + + // Delete the team + await niceBackendFetch(`/api/v1/teams/${teamId}`, { + accessType: 'admin', + method: 'DELETE', + }); + + await waitForSyncedTeamDeletion(client, teamId); + }, TEST_TIMEOUT); + + /** + * What it does: + * - Creates a team and verifies it appears via the ClickHouse analytics query API. + */ + test('Team sync (ClickHouse)', async ({ expect }) => { + await Project.createAndSwitch({ config: { magic_link_enabled: true } }); + + const createResponse = await niceBackendFetch('/api/v1/teams', { + accessType: 'admin', + method: 'POST', + body: { display_name: 'CH Team Test' }, + }); + expect(createResponse.status).toBe(201); + + await InternalApiKey.createAndSetProjectKeys(); + + const timeoutMs = 180_000; + const intervalMs = 2_000; + const start = performance.now(); + + let response; + while (performance.now() - start < timeoutMs) { + response = await runQueryForCurrentProject({ + query: "SELECT display_name FROM teams WHERE display_name = {name:String}", + params: { name: 'CH Team Test' }, + }); + expect(response.status).toBe(200); + if (response.body.result.length === 1) { + break; + } + await wait(intervalMs); + } + + expect(response!.body.result.length).toBe(1); + expect(response!.body.result[0].display_name).toBe('CH Team Test'); + }, TEST_TIMEOUT); + + /** + * What it does: + * - Creates a user and team, adds the user as a member, verifies in external DB, + * removes the member, and verifies removal. + */ + test('TeamMember CRUD sync (Postgres)', async () => { + const dbName = 'team_member_crud_test'; + const connectionString = await dbManager.createDatabase(dbName); + + await createProjectWithExternalDb({ + main: { + type: 'postgres', + connectionString, + } + }); + + const client = dbManager.getClient(dbName); + + const user = await User.create({ primary_email: 'tm-crud@example.com' }); + const createTeamResponse = await niceBackendFetch('/api/v1/teams', { + accessType: 'admin', + method: 'POST', + body: { display_name: 'TM CRUD Team' }, + }); + expect(createTeamResponse.status).toBe(201); + const teamId = createTeamResponse.body.id; + + // Add user as team member + const addMemberResponse = await niceBackendFetch(`/api/v1/team-memberships/${teamId}/${user.userId}`, { + accessType: 'admin', + method: 'POST', + body: {}, + }); + expect(addMemberResponse.status).toBe(201); + + await waitForSyncedTeamMember(client, teamId, user.userId); + + const res1 = await client.query(`SELECT * FROM "team_members" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, user.userId]); + expect(res1.rows.length).toBe(1); + + // Remove member + await niceBackendFetch(`/api/v1/team-memberships/${teamId}/${user.userId}`, { + accessType: 'admin', + method: 'DELETE', + }); + + await waitForSyncedTeamMemberDeletion(client, teamId, user.userId); + }, TEST_TIMEOUT); + + /** + * What it does: + * - Creates a user with a primary email and verifies the contact channel appears + * in the external DB contact_channels table. + */ + test('ContactChannel sync (Postgres)', async () => { + const dbName = 'contact_channel_test'; + const connectionString = await dbManager.createDatabase(dbName); + + await createProjectWithExternalDb({ + main: { + type: 'postgres', + connectionString, + } + }); + + const client = dbManager.getClient(dbName); + + const user = await User.create({ primary_email: 'cc-sync@example.com' }); + + await waitForSyncedContactChannel(client, 'cc-sync@example.com'); + + const res = await client.query(`SELECT * FROM "contact_channels" WHERE "value" = $1`, ['cc-sync@example.com']); + expect(res.rows.length).toBe(1); + expect(res.rows[0].user_id).toBe(user.userId); + expect(res.rows[0].is_primary).toBe(true); + }, TEST_TIMEOUT); + + /** + * What it does: + * - Creates a user in a team, deletes the user, and verifies the team_member is gone. + */ + test('Cascade: User delete removes team members from external DB', async () => { + const dbName = 'cascade_user_delete_test'; + const connectionString = await dbManager.createDatabase(dbName); + + await createProjectWithExternalDb({ + main: { + type: 'postgres', + connectionString, + } + }); + + const client = dbManager.getClient(dbName); + + const user = await User.create({ primary_email: 'cascade-user-del@example.com' }); + const createTeamResponse = await niceBackendFetch('/api/v1/teams', { + accessType: 'admin', + method: 'POST', + body: { display_name: 'Cascade User Team' }, + }); + const teamId = createTeamResponse.body.id; + + await niceBackendFetch(`/api/v1/team-memberships/${teamId}/${user.userId}`, { + accessType: 'admin', + method: 'POST', + body: {}, + }); + + await waitForSyncedTeamMember(client, teamId, user.userId); + + // Delete the user — should cascade-delete the team member + await niceBackendFetch(`/api/v1/users/${user.userId}`, { + accessType: 'admin', + method: 'DELETE', + }); + + await waitForSyncedTeamMemberDeletion(client, teamId, user.userId); + }, TEST_TIMEOUT); + + /** + * What it does: + * - Creates a team with a member, deletes the team, and verifies both team and member are gone. + */ + test('Cascade: Team delete removes team and members from external DB', async () => { + const dbName = 'cascade_team_delete_test'; + const connectionString = await dbManager.createDatabase(dbName); + + await createProjectWithExternalDb({ + main: { + type: 'postgres', + connectionString, + } + }); + + const client = dbManager.getClient(dbName); + + const user = await User.create({ primary_email: 'cascade-team-del@example.com' }); + const createTeamResponse = await niceBackendFetch('/api/v1/teams', { + accessType: 'admin', + method: 'POST', + body: { display_name: 'Cascade Team' }, + }); + const teamId = createTeamResponse.body.id; + + await niceBackendFetch(`/api/v1/team-memberships/${teamId}/${user.userId}`, { + accessType: 'admin', + method: 'POST', + body: {}, + }); + + await waitForSyncedTeamMember(client, teamId, user.userId); + await waitForSyncedTeam(client, 'Cascade Team'); + + // Delete the team — should cascade-delete the member too + await niceBackendFetch(`/api/v1/teams/${teamId}`, { + accessType: 'admin', + method: 'DELETE', + }); + + await waitForSyncedTeamDeletion(client, teamId); + await waitForSyncedTeamMemberDeletion(client, teamId, user.userId); + }, TEST_TIMEOUT); + /** * What it does: * - Reads the external DB sync fusebox settings. diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts index 45281add2c..ae830bbe67 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts @@ -150,59 +150,66 @@ export async function waitForCondition( } /** - * Wait for data to appear in external DB (relies on automatic cron job) + * Generic helper to wait for a row to appear or disappear in the external DB. + * Handles the common pattern of catching "table does not exist" (42P01) errors. */ -export async function waitForSyncedData(client: Client, email: string, expectedName?: string) { - +async function waitForExternalDbRow( + client: Client, + query: string, + params: unknown[], + opts: { shouldExist: boolean, description: string, checkRow?: (row: Record) => boolean }, +) { await waitForCondition( async () => { let res; try { - res = await client.query(`SELECT * FROM "users" WHERE "primary_email" = $1`, [email]); + res = await client.query(query, params); } catch (err: any) { if (err && err.code === '42P01') { return false; } throw err; } - if (res.rows.length === 0) { - return false; + if (opts.shouldExist) { + if (res.rows.length === 0) return false; + if (opts.checkRow && !opts.checkRow(res.rows[0])) return false; + return true; } - if (expectedName && res.rows[0].display_name !== expectedName) { - return false; - } - return true; + return res.rows.length === 0; }, { - description: `data for ${email} to appear in external DB`, + description: opts.description, timeoutMs: 180000, intervalMs: 500, } ); } +/** + * Wait for data to appear in external DB (relies on automatic cron job) + */ +export async function waitForSyncedData(client: Client, email: string, expectedName?: string) { + await waitForExternalDbRow( + client, + `SELECT * FROM "users" WHERE "primary_email" = $1`, + [email], + { + shouldExist: true, + description: `data for ${email} to appear in external DB`, + checkRow: expectedName ? (row) => row.display_name === expectedName : undefined, + }, + ); +} + /** * Wait for data to be removed from external DB (relies on automatic cron job) */ export async function waitForSyncedDeletion(client: Client, email: string) { - await waitForCondition( - async () => { - let res; - try { - res = await client.query(`SELECT * FROM "users" WHERE "primary_email" = $1`, [email]); - } catch (err: any) { - if (err && err.code === '42P01') { - return false; - } - throw err; - } - return res.rows.length === 0; - }, - { - description: `data for ${email} to be removed from external DB`, - timeoutMs: 180000, - intervalMs: 500, - } + await waitForExternalDbRow( + client, + `SELECT * FROM "users" WHERE "primary_email" = $1`, + [email], + { shouldExist: false, description: `data for ${email} to be removed from external DB` }, ); } @@ -214,7 +221,7 @@ export async function waitForTable(client: Client, tableName: string) { async () => { const res = await client.query(` SELECT EXISTS ( - SELECT FROM information_schema.tables + SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = $1 ); @@ -265,6 +272,48 @@ export async function countUsersInExternalDb(client: Client): Promise { } } +export async function waitForSyncedTeam(client: Client, displayName: string) { + await waitForExternalDbRow(client, `SELECT * FROM "teams" WHERE "display_name" = $1`, [displayName], { + shouldExist: true, + description: `team "${displayName}" to appear in external DB`, + }); +} + +export async function waitForSyncedTeamDeletion(client: Client, teamId: string) { + await waitForExternalDbRow(client, `SELECT * FROM "teams" WHERE "id" = $1`, [teamId], { + shouldExist: false, + description: `team ${teamId} to be removed from external DB`, + }); +} + +export async function waitForSyncedTeamMember(client: Client, teamId: string, userId: string) { + await waitForExternalDbRow(client, `SELECT * FROM "team_members" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, userId], { + shouldExist: true, + description: `team member (team=${teamId}, user=${userId}) to appear in external DB`, + }); +} + +export async function waitForSyncedTeamMemberDeletion(client: Client, teamId: string, userId: string) { + await waitForExternalDbRow(client, `SELECT * FROM "team_members" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, userId], { + shouldExist: false, + description: `team member (team=${teamId}, user=${userId}) to be removed from external DB`, + }); +} + +export async function waitForSyncedContactChannel(client: Client, value: string) { + await waitForExternalDbRow(client, `SELECT * FROM "contact_channels" WHERE "value" = $1`, [value], { + shouldExist: true, + description: `contact channel "${value}" to appear in external DB`, + }); +} + +export async function waitForSyncedContactChannelDeletion(client: Client, value: string) { + await waitForExternalDbRow(client, `SELECT * FROM "contact_channels" WHERE "value" = $1`, [value], { + shouldExist: false, + description: `contact channel "${value}" to be removed from external DB`, + }); +} + /** * Helper to create a project and update its config with external DB settings. * Tracks the project for cleanup later. diff --git a/packages/stack-shared/src/config/db-sync-mappings.ts b/packages/stack-shared/src/config/db-sync-mappings.ts index 65e839446c..92d6399970 100644 --- a/packages/stack-shared/src/config/db-sync-mappings.ts +++ b/packages/stack-shared/src/config/db-sync-mappings.ts @@ -275,4 +275,561 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { `.trim(), }, }, + "contact_channels": { + sourceTables: { "ContactChannel": "ContactChannel" }, + targetTable: "contact_channels", + targetTableSchemas: { + postgres: ` + CREATE TABLE IF NOT EXISTS "contact_channels" ( + "id" uuid PRIMARY KEY NOT NULL, + "user_id" uuid NOT NULL, + "type" text NOT NULL, + "value" text NOT NULL, + "is_primary" boolean NOT NULL DEFAULT false, + "is_verified" boolean NOT NULL DEFAULT false, + "used_for_auth" boolean NOT NULL DEFAULT false, + "created_at" timestamp without time zone NOT NULL + ); + REVOKE ALL ON "contact_channels" FROM PUBLIC; + GRANT SELECT ON "contact_channels" TO PUBLIC; + + CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" ( + "mapping_name" text PRIMARY KEY NOT NULL, + "last_synced_sequence_id" bigint NOT NULL DEFAULT -1, + "updated_at" timestamp without time zone NOT NULL DEFAULT now() + ); + `.trim(), + clickhouse: ` + CREATE TABLE IF NOT EXISTS analytics_internal.contact_channels ( + project_id String, + branch_id String, + id UUID, + user_id UUID, + type LowCardinality(String), + value String, + is_primary UInt8, + is_verified UInt8, + used_for_auth UInt8, + created_at DateTime64(3, 'UTC'), + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) + ) + ENGINE ReplacingMergeTree(sync_sequence_id) + PARTITION BY toYYYYMM(created_at) + ORDER BY (project_id, branch_id, id); + `.trim(), + }, + internalDbFetchQueries: { + clickhouse: ` + SELECT * + FROM ( + SELECT + "Tenancy"."projectId" AS "project_id", + "Tenancy"."branchId" AS "branch_id", + "ContactChannel"."id" AS "id", + "ContactChannel"."projectUserId" AS "user_id", + "ContactChannel"."type"::text AS "type", + "ContactChannel"."value" AS "value", + CASE WHEN "ContactChannel"."isPrimary" = 'TRUE' THEN true ELSE false END AS "is_primary", + "ContactChannel"."isVerified" AS "is_verified", + CASE WHEN "ContactChannel"."usedForAuth" = 'TRUE' THEN true ELSE false END AS "used_for_auth", + "ContactChannel"."createdAt" AS "created_at", + "ContactChannel"."sequenceId" AS "sync_sequence_id", + "ContactChannel"."tenancyId" AS "tenancyId", + false AS "sync_is_deleted" + FROM "ContactChannel" + JOIN "Tenancy" ON "Tenancy"."id" = "ContactChannel"."tenancyId" + WHERE "ContactChannel"."tenancyId" = $1::uuid + + UNION ALL + + SELECT + "Tenancy"."projectId" AS "project_id", + "Tenancy"."branchId" AS "branch_id", + ("DeletedRow"."primaryKey"->>'id')::uuid AS "id", + ("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id", + NULL::text AS "type", + NULL::text AS "value", + false AS "is_primary", + false AS "is_verified", + false AS "used_for_auth", + "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", + "DeletedRow"."sequenceId" AS "sync_sequence_id", + "DeletedRow"."tenancyId" AS "tenancyId", + true AS "sync_is_deleted" + FROM "DeletedRow" + JOIN "Tenancy" ON "Tenancy"."id" = "DeletedRow"."tenancyId" + WHERE + "DeletedRow"."tenancyId" = $1::uuid + AND "DeletedRow"."tableName" = 'ContactChannel' + ) AS "_src" + WHERE "sync_sequence_id" IS NOT NULL + AND "sync_sequence_id" > $2::bigint + ORDER BY "sync_sequence_id" ASC + LIMIT 1000 + `.trim(), + }, + internalDbFetchQuery: ` + SELECT * + FROM ( + SELECT + "ContactChannel"."id" AS "id", + "ContactChannel"."projectUserId" AS "user_id", + "ContactChannel"."type"::text AS "type", + "ContactChannel"."value" AS "value", + CASE WHEN "ContactChannel"."isPrimary" = 'TRUE' THEN true ELSE false END AS "is_primary", + "ContactChannel"."isVerified" AS "is_verified", + CASE WHEN "ContactChannel"."usedForAuth" = 'TRUE' THEN true ELSE false END AS "used_for_auth", + "ContactChannel"."createdAt" AS "created_at", + "ContactChannel"."sequenceId" AS "sequence_id", + "ContactChannel"."tenancyId", + false AS "is_deleted" + FROM "ContactChannel" + WHERE "ContactChannel"."tenancyId" = $1::uuid + + UNION ALL + + SELECT + ("DeletedRow"."primaryKey"->>'id')::uuid AS "id", + ("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id", + NULL::text AS "type", + NULL::text AS "value", + false AS "is_primary", + false AS "is_verified", + false AS "used_for_auth", + "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", + "DeletedRow"."sequenceId" AS "sequence_id", + "DeletedRow"."tenancyId", + true AS "is_deleted" + FROM "DeletedRow" + WHERE + "DeletedRow"."tenancyId" = $1::uuid + AND "DeletedRow"."tableName" = 'ContactChannel' + ) AS "_src" + WHERE "sequence_id" IS NOT NULL + AND "sequence_id" > $2::bigint + ORDER BY "sequence_id" ASC + LIMIT 1000 + `.trim(), + externalDbUpdateQueries: { + postgres: ` + WITH params AS ( + SELECT + $1::uuid AS "id", + $2::uuid AS "user_id", + $3::text AS "type", + $4::text AS "value", + $5::boolean AS "is_primary", + $6::boolean AS "is_verified", + $7::boolean AS "used_for_auth", + $8::timestamp without time zone AS "created_at", + $9::bigint AS "sequence_id", + $10::boolean AS "is_deleted", + $11::text AS "mapping_name" + ), + deleted AS ( + DELETE FROM "contact_channels" c + USING params p + WHERE p."is_deleted" = true AND c."id" = p."id" + RETURNING 1 + ), + upserted AS ( + INSERT INTO "contact_channels" ( + "id", + "user_id", + "type", + "value", + "is_primary", + "is_verified", + "used_for_auth", + "created_at" + ) + SELECT + p."id", + p."user_id", + p."type", + p."value", + p."is_primary", + p."is_verified", + p."used_for_auth", + p."created_at" + FROM params p + WHERE p."is_deleted" = false + ON CONFLICT ("id") DO UPDATE SET + "user_id" = EXCLUDED."user_id", + "type" = EXCLUDED."type", + "value" = EXCLUDED."value", + "is_primary" = EXCLUDED."is_primary", + "is_verified" = EXCLUDED."is_verified", + "used_for_auth" = EXCLUDED."used_for_auth", + "created_at" = EXCLUDED."created_at" + RETURNING 1 + ) + INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at") + SELECT p."mapping_name", p."sequence_id", now() FROM params p + ON CONFLICT ("mapping_name") DO UPDATE SET + "last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"), + "updated_at" = now(); + `.trim(), + }, + }, + "teams": { + sourceTables: { "Team": "Team" }, + targetTable: "teams", + targetTableSchemas: { + postgres: ` + CREATE TABLE IF NOT EXISTS "teams" ( + "id" uuid PRIMARY KEY NOT NULL, + "display_name" text NOT NULL, + "profile_image_url" text, + "created_at" timestamp without time zone NOT NULL, + "client_metadata" jsonb NOT NULL DEFAULT '{}'::jsonb, + "client_read_only_metadata" jsonb NOT NULL DEFAULT '{}'::jsonb, + "server_metadata" jsonb NOT NULL DEFAULT '{}'::jsonb + ); + REVOKE ALL ON "teams" FROM PUBLIC; + GRANT SELECT ON "teams" TO PUBLIC; + + CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" ( + "mapping_name" text PRIMARY KEY NOT NULL, + "last_synced_sequence_id" bigint NOT NULL DEFAULT -1, + "updated_at" timestamp without time zone NOT NULL DEFAULT now() + ); + `.trim(), + clickhouse: ` + CREATE TABLE IF NOT EXISTS analytics_internal.teams ( + project_id String, + branch_id String, + id UUID, + display_name String, + profile_image_url Nullable(String), + created_at DateTime64(3, 'UTC'), + client_metadata String, + client_read_only_metadata String, + server_metadata String, + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) + ) + ENGINE ReplacingMergeTree(sync_sequence_id) + PARTITION BY toYYYYMM(created_at) + ORDER BY (project_id, branch_id, id); + `.trim(), + }, + internalDbFetchQueries: { + clickhouse: ` + SELECT * + FROM ( + SELECT + "Tenancy"."projectId" AS "project_id", + "Tenancy"."branchId" AS "branch_id", + "Team"."teamId" AS "id", + "Team"."displayName" AS "display_name", + "Team"."profileImageUrl" AS "profile_image_url", + "Team"."createdAt" AS "created_at", + COALESCE("Team"."clientMetadata", '{}'::jsonb) AS "client_metadata", + COALESCE("Team"."clientReadOnlyMetadata", '{}'::jsonb) AS "client_read_only_metadata", + COALESCE("Team"."serverMetadata", '{}'::jsonb) AS "server_metadata", + "Team"."sequenceId" AS "sync_sequence_id", + "Team"."tenancyId" AS "tenancyId", + false AS "sync_is_deleted" + FROM "Team" + JOIN "Tenancy" ON "Tenancy"."id" = "Team"."tenancyId" + WHERE "Team"."tenancyId" = $1::uuid + + UNION ALL + + SELECT + "Tenancy"."projectId" AS "project_id", + "Tenancy"."branchId" AS "branch_id", + ("DeletedRow"."primaryKey"->>'teamId')::uuid AS "id", + NULL::text AS "display_name", + NULL::text AS "profile_image_url", + "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", + '{}'::jsonb AS "client_metadata", + '{}'::jsonb AS "client_read_only_metadata", + '{}'::jsonb AS "server_metadata", + "DeletedRow"."sequenceId" AS "sync_sequence_id", + "DeletedRow"."tenancyId" AS "tenancyId", + true AS "sync_is_deleted" + FROM "DeletedRow" + JOIN "Tenancy" ON "Tenancy"."id" = "DeletedRow"."tenancyId" + WHERE + "DeletedRow"."tenancyId" = $1::uuid + AND "DeletedRow"."tableName" = 'Team' + ) AS "_src" + WHERE "sync_sequence_id" IS NOT NULL + AND "sync_sequence_id" > $2::bigint + ORDER BY "sync_sequence_id" ASC + LIMIT 1000 + `.trim(), + }, + internalDbFetchQuery: ` + SELECT * + FROM ( + SELECT + "Team"."teamId" AS "id", + "Team"."displayName" AS "display_name", + "Team"."profileImageUrl" AS "profile_image_url", + "Team"."createdAt" AS "created_at", + COALESCE("Team"."clientMetadata", '{}'::jsonb) AS "client_metadata", + COALESCE("Team"."clientReadOnlyMetadata", '{}'::jsonb) AS "client_read_only_metadata", + COALESCE("Team"."serverMetadata", '{}'::jsonb) AS "server_metadata", + "Team"."sequenceId" AS "sequence_id", + "Team"."tenancyId", + false AS "is_deleted" + FROM "Team" + WHERE "Team"."tenancyId" = $1::uuid + + UNION ALL + + SELECT + ("DeletedRow"."primaryKey"->>'teamId')::uuid AS "id", + NULL::text AS "display_name", + NULL::text AS "profile_image_url", + "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", + '{}'::jsonb AS "client_metadata", + '{}'::jsonb AS "client_read_only_metadata", + '{}'::jsonb AS "server_metadata", + "DeletedRow"."sequenceId" AS "sequence_id", + "DeletedRow"."tenancyId", + true AS "is_deleted" + FROM "DeletedRow" + WHERE + "DeletedRow"."tenancyId" = $1::uuid + AND "DeletedRow"."tableName" = 'Team' + ) AS "_src" + WHERE "sequence_id" IS NOT NULL + AND "sequence_id" > $2::bigint + ORDER BY "sequence_id" ASC + LIMIT 1000 + `.trim(), + externalDbUpdateQueries: { + postgres: ` + WITH params AS ( + SELECT + $1::uuid AS "id", + $2::text AS "display_name", + $3::text AS "profile_image_url", + $4::timestamp without time zone AS "created_at", + $5::jsonb AS "client_metadata", + $6::jsonb AS "client_read_only_metadata", + $7::jsonb AS "server_metadata", + $8::bigint AS "sequence_id", + $9::boolean AS "is_deleted", + $10::text AS "mapping_name" + ), + deleted AS ( + DELETE FROM "teams" t + USING params p + WHERE p."is_deleted" = true AND t."id" = p."id" + RETURNING 1 + ), + upserted AS ( + INSERT INTO "teams" ( + "id", + "display_name", + "profile_image_url", + "created_at", + "client_metadata", + "client_read_only_metadata", + "server_metadata" + ) + SELECT + p."id", + p."display_name", + p."profile_image_url", + p."created_at", + p."client_metadata", + p."client_read_only_metadata", + p."server_metadata" + FROM params p + WHERE p."is_deleted" = false + ON CONFLICT ("id") DO UPDATE SET + "display_name" = EXCLUDED."display_name", + "profile_image_url" = EXCLUDED."profile_image_url", + "created_at" = EXCLUDED."created_at", + "client_metadata" = EXCLUDED."client_metadata", + "client_read_only_metadata" = EXCLUDED."client_read_only_metadata", + "server_metadata" = EXCLUDED."server_metadata" + RETURNING 1 + ) + INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at") + SELECT p."mapping_name", p."sequence_id", now() FROM params p + ON CONFLICT ("mapping_name") DO UPDATE SET + "last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"), + "updated_at" = now(); + `.trim(), + }, + }, + "team_members": { + sourceTables: { "TeamMember": "TeamMember" }, + targetTable: "team_members", + targetTableSchemas: { + postgres: ` + CREATE TABLE IF NOT EXISTS "team_members" ( + "team_id" uuid NOT NULL, + "user_id" uuid NOT NULL, + "display_name" text, + "profile_image_url" text, + "created_at" timestamp without time zone NOT NULL, + PRIMARY KEY ("team_id", "user_id") + ); + REVOKE ALL ON "team_members" FROM PUBLIC; + GRANT SELECT ON "team_members" TO PUBLIC; + + CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" ( + "mapping_name" text PRIMARY KEY NOT NULL, + "last_synced_sequence_id" bigint NOT NULL DEFAULT -1, + "updated_at" timestamp without time zone NOT NULL DEFAULT now() + ); + `.trim(), + clickhouse: ` + CREATE TABLE IF NOT EXISTS analytics_internal.team_members ( + project_id String, + branch_id String, + team_id UUID, + user_id UUID, + display_name Nullable(String), + profile_image_url Nullable(String), + created_at DateTime64(3, 'UTC'), + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) + ) + ENGINE ReplacingMergeTree(sync_sequence_id) + PARTITION BY toYYYYMM(created_at) + ORDER BY (project_id, branch_id, team_id, user_id); + `.trim(), + }, + internalDbFetchQueries: { + clickhouse: ` + SELECT * + FROM ( + SELECT + "Tenancy"."projectId" AS "project_id", + "Tenancy"."branchId" AS "branch_id", + "TeamMember"."teamId" AS "team_id", + "TeamMember"."projectUserId" AS "user_id", + "TeamMember"."displayName" AS "display_name", + "TeamMember"."profileImageUrl" AS "profile_image_url", + "TeamMember"."createdAt" AS "created_at", + "TeamMember"."sequenceId" AS "sync_sequence_id", + "TeamMember"."tenancyId" AS "tenancyId", + false AS "sync_is_deleted" + FROM "TeamMember" + JOIN "Tenancy" ON "Tenancy"."id" = "TeamMember"."tenancyId" + WHERE "TeamMember"."tenancyId" = $1::uuid + + UNION ALL + + SELECT + "Tenancy"."projectId" AS "project_id", + "Tenancy"."branchId" AS "branch_id", + ("DeletedRow"."primaryKey"->>'teamId')::uuid AS "team_id", + ("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id", + NULL::text AS "display_name", + NULL::text AS "profile_image_url", + "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", + "DeletedRow"."sequenceId" AS "sync_sequence_id", + "DeletedRow"."tenancyId" AS "tenancyId", + true AS "sync_is_deleted" + FROM "DeletedRow" + JOIN "Tenancy" ON "Tenancy"."id" = "DeletedRow"."tenancyId" + WHERE + "DeletedRow"."tenancyId" = $1::uuid + AND "DeletedRow"."tableName" = 'TeamMember' + ) AS "_src" + WHERE "sync_sequence_id" IS NOT NULL + AND "sync_sequence_id" > $2::bigint + ORDER BY "sync_sequence_id" ASC + LIMIT 1000 + `.trim(), + }, + internalDbFetchQuery: ` + SELECT * + FROM ( + SELECT + "TeamMember"."teamId" AS "team_id", + "TeamMember"."projectUserId" AS "user_id", + "TeamMember"."displayName" AS "display_name", + "TeamMember"."profileImageUrl" AS "profile_image_url", + "TeamMember"."createdAt" AS "created_at", + "TeamMember"."sequenceId" AS "sequence_id", + "TeamMember"."tenancyId", + false AS "is_deleted" + FROM "TeamMember" + WHERE "TeamMember"."tenancyId" = $1::uuid + + UNION ALL + + SELECT + ("DeletedRow"."primaryKey"->>'teamId')::uuid AS "team_id", + ("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id", + NULL::text AS "display_name", + NULL::text AS "profile_image_url", + "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", + "DeletedRow"."sequenceId" AS "sequence_id", + "DeletedRow"."tenancyId", + true AS "is_deleted" + FROM "DeletedRow" + WHERE + "DeletedRow"."tenancyId" = $1::uuid + AND "DeletedRow"."tableName" = 'TeamMember' + ) AS "_src" + WHERE "sequence_id" IS NOT NULL + AND "sequence_id" > $2::bigint + ORDER BY "sequence_id" ASC + LIMIT 1000 + `.trim(), + externalDbUpdateQueries: { + postgres: ` + WITH params AS ( + SELECT + $1::uuid AS "team_id", + $2::uuid AS "user_id", + $3::text AS "display_name", + $4::text AS "profile_image_url", + $5::timestamp without time zone AS "created_at", + $6::bigint AS "sequence_id", + $7::boolean AS "is_deleted", + $8::text AS "mapping_name" + ), + deleted AS ( + DELETE FROM "team_members" tm + USING params p + WHERE p."is_deleted" = true AND tm."team_id" = p."team_id" AND tm."user_id" = p."user_id" + RETURNING 1 + ), + upserted AS ( + INSERT INTO "team_members" ( + "team_id", + "user_id", + "display_name", + "profile_image_url", + "created_at" + ) + SELECT + p."team_id", + p."user_id", + p."display_name", + p."profile_image_url", + p."created_at" + FROM params p + WHERE p."is_deleted" = false + ON CONFLICT ("team_id", "user_id") DO UPDATE SET + "display_name" = EXCLUDED."display_name", + "profile_image_url" = EXCLUDED."profile_image_url", + "created_at" = EXCLUDED."created_at" + RETURNING 1 + ) + INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at") + SELECT p."mapping_name", p."sequence_id", now() FROM params p + ON CONFLICT ("mapping_name") DO UPDATE SET + "last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"), + "updated_at" = now(); + `.trim(), + }, + }, } as const; From e23c673d66e94e4e9c1fd0c0d04d09b030e471b6 Mon Sep 17 00:00:00 2001 From: Bilal Godil Date: Tue, 17 Mar 2026 11:09:50 -0700 Subject: [PATCH 2/3] team member profile clickhouse table --- apps/backend/scripts/clickhouse-migrations.ts | 20 ++-- .../external-db-sync/sequencer/route.ts | 18 ++++ apps/backend/src/lib/external-db-sync.ts | 2 +- .../endpoints/api/v1/analytics-query.test.ts | 21 ++++ .../api/v1/external-db-sync-basics.test.ts | 2 +- .../api/v1/external-db-sync-utils.ts | 4 +- .../src/config/db-sync-mappings.ts | 102 +++++++++++++++--- 7 files changed, 143 insertions(+), 26 deletions(-) diff --git a/apps/backend/scripts/clickhouse-migrations.ts b/apps/backend/scripts/clickhouse-migrations.ts index f0b533c034..55e1f86082 100644 --- a/apps/backend/scripts/clickhouse-migrations.ts +++ b/apps/backend/scripts/clickhouse-migrations.ts @@ -20,8 +20,8 @@ export async function runClickhouseMigrations() { await client.exec({ query: CONTACT_CHANNELS_VIEW_SQL }); await client.exec({ query: TEAMS_TABLE_BASE_SQL }); await client.exec({ query: TEAMS_VIEW_SQL }); - await client.exec({ query: TEAM_MEMBERS_TABLE_BASE_SQL }); - await client.exec({ query: TEAM_MEMBERS_VIEW_SQL }); + await client.exec({ query: TEAM_MEMBER_PROFILES_TABLE_BASE_SQL }); + await client.exec({ query: TEAM_MEMBER_PROFILES_VIEW_SQL }); await client.exec({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL }); await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL }); await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL }); @@ -35,7 +35,7 @@ export async function runClickhouseMigrations() { "GRANT SELECT ON default.users TO limited_user;", "GRANT SELECT ON default.contact_channels TO limited_user;", "GRANT SELECT ON default.teams TO limited_user;", - "GRANT SELECT ON default.team_members TO limited_user;", + "GRANT SELECT ON default.team_member_profiles TO limited_user;", ]; await client.exec({ query: "CREATE ROW POLICY IF NOT EXISTS events_project_isolation ON default.events FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", @@ -50,7 +50,7 @@ export async function runClickhouseMigrations() { query: "CREATE ROW POLICY IF NOT EXISTS teams_project_isolation ON default.teams FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", }); await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS team_members_project_isolation ON default.team_members FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", + query: "CREATE ROW POLICY IF NOT EXISTS team_member_profiles_project_isolation ON default.team_member_profiles FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", }); for (const query of queries) { await client.exec({ query }); @@ -295,14 +295,15 @@ FINAL WHERE sync_is_deleted = 0; `; -const TEAM_MEMBERS_TABLE_BASE_SQL = ` -CREATE TABLE IF NOT EXISTS analytics_internal.team_members ( +const TEAM_MEMBER_PROFILES_TABLE_BASE_SQL = ` +CREATE TABLE IF NOT EXISTS analytics_internal.team_member_profiles ( project_id String, branch_id String, team_id UUID, user_id UUID, display_name Nullable(String), profile_image_url Nullable(String), + user JSON, created_at DateTime64(3, 'UTC'), sync_sequence_id Int64, sync_is_deleted UInt8, @@ -313,8 +314,8 @@ PARTITION BY toYYYYMM(created_at) ORDER BY (project_id, branch_id, team_id, user_id); `; -const TEAM_MEMBERS_VIEW_SQL = ` -CREATE OR REPLACE VIEW default.team_members +const TEAM_MEMBER_PROFILES_VIEW_SQL = ` +CREATE OR REPLACE VIEW default.team_member_profiles SQL SECURITY DEFINER AS SELECT @@ -324,8 +325,9 @@ SELECT user_id, display_name, profile_image_url, + user, created_at -FROM analytics_internal.team_members +FROM analytics_internal.team_member_profiles FINAL WHERE sync_is_deleted = 0; `; diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts index 78bbb99ffd..c5b190f94d 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts @@ -1,5 +1,6 @@ import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata"; import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue"; +import { Prisma } from "@/generated/prisma/client"; import { globalPrismaClient } from "@/prisma-client"; import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; import { traceSpan } from "@/utils/telemetry"; @@ -78,6 +79,23 @@ async function backfillSequenceIds(batchSize: number): Promise { if (projectUserTenants.length > 0) { await enqueueExternalDbSyncBatch(projectUserTenants.map(t => t.tenancyId)); didUpdate = true; + + // Cascade: when a user changes, mark their TeamMember rows for re-sync + // so the embedded user JSON in team_member_profiles stays fresh + await globalPrismaClient.$executeRaw` + UPDATE "TeamMember" + SET "shouldUpdateSequenceId" = TRUE + FROM ( + SELECT DISTINCT "tenancyId", "projectUserId" + FROM "ProjectUser" + WHERE "tenancyId" IN (${Prisma.join(projectUserTenants.map(t => t.tenancyId))}) + AND "shouldUpdateSequenceId" = FALSE + AND "sequenceId" IS NOT NULL + ) AS changed_users + WHERE "TeamMember"."tenancyId" = changed_users."tenancyId" + AND "TeamMember"."projectUserId" = changed_users."projectUserId" + AND "TeamMember"."shouldUpdateSequenceId" = FALSE + `; } const contactChannelTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` diff --git a/apps/backend/src/lib/external-db-sync.ts b/apps/backend/src/lib/external-db-sync.ts index bd9b0b1cab..966f822aaf 100644 --- a/apps/backend/src/lib/external-db-sync.ts +++ b/apps/backend/src/lib/external-db-sync.ts @@ -552,7 +552,7 @@ const CLICKHOUSE_COLUMN_NORMALIZERS: Record { { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "REVOKE TABLE ENGINE ON SQLite FROM limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "REVOKE TABLE ENGINE ON URL FROM limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW DATABASES ON default.* TO limited_user" }, + { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.contact_channels TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.events TO limited_user" }, + { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.team_member_profiles TO limited_user" }, + { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.teams TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.users TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SELECT ON system.aggregate_function_combinators TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SELECT ON system.collations TO limited_user" }, @@ -561,10 +564,22 @@ it("can see only some tables", async ({ expect }) => { "status": 200, "body": { "result": [ + { + "database": "default", + "name": "contact_channels", + }, { "database": "default", "name": "events", }, + { + "database": "default", + "name": "team_member_profiles", + }, + { + "database": "default", + "name": "teams", + }, { "database": "default", "name": "users", @@ -586,7 +601,10 @@ it("SHOW TABLES should have the correct tables", async ({ expect }) => { "status": 200, "body": { "result": [ + { "name": "contact_channels" }, { "name": "events" }, + { "name": "team_member_profiles" }, + { "name": "teams" }, { "name": "users" }, ], }, @@ -1068,7 +1086,10 @@ it("shows grants", async ({ expect }) => { "status": 200, "body": { "result": [ + { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.contact_channels TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.events TO limited_user" }, + { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.team_member_profiles TO limited_user" }, + { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.teams TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.users TO limited_user" }, ], }, diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts index 4c7bceee8d..b912b64215 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts @@ -711,7 +711,7 @@ describe.sequential('External DB Sync - Basic Tests', () => { await waitForSyncedTeamMember(client, teamId, user.userId); - const res1 = await client.query(`SELECT * FROM "team_members" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, user.userId]); + const res1 = await client.query(`SELECT * FROM "team_member_profiles" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, user.userId]); expect(res1.rows.length).toBe(1); // Remove member diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts index ae830bbe67..59b855373f 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts @@ -287,14 +287,14 @@ export async function waitForSyncedTeamDeletion(client: Client, teamId: string) } export async function waitForSyncedTeamMember(client: Client, teamId: string, userId: string) { - await waitForExternalDbRow(client, `SELECT * FROM "team_members" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, userId], { + await waitForExternalDbRow(client, `SELECT * FROM "team_member_profiles" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, userId], { shouldExist: true, description: `team member (team=${teamId}, user=${userId}) to appear in external DB`, }); } export async function waitForSyncedTeamMemberDeletion(client: Client, teamId: string, userId: string) { - await waitForExternalDbRow(client, `SELECT * FROM "team_members" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, userId], { + await waitForExternalDbRow(client, `SELECT * FROM "team_member_profiles" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, userId], { shouldExist: false, description: `team member (team=${teamId}, user=${userId}) to be removed from external DB`, }); diff --git a/packages/stack-shared/src/config/db-sync-mappings.ts b/packages/stack-shared/src/config/db-sync-mappings.ts index 92d6399970..f00b4ed0ae 100644 --- a/packages/stack-shared/src/config/db-sync-mappings.ts +++ b/packages/stack-shared/src/config/db-sync-mappings.ts @@ -663,21 +663,22 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { `.trim(), }, }, - "team_members": { - sourceTables: { "TeamMember": "TeamMember" }, - targetTable: "team_members", + "team_member_profiles": { + sourceTables: { "TeamMember": "TeamMember", "ProjectUser": "ProjectUser" }, + targetTable: "team_member_profiles", targetTableSchemas: { postgres: ` - CREATE TABLE IF NOT EXISTS "team_members" ( + CREATE TABLE IF NOT EXISTS "team_member_profiles" ( "team_id" uuid NOT NULL, "user_id" uuid NOT NULL, "display_name" text, "profile_image_url" text, + "user" jsonb NOT NULL DEFAULT '{}'::jsonb, "created_at" timestamp without time zone NOT NULL, PRIMARY KEY ("team_id", "user_id") ); - REVOKE ALL ON "team_members" FROM PUBLIC; - GRANT SELECT ON "team_members" TO PUBLIC; + REVOKE ALL ON "team_member_profiles" FROM PUBLIC; + GRANT SELECT ON "team_member_profiles" TO PUBLIC; CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" ( "mapping_name" text PRIMARY KEY NOT NULL, @@ -686,13 +687,14 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { ); `.trim(), clickhouse: ` - CREATE TABLE IF NOT EXISTS analytics_internal.team_members ( + CREATE TABLE IF NOT EXISTS analytics_internal.team_member_profiles ( project_id String, branch_id String, team_id UUID, user_id UUID, display_name Nullable(String), profile_image_url Nullable(String), + user JSON, created_at DateTime64(3, 'UTC'), sync_sequence_id Int64, sync_is_deleted UInt8, @@ -714,12 +716,46 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { "TeamMember"."projectUserId" AS "user_id", "TeamMember"."displayName" AS "display_name", "TeamMember"."profileImageUrl" AS "profile_image_url", + jsonb_build_object( + 'id', "ProjectUser"."projectUserId", + 'display_name', "ProjectUser"."displayName", + 'primary_email', ( + SELECT "ContactChannel"."value" + FROM "ContactChannel" + WHERE "ContactChannel"."projectUserId" = "ProjectUser"."projectUserId" + AND "ContactChannel"."tenancyId" = "ProjectUser"."tenancyId" + AND "ContactChannel"."type" = 'EMAIL' + AND "ContactChannel"."isPrimary" = 'TRUE' + LIMIT 1 + ), + 'primary_email_verified', COALESCE( + ( + SELECT "ContactChannel"."isVerified" + FROM "ContactChannel" + WHERE "ContactChannel"."projectUserId" = "ProjectUser"."projectUserId" + AND "ContactChannel"."tenancyId" = "ProjectUser"."tenancyId" + AND "ContactChannel"."type" = 'EMAIL' + AND "ContactChannel"."isPrimary" = 'TRUE' + LIMIT 1 + ), + false + ), + 'profile_image_url', "ProjectUser"."profileImageUrl", + 'signed_up_at_millis', EXTRACT(EPOCH FROM "ProjectUser"."createdAt") * 1000, + 'client_metadata', COALESCE("ProjectUser"."clientMetadata", '{}'::jsonb), + 'client_read_only_metadata', COALESCE("ProjectUser"."clientReadOnlyMetadata", '{}'::jsonb), + 'server_metadata', COALESCE("ProjectUser"."serverMetadata", '{}'::jsonb), + 'is_anonymous', "ProjectUser"."isAnonymous", + 'last_active_at_millis', CASE WHEN "ProjectUser"."lastActiveAt" IS NOT NULL THEN EXTRACT(EPOCH FROM "ProjectUser"."lastActiveAt") * 1000 ELSE NULL END + ) AS "user", "TeamMember"."createdAt" AS "created_at", "TeamMember"."sequenceId" AS "sync_sequence_id", "TeamMember"."tenancyId" AS "tenancyId", false AS "sync_is_deleted" FROM "TeamMember" JOIN "Tenancy" ON "Tenancy"."id" = "TeamMember"."tenancyId" + JOIN "ProjectUser" ON "ProjectUser"."projectUserId" = "TeamMember"."projectUserId" + AND "ProjectUser"."tenancyId" = "TeamMember"."tenancyId" WHERE "TeamMember"."tenancyId" = $1::uuid UNION ALL @@ -731,6 +767,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { ("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id", NULL::text AS "display_name", NULL::text AS "profile_image_url", + '{}'::jsonb AS "user", "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", "DeletedRow"."sequenceId" AS "sync_sequence_id", "DeletedRow"."tenancyId" AS "tenancyId", @@ -755,11 +792,45 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { "TeamMember"."projectUserId" AS "user_id", "TeamMember"."displayName" AS "display_name", "TeamMember"."profileImageUrl" AS "profile_image_url", + jsonb_build_object( + 'id', "ProjectUser"."projectUserId", + 'display_name', "ProjectUser"."displayName", + 'primary_email', ( + SELECT "ContactChannel"."value" + FROM "ContactChannel" + WHERE "ContactChannel"."projectUserId" = "ProjectUser"."projectUserId" + AND "ContactChannel"."tenancyId" = "ProjectUser"."tenancyId" + AND "ContactChannel"."type" = 'EMAIL' + AND "ContactChannel"."isPrimary" = 'TRUE' + LIMIT 1 + ), + 'primary_email_verified', COALESCE( + ( + SELECT "ContactChannel"."isVerified" + FROM "ContactChannel" + WHERE "ContactChannel"."projectUserId" = "ProjectUser"."projectUserId" + AND "ContactChannel"."tenancyId" = "ProjectUser"."tenancyId" + AND "ContactChannel"."type" = 'EMAIL' + AND "ContactChannel"."isPrimary" = 'TRUE' + LIMIT 1 + ), + false + ), + 'profile_image_url', "ProjectUser"."profileImageUrl", + 'signed_up_at_millis', EXTRACT(EPOCH FROM "ProjectUser"."createdAt") * 1000, + 'client_metadata', COALESCE("ProjectUser"."clientMetadata", '{}'::jsonb), + 'client_read_only_metadata', COALESCE("ProjectUser"."clientReadOnlyMetadata", '{}'::jsonb), + 'server_metadata', COALESCE("ProjectUser"."serverMetadata", '{}'::jsonb), + 'is_anonymous', "ProjectUser"."isAnonymous", + 'last_active_at_millis', CASE WHEN "ProjectUser"."lastActiveAt" IS NOT NULL THEN EXTRACT(EPOCH FROM "ProjectUser"."createdAt") * 1000 ELSE NULL END + ) AS "user", "TeamMember"."createdAt" AS "created_at", "TeamMember"."sequenceId" AS "sequence_id", "TeamMember"."tenancyId", false AS "is_deleted" FROM "TeamMember" + JOIN "ProjectUser" ON "ProjectUser"."projectUserId" = "TeamMember"."projectUserId" + AND "ProjectUser"."tenancyId" = "TeamMember"."tenancyId" WHERE "TeamMember"."tenancyId" = $1::uuid UNION ALL @@ -769,6 +840,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { ("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id", NULL::text AS "display_name", NULL::text AS "profile_image_url", + '{}'::jsonb AS "user", "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", "DeletedRow"."sequenceId" AS "sequence_id", "DeletedRow"."tenancyId", @@ -791,23 +863,25 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { $2::uuid AS "user_id", $3::text AS "display_name", $4::text AS "profile_image_url", - $5::timestamp without time zone AS "created_at", - $6::bigint AS "sequence_id", - $7::boolean AS "is_deleted", - $8::text AS "mapping_name" + $5::jsonb AS "user", + $6::timestamp without time zone AS "created_at", + $7::bigint AS "sequence_id", + $8::boolean AS "is_deleted", + $9::text AS "mapping_name" ), deleted AS ( - DELETE FROM "team_members" tm + DELETE FROM "team_member_profiles" tm USING params p WHERE p."is_deleted" = true AND tm."team_id" = p."team_id" AND tm."user_id" = p."user_id" RETURNING 1 ), upserted AS ( - INSERT INTO "team_members" ( + INSERT INTO "team_member_profiles" ( "team_id", "user_id", "display_name", "profile_image_url", + "user", "created_at" ) SELECT @@ -815,12 +889,14 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { p."user_id", p."display_name", p."profile_image_url", + p."user", p."created_at" FROM params p WHERE p."is_deleted" = false ON CONFLICT ("team_id", "user_id") DO UPDATE SET "display_name" = EXCLUDED."display_name", "profile_image_url" = EXCLUDED."profile_image_url", + "user" = EXCLUDED."user", "created_at" = EXCLUDED."created_at" RETURNING 1 ) From 1e1148c4f3662c4c918288c6cba4c94108e230ab Mon Sep 17 00:00:00 2001 From: Bilal Godil Date: Thu, 19 Mar 2026 13:38:26 -0700 Subject: [PATCH 3/3] Fix last_active_at_millis extracting from wrong column Was using createdAt instead of lastActiveAt in the Postgres variant of the team_member_profiles sync query. --- packages/stack-shared/src/config/db-sync-mappings.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/stack-shared/src/config/db-sync-mappings.ts b/packages/stack-shared/src/config/db-sync-mappings.ts index f00b4ed0ae..3dc6c68e28 100644 --- a/packages/stack-shared/src/config/db-sync-mappings.ts +++ b/packages/stack-shared/src/config/db-sync-mappings.ts @@ -822,7 +822,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { 'client_read_only_metadata', COALESCE("ProjectUser"."clientReadOnlyMetadata", '{}'::jsonb), 'server_metadata', COALESCE("ProjectUser"."serverMetadata", '{}'::jsonb), 'is_anonymous', "ProjectUser"."isAnonymous", - 'last_active_at_millis', CASE WHEN "ProjectUser"."lastActiveAt" IS NOT NULL THEN EXTRACT(EPOCH FROM "ProjectUser"."createdAt") * 1000 ELSE NULL END + 'last_active_at_millis', CASE WHEN "ProjectUser"."lastActiveAt" IS NOT NULL THEN EXTRACT(EPOCH FROM "ProjectUser"."lastActiveAt") * 1000 ELSE NULL END ) AS "user", "TeamMember"."createdAt" AS "created_at", "TeamMember"."sequenceId" AS "sequence_id",