Skip to content

Commit b431096

Browse files
authored
Merge pull request #577 from devforth/feature/AdminForth/1453/resource(x).aggregate
feat: implement aggregation methods with support for grouping and med…
2 parents 813674f + 2b534b3 commit b431096

3 files changed

Lines changed: 291 additions & 8 deletions

File tree

adminforth/dataConnectors/clickhouse.ts

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, AdminForthResourceColumn } from '../types/Back.js';
1+
import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, AdminForthResourceColumn, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js';
22
import AdminForthBaseConnector from './baseConnector.js';
33
import dayjs from 'dayjs';
44
import { createClient } from '@clickhouse/client'
@@ -444,13 +444,79 @@ class ClickhouseConnector extends AdminForthBaseConnector implements IAdminForth
444444
return { where, params };
445445
}
446446

447+
async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: {
448+
resource: AdminForthResource;
449+
filters: IAdminForthAndOrFilter;
450+
aggregations: { [alias: string]: IAggregationRule };
451+
groupBy?: IGroupByRule;
452+
}): Promise <Array<{ group?: string, [key: string]: any }>> {
453+
454+
const tableName = `${this.dbName}.${resource.table}`;
455+
456+
const selectParts: string[] = [];
457+
let groupExpr: string | null = null;
458+
459+
if (groupBy?.type === 'date_trunc') {
460+
const g = groupBy as IGroupByDateTrunc;
461+
const tz = g.timezone ?? 'UTC';
462+
463+
const field = `toTimeZone(${g.field}, '${tz}')`;
464+
465+
switch (g.truncation) {
466+
case 'day': groupExpr = `toDate(toStartOfDay(${field}))`; break;
467+
case 'month': groupExpr = `toDate(toStartOfMonth(${field}))`; break;
468+
case 'week': groupExpr = `toDate(toStartOfWeek(${field}))`; break;
469+
case 'year': groupExpr = `toDate(toStartOfYear(${field}))`; break;
470+
}
471+
472+
selectParts.push(`${groupExpr} AS \`group\``);
473+
474+
} else if (groupBy?.type === 'field') {
475+
const g = groupBy as IGroupByField;
476+
groupExpr = `${g.field}`;
477+
selectParts.push(`${groupExpr} AS \`group\``);
478+
}
479+
480+
for (const [alias, rule] of Object.entries(aggregations)) {
481+
switch (rule.operation) {
482+
case 'count': selectParts.push(`count() AS \`${alias}\``); break;
483+
case 'sum': selectParts.push(`sum(${rule.field}) AS \`${alias}\``); break;
484+
case 'avg': selectParts.push(`avg(${rule.field}) AS \`${alias}\``); break;
485+
case 'min': selectParts.push(`min(${rule.field}) AS \`${alias}\``); break;
486+
case 'max': selectParts.push(`max(${rule.field}) AS \`${alias}\``); break;
487+
case 'median': selectParts.push(`quantile(0.5)(${rule.field}) AS \`${alias}\``); break;
488+
}
489+
}
490+
491+
const { where, params } = this.whereClause(resource, filters);
492+
493+
let query = `SELECT ${selectParts.join(', ')} FROM ${tableName} ${where}`;
494+
495+
if (groupExpr) {
496+
query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`;
497+
}
498+
499+
const result = await this.client.query({
500+
query,
501+
format: 'JSONEachRow',
502+
query_params: params,
503+
});
504+
505+
const rows = await result.json();
506+
507+
return rows.map((r: any) => ({
508+
group: r.group,
509+
...r,
510+
}));
511+
}
512+
447513
async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }: {
448514
resource: AdminForthResource,
449515
limit: number,
450516
offset: number,
451517
sort: { field: string, direction: AdminForthSortDirections }[],
452518
filters: IAdminForthAndOrFilter,
453-
}): Promise<any[]> {
519+
}): Promise<Array<{ group?: string, [key: string]: any }>> {
454520
const columns = resource.dataSourceColumns.map((col) => {
455521
// for decimal cast to string
456522
if (col.type == AdminForthDataTypes.DECIMAL) {

adminforth/dataConnectors/mongo.ts

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import dayjs from 'dayjs';
22
import { MongoClient } from 'mongodb';
33
import { Decimal128, Double } from 'bson';
4-
import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource } from '../types/Back.js';
4+
import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js';
55
import AdminForthBaseConnector from './baseConnector.js';
66
import { afLogger } from '../modules/logger.js';
77
import { AdminForthDataTypes, AdminForthFilterOperators, AdminForthSortDirections, } from '../types/Common.js';
@@ -305,6 +305,108 @@ class MongoConnector extends AdminForthBaseConnector implements IAdminForthDataS
305305
.filter((f) => (f as IAdminForthSingleFilter).insecureRawSQL === undefined)
306306
.map((f) => this.getFilterQuery(resource, f)));
307307
}
308+
309+
async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: {
310+
resource: AdminForthResource;
311+
filters: IAdminForthAndOrFilter;
312+
aggregations: { [alias: string]: IAggregationRule };
313+
groupBy?: IGroupByRule;
314+
}): Promise<Array<{ group?: string, [key: string]: any }>> {
315+
316+
const collection = this.client.db().collection(resource.table);
317+
318+
const match = filters?.subFilters?.length ? this.getFilterQuery(resource, filters) : {};
319+
320+
let groupId: any = null;
321+
322+
if (groupBy?.type === 'field') {
323+
const g = groupBy as IGroupByField;
324+
groupId = `$${g.field}`;
325+
}
326+
327+
if (groupBy?.type === 'date_trunc') {
328+
const g = groupBy as IGroupByDateTrunc;
329+
const tz = g.timezone ?? 'UTC';
330+
const dateTruncSpec: any = {
331+
date: `$${g.field}`,
332+
unit: g.truncation,
333+
timezone: tz,
334+
};
335+
if (g.truncation === 'week') {
336+
dateTruncSpec.startOfWeek = 'Mon';
337+
}
338+
groupId = { $dateTrunc: dateTruncSpec };
339+
}
340+
341+
const groupStage: Record<string, any> = {
342+
_id: groupId,
343+
};
344+
345+
for (const [alias, rule] of Object.entries(aggregations)) {
346+
switch (rule.operation) {
347+
case 'count': groupStage[alias] = { $sum: 1 }; break;
348+
case 'sum': groupStage[alias] = { $sum: { $toDouble: `$${rule.field}` } }; break;
349+
case 'avg': groupStage[alias] = { $avg: { $toDouble: `$${rule.field}` } }; break;
350+
case 'min': groupStage[alias] = { $min: { $toDouble: `$${rule.field}` } }; break;
351+
case 'max': groupStage[alias] = { $max: { $toDouble: `$${rule.field}` } }; break;
352+
case 'median': groupStage[alias] = { $push: { $toDouble: `$${rule.field}` } }; break;
353+
}
354+
}
355+
356+
const pipeline: any[] = [];
357+
358+
if (Object.keys(match).length) {
359+
pipeline.push({ $match: match });
360+
}
361+
362+
pipeline.push({ $group: groupStage });
363+
364+
pipeline.push({
365+
$project: {
366+
_id: 0,
367+
group: !groupBy ? "$$REMOVE" : (groupBy.type === 'date_trunc' ? {
368+
$cond: {
369+
if: { $eq: [{ $type: "$_id" }, "date"] },
370+
then: {
371+
$dateToString: {
372+
format: "%Y-%m-%d",
373+
date: "$_id",
374+
timezone: (groupBy as IGroupByDateTrunc).timezone ?? 'UTC'
375+
}
376+
},
377+
else: "$_id"
378+
}
379+
} : "$_id"),
380+
...Object.fromEntries(
381+
Object.keys(groupStage)
382+
.filter(k => k !== '_id')
383+
.map(k => [k, `$${k}`])
384+
),
385+
},
386+
});
387+
388+
const calculateMedian = (arr: any[]) => {
389+
if (!Array.isArray(arr) || arr.length === 0) return null;
390+
const sorted = [...arr].sort((a, b) => a - b);
391+
const mid = Math.floor(sorted.length / 2);
392+
return sorted.length % 2 === 0
393+
? (sorted[mid - 1] + sorted[mid]) / 2
394+
: sorted[mid];
395+
};
396+
397+
const result = await collection.aggregate(pipeline).toArray();
398+
399+
const medianAliases = Object.keys(aggregations).filter(
400+
alias => aggregations[alias].operation === 'median'
401+
);
402+
403+
return result.map(row => {
404+
medianAliases.forEach(alias => {
405+
row[alias] = calculateMedian(row[alias]);
406+
});
407+
return row;
408+
});
409+
}
308410

309411
async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }:
310412
{

adminforth/dataConnectors/mysql.ts

Lines changed: 120 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import dayjs from 'dayjs';
2-
import { AdminForthResource, IAdminForthSingleFilter, IAdminForthAndOrFilter, IAdminForthDataSourceConnector, AdminForthConfig } from '../types/Back.js';
2+
import { AdminForthResource, IAdminForthSingleFilter, IAdminForthAndOrFilter, IAdminForthDataSourceConnector, AdminForthConfig, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js';
33
import { AdminForthDataTypes, AdminForthFilterOperators, AdminForthSortDirections, } from '../types/Common.js';
44
import AdminForthBaseConnector from './baseConnector.js';
55
import mysql from 'mysql2/promise';
@@ -338,13 +338,128 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS
338338
} : { sql: '', values: [] };
339339
}
340340

341+
async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: {
342+
resource: AdminForthResource;
343+
filters: IAdminForthAndOrFilter;
344+
aggregations: { [alias: string]: IAggregationRule };
345+
groupBy?: IGroupByRule;
346+
}): Promise<Array<{ group?: string, [key: string]: any }>> {
347+
const tableName = resource.table;
348+
const selectParts: string[] = [];
349+
const medianFields: { alias: string; field: string }[] = [];
350+
let groupExpr: string | null = null;
351+
352+
if (groupBy?.type === 'field') {
353+
groupExpr = `\`${groupBy.field}\``;
354+
selectParts.push(`${groupExpr} AS \`group\``);
355+
} else if (groupBy?.type === 'date_trunc') {
356+
const g = groupBy as IGroupByDateTrunc;
357+
const tz = g.timezone ?? 'UTC';
358+
if (!/^[A-Za-z0-9/_+\-]+$/.test(tz)) {
359+
throw new Error(`Invalid timezone value: ${tz}`);
360+
}
361+
const innerExpr = `COALESCE(CONVERT_TZ(\`${g.field}\`, 'UTC', '${tz}'), \`${g.field}\`)`;
362+
switch (g.truncation) {
363+
case 'day': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-%d')`; break;
364+
case 'month': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-01')`; break;
365+
case 'year': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-01-01')`; break;
366+
case 'week': groupExpr = `DATE_FORMAT(DATE_SUB(${innerExpr}, INTERVAL WEEKDAY(${innerExpr}) DAY), '%Y-%m-%d')`; break;
367+
}
368+
selectParts.push(`${groupExpr} AS \`group\``);
369+
}
370+
371+
for (const [alias, rule] of Object.entries(aggregations)) {
372+
const f = `\`${rule.field}\``;
373+
switch (rule.operation) {
374+
case 'sum': selectParts.push(`SUM(${f}) AS \`${alias}\``); break;
375+
case 'count': selectParts.push(`COUNT(*) AS \`${alias}\``); break;
376+
case 'avg': selectParts.push(`AVG(${f}) AS \`${alias}\``); break;
377+
case 'min': selectParts.push(`MIN(${f}) AS \`${alias}\``); break;
378+
case 'max': selectParts.push(`MAX(${f}) AS \`${alias}\``); break;
379+
case 'median': medianFields.push({ alias, field: rule.field }); break;
380+
}
381+
}
382+
383+
const { sql: where, values: filterValues } = this.whereClauseAndValues(filters);
384+
385+
type AggRow = { group?: string } & Record<string, number | string | null>;
386+
387+
// Run non-median aggregations
388+
let rows: AggRow[] = [];
389+
const hasNonMedian = selectParts.length > (groupExpr ? 1 : 0);
390+
if (hasNonMedian) {
391+
let query = `SELECT ${selectParts.join(', ')} FROM \`${tableName}\` ${where}`;
392+
if (groupExpr) query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`;
393+
dbLogger.trace(`🪲📜 MySQL AGG Q: ${query} values: ${JSON.stringify(filterValues)}`);
394+
const [result] = await this.client.execute(query, filterValues);
395+
rows = result as AggRow[];
396+
}
397+
398+
// Run each median via window functions (MySQL 8+) — no session variables, no memory pressure
399+
for (const { alias, field } of medianFields) {
400+
const f = `\`${field}\``;
401+
const nullGuard = where ? `${where} AND ${f} IS NOT NULL` : `WHERE ${f} IS NOT NULL`;
402+
403+
let medianQuery: string;
404+
if (groupExpr) {
405+
medianQuery = `
406+
SELECT \`group\`, AVG(${f}) AS \`${alias}\`
407+
FROM (
408+
SELECT ${groupExpr} AS \`group\`, ${f},
409+
ROW_NUMBER() OVER (PARTITION BY ${groupExpr} ORDER BY ${f}) AS rn,
410+
COUNT(*) OVER (PARTITION BY ${groupExpr}) AS cnt
411+
FROM \`${tableName}\` ${nullGuard}
412+
) t
413+
WHERE rn IN (FLOOR((cnt + 1) / 2.0), CEIL((cnt + 1) / 2.0))
414+
GROUP BY \`group\`
415+
ORDER BY \`group\` ASC
416+
`;
417+
} else {
418+
medianQuery = `
419+
SELECT AVG(${f}) AS \`${alias}\`
420+
FROM (
421+
SELECT ${f},
422+
ROW_NUMBER() OVER (ORDER BY ${f}) AS rn,
423+
COUNT(*) OVER () AS cnt
424+
FROM \`${tableName}\` ${nullGuard}
425+
) t
426+
WHERE rn IN (FLOOR((cnt + 1) / 2.0), CEIL((cnt + 1) / 2.0))
427+
`;
428+
}
429+
430+
dbLogger.trace(`🪲📜 MySQL MEDIAN Q: ${medianQuery} values: ${JSON.stringify(filterValues)}`);
431+
const [medianResult] = await this.client.execute(medianQuery, filterValues);
432+
const medianRows = medianResult as AggRow[];
433+
434+
if (groupExpr) {
435+
if (rows.length === 0) {
436+
rows = medianRows.map((r) => ({ group: r.group, [alias]: r[alias] }));
437+
} else {
438+
const byGroup = new Map(medianRows.map((r) => [String(r.group), r[alias]]));
439+
for (const row of rows) {
440+
row[alias] = byGroup.get(String(row.group)) ?? null;
441+
}
442+
}
443+
} else {
444+
const medianVal = medianRows[0]?.[alias] ?? null;
445+
if (rows.length === 0) {
446+
rows = [{ [alias]: medianVal }];
447+
} else {
448+
rows[0][alias] = medianVal;
449+
}
450+
}
451+
}
452+
453+
return rows;
454+
}
455+
341456
async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }): Promise<any[]> {
342-
const columns = resource.dataSourceColumns.map((col) => `${col.name}`).join(', ');
457+
const columns = resource.dataSourceColumns.map((col: { name: string }) => `${col.name}`).join(', ');
343458
const tableName = resource.table;
344459

345460
const { sql: where, values: filterValues } = this.whereClauseAndValues(filters);
346461

347-
const orderBy = sort.length ? `ORDER BY ${sort.map((s) => `${s.field} ${this.SortDirectionsMap[s.direction]}`).join(', ')}` : '';
462+
const orderBy = sort.length ? `ORDER BY ${sort.map((s: { field: string; direction: AdminForthSortDirections }) => `${s.field} ${this.SortDirectionsMap[s.direction]}`).join(', ')}` : '';
348463
let selectQuery = `SELECT ${columns} FROM ${tableName}`;
349464
if (where) selectQuery += ` ${where}`;
350465
if (orderBy) selectQuery += ` ${orderBy}`;
@@ -385,7 +500,7 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS
385500
async getMinMaxForColumnsWithOriginalTypes({ resource, columns }) {
386501
const tableName = resource.table;
387502
const result = {};
388-
await Promise.all(columns.map(async (col) => {
503+
await Promise.all(columns.map(async (col: { name: string }) => {
389504
const q = `SELECT MIN(${col.name}) as min, MAX(${col.name}) as max FROM ${tableName}`;
390505
dbLogger.trace(`🪲📜 MySQL Q: ${q}`);
391506
const [results] = await this.client.execute(q);
@@ -410,7 +525,7 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS
410525

411526
async updateRecordOriginalValues({ resource, recordId, newValues }) {
412527
const values = [...Object.values(newValues), recordId];
413-
const columnsWithPlaceholders = Object.keys(newValues).map((col, i) => `${col} = ?`).join(', ');
528+
const columnsWithPlaceholders = Object.keys(newValues).map((col) => `${col} = ?`).join(', ');
414529
const q = `UPDATE ${resource.table} SET ${columnsWithPlaceholders} WHERE ${this.getPrimaryKey(resource)} = ?`;
415530
dbLogger.trace(`🪲📜 MySQL Q: ${q} values: ${JSON.stringify(values)}`);
416531
await this.client.execute(q, values);

0 commit comments

Comments
 (0)