diff --git a/apps/backend/scripts/clickhouse-migrations.ts b/apps/backend/scripts/clickhouse-migrations.ts index 318c7d4e92..b83185d68b 100644 --- a/apps/backend/scripts/clickhouse-migrations.ts +++ b/apps/backend/scripts/clickhouse-migrations.ts @@ -17,6 +17,7 @@ export async function runClickhouseMigrations() { await client.exec({ query: USERS_TABLE_BASE_SQL }); await client.exec({ query: USERS_VIEW_SQL }); await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL }); + await client.exec({ query: SIGN_UP_RULE_TRIGGER_EVENT_ROW_FORMAT_MUTATION_SQL }); const queries = [ "REVOKE ALL PRIVILEGES ON *.* FROM limited_user;", "REVOKE ALL FROM limited_user;", @@ -93,6 +94,27 @@ WHERE event_type = '$token-refresh' AND JSONHas(toJSONString(data), 'refreshTokenId'); `; +// Normalizes legacy $sign-up-rule-trigger rows (camelCase JSON) to the new format: +// - Row identity stays in columns (project_id/branch_id) +// - data JSON becomes { project_id, branch_id, rule_id, action, email, auth_method, oauth_provider } (snake_case) +const SIGN_UP_RULE_TRIGGER_EVENT_ROW_FORMAT_MUTATION_SQL = ` +ALTER TABLE analytics_internal.events +UPDATE + data = CAST(concat( + '{', + '"project_id":', toJSONString(JSONExtractString(toJSONString(data), 'projectId')), ',', + '"branch_id":', toJSONString(JSONExtractString(toJSONString(data), 'branchId')), ',', + '"rule_id":', toJSONString(JSONExtractString(toJSONString(data), 'ruleId')), ',', + '"action":', toJSONString(JSONExtractString(toJSONString(data), 'action')), ',', + '"email":', toJSONString(JSONExtract(toJSONString(data), 'email', 'Nullable(String)')), ',', + '"auth_method":', toJSONString(JSONExtract(toJSONString(data), 'authMethod', 'Nullable(String)')), ',', + '"oauth_provider":', toJSONString(JSONExtract(toJSONString(data), 'oauthProvider', 'Nullable(String)')), + '}' + ) AS JSON) +WHERE event_type = '$sign-up-rule-trigger' + AND JSONHas(toJSONString(data), 'ruleId'); +`; + const USERS_TABLE_BASE_SQL = ` CREATE TABLE IF NOT EXISTS analytics_internal.users ( project_id String, @@ -103,9 +125,9 @@ CREATE TABLE IF NOT EXISTS analytics_internal.users ( primary_email Nullable(String), primary_email_verified UInt8, signed_up_at DateTime64(3, 'UTC'), - client_metadata JSON, - client_read_only_metadata JSON, - server_metadata JSON, + client_metadata String, + client_read_only_metadata String, + server_metadata String, is_anonymous UInt8, restricted_by_admin UInt8, restricted_by_admin_reason Nullable(String), diff --git a/apps/backend/src/app/api/latest/internal/metrics/route.tsx b/apps/backend/src/app/api/latest/internal/metrics/route.tsx index ccb1e90c9d..5c10e3a057 100644 --- a/apps/backend/src/app/api/latest/internal/metrics/route.tsx +++ b/apps/backend/src/app/api/latest/internal/metrics/route.tsx @@ -46,6 +46,12 @@ async function loadUsersByCountry(tenancy: Tenancy, prisma: PrismaClientTransact const userIds = users.map((user) => user.projectUserId); const scalingFactor = totalUsers > users.length ? totalUsers / users.length : 1; + // Build ClickHouse array literal inline in the query body (sent via POST) instead of + // passing as query_params (sent as URL params) to avoid the HTTP form field size limit + // when there are many user IDs. UUIDs contain only hex chars and dashes, but we escape + // single quotes for safety. + const userIdsArrayLiteral = `[${userIds.map(id => `'${id.replace(/'/g, "''")}'`).join(',')}]`; + const clickhouseClient = getClickhouseAdminClient(); const res = await clickhouseClient.query({ query: ` @@ -67,7 +73,7 @@ async function loadUsersByCountry(tenancy: Tenancy, prisma: PrismaClientTransact AND project_id = {projectId:String} AND branch_id = {branchId:String} AND user_id IS NOT NULL - AND has({userIds:Array(String)}, assumeNotNull(user_id)) + AND has(${userIdsArrayLiteral}, assumeNotNull(user_id)) ) WHERE cc IS NOT NULL AND ({includeAnonymous:UInt8} = 1 OR is_anonymous = 0) @@ -80,7 +86,6 @@ async function loadUsersByCountry(tenancy: Tenancy, prisma: PrismaClientTransact query_params: { projectId: tenancy.project.id, branchId: tenancy.branchId, - userIds, includeAnonymous: includeAnonymous ? 1 : 0, }, format: "JSONEachRow", diff --git a/apps/backend/src/lib/external-db-sync.ts b/apps/backend/src/lib/external-db-sync.ts index a9b57db045..7a78dfbd0a 100644 --- a/apps/backend/src/lib/external-db-sync.ts +++ b/apps/backend/src/lib/external-db-sync.ts @@ -430,6 +430,9 @@ async function pushRowsToClickhouse( return { ...rest, sync_sequence_id: sequenceId, + client_metadata: JSON.stringify(rest.client_metadata), + client_read_only_metadata: JSON.stringify(rest.client_read_only_metadata), + server_metadata: JSON.stringify(rest.server_metadata), primary_email_verified: normalizeClickhouseBoolean(rest.primary_email_verified, "primary_email_verified"), is_anonymous: normalizeClickhouseBoolean(rest.is_anonymous, "is_anonymous"), restricted_by_admin: normalizeClickhouseBoolean(rest.restricted_by_admin, "restricted_by_admin"), diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-advanced.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-advanced.test.ts index ebedb7b846..7b78360990 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-advanced.test.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-advanced.test.ts @@ -1,7 +1,9 @@ +import { wait } from "@stackframe/stack-shared/dist/utils/promises"; +import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors"; import { Client } from 'pg'; import { afterAll, beforeAll, describe, expect } from 'vitest'; import { test } from '../../../../helpers'; -import { InternalApiKey, User, backendContext, niceBackendFetch } from '../../../backend-helpers'; +import { InternalApiKey, Project, User, backendContext, niceBackendFetch } from '../../../backend-helpers'; import { HIGH_VOLUME_TIMEOUT, POSTGRES_HOST, @@ -19,6 +21,38 @@ import { const COMPLEX_SEQUENCE_TIMEOUT = TEST_TIMEOUT * 2 + 30_000; +async function runQueryForCurrentProject(body: { query: string, params?: Record, timeout_ms?: number }) { + return await niceBackendFetch("/api/v1/internal/analytics/query", { + method: "POST", + accessType: "admin", + body, + }); +} + +async function waitForClickhouseUser(email: string, expectedDisplayName: string) { + const timeoutMs = 180_000; + const intervalMs = 2_000; + const start = performance.now(); + + while (performance.now() - start < timeoutMs) { + const response = await runQueryForCurrentProject({ + query: "SELECT primary_email, display_name FROM users WHERE primary_email = {email:String}", + params: { email }, + }); + if ( + response.status === 200 + && Array.isArray(response.body?.result) + && response.body.result.length === 1 + && response.body.result[0]?.display_name === expectedDisplayName + ) { + return response; + } + await wait(intervalMs); + } + + throw new StackAssertionError(`Timed out waiting for ClickHouse user ${email} to sync.`); +} + describe.sequential('External DB Sync - Advanced Tests', () => { let dbManager: TestDbManager; const createProjectWithExternalDb = ( @@ -1126,4 +1160,39 @@ $$;`); await internalClient.end(); } }, HIGH_VOLUME_TIMEOUT); + + /** + * What it does: + * - Configures a project with a bad postgres connection string (simulating postgres being down). + * - Creates a user and verifies it still syncs to ClickHouse despite the postgres failure. + * - Then configures a separate project with a valid postgres DB and verifies postgres sync works + * even though ClickHouse sync runs independently in the same cycle. + * + * Why it matters: + * - Proves that ClickHouse and postgres sync targets are independent: a failure in one + * does not block the other from completing successfully. + */ + test('Cross-DB resilience: postgres down does not block ClickHouse sync', async () => { + const badConnectionString = 'postgresql://invalid:invalid@invalid:5432/invalid'; + + // Create a project with only a bad postgres DB — ClickHouse syncs automatically via env var + await createProjectWithExternalDb({ + bad_pg: { + type: 'postgres', + connectionString: badConnectionString, + }, + }); + + const email = 'cross-db-resilience@example.com'; + const user = await User.create({ primary_email: email }); + await niceBackendFetch(`/api/v1/users/${user.userId}`, { + accessType: 'admin', + method: 'PATCH', + body: { display_name: 'Cross DB User' }, + }); + + // ClickHouse should still receive the data even though postgres sync fails + await waitForClickhouseUser(email, 'Cross DB User'); + + }, TEST_TIMEOUT); });