Skip to content

Commit a50ba20

Browse files
HyteqHyteq
authored andcommitted
feat: better clickhouse
1 parent 00db882 commit a50ba20

9 files changed

Lines changed: 4512 additions & 0 deletions

File tree

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import type { LogParams, ErrorLogParams, WarnLogParams, Logger, ResponseJSON } from '@clickhouse/client';
2+
import { ClickHouseLogLevel, createClient } from '@clickhouse/client';
3+
import type { NodeClickHouseClientConfigOptions } from '@clickhouse/client/dist/config';
4+
5+
export { createClient };
6+
7+
/**
8+
* ClickHouse table names used throughout the application
9+
*/
10+
export const TABLE_NAMES = {
11+
events: 'analytics.events',
12+
errors: 'analytics.errors',
13+
web_vitals: 'analytics.web_vitals',
14+
stripe_payment_intents: 'analytics.stripe_payment_intents',
15+
stripe_charges: 'analytics.stripe_charges',
16+
stripe_refunds: 'analytics.stripe_refunds',
17+
};
18+
19+
const logger = console;
20+
21+
export const CLICKHOUSE_OPTIONS: NodeClickHouseClientConfigOptions = {
22+
max_open_connections: 30,
23+
request_timeout: 30000,
24+
keep_alive: {
25+
enabled: true,
26+
idle_socket_ttl: 8000,
27+
},
28+
compression: {
29+
request: true,
30+
response: true,
31+
},
32+
};
33+
34+
export const clickHouseOG = createClient({
35+
url: process.env.CLICKHOUSE_URL,
36+
...CLICKHOUSE_OPTIONS,
37+
});
38+
39+
async function withRetry<T>(
40+
operation: () => Promise<T>,
41+
maxRetries = 3,
42+
baseDelay = 500,
43+
): Promise<T> {
44+
let lastError: Error | undefined;
45+
46+
for (let attempt = 0; attempt < maxRetries; attempt++) {
47+
try {
48+
const res = await operation();
49+
if (attempt > 0) {
50+
logger.info('Retry operation succeeded', { attempt });
51+
}
52+
return res;
53+
} catch (error: any) {
54+
lastError = error;
55+
56+
if (
57+
error.message.includes('Connect') ||
58+
error.message.includes('socket hang up') ||
59+
error.message.includes('Timeout error')
60+
) {
61+
const delay = baseDelay * 2 ** attempt;
62+
logger.warn(
63+
`Attempt ${attempt + 1}/${maxRetries} failed, retrying in ${delay}ms`,
64+
{
65+
error: error.message,
66+
},
67+
);
68+
await new Promise((resolve) => setTimeout(resolve, delay));
69+
continue;
70+
}
71+
72+
throw error; // Non-retriable error
73+
}
74+
}
75+
76+
throw lastError;
77+
}
78+
79+
80+
export const clickHouse = new Proxy(clickHouseOG, {
81+
get(target, property, receiver) {
82+
const value = Reflect.get(target, property, receiver);
83+
84+
if (property === 'insert') {
85+
return (...args: any[]) => withRetry(() => value.apply(target, args));
86+
}
87+
88+
return value;
89+
},
90+
});
91+
92+
export async function chQueryWithMeta<T extends Record<string, any>>(
93+
query: string,
94+
): Promise<ResponseJSON<T>> {
95+
const start = Date.now();
96+
const res = await clickHouse.query({
97+
query,
98+
});
99+
const beforeParse = Date.now();
100+
const json = await res.json<T>();
101+
const afterParse = Date.now();
102+
const keys = Object.keys(json.data[0] || {});
103+
const response = {
104+
...json,
105+
data: json.data.map((item) => {
106+
return keys.reduce((acc, key) => {
107+
const meta = json.meta?.find((m) => m.name === key);
108+
return Object.assign(acc, {
109+
[key]:
110+
item[key] && meta?.type.includes('Int')
111+
? Number.parseFloat(item[key] as string)
112+
: item[key],
113+
});
114+
}, {} as T);
115+
}),
116+
};
117+
118+
logger.info('query info', {
119+
// query: cleanQuery(query),
120+
rows: json.rows,
121+
stats: response.statistics,
122+
beforeParse: beforeParse - start,
123+
afterParse: afterParse - beforeParse,
124+
elapsed: Date.now() - start,
125+
});
126+
127+
return response;
128+
}
129+
130+
export async function chQuery<T extends Record<string, any>>(
131+
query: string,
132+
): Promise<T[]> {
133+
return (await chQueryWithMeta<T>(query)).data;
134+
}
135+
136+
export function formatClickhouseDate(
137+
date: Date | string,
138+
skipTime = false,
139+
): string {
140+
if (skipTime) {
141+
return new Date(date).toISOString().split('T')[0] ?? '';
142+
}
143+
return new Date(date).toISOString().replace('T', ' ').replace(/Z+$/, '');
144+
}
145+
146+
export function toDate(str: string, interval?: string) {
147+
// If it does not match the regex it's a column name eg 'created_at'
148+
if (!interval || interval === 'minute' || interval === 'hour') {
149+
if (str.match(/\d{4}-\d{2}-\d{2}/)) {
150+
return escape(str);
151+
}
152+
153+
return str;
154+
}
155+
156+
if (str.match(/\d{4}-\d{2}-\d{2}/)) {
157+
return `toDate(${escape(str.split(' ')[0])})`;
158+
}
159+
160+
return `toDate(${str})`;
161+
}
162+
163+
export function convertClickhouseDateToJs(date: string) {
164+
return new Date(`${date.replace(' ', 'T')}Z`);
165+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export * from "./client";
2+
export * from "./schema";
3+
export * from "./query_builder";

0 commit comments

Comments
 (0)