Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- AlterTable
ALTER TABLE "Team" ADD COLUMN "sequenceId" BIGINT,
ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true;

-- CreateIndex
CREATE UNIQUE INDEX "Team_sequenceId_key" ON "Team"("sequenceId");

-- CreateIndex
CREATE INDEX "Team_tenancyId_sequenceId_idx" ON "Team"("tenancyId", "sequenceId");

-- CreateIndex
CREATE INDEX "Team_shouldUpdateSequenceId_idx" ON "Team"("shouldUpdateSequenceId", "tenancyId");

-- AlterTable
ALTER TABLE "TeamMember" ADD COLUMN "sequenceId" BIGINT,
ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true;

-- CreateIndex
CREATE UNIQUE INDEX "TeamMember_sequenceId_key" ON "TeamMember"("sequenceId");

-- CreateIndex
CREATE INDEX "TeamMember_tenancyId_sequenceId_idx" ON "TeamMember"("tenancyId", "sequenceId");

-- CreateIndex
CREATE INDEX "TeamMember_shouldUpdateSequenceId_idx" ON "TeamMember"("shouldUpdateSequenceId", "tenancyId");
10 changes: 10 additions & 0 deletions apps/backend/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,16 @@ model Team {
serverMetadata Json?
profileImageUrl String?

sequenceId BigInt? @unique
shouldUpdateSequenceId Boolean @default(true)

teamMembers TeamMember[]
projectApiKey ProjectApiKey[]

@@id([tenancyId, teamId])
@@unique([mirroredProjectId, mirroredBranchId, teamId])
@@index([tenancyId, sequenceId], name: "Team_tenancyId_sequenceId_idx")
@@index([shouldUpdateSequenceId, tenancyId], name: "Team_shouldUpdateSequenceId_idx")
}

// This is used for fields that are boolean but only the true value is part of a unique constraint.
Expand All @@ -205,6 +210,9 @@ model TeamMember {
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

sequenceId BigInt? @unique
shouldUpdateSequenceId Boolean @default(true)

projectUser ProjectUser @relation(fields: [tenancyId, projectUserId], references: [tenancyId, projectUserId], onDelete: Cascade)
team Team @relation(fields: [tenancyId, teamId], references: [tenancyId, teamId], onDelete: Cascade)
isSelected BooleanTrue?
Expand All @@ -213,6 +221,8 @@ model TeamMember {
@@id([tenancyId, projectUserId, teamId])
@@unique([tenancyId, projectUserId, isSelected])
@@index([tenancyId, projectUserId, isSelected], map: "TeamMember_projectUserId_isSelected_idx")
@@index([tenancyId, sequenceId], name: "TeamMember_tenancyId_sequenceId_idx")
@@index([shouldUpdateSequenceId, tenancyId], name: "TeamMember_shouldUpdateSequenceId_idx")
}

model ProjectUserDirectPermission {
Expand Down
135 changes: 135 additions & 0 deletions apps/backend/scripts/clickhouse-migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ export async function runClickhouseMigrations() {
await client.exec({ query: EVENTS_VIEW_SQL });
await client.exec({ query: USERS_TABLE_BASE_SQL });
await client.exec({ query: USERS_VIEW_SQL });
await client.exec({ query: CONTACT_CHANNELS_TABLE_BASE_SQL });
await client.exec({ query: CONTACT_CHANNELS_VIEW_SQL });
await client.exec({ query: TEAMS_TABLE_BASE_SQL });
await client.exec({ query: TEAMS_VIEW_SQL });
await client.exec({ query: TEAM_MEMBER_PROFILES_TABLE_BASE_SQL });
await client.exec({ query: TEAM_MEMBER_PROFILES_VIEW_SQL });
await client.exec({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL });
await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL });
await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL });
Expand All @@ -27,13 +33,25 @@ export async function runClickhouseMigrations() {
"REVOKE ALL FROM limited_user;",
"GRANT SELECT ON default.events TO limited_user;",
"GRANT SELECT ON default.users TO limited_user;",
"GRANT SELECT ON default.contact_channels TO limited_user;",
"GRANT SELECT ON default.teams TO limited_user;",
"GRANT SELECT ON default.team_member_profiles TO limited_user;",
];
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS events_project_isolation ON default.events FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS users_project_isolation ON default.users FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS contact_channels_project_isolation ON default.contact_channels FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS teams_project_isolation ON default.teams FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS team_member_profiles_project_isolation ON default.team_member_profiles FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
for (const query of queries) {
await client.exec({ query });
}
Expand Down Expand Up @@ -197,6 +215,123 @@ WHERE event_type = '$token-refresh'
AND data.refresh_token_id::Nullable(String) IS NOT NULL;
`;

const CONTACT_CHANNELS_TABLE_BASE_SQL = `
CREATE TABLE IF NOT EXISTS analytics_internal.contact_channels (
project_id String,
branch_id String,
id UUID,
user_id UUID,
type LowCardinality(String),
value String,
is_primary UInt8,
is_verified UInt8,
used_for_auth UInt8,
created_at DateTime64(3, 'UTC'),
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, id);
`;

const CONTACT_CHANNELS_VIEW_SQL = `
CREATE OR REPLACE VIEW default.contact_channels
SQL SECURITY DEFINER
AS
SELECT
project_id,
branch_id,
id,
user_id,
type,
value,
is_primary,
is_verified,
used_for_auth,
created_at
FROM analytics_internal.contact_channels
FINAL
WHERE sync_is_deleted = 0;
`;
Comment thread
BilalG1 marked this conversation as resolved.

const TEAMS_TABLE_BASE_SQL = `
CREATE TABLE IF NOT EXISTS analytics_internal.teams (
project_id String,
branch_id String,
id UUID,
display_name String,
profile_image_url Nullable(String),
created_at DateTime64(3, 'UTC'),
client_metadata String,
client_read_only_metadata String,
server_metadata String,
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, id);
`;

const TEAMS_VIEW_SQL = `
CREATE OR REPLACE VIEW default.teams
SQL SECURITY DEFINER
AS
SELECT
project_id,
branch_id,
id,
display_name,
profile_image_url,
created_at,
client_metadata,
client_read_only_metadata,
server_metadata
FROM analytics_internal.teams
FINAL
WHERE sync_is_deleted = 0;
`;

const TEAM_MEMBER_PROFILES_TABLE_BASE_SQL = `
CREATE TABLE IF NOT EXISTS analytics_internal.team_member_profiles (
project_id String,
branch_id String,
team_id UUID,
user_id UUID,
display_name Nullable(String),
profile_image_url Nullable(String),
user JSON,
created_at DateTime64(3, 'UTC'),
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, team_id, user_id);
`;

const TEAM_MEMBER_PROFILES_VIEW_SQL = `
CREATE OR REPLACE VIEW default.team_member_profiles
SQL SECURITY DEFINER
AS
SELECT
project_id,
branch_id,
team_id,
user_id,
display_name,
profile_image_url,
user,
created_at
FROM analytics_internal.team_member_profiles
FINAL
WHERE sync_is_deleted = 0;
`;

const EXTERNAL_ANALYTICS_DB_SQL = `
CREATE DATABASE IF NOT EXISTS analytics_internal;
`;
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue";
import { Prisma } from "@/generated/prisma/client";
import { globalPrismaClient } from "@/prisma-client";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { traceSpan } from "@/utils/telemetry";
Expand Down Expand Up @@ -78,6 +79,23 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
if (projectUserTenants.length > 0) {
await enqueueExternalDbSyncBatch(projectUserTenants.map(t => t.tenancyId));
didUpdate = true;

// Cascade: when a user changes, mark their TeamMember rows for re-sync
// so the embedded user JSON in team_member_profiles stays fresh
await globalPrismaClient.$executeRaw`
UPDATE "TeamMember"
SET "shouldUpdateSequenceId" = TRUE
FROM (
SELECT DISTINCT "tenancyId", "projectUserId"
FROM "ProjectUser"
WHERE "tenancyId" IN (${Prisma.join(projectUserTenants.map(t => t.tenancyId))})
AND "shouldUpdateSequenceId" = FALSE
AND "sequenceId" IS NOT NULL
) AS changed_users
WHERE "TeamMember"."tenancyId" = changed_users."tenancyId"
AND "TeamMember"."projectUserId" = changed_users."projectUserId"
AND "TeamMember"."shouldUpdateSequenceId" = FALSE
`;
}

const contactChannelTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
Expand Down Expand Up @@ -109,6 +127,63 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
didUpdate = true;
}

const teamTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "tenancyId", "teamId"
FROM "Team"
WHERE "shouldUpdateSequenceId" = TRUE
ORDER BY "tenancyId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "Team" t
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE t."tenancyId" = r."tenancyId"
AND t."teamId" = r."teamId"
RETURNING t."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
`;

span.setAttribute("stack.external-db-sync.team-tenants", teamTenants.length);

if (teamTenants.length > 0) {
await enqueueExternalDbSyncBatch(teamTenants.map(t => t.tenancyId));
didUpdate = true;
}

const teamMemberTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "tenancyId", "projectUserId", "teamId"
FROM "TeamMember"
WHERE "shouldUpdateSequenceId" = TRUE
ORDER BY "tenancyId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "TeamMember" tm
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE tm."tenancyId" = r."tenancyId"
AND tm."projectUserId" = r."projectUserId"
AND tm."teamId" = r."teamId"
RETURNING tm."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
`;

span.setAttribute("stack.external-db-sync.team-member-tenants", teamMemberTenants.length);

if (teamMemberTenants.length > 0) {
await enqueueExternalDbSyncBatch(teamMemberTenants.map(t => t.tenancyId));
didUpdate = true;
}

const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "id", "tenancyId"
Expand Down Expand Up @@ -138,7 +213,7 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {

span.setAttribute("stack.external-db-sync.did-update", didUpdate);
if (didUpdate) {
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, DR=${deletedRowTenants.length}`);
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, DR=${deletedRowTenants.length}`);
}

return didUpdate;
Expand Down
12 changes: 10 additions & 2 deletions apps/backend/src/app/api/latest/team-memberships/crud.tsx
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { recordExternalDbSyncDeletion, withExternalDbSyncUpdate } from "@/lib/external-db-sync";
import { grantDefaultTeamPermissions } from "@/lib/permissions";
import { ensureTeamExists, ensureTeamMembershipDoesNotExist, ensureTeamMembershipExists, ensureUserExists, ensureUserTeamPermissionExists } from "@/lib/request-checks";
import { Tenancy } from "@/lib/tenancies";
Expand All @@ -20,11 +21,11 @@ export async function addUserToTeam(tx: PrismaTransaction, options: {
type: 'member' | 'creator',
}) {
await tx.teamMember.create({
data: {
data: withExternalDbSyncUpdate({
projectUserId: options.userId,
teamId: options.teamId,
tenancyId: options.tenancy.id,
},
}),
});

const result = await grantDefaultTeamPermissions(tx, {
Expand Down Expand Up @@ -138,6 +139,13 @@ export const teamMembershipsCrudHandlers = createLazyProxy(() => createCrudHandl
userId: params.user_id,
});

await recordExternalDbSyncDeletion(tx, {
tableName: "TeamMember",
tenancyId: auth.tenancy.id,
projectUserId: params.user_id,
teamId: params.team_id,
});

await tx.teamMember.delete({
where: {
tenancyId_projectUserId_teamId: {
Expand Down
Loading
Loading