Skip to content

Commit 2f70510

Browse files
committed
fix(telemetry): address review findings — wiring, lifecycle, redaction, knobs
Iter-3 review fixes addressing 17 distinct findings from the multi-agent review. Telemetry is now functionally correct and operationally safe. Critical - F1: TelemetryClient ctor wires getOrCreateContext on FeatureFlagCache. isTelemetryEnabled was previously short-circuiting to false in production because no caller registered the host — every customer silently emitted zero events. - F2: integration test asserts the documented default (true), not the prior off-by-default. Test was contradicting production code. - F3: IClientContext.getAuthProvider now optional; consumers use ?.() so external implementers don't break on upgrade. High / privacy - F4: explicit DATABRICKS_TELEMETRY_DISABLED parser (1/true/yes/on, case insensitive). Avoids the footgun where DATABRICKS_TELEMETRY_DISABLED=false also disabled telemetry. Documented in CHANGELOG and TSDoc. - F12: TelemetryClient.registerContext warns on telemetry-config and userAgentEntry divergence so multi-tenant misconfig is visible. - F9: connect()-on-reconnect releases prior refcount; close() clears the emitter ref so post-close events can't smuggle into a closed aggregator. - M-1: redactSensitive strips /home/<user>/, /Users/<user>/, and C:\Users\<user>\ patterns from stack traces. - M-3: FeatureFlagCache.getAuthHeaders falls through to the context's auth provider — feature-flag GET is no longer unconditionally unauth. Operational - F7: MetricsAggregator.close races the final flush against a configurable telemetryCloseTimeoutMs (default 2s) so a flapping endpoint can't hang process.exit(0). - F8: flushInFlight serializer prevents concurrent fire-and-forget flushes from starving the user's HTTP socket pool. Drain pattern in close() awaits any in-flight flush, then issues a fresh one to capture close-time metrics that would otherwise be stranded. - F16: maxStatementMetrics cap (default 5000) with oldest-first eviction. Buffered errors emitted as standalone metrics first so the first-failure signal survives. - DBSQLSession.close() emits connection.close even when closeSession fails so failed-close rates are visible in dashboards. Maintainability - F10/F17: single withErrorTelemetry helper covers fetchChunk, cancel, close, finished, hasMoreRows, getSchema, getMetadata. safeEmit helper consolidates seven copy-pasted "get emitter, emit, swallow at debug" blocks across DBSQLOperation, DBSQLClient, DBSQLSession, CloudFetchResultHandler, RowSetProvider. Also fixes the inconsistency where DBSQLSession.close() lacked the swallow wrapper that the other six sites had. API surface - F13: ConnectionOptions exposes nine telemetry knobs (was three) with TSDoc. Adds telemetryFlushIntervalMs, telemetryMaxRetries, telemetryCircuitBreakerThreshold, telemetryCircuitBreakerTimeout, telemetryCloseTimeoutMs, telemetryMaxStatementMetrics. Tests - ClientContextStub gains telemetryEmitter / telemetryAggregator hooks so unit tests can assert on emit calls instead of silently no-op'ing. - 18 new unit tests covering F1 refcount, F12 divergence warn, async-close idempotency, error-telemetry wrappers (cancel, close, getMetadata, closed-op finished/getSchema/hasMoreRows), multi-context FIFO, and a new tests/unit/result/RowSetProvider.test.ts file (RowSetProvider had no test file at all). 783 unit tests pass; live e2e against adb-27363120558779.19.azuredatabricks.net validates the full pipeline. Co-authored-by: Isaac
1 parent 410780a commit 2f70510

19 files changed

Lines changed: 745 additions & 167 deletions

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@
66
- Add metric view metadata support (databricks/databricks-sql-nodejs#312 by @shivam2680)
77
- Fix: Avoid calling require('lz4') if it's really not required (databricks/databricks-sql-nodejs#316 by @ikkala)
88
- Add telemetry foundation (off by default) (databricks/databricks-sql-nodejs#324 by @samikshya-db)
9+
- Telemetry event emission and per-host aggregation (databricks/databricks-sql-nodejs#327 by @samikshya-db).
10+
**Default change:** `telemetryEnabled` now defaults to `true` (gated by a remote feature flag).
11+
To opt out programmatically, pass `telemetryEnabled: false` to `connect()`.
12+
To disable globally without code changes, set the environment variable
13+
`DATABRICKS_TELEMETRY_DISABLED` to one of `1`, `true`, `yes`, or `on`
14+
(case-insensitive). Other values (empty, `0`, `false`, etc.) are ignored
15+
— the runtime config takes precedence.
916

1017
## 1.12.0
1118

lib/DBSQLClient.ts

Lines changed: 54 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import TelemetryClient from './telemetry/TelemetryClient';
3636
import TelemetryClientProvider from './telemetry/TelemetryClientProvider';
3737
import TelemetryEventEmitter from './telemetry/TelemetryEventEmitter';
3838
import { DriverConfiguration, DRIVER_NAME, TelemetryEventType } from './telemetry/types';
39+
import { safeEmit } from './telemetry/telemetryUtils';
3940
import driverVersion from './version';
4041

4142
function prependSlash(str: string): string {
@@ -429,6 +430,19 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
429430
}
430431
}
431432

433+
// If connect() is being called a second time (reconnect, host switch),
434+
// release the prior telemetry refcount and emitter so we don't leak a
435+
// refcount in the process-wide TelemetryClientProvider for the old host.
436+
if (this.host && this.telemetryClient) {
437+
try {
438+
await TelemetryClientProvider.getInstance().releaseClient(this, this.host);
439+
} catch (error: any) {
440+
this.logger.log(LogLevel.debug, `Telemetry release-on-reconnect error: ${error.message}`);
441+
}
442+
this.telemetryClient = undefined;
443+
this.telemetryEmitter = undefined;
444+
}
445+
432446
// Store connection params for telemetry
433447
this.host = options.host;
434448
this.httpPath = options.path;
@@ -440,14 +454,22 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
440454
}
441455

442456
// Override telemetry config if provided in options
443-
if (options.telemetryEnabled !== undefined) {
444-
this.config.telemetryEnabled = options.telemetryEnabled;
445-
}
446-
if (options.telemetryBatchSize !== undefined) {
447-
this.config.telemetryBatchSize = options.telemetryBatchSize;
448-
}
449-
if (options.telemetryAuthenticatedExport !== undefined) {
450-
this.config.telemetryAuthenticatedExport = options.telemetryAuthenticatedExport;
457+
const telemetryOverrides = [
458+
'telemetryEnabled',
459+
'telemetryBatchSize',
460+
'telemetryFlushIntervalMs',
461+
'telemetryMaxRetries',
462+
'telemetryAuthenticatedExport',
463+
'telemetryCircuitBreakerThreshold',
464+
'telemetryCircuitBreakerTimeout',
465+
'telemetryCloseTimeoutMs',
466+
'telemetryMaxStatementMetrics',
467+
] as const;
468+
for (const k of telemetryOverrides) {
469+
if (options[k] !== undefined) {
470+
// The narrow union forces a cast; values are validated at point of use.
471+
(this.config as any)[k] = options[k];
472+
}
451473
}
452474

453475
// Persist userAgentEntry so telemetry and feature-flag call sites reuse
@@ -489,8 +511,15 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
489511
this.emit('timeout');
490512
});
491513

492-
// Initialize telemetry if enabled
493-
if (this.config.telemetryEnabled) {
514+
// Initialize telemetry if enabled. The env var DATABRICKS_TELEMETRY_DISABLED
515+
// is a hard kill switch for ops/IT teams who can't redeploy app code.
516+
// Recognized truthy values: 1, true, yes, on (case-insensitive). Anything
517+
// else (empty, "0", "false", "no", "off") leaves the runtime config in
518+
// charge — avoiding the footgun where a sysadmin sets the var to "false"
519+
// expecting to enable telemetry.
520+
const envKill = process.env.DATABRICKS_TELEMETRY_DISABLED;
521+
const envDisabled = typeof envKill === 'string' && /^(1|true|yes|on)$/i.test(envKill.trim());
522+
if (this.config.telemetryEnabled && !envDisabled) {
494523
await this.initializeTelemetry();
495524
}
496525

@@ -534,22 +563,18 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
534563
this.sessions.add(session);
535564

536565
// Emit connection.open telemetry event
537-
if (this.telemetryEmitter && this.host) {
538-
try {
539-
const latencyMs = Date.now() - startTime;
540-
const workspaceId = this.extractWorkspaceId(this.host);
541-
const driverConfig = this.buildDriverConfiguration();
542-
this.telemetryEmitter.emitConnectionOpen({
543-
sessionId: session.id,
544-
workspaceId,
545-
driverConfig,
546-
latencyMs,
547-
});
548-
} catch (error: any) {
549-
// CRITICAL: All telemetry exceptions swallowed
550-
this.logger.log(LogLevel.debug, `Error emitting connection.open event: ${error.message}`);
551-
}
552-
}
566+
safeEmit(this, (emitter) => {
567+
if (!this.host) return;
568+
const latencyMs = Date.now() - startTime;
569+
const workspaceId = this.extractWorkspaceId(this.host);
570+
const driverConfig = this.buildDriverConfiguration();
571+
emitter.emitConnectionOpen({
572+
sessionId: session.id,
573+
workspaceId,
574+
driverConfig,
575+
latencyMs,
576+
});
577+
});
553578

554579
return session;
555580
}
@@ -578,6 +603,9 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
578603
}
579604
this.telemetryClient = undefined;
580605
}
606+
// Drop the emitter ref so post-close calls (e.g. session.close racing
607+
// with client.close) cannot smuggle events into the closed aggregator.
608+
this.telemetryEmitter = undefined;
581609

582610
this.client = undefined;
583611
this.connectionProvider = undefined;

lib/DBSQLOperation.ts

Lines changed: 57 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import HiveDriverError from './errors/HiveDriverError';
3636
import IClientContext from './contracts/IClientContext';
3737
import { mapOperationTypeToTelemetryType, mapResultFormatToTelemetryType } from './telemetry/telemetryTypeMappers';
3838
import ExceptionClassifier from './telemetry/ExceptionClassifier';
39+
import { safeEmit } from './telemetry/telemetryUtils';
3940

4041
interface DBSQLOperationConstructorOptions {
4142
handle: TOperationHandle;
@@ -181,12 +182,7 @@ export default class DBSQLOperation implements IOperation {
181182
* const result = await queryOperation.fetchChunk({maxRows: 1000});
182183
*/
183184
public async fetchChunk(options?: FetchOptions): Promise<Array<object>> {
184-
try {
185-
return await this.fetchChunkInternal(options);
186-
} catch (err: any) {
187-
this.emitErrorEvent(err);
188-
throw err;
189-
}
185+
return this.withErrorTelemetry(() => this.fetchChunkInternal(options));
190186
}
191187

192188
private async fetchChunkInternal(options?: FetchOptions): Promise<Array<object>> {
@@ -266,12 +262,7 @@ export default class DBSQLOperation implements IOperation {
266262
* @throws {StatusError}
267263
*/
268264
public async cancel(): Promise<Status> {
269-
try {
270-
return await this.cancelInternal();
271-
} catch (err: any) {
272-
this.emitErrorEvent(err);
273-
throw err;
274-
}
265+
return this.withErrorTelemetry(() => this.cancelInternal());
275266
}
276267

277268
private async cancelInternal(): Promise<Status> {
@@ -299,12 +290,7 @@ export default class DBSQLOperation implements IOperation {
299290
* @throws {StatusError}
300291
*/
301292
public async close(): Promise<Status> {
302-
try {
303-
return await this.closeInternal();
304-
} catch (err: any) {
305-
this.emitErrorEvent(err);
306-
throw err;
307-
}
293+
return this.withErrorTelemetry(() => this.closeInternal());
308294
}
309295

310296
private async closeInternal(): Promise<Status> {
@@ -332,46 +318,63 @@ export default class DBSQLOperation implements IOperation {
332318
}
333319

334320
public async finished(options?: FinishedOptions): Promise<void> {
335-
await this.failIfClosed();
336-
await this.waitUntilReady(options);
321+
return this.withErrorTelemetry(async () => {
322+
await this.failIfClosed();
323+
await this.waitUntilReady(options);
324+
});
337325
}
338326

339327
public async hasMoreRows(): Promise<boolean> {
340-
// If operation is closed or cancelled - we should not try to get data from it
341-
if (this.closed || this.cancelled) {
342-
return false;
343-
}
328+
return this.withErrorTelemetry(async () => {
329+
// If operation is closed or cancelled - we should not try to get data from it
330+
if (this.closed || this.cancelled) {
331+
return false;
332+
}
344333

345-
// Wait for operation to finish before checking for more rows
346-
// This ensures metadata can be fetched successfully
347-
if (this.operationHandle.hasResultSet) {
348-
await this.waitUntilReady();
349-
}
334+
// Wait for operation to finish before checking for more rows
335+
// This ensures metadata can be fetched successfully
336+
if (this.operationHandle.hasResultSet) {
337+
await this.waitUntilReady();
338+
}
350339

351-
// If we fetched all the data from server - check if there's anything buffered in result handler
352-
const resultHandler = await this.getResultHandler();
353-
return resultHandler.hasMore();
340+
// If we fetched all the data from server - check if there's anything buffered in result handler
341+
const resultHandler = await this.getResultHandler();
342+
return resultHandler.hasMore();
343+
});
354344
}
355345

356346
public async getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null> {
357-
await this.failIfClosed();
347+
return this.withErrorTelemetry(async () => {
348+
await this.failIfClosed();
358349

359-
if (!this.operationHandle.hasResultSet) {
360-
return null;
361-
}
350+
if (!this.operationHandle.hasResultSet) {
351+
return null;
352+
}
362353

363-
await this.waitUntilReady(options);
354+
await this.waitUntilReady(options);
364355

365-
this.context.getLogger().log(LogLevel.debug, `Fetching schema for operation with id: ${this.id}`);
366-
const metadata = await this.fetchMetadata();
367-
return metadata.schema ?? null;
356+
this.context.getLogger().log(LogLevel.debug, `Fetching schema for operation with id: ${this.id}`);
357+
const metadata = await this.fetchMetadata();
358+
return metadata.schema ?? null;
359+
});
368360
}
369361

370362
public async getMetadata(): Promise<TGetResultSetMetadataResp> {
371-
try {
363+
return this.withErrorTelemetry(async () => {
372364
await this.failIfClosed();
373365
await this.waitUntilReady();
374-
return await this.fetchMetadata();
366+
return this.fetchMetadata();
367+
});
368+
}
369+
370+
/**
371+
* Wrap a public IOperation method so any thrown error is captured as an
372+
* error telemetry event before being rethrown to the caller. Telemetry
373+
* never alters the throw semantics.
374+
*/
375+
private async withErrorTelemetry<T>(fn: () => Promise<T>): Promise<T> {
376+
try {
377+
return await fn();
375378
} catch (err: any) {
376379
this.emitErrorEvent(err);
377380
throw err;
@@ -540,33 +543,23 @@ export default class DBSQLOperation implements IOperation {
540543
* CRITICAL: All exceptions swallowed and logged at LogLevel.debug ONLY.
541544
*/
542545
private emitStatementStart(): void {
543-
try {
544-
const telemetryEmitter = this.context.getTelemetryEmitter?.();
545-
if (!telemetryEmitter) {
546-
return;
547-
}
548-
549-
telemetryEmitter.emitStatementStart({
546+
safeEmit(this.context, (emitter) => {
547+
emitter.emitStatementStart({
550548
statementId: this.id,
551549
sessionId: this.sessionId || '',
552550
operationType: mapOperationTypeToTelemetryType(this.operationHandle.operationType),
553551
});
554-
} catch (error: any) {
555-
this.context.getLogger().log(LogLevel.debug, `Error emitting statement.start event: ${error.message}`);
556-
}
552+
});
557553
}
558554

559555
/**
560556
* Emit statement.complete telemetry event and complete aggregation.
561557
* CRITICAL: All exceptions swallowed and logged at LogLevel.debug ONLY.
562558
*/
563559
private async emitStatementComplete(): Promise<void> {
564-
try {
565-
const telemetryEmitter = this.context.getTelemetryEmitter?.();
566-
const telemetryAggregator = this.context.getTelemetryAggregator?.();
567-
if (!telemetryEmitter || !telemetryAggregator) {
568-
return;
569-
}
560+
safeEmit(this.context, (emitter) => {
561+
const aggregator = this.context.getTelemetryAggregator?.();
562+
if (!aggregator) return;
570563

571564
// Use whatever metadata was already fetched by the result-handling
572565
// path. Do NOT trigger a `getMetadata()` here — that issues a Thrift
@@ -575,22 +568,18 @@ export default class DBSQLOperation implements IOperation {
575568
// would then fire spurious error telemetry from `getMetadata`'s error
576569
// wrapper.
577570
const resultFormat = mapResultFormatToTelemetryType(this.metadata?.resultFormat);
578-
579571
const latencyMs = Date.now() - this.startTime;
580572

581-
telemetryEmitter.emitStatementComplete({
573+
emitter.emitStatementComplete({
582574
statementId: this.id,
583575
sessionId: this.sessionId || '',
584576
latencyMs,
585577
resultFormat,
586578
pollCount: this.pollCount,
587579
});
588580

589-
// Complete statement aggregation
590-
telemetryAggregator.completeStatement(this.id);
591-
} catch (error: any) {
592-
this.context.getLogger().log(LogLevel.debug, `Error emitting statement.complete event: ${error.message}`);
593-
}
581+
aggregator.completeStatement(this.id);
582+
});
594583
}
595584

596585
/**
@@ -601,20 +590,15 @@ export default class DBSQLOperation implements IOperation {
601590
* debug level — telemetry must never break the driver.
602591
*/
603592
private emitErrorEvent(error: Error): void {
604-
try {
605-
const telemetryEmitter = this.context.getTelemetryEmitter?.();
606-
if (!telemetryEmitter) return;
607-
608-
telemetryEmitter.emitError({
593+
safeEmit(this.context, (emitter) => {
594+
emitter.emitError({
609595
statementId: this.id,
610596
sessionId: this.sessionId,
611597
errorName: error.name || 'Error',
612598
errorMessage: error.message || 'Unknown error',
613599
errorStack: error.stack,
614600
isTerminal: ExceptionClassifier.isTerminal(error),
615601
});
616-
} catch (emitError: any) {
617-
this.context.getLogger().log(LogLevel.debug, `Error emitting error event: ${emitError.message}`);
618-
}
602+
});
619603
}
620604
}

0 commit comments

Comments
 (0)