Skip to content

Commit 98aa013

Browse files
committed
fix(bqjdbc): fallback to RestAPI if ReadAPI is not accessible
1 parent 7bb807a commit 98aa013

2 files changed

Lines changed: 119 additions & 21 deletions

File tree

java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import com.google.api.core.InternalApi;
2020
import com.google.api.gax.paging.Page;
21+
import com.google.api.gax.rpc.ApiException;
22+
import com.google.api.gax.rpc.StatusCode;
2123
import com.google.cloud.Tuple;
2224
import com.google.cloud.bigquery.BigQuery;
2325
import com.google.cloud.bigquery.BigQuery.JobListOption;
@@ -57,6 +59,8 @@
5759
import com.google.common.collect.ImmutableList;
5860
import com.google.common.collect.Iterators;
5961
import com.google.common.util.concurrent.Uninterruptibles;
62+
import io.grpc.Status;
63+
import io.grpc.StatusRuntimeException;
6064
import java.lang.ref.ReferenceQueue;
6165
import java.sql.Connection;
6266
import java.sql.ResultSet;
@@ -879,9 +883,8 @@ Thread populateArrowBufferedQueue(
879883
rowsRead += response.getRowCount();
880884
}
881885
break;
882-
} catch (com.google.api.gax.rpc.ApiException e) {
883-
if (e.getStatusCode().getCode()
884-
== com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) {
886+
} catch (ApiException e) {
887+
if (e.getStatusCode().getCode() == StatusCode.Code.NOT_FOUND) {
885888
LOG.warning("Read session expired or not found: %s", e.getMessage());
886889
enqueueError(arrowBatchWrapperBlockingQueue, e);
887890
break;
@@ -929,25 +932,44 @@ Thread populateArrowBufferedQueue(
929932

930933
/** Executes SQL query using either fast query path or read API */
931934
void processQueryResponse(String query, TableResult results) throws SQLException {
932-
LOG.finest(
933-
"API call completed{Query=%s, Parent Job ID=%s, Total rows=%s} ",
934-
query, results.getJobId(), results.getTotalRows());
935-
JobId currentJobId = results.getJobId();
936-
if (currentJobId == null) {
937-
LOG.fine("Standard API with Stateless query used.");
938-
this.currentResultSet = processJsonResultSet(results);
939-
} else if (useReadAPI(results)) {
940-
LOG.fine("HighThroughputAPI used.");
941-
LOG.info("HTAPI job ID: " + currentJobId.getJob());
942-
this.currentResultSet = processArrowResultSet(results);
943-
} else {
944-
// read API cannot be used.
945-
LOG.fine("Standard API used.");
946-
this.currentResultSet = processJsonResultSet(results);
935+
JobId jobId = results.getJobId();
936+
String queryId = results.getQueryId();
937+
LOG.info(
938+
"Processing query response. JobId: %s, QueryId: %s, Total rows: %s",
939+
jobId, queryId, results.getTotalRows());
940+
LOG.fine("Processing query response. Query: %s} ", query);
941+
942+
ResultSet resultSet = null;
943+
if (jobId != null && useReadAPI(results)) {
944+
try {
945+
LOG.info("Using ReadAPI to read the data.");
946+
resultSet = processArrowResultSet(results);
947+
} catch (SQLException e) {
948+
if (!isPermissionDeniedException(e)) {
949+
throw e;
950+
}
951+
LOG.log(Level.WARNING, "Permission denied for Read API, falling back to JSON API", e);
952+
}
947953
}
954+
955+
if (resultSet == null) {
956+
LOG.info("Using Standard API to read the data.");
957+
resultSet = processJsonResultSet(results);
958+
}
959+
this.currentResultSet = resultSet;
948960
this.currentUpdateCount = -1;
949961
}
950962

963+
private boolean isPermissionDeniedException(Throwable t) {
964+
if (t == null) {
965+
return false;
966+
}
967+
if (t instanceof StatusRuntimeException) {
968+
return ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.PERMISSION_DENIED;
969+
}
970+
return isPermissionDeniedException(t.getCause());
971+
}
972+
951973
// The read Ratio should be met
952974
// AND the User must not have disabled the Read API
953975
@VisibleForTesting
@@ -977,9 +999,6 @@ private boolean meetsReadRatio(TableResult results) {
977999
}
9781000

9791001
BigQueryJsonResultSet processJsonResultSet(TableResult results) {
980-
String jobIdOrQueryId =
981-
results.getJobId() == null ? results.getQueryId() : results.getJobId().getJob();
982-
LOG.info("BigQuery Job %s completed. Fetching results.", jobIdOrQueryId);
9831002
List<Thread> threadList = new ArrayList<Thread>();
9841003

9851004
Schema schema = results.getSchema();

java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import static org.mockito.Mockito.mock;
2727
import static org.mockito.Mockito.verify;
2828

29+
import com.google.api.gax.rpc.ApiException;
30+
import com.google.api.gax.rpc.StatusCode;
2931
import com.google.cloud.ServiceOptions;
3032
import com.google.cloud.bigquery.BigQuery;
3133
import com.google.cloud.bigquery.BigQuery.QueryResultsOption;
@@ -44,6 +46,7 @@
4446
import com.google.cloud.bigquery.StandardSQLTypeName;
4547
import com.google.cloud.bigquery.TableId;
4648
import com.google.cloud.bigquery.TableResult;
49+
import com.google.cloud.bigquery.exception.BigQueryJdbcException;
4750
import com.google.cloud.bigquery.jdbc.BigQueryStatement.JobIdWrapper;
4851
import com.google.cloud.bigquery.spi.BigQueryRpcFactory;
4952
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
@@ -65,6 +68,7 @@
6568
import org.apache.arrow.vector.FieldVector;
6669
import org.apache.arrow.vector.IntVector;
6770
import org.apache.arrow.vector.VectorSchemaRoot;
71+
import org.junit.jupiter.api.Assertions;
6872
import org.junit.jupiter.api.BeforeEach;
6973
import org.junit.jupiter.api.Disabled;
7074
import org.junit.jupiter.api.Test;
@@ -494,4 +498,79 @@ public void testGetStatementType(boolean isReadOnlyTokenUsed) throws Exception {
494498
verify(bigquery, isReadOnlyTokenUsed ? Mockito.never() : Mockito.times(1))
495499
.create(any(JobInfo.class));
496500
}
501+
502+
@Test
503+
public void testProcessQueryResponseFallbackToJsonOnReadApiFailure() throws SQLException {
504+
BigQueryStatement statementSpy = Mockito.spy(bigQueryStatement);
505+
TableResult tableResultMock = mockTableResultWithJob("job-id");
506+
507+
// Force useReadAPI to return true to enter the HTAPI block
508+
doReturn(true).when(statementSpy).useReadAPI(tableResultMock);
509+
510+
// Mock a permission denied ApiException
511+
ApiException apiExceptionMock = mockApiException(StatusCode.Code.PERMISSION_DENIED);
512+
513+
BigQueryJdbcException exceptionToThrow =
514+
new BigQueryJdbcException("Simulated permission denied", apiExceptionMock);
515+
516+
// Force processArrowResultSet to throw the permission exception
517+
Mockito.doThrow(exceptionToThrow).when(statementSpy).processArrowResultSet(tableResultMock);
518+
519+
BigQueryJsonResultSet jsonResultSetMock = mock(BigQueryJsonResultSet.class);
520+
// Mock processJsonResultSet to return our mock JSON result set
521+
doReturn(jsonResultSetMock).when(statementSpy).processJsonResultSet(tableResultMock);
522+
523+
statementSpy.processQueryResponse("SELECT 1", tableResultMock);
524+
525+
// Verify that processJsonResultSet was indeed called as a fallback
526+
verify(statementSpy).processJsonResultSet(tableResultMock);
527+
// Verify that currentResultSet is set to the mocked JSON result set
528+
assertThat(statementSpy.currentResultSet).isEqualTo(jsonResultSetMock);
529+
}
530+
531+
@Test
532+
public void testProcessQueryResponseNoFallbackOnNonPermissionFailure() throws SQLException {
533+
BigQueryStatement statementSpy = Mockito.spy(bigQueryStatement);
534+
TableResult tableResultMock = mockTableResultWithJob("job-id");
535+
536+
// Force useReadAPI to return true to enter the HTAPI block
537+
doReturn(true).when(statementSpy).useReadAPI(tableResultMock);
538+
539+
// Mock a non-permission ApiException (e.g., INTERNAL)
540+
ApiException apiExceptionMock = mockApiException(StatusCode.Code.INTERNAL);
541+
542+
BigQueryJdbcException exceptionToThrow =
543+
new BigQueryJdbcException("Simulated internal error", apiExceptionMock);
544+
545+
// Force processArrowResultSet to throw the non-permission exception
546+
Mockito.doThrow(exceptionToThrow).when(statementSpy).processArrowResultSet(tableResultMock);
547+
548+
BigQueryJsonResultSet jsonResultSetMock = mock(BigQueryJsonResultSet.class);
549+
doReturn(jsonResultSetMock).when(statementSpy).processJsonResultSet(tableResultMock);
550+
551+
// Assert that the exception is propagated
552+
try {
553+
statementSpy.processQueryResponse("SELECT 1", tableResultMock);
554+
Assertions.fail("Expected SQLException to be thrown");
555+
} catch (SQLException e) {
556+
assertEquals(exceptionToThrow, e);
557+
}
558+
559+
// Verify that processJsonResultSet was NOT called
560+
verify(statementSpy, Mockito.never()).processJsonResultSet(tableResultMock);
561+
}
562+
563+
private TableResult mockTableResultWithJob(String jobId) {
564+
TableResult tableResult = mock(TableResult.class);
565+
doReturn(JobId.of(jobId)).when(tableResult).getJobId();
566+
return tableResult;
567+
}
568+
569+
private ApiException mockApiException(StatusCode.Code code) {
570+
ApiException apiExceptionMock = mock(ApiException.class);
571+
StatusCode statusCodeMock = mock(StatusCode.class);
572+
doReturn(statusCodeMock).when(apiExceptionMock).getStatusCode();
573+
doReturn(code).when(statusCodeMock).getCode();
574+
return apiExceptionMock;
575+
}
497576
}

0 commit comments

Comments
 (0)