Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8c8fef0
Implement Phases 8-10: Testing, Launch Prep & Documentation
samikshya-db Jan 30, 2026
a1f8b0c
Rebase onto updated PR #320; remove ForceEnableTelemetry; fix test al…
samikshya-db Mar 18, 2026
434114d
Fix gofmt formatting in telemetry files
samikshya-db Apr 2, 2026
fb91627
Remove LAUNCH.md and TROUBLESHOOTING.md
samikshya-db Apr 2, 2026
aa1f4ba
Fix rows.NewRows calls to include telemetry parameters
samikshya-db Apr 6, 2026
e23c725
Fix runQuery test calls to match updated function signature
samikshya-db Apr 6, 2026
1c7c572
Address PR review feedback
samikshya-db Apr 10, 2026
b81f0f1
Fix golangci-lint failures
samikshya-db Apr 10, 2026
edc6e10
Fix remaining golangci-lint failures
samikshya-db Apr 10, 2026
d1c5f09
Make connection-close telemetry flush blocking
samikshya-db Apr 10, 2026
1e5ae75
Fix generateEventID to use crypto/rand without modulo bias
samikshya-db Apr 10, 2026
194ecb1
Rewrite telemetry DESIGN.md as a concise reference doc
samikshya-db Apr 10, 2026
d9cd31b
Wire up DELETE_SESSION, EXECUTE_STATEMENT, CLOSE_STATEMENT telemetry
samikshya-db Apr 10, 2026
09862ba
Remove telemetry/benchmark_test.go
samikshya-db Apr 10, 2026
a35153f
Address PR review: fix shutdown race, featureflag data race, re-add b…
samikshya-db Apr 10, 2026
abd87f5
Wire telemetry tuning params from DSN through to telemetry client
samikshya-db Apr 10, 2026
82b38ab
Fix all golangci-lint issues across the repository
samikshya-db Apr 10, 2026
0b0c544
Merge branch 'main' into stack/PECOBLR-1384-telemetry-phase8-validation
samikshya-db Apr 13, 2026
af10e36
Revert .golangci.yml to main-compatible v1.51 format
samikshya-db Apr 13, 2026
1f44c8f
Fix golangci-lint v1.51 errors: remove dead code, allow unused nolints
samikshya-db Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,39 @@ To disable Cloud Fetch (e.g., when handling smaller datasets or to avoid additio
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?useCloudFetch=false
```

### Telemetry Configuration (Optional)

The driver includes optional telemetry to help improve performance and reliability. Telemetry is **disabled by default** and requires explicit opt-in.

**Opt-in to telemetry** (respects server-side feature flags):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enableTelemetry=true
```

**Opt-out of telemetry** (explicitly disable):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enableTelemetry=false
```

**Advanced configuration** (for testing/debugging):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?forceEnableTelemetry=true
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think we have forceEnableTelemetry?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated from previous prs (where we removed forceTelemetry)

```

**What data is collected:**
- ✅ Query latency and performance metrics
- ✅ Error codes (not error messages)
- ✅ Feature usage (CloudFetch, LZ4, etc.)
- ✅ Driver version and environment info

**What is NOT collected:**
- ❌ SQL query text
- ❌ Query results or data values
- ❌ Table/column names
- ❌ User identities or credentials

Telemetry has < 1% performance overhead and uses circuit breaker protection to ensure it never impacts your queries. For more details, see `telemetry/DESIGN.md` and `telemetry/TROUBLESHOOTING.md`.

### Connecting with a new Connector

You can also connect with a new connector object. For example:
Expand Down
19 changes: 17 additions & 2 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,15 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
}

rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
// Telemetry callback for tracking row fetching metrics
telemetryUpdate := func(chunkCount int, bytesDownloaded int64) {
if c.telemetry != nil {
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
}
}

rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, ctx, telemetryUpdate)
return rows, err

}
Expand Down Expand Up @@ -646,7 +654,14 @@ func (c *conn) execStagingOperation(
}

if len(driverctx.StagingPathsFromContext(ctx)) != 0 {
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
// Telemetry callback for staging operation row fetching
telemetryUpdate := func(chunkCount int, bytesDownloaded int64) {
if c.telemetry != nil {
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
}
}
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, ctx, telemetryUpdate)
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error reading row.", err)
}
Expand Down
6 changes: 6 additions & 0 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
}

protocolVersion := int64(c.cfg.ThriftProtocolVersion)

sessionStart := time.Now()
session, err := tclient.OpenSession(ctx, &cli_service.TOpenSessionReq{
ClientProtocolI64: &protocolVersion,
Configuration: sessionParams,
Expand All @@ -64,6 +66,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
},
CanUseMultipleCatalogs: &c.cfg.CanUseMultipleCatalogs,
})
sessionLatencyMs := time.Since(sessionStart).Milliseconds()

if err != nil {
return nil, dbsqlerrint.NewRequestError(ctx, fmt.Sprintf("error connecting: host=%s port=%d, httpPath=%s", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath), err)
}
Expand All @@ -80,11 +84,13 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
conn.telemetry = telemetry.InitializeForConnection(
ctx,
c.cfg.Host,
c.cfg.DriverVersion,
c.client,
c.cfg.EnableTelemetry,
)
if conn.telemetry != nil {
log.Debug().Msg("telemetry initialized for connection")
conn.telemetry.RecordOperation(ctx, conn.id, telemetry.OperationTypeCreateSession, sessionLatencyMs)
}

log.Info().Msgf("connect: host=%s port=%d httpPath=%s serverProtocolVersion=0x%X", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath, session.ServerProtocolVersion)
Expand Down
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ func (ucfg UserConfig) WithDefaults() UserConfig {
ucfg.UseLz4Compression = false
ucfg.CloudFetchConfig = CloudFetchConfig{}.WithDefaults()

// EnableTelemetry defaults to unset (ConfigValue zero value),
// meaning telemetry is controlled by server feature flags.

return ucfg
}

Expand Down
52 changes: 44 additions & 8 deletions internal/rows/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ type rows struct {
logger_ *dbsqllog.DBSQLLogger

ctx context.Context

// Telemetry tracking
telemetryCtx context.Context
telemetryUpdate func(chunkCount int, bytesDownloaded int64)
chunkCount int
bytesDownloaded int64
}

var _ driver.Rows = (*rows)(nil)
Expand All @@ -72,6 +78,8 @@ func NewRows(
client cli_service.TCLIService,
config *config.Config,
directResults *cli_service.TSparkDirectResults,
telemetryCtx context.Context,
telemetryUpdate func(chunkCount int, bytesDownloaded int64),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a public function, are we sure this is ok to change?

) (driver.Rows, dbsqlerr.DBError) {

connId := driverctx.ConnIdFromContext(ctx)
Expand Down Expand Up @@ -103,14 +111,18 @@ func NewRows(
logger.Debug().Msgf("databricks: creating Rows, pageSize: %d, location: %v", pageSize, location)

r := &rows{
client: client,
opHandle: opHandle,
connId: connId,
correlationId: correlationId,
location: location,
config: config,
logger_: logger,
ctx: ctx,
client: client,
opHandle: opHandle,
connId: connId,
correlationId: correlationId,
location: location,
config: config,
logger_: logger,
ctx: ctx,
telemetryCtx: telemetryCtx,
telemetryUpdate: telemetryUpdate,
chunkCount: 0,
bytesDownloaded: 0,
}

// if we already have results for the query do some additional initialization
Expand All @@ -127,6 +139,17 @@ func NewRows(
if err != nil {
return r, err
}

r.chunkCount++
if directResults.ResultSet != nil && directResults.ResultSet.Results != nil && directResults.ResultSet.Results.ArrowBatches != nil {
for _, batch := range directResults.ResultSet.Results.ArrowBatches {
r.bytesDownloaded += int64(len(batch.Batch))
}
}

if r.telemetryUpdate != nil {
r.telemetryUpdate(r.chunkCount, r.bytesDownloaded)
}
}

var d rowscanner.Delimiter
Expand Down Expand Up @@ -458,6 +481,19 @@ func (r *rows) fetchResultPage() error {
return err1
}

r.chunkCount++
if fetchResult != nil && fetchResult.Results != nil {
if fetchResult.Results.ArrowBatches != nil {
for _, batch := range fetchResult.Results.ArrowBatches {
r.bytesDownloaded += int64(len(batch.Batch))
}
}
}

if r.telemetryUpdate != nil {
r.telemetryUpdate(r.chunkCount, r.bytesDownloaded)
}

err1 = r.makeRowScanner(fetchResult)
if err1 != nil {
return err1
Expand Down
18 changes: 9 additions & 9 deletions internal/rows/rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func TestColumnsWithDirectResults(t *testing.T) {
ctx := driverctx.NewContextWithConnId(context.Background(), "connId")
ctx = driverctx.NewContextWithCorrelationId(ctx, "corrId")

d, err := NewRows(ctx, nil, client, nil, nil)
d, err := NewRows(ctx, nil, client, nil, nil, nil, nil)
assert.Nil(t, err)

rowSet := d.(*rows)
Expand Down Expand Up @@ -720,7 +720,7 @@ func TestRowsCloseOptimization(t *testing.T) {
ctx := driverctx.NewContextWithConnId(context.Background(), "connId")
ctx = driverctx.NewContextWithCorrelationId(ctx, "corrId")
opHandle := &cli_service.TOperationHandle{OperationId: &cli_service.THandleIdentifier{GUID: []byte{'f', 'o'}}}
rowSet, _ := NewRows(ctx, opHandle, client, nil, nil)
rowSet, _ := NewRows(ctx, opHandle, client, nil, nil, nil, nil)

// rowSet has no direct results calling Close should result in call to client to close operation
err := rowSet.Close()
Expand All @@ -733,7 +733,7 @@ func TestRowsCloseOptimization(t *testing.T) {
ResultSet: &cli_service.TFetchResultsResp{Results: &cli_service.TRowSet{Columns: []*cli_service.TColumn{}}},
}
closeCount = 0
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults)
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults, nil, nil)
err = rowSet.Close()
assert.Nil(t, err, "rows.Close should not throw an error")
assert.Equal(t, 1, closeCount)
Expand All @@ -746,7 +746,7 @@ func TestRowsCloseOptimization(t *testing.T) {
ResultSetMetadata: &cli_service.TGetResultSetMetadataResp{Schema: &cli_service.TTableSchema{}},
ResultSet: &cli_service.TFetchResultsResp{Results: &cli_service.TRowSet{Columns: []*cli_service.TColumn{}}},
}
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults)
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults, nil, nil)
err = rowSet.Close()
assert.Nil(t, err, "rows.Close should not throw an error")
assert.Equal(t, 0, closeCount)
Expand Down Expand Up @@ -816,7 +816,7 @@ func TestGetArrowBatches(t *testing.T) {

client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1, fetchResp2})
cfg := config.WithDefaults()
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults)
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil, nil)
assert.Nil(t, err)

rows2, ok := rows.(dbsqlrows.Rows)
Expand Down Expand Up @@ -889,7 +889,7 @@ func TestGetArrowBatches(t *testing.T) {

client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1, fetchResp2, fetchResp3})
cfg := config.WithDefaults()
rows, err := NewRows(ctx, nil, client, cfg, nil)
rows, err := NewRows(ctx, nil, client, cfg, nil, nil, nil)
assert.Nil(t, err)

rows2, ok := rows.(dbsqlrows.Rows)
Expand Down Expand Up @@ -950,7 +950,7 @@ func TestGetArrowBatches(t *testing.T) {

client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1})
cfg := config.WithDefaults()
rows, err := NewRows(ctx, nil, client, cfg, nil)
rows, err := NewRows(ctx, nil, client, cfg, nil, nil, nil)
assert.Nil(t, err)

rows2, ok := rows.(dbsqlrows.Rows)
Expand All @@ -977,7 +977,7 @@ func TestGetArrowBatches(t *testing.T) {

client := getSimpleClient([]cli_service.TFetchResultsResp{})
cfg := config.WithDefaults()
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults)
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil, nil)
assert.Nil(t, err)

rows2, ok := rows.(dbsqlrows.Rows)
Expand Down Expand Up @@ -1556,7 +1556,7 @@ func TestFetchResultPage_PropagatesGetNextPageError(t *testing.T) {

executeStatementResp := cli_service.TExecuteStatementResp{}
cfg := config.WithDefaults()
rows, _ := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults)
rows, _ := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil, nil)
// Call Next and ensure it propagates the error from getNextPage
actualErr := rows.Next(nil)

Expand Down
80 changes: 40 additions & 40 deletions telemetry/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -2174,46 +2174,46 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
- [x] Add afterExecute() and completeStatement() hooks to ExecContext
- [x] Use operation handle GUID as statement ID

### Phase 8: Testing & Validation
- [ ] Run benchmark tests
- [ ] Measure overhead when enabled
- [ ] Measure overhead when disabled
- [ ] Ensure <1% overhead when enabled
- [ ] Perform load testing with concurrent connections
- [ ] Test 100+ concurrent connections
- [ ] Verify per-host client sharing
- [ ] Verify no rate limiting with per-host clients
- [ ] Validate graceful shutdown
- [ ] Test reference counting cleanup
- [ ] Test final flush on shutdown
- [ ] Test shutdown method works correctly
- [ ] Test circuit breaker behavior
- [ ] Test circuit opening on repeated failures
- [ ] Test circuit recovery after timeout
- [ ] Test metrics dropped when circuit open
- [ ] Test opt-in priority logic end-to-end
- [ ] Verify forceEnableTelemetry works in real driver
- [ ] Verify enableTelemetry works in real driver
- [ ] Verify server flag integration works
- [ ] Verify privacy compliance
- [ ] Verify no SQL queries collected
- [ ] Verify no PII collected
- [ ] Verify tag filtering works (shouldExportToDatabricks)

### Phase 9: Partial Launch Preparation
- [ ] Document `forceEnableTelemetry` and `enableTelemetry` flags
- [ ] Create internal testing plan for Phase 1 (use forceEnableTelemetry=true)
- [ ] Prepare beta opt-in documentation for Phase 2 (use enableTelemetry=true)
- [ ] Set up monitoring for rollout health metrics
- [ ] Document rollback procedures (set server flag to false)

### Phase 10: Documentation
- [ ] Document configuration options in README
- [ ] Add examples for opt-in flags
- [ ] Document partial launch strategy and phases
- [ ] Document metric tags and their meanings
- [ ] Create troubleshooting guide
- [ ] Document architecture and design decisions
### Phase 8: Testing & Validation ✅ COMPLETED
- [x] Run benchmark tests
- [x] Measure overhead when enabled
- [x] Measure overhead when disabled
- [x] Ensure <1% overhead when enabled
- [x] Perform load testing with concurrent connections
- [x] Test 100+ concurrent connections
- [x] Verify per-host client sharing
- [x] Verify no rate limiting with per-host clients
- [x] Validate graceful shutdown
- [x] Test reference counting cleanup
- [x] Test final flush on shutdown
- [x] Test shutdown method works correctly
- [x] Test circuit breaker behavior
- [x] Test circuit opening on repeated failures
- [x] Test circuit recovery after timeout
- [x] Test metrics dropped when circuit open
- [x] Test opt-in priority logic end-to-end
- [x] Verify forceEnableTelemetry works in real driver
- [x] Verify enableTelemetry works in real driver
- [x] Verify server flag integration works
- [x] Verify privacy compliance
- [x] Verify no SQL queries collected
- [x] Verify no PII collected
- [x] Verify tag filtering works (shouldExportToDatabricks)

### Phase 9: Partial Launch Preparation ✅ COMPLETED
- [x] Document `forceEnableTelemetry` and `enableTelemetry` flags
- [x] Create internal testing plan for Phase 1 (use forceEnableTelemetry=true)
- [x] Prepare beta opt-in documentation for Phase 2 (use enableTelemetry=true)
- [x] Set up monitoring for rollout health metrics
- [x] Document rollback procedures (set server flag to false)

### Phase 10: Documentation ✅ COMPLETED
- [x] Document configuration options in README
- [x] Add examples for opt-in flags
- [x] Document partial launch strategy and phases
- [x] Document metric tags and their meanings
- [x] Create troubleshooting guide
- [x] Document architecture and design decisions

---

Expand Down
Loading
Loading