Skip to content

Commit c108048

Browse files
committed
feat: emit per-chunk timing telemetry on the FetchResults path
Aggregate initial/slowest/sum chunk latencies in MetricsAggregator, emit them in the proto chunk_details block, and time each FetchResults RPC in RowSetProvider so the inline-Arrow path populates chunk telemetry alongside CloudFetch (mirrors Go's chunkTimingAccumulator). Also fix MetricsAggregator clobbering accumulated chunkCount and bytesDownloaded to 0 on STATEMENT_COMPLETE when the event omitted those fields — this hid chunk_details from every path. Co-authored-by: Isaac
1 parent f0cdfd1 commit c108048

5 files changed

Lines changed: 101 additions & 2 deletions

File tree

lib/DBSQLOperation.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ export default class DBSQLOperation implements IOperation {
103103
this.operationHandle,
104104
[directResults?.resultSet],
105105
useOnlyPrefetchedResults,
106+
this.id,
106107
);
107108
this.closeOperation = directResults?.closeOperation;
108109
this.context.getLogger().log(LogLevel.debug, `Operation created with id: ${this.id}`);

lib/result/RowSetProvider.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import Int64 from 'node-int64';
22
import { TFetchOrientation, TFetchResultsResp, TOperationHandle, TRowSet } from '../../thrift/TCLIService_types';
33
import Status from '../dto/Status';
44
import IClientContext from '../contracts/IClientContext';
5+
import { LogLevel } from '../contracts/IDBSQLLogger';
56
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
67
import { getColumnValue } from './utils';
78

@@ -26,6 +27,10 @@ export default class RowSetProvider implements IResultsProvider<TRowSet | undefi
2627

2728
private readonly operationHandle: TOperationHandle;
2829

30+
private readonly statementId?: string;
31+
32+
private chunkIndex: number = 0;
33+
2934
private fetchOrientation: TFetchOrientation = TFetchOrientation.FETCH_FIRST;
3035

3136
private prefetchedResults: TFetchResultsResp[] = [];
@@ -48,9 +53,11 @@ export default class RowSetProvider implements IResultsProvider<TRowSet | undefi
4853
operationHandle: TOperationHandle,
4954
prefetchedResults: Array<TFetchResultsResp | undefined>,
5055
returnOnlyPrefetchedResults: boolean,
56+
statementId?: string,
5157
) {
5258
this.context = context;
5359
this.operationHandle = operationHandle;
60+
this.statementId = statementId;
5461
prefetchedResults.forEach((item) => {
5562
if (item) {
5663
this.prefetchedResults.push(item);
@@ -83,16 +90,55 @@ export default class RowSetProvider implements IResultsProvider<TRowSet | undefi
8390
}
8491

8592
const driver = await this.context.getDriver();
93+
const startTime = Date.now();
8694
const response = await driver.fetchResults({
8795
operationHandle: this.operationHandle,
8896
orientation: this.fetchOrientation,
8997
maxRows: new Int64(limit),
9098
fetchType: FetchType.Data,
9199
});
100+
const latencyMs = Date.now() - startTime;
101+
102+
this.emitChunkEvent(latencyMs, response);
92103

93104
return this.processFetchResponse(response);
94105
}
95106

107+
/**
108+
* Emit a chunk telemetry event for one FetchResults page.
109+
* CRITICAL: All exceptions swallowed and logged at LogLevel.debug ONLY.
110+
*/
111+
private emitChunkEvent(latencyMs: number, response: TFetchResultsResp): void {
112+
try {
113+
if (!this.statementId) {
114+
return;
115+
}
116+
117+
const { telemetryEmitter } = this.context as any;
118+
if (!telemetryEmitter) {
119+
return;
120+
}
121+
122+
let bytes = 0;
123+
const arrowBatches = response.results?.arrowBatches;
124+
if (arrowBatches) {
125+
for (const batch of arrowBatches) {
126+
bytes += batch.batch?.length ?? 0;
127+
}
128+
}
129+
130+
telemetryEmitter.emitCloudFetchChunk({
131+
statementId: this.statementId,
132+
chunkIndex: this.chunkIndex,
133+
latencyMs,
134+
bytes,
135+
});
136+
this.chunkIndex += 1;
137+
} catch (error: any) {
138+
this.context.getLogger().log(LogLevel.debug, `Error emitting FetchResults chunk event: ${error.message}`);
139+
}
140+
}
141+
96142
public async hasMore() {
97143
// If there are prefetched results available - return `true` regardless of
98144
// the actual state of `hasMoreRows` flag (because we actually have some data)

lib/telemetry/DatabricksTelemetryExporter.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,15 @@ export default class DatabricksTelemetryExporter {
396396
log.entry.sql_driver_log.sql_operation.chunk_details = {
397397
total_chunks_present: metric.chunkCount,
398398
total_chunks_iterated: metric.chunkCount,
399+
...(metric.chunkInitialLatencyMs !== undefined && {
400+
initial_chunk_latency_millis: metric.chunkInitialLatencyMs,
401+
}),
402+
...(metric.chunkSlowestLatencyMs !== undefined && {
403+
slowest_chunk_latency_millis: metric.chunkSlowestLatencyMs,
404+
}),
405+
...(metric.chunkSumLatencyMs !== undefined && {
406+
sum_chunks_download_time_millis: metric.chunkSumLatencyMs,
407+
}),
399408
};
400409
}
401410
}

lib/telemetry/MetricsAggregator.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ interface StatementTelemetryDetails {
3838
executionLatencyMs?: number;
3939
resultFormat?: string;
4040
chunkCount: number;
41+
chunkInitialLatencyMs?: number;
42+
chunkSlowestLatencyMs?: number;
43+
chunkSumLatencyMs: number;
4144
bytesDownloaded: number;
4245
pollCount: number;
4346
compressionEnabled?: boolean;
@@ -226,8 +229,14 @@ export default class MetricsAggregator {
226229
case TelemetryEventType.STATEMENT_COMPLETE:
227230
details.executionLatencyMs = event.latencyMs;
228231
details.resultFormat = event.resultFormat;
229-
details.chunkCount = event.chunkCount ?? 0;
230-
details.bytesDownloaded = event.bytesDownloaded ?? 0;
232+
// Only override with event-provided values; otherwise keep counts
233+
// accumulated from CLOUDFETCH_CHUNK events.
234+
if (event.chunkCount !== undefined) {
235+
details.chunkCount = event.chunkCount;
236+
}
237+
if (event.bytesDownloaded !== undefined) {
238+
details.bytesDownloaded = event.bytesDownloaded;
239+
}
231240
details.pollCount = event.pollCount ?? 0;
232241
break;
233242

@@ -237,6 +246,18 @@ export default class MetricsAggregator {
237246
if (event.compressed !== undefined) {
238247
details.compressionEnabled = event.compressed;
239248
}
249+
// Per-chunk timing aggregation (mirrors Go's chunkTimingAccumulator).
250+
// Only record when latencyMs is positive — keeps prefetched/cached
251+
// pages out of the timing stats.
252+
if (event.latencyMs !== undefined && event.latencyMs > 0) {
253+
if (details.chunkInitialLatencyMs === undefined) {
254+
details.chunkInitialLatencyMs = event.latencyMs;
255+
}
256+
if (details.chunkSlowestLatencyMs === undefined || event.latencyMs > details.chunkSlowestLatencyMs) {
257+
details.chunkSlowestLatencyMs = event.latencyMs;
258+
}
259+
details.chunkSumLatencyMs += event.latencyMs;
260+
}
240261
break;
241262

242263
default:
@@ -258,6 +279,7 @@ export default class MetricsAggregator {
258279
workspaceId: event.workspaceId,
259280
startTime: event.timestamp,
260281
chunkCount: 0,
282+
chunkSumLatencyMs: 0,
261283
bytesDownloaded: 0,
262284
pollCount: 0,
263285
errors: [],
@@ -293,6 +315,9 @@ export default class MetricsAggregator {
293315
latencyMs: details.executionLatencyMs,
294316
resultFormat: details.resultFormat,
295317
chunkCount: details.chunkCount,
318+
chunkInitialLatencyMs: details.chunkInitialLatencyMs,
319+
chunkSlowestLatencyMs: details.chunkSlowestLatencyMs,
320+
chunkSumLatencyMs: details.chunkSumLatencyMs > 0 ? details.chunkSumLatencyMs : undefined,
296321
bytesDownloaded: details.bytesDownloaded,
297322
pollCount: details.pollCount,
298323
compressed: details.compressionEnabled,

lib/telemetry/types.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,15 @@ export interface TelemetryMetric {
194194
/** Number of result chunks */
195195
chunkCount?: number;
196196

197+
/** Latency of the first chunk fetch in milliseconds */
198+
chunkInitialLatencyMs?: number;
199+
200+
/** Latency of the slowest chunk fetch in milliseconds */
201+
chunkSlowestLatencyMs?: number;
202+
203+
/** Sum of all chunk fetch latencies in milliseconds */
204+
chunkSumLatencyMs?: number;
205+
197206
/** Total bytes downloaded */
198207
bytesDownloaded?: number;
199208

@@ -315,6 +324,15 @@ export interface StatementMetrics {
315324
/** Number of CloudFetch chunks downloaded */
316325
chunkCount: number;
317326

327+
/** Latency of the first chunk fetch in milliseconds */
328+
chunkInitialLatencyMs?: number;
329+
330+
/** Latency of the slowest chunk fetch in milliseconds */
331+
chunkSlowestLatencyMs?: number;
332+
333+
/** Sum of all chunk fetch latencies in milliseconds */
334+
chunkSumLatencyMs?: number;
335+
318336
/** Total bytes downloaded */
319337
totalBytesDownloaded: number;
320338

0 commit comments

Comments
 (0)