Skip to content

Commit faabb5c

Browse files
authored
fix: inject query priority into proto messages for server-side scheduling (#9208)
* fix: inject query priority into proto messages for server-side scheduling After the ConnectRPC migration, the queue interceptor only used priority for client-side request ordering but never set it on the proto message. The server saw priority 0 (lowest) for all dashboard queries, causing them to land in the low capacity lane of the DuckDB/ClickHouse connection semaphore. The interceptor now injects the method-derived priority into the proto message before sending, restoring the unified priority behavior of the old HTTP request queue. Caller-set priorities (e.g. column profile active/inactive boost) are preserved. Also adds missing methods to the priority map and bumps MetricsViewTimeRange(s) to 100 since they gate dashboard rendering. * fix: resolve TS2345 in request-queue.spec.ts Cast mock objects to `any` to satisfy ConnectRPC's strict `AnyFn` and `UnaryRequest` type constraints in test mocks. * refactor: improve request-queue test readability Rename variables to clarify the interceptor testing pattern: run → sendRequest, next → transport, makeRequest → fakeRequest.
1 parent 1ac7070 commit faabb5c

4 files changed

Lines changed: 92 additions & 10 deletions

File tree

web-common/src/features/dashboards/time-controls/rill-time-ranges.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ export async function fetchTimeRanges({
9191
expressions: rillTimes,
9292
timeZone,
9393
executionTime: executionTime as any,
94-
priority: 100,
9594
timeDimension,
9695
};
9796

web-common/src/runtime-client/v2/request-priorities.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
// Maps ConnectRPC method names to priority weights.
88
// Higher priority = dispatched first.
99
const MethodPriorities: Record<string, number> = {
10+
// Critical: pre-dashboard-load; must complete before other dashboard queries
11+
MetricsViewTimeRange: 100,
12+
MetricsViewTimeRanges: 100,
13+
1014
// High priority: user-visible data
1115
MetricsViewRows: 50,
12-
MetricsViewTimeRange: 50,
1316
ColumnProfile: 45,
1417

1518
// Medium: charts and summaries
@@ -18,11 +21,13 @@ const MethodPriorities: Record<string, number> = {
1821
ColumnCardinality: 35,
1922
MetricsViewAggregation: 30,
2023
MetricsViewTimeSeries: 30,
24+
MetricsViewComparison: 30,
2125
NumericHistogram: 30,
2226
MetricsViewTotals: 30,
2327

2428
// Low: exploratory queries
2529
MetricsViewToplist: 10,
30+
MetricsViewAnnotations: 10,
2631
RugHistogram: 10,
2732
DescriptiveStatistics: 10,
2833
};
@@ -40,6 +45,8 @@ export function getPriorityForMethod(methodName: string): number {
4045
const ColumnQueryPriorities: Record<string, number> = {
4146
topk: 10,
4247
timeseries: 30,
48+
"rollup-interval": 30,
49+
"smallest-time-grain": 30,
4350
"numeric-histogram": 30,
4451
"rug-histogram": 10,
4552
"descriptive-statistics": 10,
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import { describe, it, expect, vi, beforeEach } from "vitest";
2+
import { createQueueInterceptor, RequestQueue } from "./request-queue";
3+
4+
/**
5+
* Builds a minimal ConnectRPC-shaped request object.
6+
* Only the fields read by createQueueInterceptor are meaningful;
7+
* the rest are stubs to satisfy the interface.
8+
*/
9+
function fakeRequest(methodName: string, message: Record<string, unknown>) {
10+
return {
11+
stream: false as const,
12+
service: {},
13+
method: { name: methodName },
14+
url: "",
15+
init: {},
16+
signal: new AbortController().signal,
17+
header: new Headers(),
18+
contextValues: {},
19+
message,
20+
} as any; // eslint-disable-line @typescript-eslint/no-explicit-any
21+
}
22+
23+
describe("createQueueInterceptor", () => {
24+
let sendRequest: (req: any) => Promise<any>; // eslint-disable-line @typescript-eslint/no-explicit-any
25+
let transport: ReturnType<typeof vi.fn>;
26+
27+
beforeEach(() => {
28+
const queue = new RequestQueue({ maxConcurrent: 10 });
29+
const interceptor = createQueueInterceptor(queue);
30+
31+
// `transport` simulates the ConnectRPC transport layer (the `next` fn).
32+
// The interceptor wraps it, giving us `sendRequest`.
33+
transport = vi.fn().mockResolvedValue({ stream: false, message: {} });
34+
sendRequest = interceptor(transport as any); // eslint-disable-line @typescript-eslint/no-explicit-any
35+
});
36+
37+
it("injects method-derived priority when message has no priority", async () => {
38+
const message: Record<string, unknown> = {
39+
metricsViewName: "test_view",
40+
};
41+
await sendRequest(fakeRequest("MetricsViewTimeRanges", message));
42+
43+
expect(message.priority).toBe(100);
44+
expect(transport).toHaveBeenCalledOnce();
45+
});
46+
47+
it("injects method-derived priority when message has priority 0", async () => {
48+
const message: Record<string, unknown> = {
49+
metricsViewName: "test_view",
50+
priority: 0,
51+
};
52+
await sendRequest(fakeRequest("MetricsViewAggregation", message));
53+
54+
expect(message.priority).toBe(30);
55+
});
56+
57+
it("preserves caller-set priority when non-zero", async () => {
58+
const message: Record<string, unknown> = {
59+
metricsViewName: "test_view",
60+
priority: 60,
61+
};
62+
await sendRequest(fakeRequest("MetricsViewAggregation", message));
63+
64+
expect(message.priority).toBe(60);
65+
});
66+
67+
it("falls back to DEFAULT_PRIORITY for unknown methods", async () => {
68+
const message: Record<string, unknown> = {};
69+
await sendRequest(fakeRequest("SomeUnknownMethod", message));
70+
71+
expect(message.priority).toBe(10);
72+
});
73+
});

web-common/src/runtime-client/v2/request-queue.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -233,12 +233,9 @@ export class RequestQueue {
233233
* Extract the resource name from a ConnectRPC request message.
234234
* Looks for common field patterns in runtime API request messages.
235235
*/
236-
function extractResourceName(req: { message: unknown }): string | undefined {
237-
const msg = req.message as Record<string, unknown>;
238-
// MetricsView queries use metricsViewName or metricsView
236+
function extractResourceName(msg: Record<string, unknown>): string | undefined {
239237
if (typeof msg.metricsViewName === "string") return msg.metricsViewName;
240238
if (typeof msg.metricsView === "string") return msg.metricsView;
241-
// Column profiling queries use tableName
242239
if (typeof msg.tableName === "string") return msg.tableName;
243240
return undefined;
244241
}
@@ -249,11 +246,17 @@ function extractResourceName(req: { message: unknown }): string | undefined {
249246
*/
250247
export function createQueueInterceptor(queue: RequestQueue): Interceptor {
251248
return (next) => async (req) => {
249+
const msg = req.message as Record<string, unknown>;
252250
const priority = getPriorityForMethod(req.method.name);
253-
const resourceName = extractResourceName(req);
254-
const columnName = (req.message as Record<string, unknown>).columnName as
255-
| string
256-
| undefined;
251+
const resourceName = extractResourceName(msg);
252+
const columnName = msg.columnName as string | undefined;
253+
254+
// Inject priority into the proto message so the server can schedule
255+
// accordingly (e.g. DuckDB/ClickHouse connection semaphore ordering).
256+
// If the caller already set a non-zero priority, respect it.
257+
if (!msg.priority) {
258+
msg.priority = priority;
259+
}
257260

258261
return queue.enqueue({
259262
priority,

0 commit comments

Comments
 (0)