Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions apps/backend/scripts/clickhouse-migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export async function runClickhouseMigrations() {
await client.exec({ query: USERS_TABLE_BASE_SQL });
await client.exec({ query: USERS_VIEW_SQL });
await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL });
await client.exec({ query: SIGN_UP_RULE_TRIGGER_EVENT_ROW_FORMAT_MUTATION_SQL });
const queries = [
"REVOKE ALL PRIVILEGES ON *.* FROM limited_user;",
"REVOKE ALL FROM limited_user;",
Expand Down Expand Up @@ -93,6 +94,27 @@ WHERE event_type = '$token-refresh'
AND JSONHas(toJSONString(data), 'refreshTokenId');
`;

// Normalizes legacy $sign-up-rule-trigger rows (camelCase JSON) to the new format:
// - Row identity stays in columns (project_id/branch_id)
// - data JSON becomes { project_id, branch_id, rule_id, action, email, auth_method, oauth_provider } (snake_case)
const SIGN_UP_RULE_TRIGGER_EVENT_ROW_FORMAT_MUTATION_SQL = `
ALTER TABLE analytics_internal.events
UPDATE
data = CAST(concat(
'{',
'"project_id":', toJSONString(JSONExtractString(toJSONString(data), 'projectId')), ',',
'"branch_id":', toJSONString(JSONExtractString(toJSONString(data), 'branchId')), ',',
'"rule_id":', toJSONString(JSONExtractString(toJSONString(data), 'ruleId')), ',',
'"action":', toJSONString(JSONExtractString(toJSONString(data), 'action')), ',',
'"email":', toJSONString(JSONExtract(toJSONString(data), 'email', 'Nullable(String)')), ',',
'"auth_method":', toJSONString(JSONExtract(toJSONString(data), 'authMethod', 'Nullable(String)')), ',',
'"oauth_provider":', toJSONString(JSONExtract(toJSONString(data), 'oauthProvider', 'Nullable(String)')),
'}'
) AS JSON)
WHERE event_type = '$sign-up-rule-trigger'
AND JSONHas(toJSONString(data), 'ruleId');
`;

const USERS_TABLE_BASE_SQL = `
CREATE TABLE IF NOT EXISTS analytics_internal.users (
project_id String,
Expand All @@ -103,9 +125,9 @@ CREATE TABLE IF NOT EXISTS analytics_internal.users (
primary_email Nullable(String),
primary_email_verified UInt8,
signed_up_at DateTime64(3, 'UTC'),
client_metadata JSON,
client_read_only_metadata JSON,
server_metadata JSON,
client_metadata String,
client_read_only_metadata String,
server_metadata String,
is_anonymous UInt8,
restricted_by_admin UInt8,
restricted_by_admin_reason Nullable(String),
Expand Down
9 changes: 7 additions & 2 deletions apps/backend/src/app/api/latest/internal/metrics/route.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ async function loadUsersByCountry(tenancy: Tenancy, prisma: PrismaClientTransact
const userIds = users.map((user) => user.projectUserId);
const scalingFactor = totalUsers > users.length ? totalUsers / users.length : 1;

// Build ClickHouse array literal inline in the query body (sent via POST) instead of
// passing as query_params (sent as URL params) to avoid the HTTP form field size limit
// when there are many user IDs. UUIDs contain only hex chars and dashes, but we escape
// single quotes for safety.
const userIdsArrayLiteral = `[${userIds.map(id => `'${id.replace(/'/g, "''")}'`).join(',')}]`;

const clickhouseClient = getClickhouseAdminClient();
const res = await clickhouseClient.query({
query: `
Expand All @@ -67,7 +73,7 @@ async function loadUsersByCountry(tenancy: Tenancy, prisma: PrismaClientTransact
AND project_id = {projectId:String}
AND branch_id = {branchId:String}
AND user_id IS NOT NULL
AND has({userIds:Array(String)}, assumeNotNull(user_id))
AND has(${userIdsArrayLiteral}, assumeNotNull(user_id))
)
WHERE cc IS NOT NULL
AND ({includeAnonymous:UInt8} = 1 OR is_anonymous = 0)
Expand All @@ -80,7 +86,6 @@ async function loadUsersByCountry(tenancy: Tenancy, prisma: PrismaClientTransact
query_params: {
projectId: tenancy.project.id,
branchId: tenancy.branchId,
userIds,
includeAnonymous: includeAnonymous ? 1 : 0,
},
format: "JSONEachRow",
Expand Down
3 changes: 3 additions & 0 deletions apps/backend/src/lib/external-db-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,9 @@ async function pushRowsToClickhouse(
return {
...rest,
sync_sequence_id: sequenceId,
client_metadata: JSON.stringify(rest.client_metadata),
client_read_only_metadata: JSON.stringify(rest.client_read_only_metadata),
server_metadata: JSON.stringify(rest.server_metadata),
primary_email_verified: normalizeClickhouseBoolean(rest.primary_email_verified, "primary_email_verified"),
is_anonymous: normalizeClickhouseBoolean(rest.is_anonymous, "is_anonymous"),
restricted_by_admin: normalizeClickhouseBoolean(rest.restricted_by_admin, "restricted_by_admin"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors";
import { Client } from 'pg';
import { afterAll, beforeAll, describe, expect } from 'vitest';
import { test } from '../../../../helpers';
import { InternalApiKey, User, backendContext, niceBackendFetch } from '../../../backend-helpers';
import { InternalApiKey, Project, User, backendContext, niceBackendFetch } from '../../../backend-helpers';
import {
HIGH_VOLUME_TIMEOUT,
POSTGRES_HOST,
Expand All @@ -19,6 +21,38 @@ import {

const COMPLEX_SEQUENCE_TIMEOUT = TEST_TIMEOUT * 2 + 30_000;

async function runQueryForCurrentProject(body: { query: string, params?: Record<string, string>, timeout_ms?: number }) {
return await niceBackendFetch("/api/v1/internal/analytics/query", {
method: "POST",
accessType: "admin",
body,
});
}

async function waitForClickhouseUser(email: string, expectedDisplayName: string) {
const timeoutMs = 180_000;
const intervalMs = 2_000;
const start = performance.now();

while (performance.now() - start < timeoutMs) {
const response = await runQueryForCurrentProject({
query: "SELECT primary_email, display_name FROM users WHERE primary_email = {email:String}",
params: { email },
});
if (
response.status === 200
&& Array.isArray(response.body?.result)
&& response.body.result.length === 1
&& response.body.result[0]?.display_name === expectedDisplayName
) {
return response;
}
await wait(intervalMs);
}

throw new StackAssertionError(`Timed out waiting for ClickHouse user ${email} to sync.`);
}

describe.sequential('External DB Sync - Advanced Tests', () => {
let dbManager: TestDbManager;
const createProjectWithExternalDb = (
Expand Down Expand Up @@ -1126,4 +1160,39 @@ $$;`);
await internalClient.end();
}
}, HIGH_VOLUME_TIMEOUT);

/**
* What it does:
* - Configures a project with a bad postgres connection string (simulating postgres being down).
* - Creates a user and verifies it still syncs to ClickHouse despite the postgres failure.
* - Then configures a separate project with a valid postgres DB and verifies postgres sync works
* even though ClickHouse sync runs independently in the same cycle.
*
* Why it matters:
* - Proves that ClickHouse and postgres sync targets are independent: a failure in one
* does not block the other from completing successfully.
*/
test('Cross-DB resilience: postgres down does not block ClickHouse sync', async () => {
const badConnectionString = 'postgresql://invalid:invalid@invalid:5432/invalid';

// Create a project with only a bad postgres DB — ClickHouse syncs automatically via env var
await createProjectWithExternalDb({
bad_pg: {
type: 'postgres',
connectionString: badConnectionString,
},
});

const email = 'cross-db-resilience@example.com';
const user = await User.create({ primary_email: email });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Cross DB User' },
});

// ClickHouse should still receive the data even though postgres sync fails
await waitForClickhouseUser(email, 'Cross DB User');

}, TEST_TIMEOUT);
});
Loading