Skip to content

Commit 7d10d1e

Browse files
committed
fix(telemetry): address review highs — refcount leak, FIFO fallthrough, public types, README, test gaps
- DBSQLClient.initializeTelemetry: release the per-host refcount on the catch path so a throw between getOrCreateClient and the success log doesn't leak a TelemetryClient (with its flush timer / exporter / FFCache) for the lifetime of the process on long-running supervisors. - TelemetryClient.getClient/getDriver: walk the FIFO with try/catch fallthrough to mirror getConnectionProvider, so a closed-but-not-yet- released head context doesn't take the whole shared pool down. getAuthProvider: return the first defined entry from the FIFO. - lib/index.ts: re-export TelemetryEventType, DEFAULT_TELEMETRY_CONFIG, and the consumer-facing telemetry payload types so SDK users don't need to deep-import for type-checked event/metric handling. - README: add a Telemetry section covering what's collected, the three opt-out paths (env var, programmatic, server-side feature flag), the tunable knobs, and the await-close requirement for short-lived processes. - MetricsAggregator.test.ts: cover chunk-timing aggregation (initial=first-positive, slowest=max, sum=running) and CONNECTION_CLOSE → DELETE_SESSION emission. Both were acknowledged coverage gaps. - TelemetryEventEmitter.test.ts: cover emitConnectionClose — emission shape, disabled-flag suppression, and listener-exception swallow. Co-authored-by: Isaac
1 parent 34c1678 commit 7d10d1e

6 files changed

Lines changed: 249 additions & 9 deletions

File tree

README.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,70 @@ client
5151
});
5252
```
5353

54+
## Telemetry
55+
56+
Starting with version 1.13, the driver collects telemetry — connection,
57+
statement, and CloudFetch chunk metrics, plus error events with redacted
58+
stack traces — to help Databricks improve driver performance and
59+
reliability. **Telemetry is enabled by default and gated by a server-side
60+
feature flag**: events are emitted only when the workspace's feature flag
61+
is on. No SQL text, parameter values, or row data are ever included.
62+
63+
### What's collected
64+
65+
- Connection lifecycle (`CREATE_SESSION`, `DELETE_SESSION`) with latency.
66+
- Statement lifecycle (`STATEMENT_START`, `STATEMENT_COMPLETE`) with
67+
execution latency, operation type, and result format.
68+
- CloudFetch chunk timings and byte counts.
69+
- Error events with redacted stack traces (Bearer/JWT tokens, OAuth
70+
secrets, home-directory paths, and Databricks PATs are stripped before
71+
emission).
72+
73+
See `TelemetryEvent` and `TelemetryMetric` in the package exports for the
74+
exact payload shapes.
75+
76+
### Opting out
77+
78+
Three independent ways to disable telemetry, in order of precedence:
79+
80+
1. **Environment variable** — set `DATABRICKS_TELEMETRY_DISABLED` to one
81+
of `1`, `true`, `yes`, or `on` (case-insensitive). Other values
82+
(empty, `0`, `false`, `off`, `no`) are ignored, leaving the runtime
83+
config in charge.
84+
2. **Programmatic** — pass `telemetryEnabled: false` to `connect()`:
85+
```javascript
86+
await client.connect({
87+
host,
88+
path,
89+
token,
90+
telemetryEnabled: false,
91+
});
92+
```
93+
3. **Server-side** — Databricks-managed feature flag; if disabled for
94+
your workspace, the driver does not emit telemetry regardless of
95+
client config.
96+
97+
### Tuning
98+
99+
If you keep telemetry on, the following knobs are available on
100+
`ConnectionOptions` (see JSDoc on `IDBSQLClient.ts` for defaults and
101+
units):
102+
103+
- `telemetryAuthenticatedExport` — set to `false` to ship reduced
104+
payloads (no statement/session correlation IDs, generic User-Agent)
105+
via the unauthenticated endpoint.
106+
- `telemetryBatchSize`, `telemetryFlushIntervalMs`, `telemetryMaxRetries`
107+
— batching and retry tuning.
108+
- `telemetryCircuitBreakerThreshold`, `telemetryCircuitBreakerTimeout`
109+
circuit-breaker tuning for the export endpoint.
110+
- `telemetryCloseTimeoutMs` — bound on `await client.close()` waiting for
111+
the final flush.
112+
113+
> **Note for short-lived processes**: always `await client.close()`
114+
> before `process.exit(0)` so the final batch is flushed. Without an
115+
> explicit close, the periodic flush timer is `unref()`'d to avoid
116+
> holding the event loop open, so any unflushed events are dropped.
117+
54118
## Run Tests
55119

56120
### Unit tests

lib/DBSQLClient.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,8 +404,23 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
404404

405405
this.logger.log(LogLevel.debug, 'Telemetry: enabled');
406406
} catch (error: any) {
407-
// Swallow all telemetry initialization errors
408-
this.logger.log(LogLevel.debug, `Telemetry initialization error: ${error.message}`);
407+
// Swallow all telemetry initialization errors. If we acquired a refcount
408+
// before the throw, release it — otherwise the per-host TelemetryClient
409+
// (and its flush timer / exporter / FFCache) leaks for the lifetime of
410+
// the process on long-running supervisors that retry-connect.
411+
if (this.telemetryClient) {
412+
try {
413+
await TelemetryClientProvider.getInstance().releaseClient(this, this.host);
414+
} catch (releaseError: any) {
415+
this.logger.log(
416+
LogLevel.debug,
417+
`Telemetry release-after-init-failure error: ${releaseError?.message ?? releaseError}`,
418+
);
419+
}
420+
this.telemetryClient = undefined;
421+
this.telemetryEmitter = undefined;
422+
}
423+
this.logger.log(LogLevel.debug, `Telemetry initialization error: ${error?.message ?? error}`);
409424
}
410425
}
411426

lib/index.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@ export type { default as ITokenProvider } from './connection/auth/tokenProvider/
2626
export { CircuitBreakerOpenError, CIRCUIT_BREAKER_OPEN_CODE } from './telemetry/CircuitBreaker';
2727
export { TelemetryTerminalError } from './telemetry/DatabricksTelemetryExporter';
2828

29+
// Telemetry event/metric/config shapes for consumers that want to inspect
30+
// telemetry payloads or pre-validate config. The emitter, aggregator, and
31+
// per-host client are deliberately not re-exported — they are internal.
32+
export { TelemetryEventType, DEFAULT_TELEMETRY_CONFIG } from './telemetry/types';
33+
export type {
34+
TelemetryEvent,
35+
TelemetryMetric,
36+
TelemetryConfiguration,
37+
StatementMetrics,
38+
DriverConfiguration,
39+
} from './telemetry/types';
40+
2941
export const auth = {
3042
PlainHttpAuthentication,
3143
// Token provider classes for custom authentication

lib/telemetry/TelemetryClient.ts

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -175,21 +175,43 @@ class TelemetryClient implements IClientContext {
175175
}
176176

177177
async getClient(): Promise<IThriftClient> {
178-
if (this.contexts.length === 0) {
179-
throw new Error(`TelemetryClient: no client available for host ${this.host}`);
178+
let lastErr: unknown;
179+
for (const ctx of this.contexts) {
180+
try {
181+
// eslint-disable-next-line no-await-in-loop
182+
return await ctx.getClient();
183+
} catch (err) {
184+
lastErr = err;
185+
}
180186
}
181-
return this.contexts[0].getClient();
187+
throw lastErr instanceof Error ? lastErr : new Error(`TelemetryClient: no client available for host ${this.host}`);
182188
}
183189

184190
async getDriver(): Promise<IDriver> {
185-
if (this.contexts.length === 0) {
186-
throw new Error(`TelemetryClient: no driver available for host ${this.host}`);
191+
let lastErr: unknown;
192+
for (const ctx of this.contexts) {
193+
try {
194+
// eslint-disable-next-line no-await-in-loop
195+
return await ctx.getDriver();
196+
} catch (err) {
197+
lastErr = err;
198+
}
187199
}
188-
return this.contexts[0].getDriver();
200+
throw lastErr instanceof Error ? lastErr : new Error(`TelemetryClient: no driver available for host ${this.host}`);
189201
}
190202

191203
getAuthProvider(): IAuthentication | undefined {
192-
return this.authProviders[0];
204+
// Walk the FIFO and return the first usable provider. A registered head
205+
// whose underlying DBSQLClient has revoked credentials will surface as
206+
// an `authenticate()` failure inside the exporter retry loop, but we
207+
// can avoid that round-trip when the array contains a clearly-defined
208+
// entry. Symmetric with `getConnectionProvider` fallthrough.
209+
for (const provider of this.authProviders) {
210+
if (provider !== undefined) {
211+
return provider;
212+
}
213+
}
214+
return undefined;
193215
}
194216

195217
getTelemetryEmitter(): undefined {

tests/unit/telemetry/MetricsAggregator.test.ts

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,79 @@ describe('MetricsAggregator', () => {
121121

122122
expect(() => aggregator.completeStatement('unknown-stmt')).to.not.throw();
123123
});
124+
125+
it('aggregates chunk timing — initial is first-seen, slowest is max, sum accumulates', () => {
126+
const context = new ClientContextStub();
127+
const exporter = makeExporterStub();
128+
const aggregator = new MetricsAggregator(context, exporter as any);
129+
130+
aggregator.processEvent(statementEvent(TelemetryEventType.STATEMENT_START));
131+
aggregator.processEvent(statementEvent(TelemetryEventType.CLOUDFETCH_CHUNK, { latencyMs: 100, bytes: 10 }));
132+
aggregator.processEvent(statementEvent(TelemetryEventType.CLOUDFETCH_CHUNK, { latencyMs: 250, bytes: 10 }));
133+
aggregator.processEvent(statementEvent(TelemetryEventType.CLOUDFETCH_CHUNK, { latencyMs: 75, bytes: 10 }));
134+
135+
aggregator.completeStatement('stmt-1');
136+
aggregator.flush();
137+
138+
const stmtMetric = exporter.export.firstCall.args[0][0];
139+
expect(stmtMetric.chunkInitialLatencyMs).to.equal(100);
140+
expect(stmtMetric.chunkSlowestLatencyMs).to.equal(250);
141+
expect(stmtMetric.chunkSumLatencyMs).to.equal(425);
142+
});
143+
144+
it('chunks with non-positive latency do not contribute to timing fields', () => {
145+
const context = new ClientContextStub();
146+
const exporter = makeExporterStub();
147+
const aggregator = new MetricsAggregator(context, exporter as any);
148+
149+
aggregator.processEvent(statementEvent(TelemetryEventType.STATEMENT_START));
150+
// latency=0 (cached/prefetched page) — must be ignored entirely.
151+
aggregator.processEvent(statementEvent(TelemetryEventType.CLOUDFETCH_CHUNK, { latencyMs: 0, bytes: 5 }));
152+
// latency undefined — emitter didn't set it, must be ignored.
153+
aggregator.processEvent(statementEvent(TelemetryEventType.CLOUDFETCH_CHUNK, { bytes: 5 }));
154+
// First *positive* latency wins for `initial`, even though earlier chunks already arrived.
155+
aggregator.processEvent(statementEvent(TelemetryEventType.CLOUDFETCH_CHUNK, { latencyMs: 60, bytes: 5 }));
156+
157+
aggregator.completeStatement('stmt-1');
158+
aggregator.flush();
159+
160+
const stmtMetric = exporter.export.firstCall.args[0][0];
161+
expect(stmtMetric.chunkInitialLatencyMs).to.equal(60);
162+
expect(stmtMetric.chunkSlowestLatencyMs).to.equal(60);
163+
expect(stmtMetric.chunkSumLatencyMs).to.equal(60);
164+
expect(stmtMetric.chunkCount).to.equal(3); // chunkCount counts all chunks regardless of latency
165+
});
166+
});
167+
168+
describe('processEvent() - CONNECTION_CLOSE', () => {
169+
it('emits a DELETE_SESSION connection metric immediately', () => {
170+
const context = new ClientContextStub();
171+
const exporter = makeExporterStub();
172+
const aggregator = new MetricsAggregator(context, exporter as any);
173+
174+
aggregator.processEvent(connectionEvent({ eventType: TelemetryEventType.CONNECTION_CLOSE, latencyMs: 42 }));
175+
aggregator.flush();
176+
177+
expect(exporter.export.calledOnce).to.be.true;
178+
const metric = exporter.export.firstCall.args[0][0];
179+
expect(metric.metricType).to.equal('connection');
180+
expect(metric.operationType).to.equal('DELETE_SESSION');
181+
expect(metric.latencyMs).to.equal(42);
182+
});
183+
184+
it('CONNECTION_OPEN and CONNECTION_CLOSE produce distinct operation types in the same batch', () => {
185+
const context = new ClientContextStub();
186+
const exporter = makeExporterStub();
187+
const aggregator = new MetricsAggregator(context, exporter as any);
188+
189+
aggregator.processEvent(connectionEvent({ eventType: TelemetryEventType.CONNECTION_OPEN, latencyMs: 100 }));
190+
aggregator.processEvent(connectionEvent({ eventType: TelemetryEventType.CONNECTION_CLOSE, latencyMs: 5 }));
191+
aggregator.flush();
192+
193+
const batch = exporter.export.firstCall.args[0];
194+
expect(batch).to.have.lengthOf(2);
195+
expect(batch.map((m: any) => m.operationType)).to.deep.equal(['CREATE_SESSION', 'DELETE_SESSION']);
196+
});
124197
});
125198

126199
describe('processEvent() - error events', () => {

tests/unit/telemetry/TelemetryEventEmitter.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,60 @@ describe('TelemetryEventEmitter', () => {
222222
});
223223
});
224224

225+
describe('emitConnectionClose', () => {
226+
it('emits connection.close with sessionId and latencyMs', (done) => {
227+
emitter.on(TelemetryEventType.CONNECTION_CLOSE, (event: TelemetryEvent) => {
228+
expect(event.eventType).to.equal(TelemetryEventType.CONNECTION_CLOSE);
229+
expect(event.sessionId).to.equal('session-close-1');
230+
expect(event.latencyMs).to.equal(7);
231+
expect(event.timestamp).to.be.a('number');
232+
done();
233+
});
234+
235+
emitter.emitConnectionClose({ sessionId: 'session-close-1', latencyMs: 7 });
236+
});
237+
238+
it('does not emit when telemetry is disabled', () => {
239+
const disabledContext = {
240+
getLogger: () => logger,
241+
getConfig: () => ({
242+
telemetryEnabled: false,
243+
directResultsDefaultMaxRows: 10000,
244+
fetchChunkDefaultMaxRows: 100000,
245+
socketTimeout: 900000,
246+
retryMaxAttempts: 30,
247+
retriesTimeout: 900000,
248+
retryDelayMin: 1000,
249+
retryDelayMax: 30000,
250+
useCloudFetch: true,
251+
cloudFetchConcurrentDownloads: 10,
252+
cloudFetchSpeedThresholdMBps: 0,
253+
useLZ4Compression: true,
254+
}),
255+
} as any;
256+
257+
const disabledEmitter = new TelemetryEventEmitter(disabledContext);
258+
let emitted = false;
259+
disabledEmitter.on(TelemetryEventType.CONNECTION_CLOSE, () => {
260+
emitted = true;
261+
});
262+
263+
disabledEmitter.emitConnectionClose({ sessionId: 's', latencyMs: 1 });
264+
expect(emitted).to.be.false;
265+
});
266+
267+
it('swallows exceptions from listeners and logs at debug level', () => {
268+
emitter.on(TelemetryEventType.CONNECTION_CLOSE, () => {
269+
throw new Error('listener boom');
270+
});
271+
272+
expect(() => emitter.emitConnectionClose({ sessionId: 's', latencyMs: 1 })).to.not.throw();
273+
const logStub = logger.log as sinon.SinonStub;
274+
const debugCalls = logStub.getCalls().filter((c) => c.args[0] === LogLevel.debug);
275+
expect(debugCalls.some((c) => /connection close/i.test(c.args[1]))).to.be.true;
276+
});
277+
});
278+
225279
describe('emitStatementStart', () => {
226280
it('should emit statement.start event with correct data', (done) => {
227281
emitter.on(TelemetryEventType.STATEMENT_START, (event: TelemetryEvent) => {

0 commit comments

Comments
 (0)