Skip to content

Commit 3453ebb

Browse files
Retry FetchResults and error handling
Allow retries on calls to fetch results. The drivers handling of fetching result pages is robust enough to handle unexpected pagination due to retries. Updated the error handling to make sure that the client error handler will be called so that information in databricks response headers will be added to the error info. Signed-off-by: Raymond Cypher <raymond.cypher@databricks.com>
1 parent 3925928 commit 3453ebb

3 files changed

Lines changed: 160 additions & 30 deletions

File tree

internal/client/client.go

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ var idempotentClientMethods map[clientMethod]any = map[clientMethod]any{
6262
getOperationStatus: struct{}{},
6363
closeOperation: struct{}{},
6464
cancelOperation: struct{}{},
65+
// fetchResults is treated as idempotent because the drivers fetching logic is robust enoug
66+
// to fetch results in correct order
67+
fetchResults: struct{}{},
6568
}
6669

6770
// OpenSession is a wrapper around the thrift operation OpenSession
@@ -360,23 +363,8 @@ func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
360363
// req.Body is assumed to be closed by the base RoundTripper.
361364
reqBodyClosed = true
362365
resp, err := t.Base.RoundTrip(req2)
363-
if err != nil {
364-
return resp, err
365-
}
366-
367-
if resp.StatusCode != http.StatusOK {
368-
reason := resp.Header.Get("X-Databricks-Reason-Phrase")
369-
terrmsg := resp.Header.Get("X-Thriftserver-Error-Message")
370366

371-
if reason != "" {
372-
logger.Err(fmt.Errorf(reason)).Msg(resp.Status)
373-
}
374-
if terrmsg != "" {
375-
logger.Err(fmt.Errorf(terrmsg)).Msg(resp.Status)
376-
}
377-
}
378-
379-
return resp, nil
367+
return resp, err
380368
}
381369

382370
func RetryableClient(cfg *config.Config) *http.Client {
@@ -459,28 +447,27 @@ func (l *leveledLogger) Warn(msg string, keysAndValues ...interface{}) {
459447

460448
func errorHandler(resp *http.Response, err error, numTries int) (*http.Response, error) {
461449
var werr error
450+
msg := fmt.Sprintf("request error after %d attempt(s)", numTries)
462451
if err == nil {
463-
err = errors.New(fmt.Sprintf("request error after %d attempt(s)", numTries))
452+
err = errors.New(msg)
453+
} else {
454+
err = errors.Wrap(err, msg)
464455
}
465456

466457
if resp != nil {
467-
var orgid, reason, terrmsg, errmsg, retryAfter string
458+
var orgid, reason, terrmsg, errmsg string
468459
// TODO @mattdeekay: convert these to specific error types
469460
if resp.Header != nil {
470461
orgid = resp.Header.Get("X-Databricks-Org-Id")
471462
reason = resp.Header.Get("X-Databricks-Reason-Phrase") // TODO note: shown on notebook
472463
terrmsg = resp.Header.Get("X-Thriftserver-Error-Message")
473464
errmsg = resp.Header.Get("x-databricks-error-or-redirect-message")
474-
retryAfter = resp.Header.Get("Retry-After")
475465
// TODO note: need to see if there's other headers
476466
}
477467
msg := fmt.Sprintf("orgId: %s, reason: %s, thriftErr: %s, err: %s", orgid, reason, terrmsg, errmsg)
478468

479-
if isRetryableServerResponse(resp) {
480-
err = dbsqlerrint.NewRetryableError(err, retryAfter)
481-
}
482-
483469
werr = dbsqlerrint.WrapErr(err, msg)
470+
logger.Err(werr).Msg(resp.Status)
484471
} else {
485472
werr = err
486473
}
@@ -508,7 +495,6 @@ var (
508495
)
509496

510497
func RetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) {
511-
512498
// do not retry on context.Canceled or context.DeadlineExceeded
513499
if ctx.Err() != nil {
514500
return false, ctx.Err()
@@ -532,24 +518,35 @@ func RetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, err
532518
return true, nil
533519
}
534520

521+
var checkErr error
522+
if resp.StatusCode != http.StatusOK {
523+
checkErr = fmt.Errorf("unexpected HTTP status %s", resp.Status)
524+
}
525+
535526
// 429 Too Many Requests or 503 service unavailable is recoverable. Sometimes the server puts
536527
// a Retry-After response header to indicate when the server is
537528
// available to start processing request from client.
538529
if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable {
539-
return true, nil
530+
var retryAfter string
531+
if resp.Header != nil {
532+
retryAfter = resp.Header.Get("Retry-After")
533+
}
534+
535+
return true, dbsqlerrint.NewRetryableError(checkErr, retryAfter)
540536
}
541537

542538
if resp.StatusCode == 0 || (resp.StatusCode >= 500 && resp.StatusCode != http.StatusNotImplemented) {
543539
callerAny := ctx.Value(ClientMethod)
544540
if caller, ok := callerAny.(clientMethod); ok {
545541
if _, ok := idempotentClientMethods[caller]; ok {
546-
return true, fmt.Errorf("unexpected HTTP status %s", resp.Status)
542+
return true, checkErr
547543
}
548544
}
549545
}
550546

551-
return false, nil
552-
547+
// checkErr will be non-nil if the response code was not StatusOK.
548+
// Returning it here ensures that the error handler will be called.
549+
return false, checkErr
553550
}
554551

555552
func backoff(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration {

internal/client/client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ func TestRetryPolicy(t *testing.T) {
8484

8585
nonRetryableCodes := []int{200, 300, 400, 501}
8686

87-
idempotentOps := []clientMethod{closeSession, getResultSetMetadata, getOperationStatus, closeOperation, cancelOperation}
88-
nonIdempotentOps := []clientMethod{openSession, fetchResults, executeStatement}
87+
idempotentOps := []clientMethod{closeSession, getResultSetMetadata, getOperationStatus, closeOperation, cancelOperation, fetchResults}
88+
nonIdempotentOps := []clientMethod{openSession, executeStatement}
8989

9090
cancelled, cancel := context.WithCancel(context.Background())
9191
cancel()

internal/rows/rows_test.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,40 @@ func TestRowsCloseOptimization(t *testing.T) {
832832
assert.Equal(t, 0, closeCount)
833833
}
834834

835+
type fetch struct {
836+
direction cli_service.TFetchOrientation
837+
resultStartRec int
838+
}
839+
840+
func TestFetchResultsWithRetries(t *testing.T) {
841+
t.Parallel()
842+
843+
// Simulate a scenario where network issues and retries cause unexpected jumps
844+
// across multiple result pages.
845+
fetches := []fetch{}
846+
847+
client := getRowsTestSimpleClient2(&fetches)
848+
rowSet := &rows{client: client, hasMoreRows: true}
849+
850+
// next row number is zero so should fetch first result page
851+
err := rowSet.fetchResultPage()
852+
assert.Nil(t, err)
853+
assert.Len(t, fetches, 1)
854+
assert.Equal(t, fetches[0].direction, cli_service.TFetchOrientation_FETCH_NEXT)
855+
856+
// row number four is still in the first page so there should be no calls to fetch
857+
rowSet.nextRowNumber = 4
858+
err = rowSet.fetchResultPage()
859+
assert.Nil(t, err)
860+
assert.Len(t, fetches, 1)
861+
862+
rowSet.nextRowNumber = 5
863+
err = rowSet.fetchResultPage()
864+
assert.Nil(t, err)
865+
assert.Len(t, fetches, 5)
866+
867+
}
868+
835869
type rowTestPagingResult struct {
836870
getMetadataCount int
837871
fetchResultsCount int
@@ -1245,3 +1279,102 @@ func getRowsTestSimpleClient(getMetadataCount, fetchResultsCount *int) cli_servi
12451279

12461280
return client
12471281
}
1282+
1283+
// Build a simple test client
1284+
func getRowsTestSimpleClient2(fetches *[]fetch) cli_service.TCLIService {
1285+
// Metadata for the different types is based on the results returned when querying a table with
1286+
// all the different types which was created in a test shard.
1287+
metadata := &cli_service.TGetResultSetMetadataResp{
1288+
Status: &cli_service.TStatus{
1289+
StatusCode: cli_service.TStatusCode_SUCCESS_STATUS,
1290+
},
1291+
Schema: &cli_service.TTableSchema{
1292+
Columns: []*cli_service.TColumnDesc{
1293+
{
1294+
ColumnName: "bool_col",
1295+
TypeDesc: &cli_service.TTypeDesc{
1296+
Types: []*cli_service.TTypeEntry{
1297+
{
1298+
PrimitiveEntry: &cli_service.TPrimitiveTypeEntry{
1299+
Type: cli_service.TTypeId_BOOLEAN_TYPE,
1300+
},
1301+
},
1302+
},
1303+
},
1304+
},
1305+
},
1306+
},
1307+
}
1308+
1309+
getMetadata := func(ctx context.Context, req *cli_service.TGetResultSetMetadataReq) (_r *cli_service.TGetResultSetMetadataResp, _err error) {
1310+
return metadata, nil
1311+
}
1312+
1313+
moreRows := true
1314+
noMoreRows := false
1315+
colVals := []*cli_service.TColumn{{BoolVal: &cli_service.TBoolColumn{Values: []bool{true, false, true, false, true}}}}
1316+
1317+
pages := []*cli_service.TFetchResultsResp{
1318+
{
1319+
Status: &cli_service.TStatus{
1320+
StatusCode: cli_service.TStatusCode_SUCCESS_STATUS,
1321+
},
1322+
HasMoreRows: &moreRows,
1323+
Results: &cli_service.TRowSet{
1324+
StartRowOffset: 0,
1325+
Columns: colVals,
1326+
},
1327+
},
1328+
{
1329+
Status: &cli_service.TStatus{
1330+
StatusCode: cli_service.TStatusCode_SUCCESS_STATUS,
1331+
},
1332+
HasMoreRows: &moreRows,
1333+
Results: &cli_service.TRowSet{
1334+
StartRowOffset: 5,
1335+
Columns: colVals,
1336+
},
1337+
},
1338+
{
1339+
Status: &cli_service.TStatus{
1340+
StatusCode: cli_service.TStatusCode_SUCCESS_STATUS,
1341+
},
1342+
HasMoreRows: &noMoreRows,
1343+
Results: &cli_service.TRowSet{
1344+
StartRowOffset: 10,
1345+
Columns: colVals,
1346+
},
1347+
},
1348+
{
1349+
Status: &cli_service.TStatus{
1350+
StatusCode: cli_service.TStatusCode_SUCCESS_STATUS,
1351+
},
1352+
HasMoreRows: &noMoreRows,
1353+
Results: &cli_service.TRowSet{
1354+
StartRowOffset: 15,
1355+
Columns: []*cli_service.TColumn{},
1356+
},
1357+
},
1358+
}
1359+
1360+
// We are simulating the scenario where network errors and retry behaviour cause the fetch
1361+
// result request to be sent multiple times, resulting in jumping past the next/previous result
1362+
// page. Behaviour should be robust enough to handle this by changing the fetch orientation.
1363+
pageSequence := []int{0, 3, 2, 0, 1, 2}
1364+
pageIndex := -1
1365+
1366+
fetchResults := func(ctx context.Context, req *cli_service.TFetchResultsReq) (_r *cli_service.TFetchResultsResp, _err error) {
1367+
pageIndex++
1368+
1369+
p := pages[pageSequence[pageIndex]]
1370+
*fetches = append(*fetches, fetch{direction: req.Orientation, resultStartRec: int(p.Results.StartRowOffset)})
1371+
return p, nil
1372+
}
1373+
1374+
client := &client.TestClient{
1375+
FnGetResultSetMetadata: getMetadata,
1376+
FnFetchResults: fetchResults,
1377+
}
1378+
1379+
return client
1380+
}

0 commit comments

Comments
 (0)