Skip to content

Commit 871fe12

Browse files
committed
fix tests
1 parent 8cdd107 commit 871fe12

5 files changed

Lines changed: 133 additions & 37 deletions

File tree

apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@ import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
1111
import { captureError, StatusError } from "@stackframe/stack-shared/dist/utils/errors";
1212
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
1313

14+
const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
15+
16+
function assertUuid(value: unknown, label: string): asserts value is string {
17+
if (typeof value !== "string" || value.trim().length === 0 || !UUID_REGEX.test(value)) {
18+
throw new StatusError(500, `${label} must be a valid UUID. Received: ${JSON.stringify(value)}`);
19+
}
20+
}
21+
1422
// Assigns sequence IDs to rows that need them and queues sync requests for affected tenants.
1523
// Processes up to 1000 rows at a time from each table.
1624
async function backfillSequenceIds() {
@@ -37,6 +45,7 @@ async function backfillSequenceIds() {
3745

3846
// Enqueue sync for each affected tenant
3947
for (const { tenancyId } of projectUserTenants) {
48+
assertUuid(tenancyId, "projectUserTenants.tenancyId");
4049
await enqueueTenantSync(tenancyId);
4150
}
4251

@@ -63,6 +72,7 @@ async function backfillSequenceIds() {
6372
`;
6473

6574
for (const { tenancyId } of contactChannelTenants) {
75+
assertUuid(tenancyId, "contactChannelTenants.tenancyId");
6676
await enqueueTenantSync(tenancyId);
6777
}
6878

@@ -87,21 +97,23 @@ async function backfillSequenceIds() {
8797
`;
8898

8999
for (const { tenancyId } of deletedRowTenants) {
100+
assertUuid(tenancyId, "deletedRowTenants.tenancyId");
90101
await enqueueTenantSync(tenancyId);
91102
}
92103
}
93104

94105
// Queues a sync request for a specific tenant if one isn't already pending.
95106
// Prevents duplicate sync requests by checking for unfulfilled requests.
96107
async function enqueueTenantSync(tenancyId: string) {
108+
assertUuid(tenancyId, "tenancyId");
97109
await globalPrismaClient.$executeRaw`
98110
INSERT INTO "OutgoingRequest" ("id", "createdAt", "qstashOptions", "startedFulfillingAt")
99111
SELECT
100112
gen_random_uuid(),
101113
NOW(),
102114
json_build_object(
103115
'url', '/api/latest/internal/external-db-sync/sync-engine',
104-
'body', json_build_object('tenancyId', ${tenancyId})
116+
'body', json_build_object('tenancyId', ${tenancyId}::uuid)
105117
),
106118
NULL
107119
WHERE NOT EXISTS (
@@ -173,4 +185,3 @@ export const GET = createSmartRouteHandler({
173185
};
174186
},
175187
});
176-

apps/backend/src/lib/external-db-sync.ts

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,62 @@ import { captureError, StackAssertionError, throwErr } from "@stackframe/stack-s
66
import { omit } from "@stackframe/stack-shared/dist/utils/objects";
77
import { Client } from 'pg';
88

9+
const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
10+
11+
function assertNonEmptyString(value: unknown, label: string): asserts value is string {
12+
if (typeof value !== "string" || value.trim().length === 0) {
13+
throw new StackAssertionError(`${label} must be a non-empty string.`);
14+
}
15+
}
16+
17+
function assertUuid(value: unknown, label: string): asserts value is string {
18+
assertNonEmptyString(value, label);
19+
if (!UUID_REGEX.test(value)) {
20+
throw new StackAssertionError(`${label} must be a valid UUID. Received: ${JSON.stringify(value)}`);
21+
}
22+
}
23+
24+
type PgErrorLike = {
25+
code?: string;
26+
constraint?: string;
27+
message?: string;
28+
};
29+
30+
function isDuplicateTypeError(error: unknown): error is PgErrorLike {
31+
if (!error || typeof error !== "object") return false;
32+
const pgError = error as PgErrorLike;
33+
return pgError.code === "23505" && pgError.constraint === "pg_type_typname_nsp_index";
34+
}
35+
36+
async function ensureExternalSchema(
37+
externalClient: Client,
38+
tableSchemaSql: string,
39+
tableName: string,
40+
) {
41+
try {
42+
await externalClient.query(tableSchemaSql);
43+
} catch (error) {
44+
if (!isDuplicateTypeError(error)) throw error;
45+
46+
// Concurrent CREATE TABLE can race and hit a duplicate type error.
47+
// If the table now exists, we can safely continue.
48+
const existsResult = await externalClient.query(`
49+
SELECT EXISTS (
50+
SELECT FROM information_schema.tables
51+
WHERE table_schema = 'public'
52+
AND table_name = $1
53+
);
54+
`, [tableName]);
55+
if (existsResult.rows[0]?.exists === true) {
56+
return;
57+
}
58+
59+
throw new StackAssertionError(
60+
`Duplicate type error while creating table ${JSON.stringify(tableName)}, but table does not exist.`
61+
);
62+
}
63+
}
64+
965
async function pushRowsToExternalDb(
1066
externalClient: Client,
1167
tableName: string,
@@ -14,6 +70,12 @@ async function pushRowsToExternalDb(
1470
expectedTenancyId: string,
1571
mappingId: string,
1672
) {
73+
assertNonEmptyString(tableName, "tableName");
74+
assertNonEmptyString(mappingId, "mappingId");
75+
assertUuid(expectedTenancyId, "expectedTenancyId");
76+
if (!Array.isArray(newRows)) {
77+
throw new StackAssertionError(`newRows must be an array for table ${JSON.stringify(tableName)}.`);
78+
}
1779
if (newRows.length === 0) return;
1880
// Just for our own sanity, make sure that we have the right number of positional parameters
1981
// The last parameter is mapping_name for metadata tracking
@@ -73,12 +135,22 @@ async function syncMapping(
73135
tenancyId: string,
74136
dbType: 'postgres',
75137
) {
138+
assertNonEmptyString(mappingId, "mappingId");
139+
assertNonEmptyString(mapping.targetTable, "mapping.targetTable");
140+
assertUuid(tenancyId, "tenancyId");
76141
const fetchQuery = mapping.internalDbFetchQuery;
77142
const updateQuery = mapping.externalDbUpdateQueries[dbType];
78143
const tableName = mapping.targetTable;
144+
assertNonEmptyString(fetchQuery, "internalDbFetchQuery");
145+
assertNonEmptyString(updateQuery, "externalDbUpdateQueries");
146+
if (!fetchQuery.includes("$1") || !fetchQuery.includes("$2")) {
147+
throw new StackAssertionError(
148+
`internalDbFetchQuery must reference $1 (tenancyId) and $2 (lastSequenceId). Mapping: ${mappingId}`
149+
);
150+
}
79151

80152
const tableSchema = mapping.targetTableSchemas[dbType];
81-
await externalClient.query(tableSchema);
153+
await ensureExternalSchema(externalClient, tableSchema, tableName);
82154

83155
let lastSequenceId = -1;
84156
const metadataResult = await externalClient.query(
@@ -88,10 +160,19 @@ async function syncMapping(
88160
if (metadataResult.rows.length > 0) {
89161
lastSequenceId = Number(metadataResult.rows[0].last_synced_sequence_id);
90162
}
163+
if (!Number.isFinite(lastSequenceId)) {
164+
throw new StackAssertionError(
165+
`Invalid last_synced_sequence_id for mapping ${mappingId}: ${JSON.stringify(metadataResult.rows[0]?.last_synced_sequence_id)}`
166+
);
167+
}
91168

92169
const BATCH_LIMIT = 1000;
93170

94171
while (true) {
172+
assertUuid(tenancyId, "tenancyId");
173+
if (!Number.isFinite(lastSequenceId)) {
174+
throw new StackAssertionError(`lastSequenceId must be a finite number for mapping ${mappingId}.`);
175+
}
95176
const rows = await internalPrisma.$queryRawUnsafe<any[]>(fetchQuery, tenancyId, lastSequenceId);
96177

97178
if (rows.length === 0) {
@@ -132,6 +213,8 @@ async function syncDatabase(
132213
internalPrisma: PrismaClientTransaction,
133214
tenancyId: string,
134215
) {
216+
assertNonEmptyString(dbId, "dbId");
217+
assertUuid(tenancyId, "tenancyId");
135218
if (dbConfig.type !== 'postgres') {
136219
throw new StackAssertionError(
137220
`Unsupported database type '${dbConfig.type}' for external DB ${dbId}. Only 'postgres' is currently supported.`
@@ -143,6 +226,7 @@ async function syncDatabase(
143226
`Invalid configuration for external DB ${dbId}: 'connectionString' is missing.`
144227
);
145228
}
229+
assertNonEmptyString(dbConfig.connectionString, `external DB ${dbId} connectionString`);
146230

147231
const externalClient = new Client({
148232
connectionString: dbConfig.connectionString,
@@ -176,6 +260,7 @@ async function syncDatabase(
176260

177261

178262
export async function syncExternalDatabases(tenancy: Tenancy) {
263+
assertUuid(tenancy?.id, "tenancy.id");
179264
const externalDatabases = tenancy.config.dbSync.externalDatabases;
180265
const internalPrisma = await getPrismaClientForTenancy(tenancy);
181266

apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-advanced.test.ts

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ describe.sequential('External DB Sync - Advanced Tests', () => {
5757
}
5858
});
5959

60-
const userA = await User.create({ emailAddress: 'user-a@example.com' });
60+
const userA = await User.create({ primary_email: 'user-a@example.com' });
6161
await niceBackendFetch(`/api/v1/users/${userA.userId}`, {
6262
accessType: 'admin',
6363
method: 'PATCH',
@@ -79,7 +79,7 @@ describe.sequential('External DB Sync - Advanced Tests', () => {
7979
}
8080
});
8181

82-
const userB = await User.create({ emailAddress: 'user-b@example.com' });
82+
const userB = await User.create({ primary_email: 'user-b@example.com' });
8383
await niceBackendFetch(`/api/v1/users/${userB.userId}`, {
8484
accessType: 'admin',
8585
method: 'PATCH',
@@ -178,9 +178,9 @@ describe.sequential('External DB Sync - Advanced Tests', () => {
178178

179179
const client = dbManager.getClient(dbName);
180180

181-
const user1 = await User.create({ emailAddress: 'seq1@example.com' });
182-
const user2 = await User.create({ emailAddress: 'seq2@example.com' });
183-
const user3 = await User.create({ emailAddress: 'seq3@example.com' });
181+
const user1 = await User.create({ primary_email: 'seq1@example.com' });
182+
const user2 = await User.create({ primary_email: 'seq2@example.com' });
183+
const user3 = await User.create({ primary_email: 'seq3@example.com' });
184184

185185
await niceBackendFetch(`/api/v1/users/${user1.userId}`, {
186186
accessType: 'admin',
@@ -219,7 +219,7 @@ describe.sequential('External DB Sync - Advanced Tests', () => {
219219
const seq1 = Number(metadata1.rows[0].last_synced_sequence_id);
220220
expect(seq1).toBeGreaterThan(0);
221221

222-
const user4 = await User.create({ emailAddress: 'seq4@example.com' });
222+
const user4 = await User.create({ primary_email: 'seq4@example.com' });
223223
await niceBackendFetch(`/api/v1/users/${user4.userId}`, {
224224
accessType: 'admin',
225225
method: 'PATCH',
@@ -260,7 +260,7 @@ describe.sequential('External DB Sync - Advanced Tests', () => {
260260
}
261261
});
262262

263-
const user1 = await User.create({ emailAddress: 'user1@example.com' });
263+
const user1 = await User.create({ primary_email: 'user1@example.com' });
264264
await niceBackendFetch(`/api/v1/users/${user1.userId}`, {
265265
accessType: 'admin',
266266
method: 'PATCH',
@@ -276,7 +276,7 @@ describe.sequential('External DB Sync - Advanced Tests', () => {
276276
expect(res.rows[0].display_name).toBe('User 1');
277277
const user1Id = res.rows[0].id;
278278

279-
const user2 = await User.create({ emailAddress: 'user2@example.com' });
279+
const user2 = await User.create({ primary_email: 'user2@example.com' });
280280
await niceBackendFetch(`/api/v1/users/${user2.userId}`, {
281281
accessType: 'admin',
282282
method: 'PATCH',
@@ -316,7 +316,7 @@ describe.sequential('External DB Sync - Advanced Tests', () => {
316316
});
317317

318318
const specialName = "O'Connor 🚀 用户 \"Test\"";
319-
const user = await User.create({ emailAddress: 'special@example.com' });
319+
const user = await User.create({ primary_email: 'special@example.com' });
320320
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
321321
accessType: 'admin',
322322
method: 'PATCH',
@@ -456,9 +456,9 @@ describe.sequential('External DB Sync - Advanced Tests', () => {
456456
}
457457
});
458458

459-
const user1 = await User.create({ emailAddress: 'seq1@example.com' });
460-
const user2 = await User.create({ emailAddress: 'seq2@example.com' });
461-
const user3 = await User.create({ emailAddress: 'seq3@example.com' });
459+
const user1 = await User.create({ primary_email: 'seq1@example.com' });
460+
const user2 = await User.create({ primary_email: 'seq2@example.com' });
461+
const user3 = await User.create({ primary_email: 'seq3@example.com' });
462462

463463
await niceBackendFetch(`/api/v1/users/${user1.userId}`, {
464464
accessType: 'admin',
@@ -505,7 +505,7 @@ describe.sequential('External DB Sync - Advanced Tests', () => {
505505
method: 'DELETE',
506506
});
507507

508-
const user4 = await User.create({ emailAddress: 'seq4@example.com' });
508+
const user4 = await User.create({ primary_email: 'seq4@example.com' });
509509
await niceBackendFetch(`/api/v1/users/${user4.userId}`, {
510510
accessType: 'admin',
511511
method: 'PATCH',
@@ -568,7 +568,7 @@ describe.sequential('External DB Sync - Advanced Tests', () => {
568568

569569
const superClient = dbManager.getClient(dbName);
570570

571-
const user = await User.create({ emailAddress: 'write-protect@example.com' });
571+
const user = await User.create({ primary_email: 'write-protect@example.com' });
572572
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
573573
accessType: 'admin',
574574
method: 'PATCH',
@@ -645,7 +645,7 @@ $$;`);
645645

646646
const client = dbManager.getClient(dbName);
647647

648-
const user = await User.create({ emailAddress: 'multi-update@example.com' });
648+
const user = await User.create({ primary_email: 'multi-update@example.com' });
649649

650650
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
651651
accessType: 'admin',
@@ -695,7 +695,7 @@ $$;`);
695695

696696
const client = dbManager.getClient(dbName);
697697

698-
const user = await User.create({ emailAddress: 'delete-before-sync@example.com' });
698+
const user = await User.create({ primary_email: 'delete-before-sync@example.com' });
699699
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
700700
accessType: 'admin',
701701
method: 'PATCH',
@@ -748,7 +748,7 @@ $$;`);
748748
const client = dbManager.getClient(dbName);
749749
const email = 'recreate-after-delete@example.com';
750750

751-
const firstUser = await User.create({ emailAddress: email });
751+
const firstUser = await User.create({ primary_email: email });
752752
await niceBackendFetch(`/api/v1/users/${firstUser.userId}`, {
753753
accessType: 'admin',
754754
method: 'PATCH',
@@ -772,7 +772,7 @@ $$;`);
772772
await waitForSyncedDeletion(client, email);
773773
await verifyNotInExternalDb(client, email);
774774

775-
const secondUser = await User.create({ emailAddress: email });
775+
const secondUser = await User.create({ primary_email: email });
776776
await niceBackendFetch(`/api/v1/users/${secondUser.userId}`, {
777777
accessType: 'admin',
778778
method: 'PATCH',
@@ -825,7 +825,7 @@ $$;`);
825825
const client = dbManager.getClient(dbName);
826826
const email = 'lifecycle-test@example.com';
827827

828-
const user1 = await User.create({ emailAddress: email });
828+
const user1 = await User.create({ primary_email: email });
829829
await niceBackendFetch(`/api/v1/users/${user1.userId}`, {
830830
accessType: 'admin',
831831
method: 'PATCH',
@@ -875,7 +875,7 @@ $$;`);
875875
res = await client.query(`SELECT * FROM "users" WHERE "primary_email" = $1`, [email]);
876876
expect(res.rows.length).toBe(0);
877877

878-
const user2 = await User.create({ emailAddress: email });
878+
const user2 = await User.create({ primary_email: email });
879879
await niceBackendFetch(`/api/v1/users/${user2.userId}`, {
880880
accessType: 'admin',
881881
method: 'PATCH',

0 commit comments

Comments
 (0)