Skip to content

Commit 7395026

Browse files
committed
fix(telemetry): iter-2 review fixes — rebase regressions, type-safe wiring, error telemetry
Rebase the regressed exporter/aggregator/feature-flag-cache on main's hardened versions and re-apply only the genuinely new functionality (CONNECTION_CLOSE event, chunk-timing aggregation) on top. Closes the critical findings from the multi-reviewer audit: - SSRF guard, redactSensitive, sanitizeProcessName, hasAuthorization, auth-missing warn-once — all restored via main's telemetryUtils. - MetricsAggregator memory bounds (maxPendingMetrics with error-preferring drop, maxErrorsPerStatement, statementTtlMs eviction) restored. - FeatureFlagCache in-flight fetch dedup and TTL clamp [60s, 3600s] restored; lib/telemetry/urlUtils.ts deleted. - close() now properly awaits aggregator drain — fixes the close()/flush race that PR #362 already fixed once. - Driver version reads lib/version.ts via buildUserAgentString instead of hardcoded '1.0.0'; uuidv4() restored in place of Math.random(). - TelemetryTerminalError re-exported from lib/index.ts. Type-safe wiring: - Added optional getTelemetryEmitter() / getTelemetryAggregator() to IClientContext; removed all 7 `(this.context as any)` casts. - Six copy-pasted event listeners in DBSQLClient.initializeTelemetry collapsed into one `Object.values(TelemetryEventType)` loop — closes the listener-name mismatch that silently dropped error events. - mapAuthType now covers all 6 authType values instead of defaulting everything to 'pat'. TelemetryClient now owns the host-scoped resources: - TelemetryClientProvider is a process-wide singleton (getInstance()). - TelemetryClient owns DatabricksTelemetryExporter, MetricsAggregator, CircuitBreakerRegistry, and FeatureFlagCache for its host. Implements IClientContext itself so the owned components have a stable context that survives any single DBSQLClient's close. - DBSQLClient instances on the same host share the breaker counters, feature-flag cache, exporter, and HTTP batches. Fixes the per-instance breaker-fragmentation noted in iter-2 architecture review. - Each DBSQLClient still holds its own TelemetryEventEmitter (respects per-client telemetryEnabled); emitters bridge into the shared aggregator. - Exporter falls back to context.getAuthProvider() when no explicit auth provider is passed, so the shared exporter resolves auth through the TelemetryClient's FIFO of registered DBSQLClients. Error telemetry wired across operation entry points: - Re-added emitErrorEvent(error) on DBSQLOperation; uses ExceptionClassifier.isTerminal() to classify. - fetchChunk, cancel, close, getMetadata wrap their bodies in try/catch that calls emitErrorEvent before re-throwing. Verified end-to-end against a real Azure Databricks workspace: failed query produces STATEMENT_COMPLETE + ERROR (with redacted stack) on the wire. - Removed the await getMetadata() call from emitStatementComplete — eliminates the extra Thrift RPC on every close (F19) AND prevents spurious error telemetry from getMetadata's wrapper firing during close-cleanup of an already-failed operation. Other: - Iterating Map.keys() while mutating made safe via snapshot in close(). - STATEMENT_COMPLETE no longer zeroes accumulated chunk metrics when the emit doesn't supply them (matches sibling-field guards). - Tests for the rebased modules restored from main; provider tests updated for the singleton API; deleted unused TelemetryExporterStub. 484 unit tests passing. Diff vs main: ~+2110/-383, down from the original PR's +3640/-1173. Co-authored-by: Isaac
1 parent cfbcd27 commit 7395026

18 files changed

Lines changed: 1543 additions & 2283 deletions

lib/DBSQLClient.ts

Lines changed: 61 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,10 @@ import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
3232
import DBSQLLogger from './DBSQLLogger';
3333
import CloseableCollection from './utils/CloseableCollection';
3434
import IConnectionProvider from './connection/contracts/IConnectionProvider';
35-
import FeatureFlagCache from './telemetry/FeatureFlagCache';
35+
import TelemetryClient from './telemetry/TelemetryClient';
3636
import TelemetryClientProvider from './telemetry/TelemetryClientProvider';
3737
import TelemetryEventEmitter from './telemetry/TelemetryEventEmitter';
38-
import MetricsAggregator from './telemetry/MetricsAggregator';
39-
import DatabricksTelemetryExporter from './telemetry/DatabricksTelemetryExporter';
40-
import { CircuitBreakerRegistry } from './telemetry/CircuitBreaker';
41-
import { DriverConfiguration, DRIVER_NAME } from './telemetry/types';
38+
import { DriverConfiguration, DRIVER_NAME, TelemetryEventType } from './telemetry/types';
4239
import driverVersion from './version';
4340

4441
function prependSlash(str: string): string {
@@ -84,23 +81,21 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
8481

8582
private readonly sessions = new CloseableCollection<DBSQLSession>();
8683

87-
// Telemetry components (instance-based, NOT singletons)
84+
// Telemetry components — `telemetryClient` is the shared per-host owner
85+
// (process-wide via TelemetryClientProvider). The exporter, aggregator,
86+
// circuit-breaker registry and feature-flag cache live on it. Each
87+
// DBSQLClient still owns its own `telemetryEmitter` so it respects its
88+
// own `telemetryEnabled` flag.
8889
private host?: string;
8990

9091
private httpPath?: string;
9192

9293
private authType?: string;
9394

94-
private featureFlagCache?: FeatureFlagCache;
95-
96-
private telemetryClientProvider?: TelemetryClientProvider;
95+
private telemetryClient?: TelemetryClient;
9796

9897
private telemetryEmitter?: TelemetryEventEmitter;
9998

100-
private telemetryAggregator?: MetricsAggregator;
101-
102-
private circuitBreakerRegistry?: CircuitBreakerRegistry;
103-
10499
private static getDefaultLogger(): IDBSQLLogger {
105100
if (!this.defaultLogger) {
106101
this.defaultLogger = new DBSQLLogger();
@@ -298,19 +293,23 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
298293
* Distinguishes between U2M and M2M OAuth flows.
299294
*/
300295
private mapAuthType(options: ConnectionOptions): string {
301-
if (options.authType === 'databricks-oauth') {
302-
// Check if M2M (has client secret) or U2M (no client secret)
303-
return options.oauthClientSecret === undefined
304-
? 'external-browser' // U2M OAuth (User-to-Machine)
305-
: 'oauth-m2m'; // M2M OAuth (Machine-to-Machine)
306-
}
307-
308-
if (options.authType === 'custom') {
309-
return 'custom'; // Custom auth provider
296+
switch (options.authType) {
297+
case 'databricks-oauth':
298+
return options.oauthClientSecret === undefined ? 'external-browser' : 'oauth-m2m';
299+
case 'custom':
300+
return 'custom';
301+
case 'token-provider':
302+
return 'token-provider';
303+
case 'external-token':
304+
return 'external-token';
305+
case 'static-token':
306+
return 'static-token';
307+
case 'access-token':
308+
case undefined:
309+
return 'pat';
310+
default:
311+
return 'unknown';
310312
}
311-
312-
// 'access-token' or undefined
313-
return 'pat'; // Personal Access Token
314313
}
315314

316315
/**
@@ -373,80 +372,34 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
373372
}
374373

375374
try {
376-
// Create circuit breaker registry (shared by feature flags and telemetry)
377-
this.circuitBreakerRegistry = new CircuitBreakerRegistry(this);
378-
379-
// Create feature flag cache instance with circuit breaker protection
380-
this.featureFlagCache = new FeatureFlagCache(this, this.circuitBreakerRegistry);
381-
this.featureFlagCache.getOrCreateContext(this.host);
375+
// Acquire (or create) the per-host TelemetryClient from the
376+
// process-wide provider. The shared client owns the circuit-breaker
377+
// registry, feature-flag cache, exporter, and aggregator. Multiple
378+
// DBSQLClient instances on the same host share these resources so
379+
// breaker counters and HTTP batches don't fragment per-instance.
380+
this.telemetryClient = TelemetryClientProvider.getInstance().getOrCreateClient(this, this.host);
382381

383-
// Check if telemetry enabled via feature flag
384-
const enabled = await this.featureFlagCache.isTelemetryEnabled(this.host);
382+
// Use the shared feature-flag cache (registered in the previous step).
383+
const enabled = await this.telemetryClient.getFeatureFlagCache().isTelemetryEnabled(this.host);
385384

386385
if (!enabled) {
386+
// Release our refcount immediately; we won't be emitting.
387+
await TelemetryClientProvider.getInstance().releaseClient(this, this.host);
388+
this.telemetryClient = undefined;
387389
this.logger.log(LogLevel.debug, 'Telemetry: disabled');
388390
return;
389391
}
390392

391-
// Create telemetry components (all instance-based)
392-
this.telemetryClientProvider = new TelemetryClientProvider(this);
393+
// Each DBSQLClient still owns its own emitter so it respects its own
394+
// `telemetryEnabled` flag and feature-flag result. All emitters bridge
395+
// into the SHARED aggregator on the TelemetryClient.
393396
this.telemetryEmitter = new TelemetryEventEmitter(this);
394-
395-
// Get or create telemetry client for this host (increments refCount)
396-
this.telemetryClientProvider.getOrCreateClient(this.host);
397-
398-
// Create telemetry exporter with shared circuit breaker registry
399-
const exporter = new DatabricksTelemetryExporter(this, this.host, this.circuitBreakerRegistry);
400-
this.telemetryAggregator = new MetricsAggregator(this, exporter);
401-
402-
// Wire up event listeners
403-
this.telemetryEmitter.on('connection.open', (event) => {
404-
try {
405-
this.telemetryAggregator?.processEvent(event);
406-
} catch (error: any) {
407-
this.logger.log(LogLevel.debug, `Error processing connection.open event: ${error.message}`);
408-
}
409-
});
410-
411-
this.telemetryEmitter.on('connection.close', (event) => {
412-
try {
413-
this.telemetryAggregator?.processEvent(event);
414-
} catch (error: any) {
415-
this.logger.log(LogLevel.debug, `Error processing connection.close event: ${error.message}`);
416-
}
417-
});
418-
419-
this.telemetryEmitter.on('statement.start', (event) => {
420-
try {
421-
this.telemetryAggregator?.processEvent(event);
422-
} catch (error: any) {
423-
this.logger.log(LogLevel.debug, `Error processing statement.start event: ${error.message}`);
424-
}
425-
});
426-
427-
this.telemetryEmitter.on('statement.complete', (event) => {
428-
try {
429-
this.telemetryAggregator?.processEvent(event);
430-
} catch (error: any) {
431-
this.logger.log(LogLevel.debug, `Error processing statement.complete event: ${error.message}`);
432-
}
433-
});
434-
435-
this.telemetryEmitter.on('cloudfetch.chunk', (event) => {
436-
try {
437-
this.telemetryAggregator?.processEvent(event);
438-
} catch (error: any) {
439-
this.logger.log(LogLevel.debug, `Error processing cloudfetch.chunk event: ${error.message}`);
440-
}
441-
});
442-
443-
this.telemetryEmitter.on('error', (event) => {
444-
try {
445-
this.telemetryAggregator?.processEvent(event);
446-
} catch (error: any) {
447-
this.logger.log(LogLevel.debug, `Error processing error event: ${error.message}`);
448-
}
449-
});
397+
const sharedAggregator = this.telemetryClient.getAggregator();
398+
for (const eventType of Object.values(TelemetryEventType)) {
399+
this.telemetryEmitter.on(eventType, (event) => {
400+
sharedAggregator.processEvent(event);
401+
});
402+
}
450403

451404
this.logger.log(LogLevel.debug, 'Telemetry: enabled');
452405
} catch (error: any) {
@@ -613,27 +566,17 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
613566
public async close(): Promise<void> {
614567
await this.sessions.closeAll();
615568

616-
// Cleanup telemetry
617-
if (this.host) {
569+
// Cleanup telemetry. Releasing our refcount on the shared TelemetryClient
570+
// is awaited because the underlying close() drains the final HTTP POST —
571+
// a caller doing `await client.close(); process.exit(0)` would otherwise
572+
// truncate the in-flight request when this is the last refcount holder.
573+
if (this.host && this.telemetryClient) {
618574
try {
619-
// Step 1: Close aggregator (stops timer, completes statements, final flush)
620-
if (this.telemetryAggregator) {
621-
this.telemetryAggregator.close();
622-
}
623-
624-
// Step 2: Release telemetry client (decrements ref count, closes if last)
625-
if (this.telemetryClientProvider) {
626-
await this.telemetryClientProvider.releaseClient(this.host);
627-
}
628-
629-
// Step 3: Release feature flag context (decrements ref count)
630-
if (this.featureFlagCache) {
631-
this.featureFlagCache.releaseContext(this.host);
632-
}
575+
await TelemetryClientProvider.getInstance().releaseClient(this, this.host);
633576
} catch (error: any) {
634-
// Swallow all telemetry cleanup errors
635577
this.logger.log(LogLevel.debug, `Telemetry cleanup error: ${error.message}`);
636578
}
579+
this.telemetryClient = undefined;
637580
}
638581

639582
this.client = undefined;
@@ -687,4 +630,14 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
687630
public getAuthProvider(): IAuthentication | undefined {
688631
return this.authProvider;
689632
}
633+
634+
/** @internal */
635+
public getTelemetryEmitter(): TelemetryEventEmitter | undefined {
636+
return this.telemetryEmitter;
637+
}
638+
639+
/** @internal */
640+
public getTelemetryAggregator() {
641+
return this.telemetryClient?.getAggregator();
642+
}
690643
}

lib/DBSQLOperation.ts

Lines changed: 55 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ import { definedOrError } from './utils';
3434
import { OperationChunksIterator, OperationRowsIterator } from './utils/OperationIterator';
3535
import HiveDriverError from './errors/HiveDriverError';
3636
import IClientContext from './contracts/IClientContext';
37-
import ExceptionClassifier from './telemetry/ExceptionClassifier';
3837
import { mapOperationTypeToTelemetryType, mapResultFormatToTelemetryType } from './telemetry/telemetryTypeMappers';
38+
import ExceptionClassifier from './telemetry/ExceptionClassifier';
3939

4040
interface DBSQLOperationConstructorOptions {
4141
handle: TOperationHandle;
@@ -181,6 +181,15 @@ export default class DBSQLOperation implements IOperation {
181181
* const result = await queryOperation.fetchChunk({maxRows: 1000});
182182
*/
183183
public async fetchChunk(options?: FetchOptions): Promise<Array<object>> {
184+
try {
185+
return await this.fetchChunkInternal(options);
186+
} catch (err: any) {
187+
this.emitErrorEvent(err);
188+
throw err;
189+
}
190+
}
191+
192+
private async fetchChunkInternal(options?: FetchOptions): Promise<Array<object>> {
184193
await this.failIfClosed();
185194

186195
if (!this.operationHandle.hasResultSet) {
@@ -257,6 +266,15 @@ export default class DBSQLOperation implements IOperation {
257266
* @throws {StatusError}
258267
*/
259268
public async cancel(): Promise<Status> {
269+
try {
270+
return await this.cancelInternal();
271+
} catch (err: any) {
272+
this.emitErrorEvent(err);
273+
throw err;
274+
}
275+
}
276+
277+
private async cancelInternal(): Promise<Status> {
260278
if (this.closed || this.cancelled) {
261279
return Status.success();
262280
}
@@ -281,6 +299,15 @@ export default class DBSQLOperation implements IOperation {
281299
* @throws {StatusError}
282300
*/
283301
public async close(): Promise<Status> {
302+
try {
303+
return await this.closeInternal();
304+
} catch (err: any) {
305+
this.emitErrorEvent(err);
306+
throw err;
307+
}
308+
}
309+
310+
private async closeInternal(): Promise<Status> {
284311
if (this.closed || this.cancelled) {
285312
return Status.success();
286313
}
@@ -341,9 +368,14 @@ export default class DBSQLOperation implements IOperation {
341368
}
342369

343370
public async getMetadata(): Promise<TGetResultSetMetadataResp> {
344-
await this.failIfClosed();
345-
await this.waitUntilReady();
346-
return this.fetchMetadata();
371+
try {
372+
await this.failIfClosed();
373+
await this.waitUntilReady();
374+
return await this.fetchMetadata();
375+
} catch (err: any) {
376+
this.emitErrorEvent(err);
377+
throw err;
378+
}
347379
}
348380

349381
private async failIfClosed(): Promise<void> {
@@ -509,7 +541,7 @@ export default class DBSQLOperation implements IOperation {
509541
*/
510542
private emitStatementStart(): void {
511543
try {
512-
const { telemetryEmitter } = this.context as any;
544+
const telemetryEmitter = this.context.getTelemetryEmitter?.();
513545
if (!telemetryEmitter) {
514546
return;
515547
}
@@ -530,23 +562,19 @@ export default class DBSQLOperation implements IOperation {
530562
*/
531563
private async emitStatementComplete(): Promise<void> {
532564
try {
533-
const { telemetryEmitter } = this.context as any;
534-
const { telemetryAggregator } = this.context as any;
565+
const telemetryEmitter = this.context.getTelemetryEmitter?.();
566+
const telemetryAggregator = this.context.getTelemetryAggregator?.();
535567
if (!telemetryEmitter || !telemetryAggregator) {
536568
return;
537569
}
538570

539-
// Fetch metadata if not already fetched to get result format
540-
let resultFormat: string | undefined;
541-
try {
542-
if (!this.metadata && !this.cancelled) {
543-
await this.getMetadata();
544-
}
545-
resultFormat = mapResultFormatToTelemetryType(this.metadata?.resultFormat);
546-
} catch (error) {
547-
// If metadata fetch fails, continue without it
548-
resultFormat = undefined;
549-
}
571+
// Use whatever metadata was already fetched by the result-handling
572+
// path. Do NOT trigger a `getMetadata()` here — that issues a Thrift
573+
// RPC on every close (doubles close latency for short DDL/DML) AND
574+
// throws if the operation is already in an error/closed state, which
575+
// would then fire spurious error telemetry from `getMetadata`'s error
576+
// wrapper.
577+
const resultFormat = mapResultFormatToTelemetryType(this.metadata?.resultFormat);
550578

551579
const latencyMs = Date.now() - this.startTime;
552580

@@ -566,25 +594,24 @@ export default class DBSQLOperation implements IOperation {
566594
}
567595

568596
/**
569-
* Emit error telemetry event with terminal classification.
570-
* CRITICAL: All exceptions swallowed and logged at LogLevel.debug ONLY.
597+
* Emit a telemetry error event for an exception thrown by an operation.
598+
* Terminal errors (per `ExceptionClassifier`) trigger an immediate flush
599+
* in the aggregator; retryable errors are buffered until the statement
600+
* completes. All exceptions from this method itself are swallowed at
601+
* debug level — telemetry must never break the driver.
571602
*/
572603
private emitErrorEvent(error: Error): void {
573604
try {
574-
const { telemetryEmitter } = this.context as any;
575-
if (!telemetryEmitter) {
576-
return;
577-
}
578-
579-
// Classify the exception
580-
const isTerminal = ExceptionClassifier.isTerminal(error);
605+
const telemetryEmitter = this.context.getTelemetryEmitter?.();
606+
if (!telemetryEmitter) return;
581607

582608
telemetryEmitter.emitError({
583609
statementId: this.id,
584610
sessionId: this.sessionId,
585611
errorName: error.name || 'Error',
586612
errorMessage: error.message || 'Unknown error',
587-
isTerminal,
613+
errorStack: error.stack,
614+
isTerminal: ExceptionClassifier.isTerminal(error),
588615
});
589616
} catch (emitError: any) {
590617
this.context.getLogger().log(LogLevel.debug, `Error emitting error event: ${emitError.message}`);

lib/DBSQLSession.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ export default class DBSQLSession implements IDBSQLSession {
599599

600600
// Emit connection close telemetry
601601
const closeLatency = Date.now() - this.openTime;
602-
const { telemetryEmitter } = this.context as any;
602+
const telemetryEmitter = this.context.getTelemetryEmitter?.();
603603
if (telemetryEmitter) {
604604
telemetryEmitter.emitConnectionClose({
605605
sessionId: this.id,

0 commit comments

Comments
 (0)