|
42 | 42 | import com.google.cloud.bigquery.TableDefinition; |
43 | 43 | import com.google.cloud.bigquery.TableId; |
44 | 44 | import com.google.cloud.bigquery.exception.BigQueryJdbcException; |
| 45 | +import com.google.common.annotations.VisibleForTesting; |
45 | 46 | import io.opentelemetry.api.trace.Span; |
46 | 47 | import io.opentelemetry.api.trace.SpanContext; |
47 | 48 | import io.opentelemetry.api.trace.StatusCode; |
@@ -1731,32 +1732,52 @@ private ResultSet getTablesImpl( |
1731 | 1732 | "getTables called for catalog: %s, schemaPattern: %s, tableNamePattern: %s, types: %s", |
1732 | 1733 | effectiveCatalog, effectiveSchemaPattern, tableNamePattern, Arrays.toString(types)); |
1733 | 1734 |
|
| 1735 | + final Schema resultSchema = defineGetTablesSchema(); |
| 1736 | + final BlockingQueue<BigQueryFieldValueListWrapper> queue = |
| 1737 | + new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); |
| 1738 | + |
| 1739 | + Thread fetcherThread = |
| 1740 | + runGetTablesTaskAsync( |
| 1741 | + effectiveCatalog, effectiveSchemaPattern, tableNamePattern, types, resultSchema, queue); |
| 1742 | + |
| 1743 | + BigQueryJsonResultSet resultSet = |
| 1744 | + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); |
| 1745 | + |
| 1746 | + LOG.info("Started background thread for getTables"); |
| 1747 | + return resultSet; |
| 1748 | + } |
| 1749 | + |
| 1750 | + @VisibleForTesting |
| 1751 | + Thread runGetTablesTaskAsync( |
| 1752 | + String effectiveCatalog, |
| 1753 | + String effectiveSchemaPattern, |
| 1754 | + String tableNamePattern, |
| 1755 | + String[] types, |
| 1756 | + Schema resultSchema, |
| 1757 | + BlockingQueue<BigQueryFieldValueListWrapper> queue) |
| 1758 | + throws SQLException { |
| 1759 | + |
1734 | 1760 | final Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); |
1735 | 1761 | final Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); |
1736 | 1762 | final Set<String> requestedTypes = |
1737 | 1763 | (types == null || types.length == 0) ? null : new HashSet<>(Arrays.asList(types)); |
1738 | 1764 |
|
1739 | | - final Schema resultSchema = defineGetTablesSchema(); |
1740 | 1765 | final FieldList resultSchemaFields = resultSchema.getFields(); |
1741 | | - |
1742 | | - final BlockingQueue<BigQueryFieldValueListWrapper> queue = |
1743 | | - new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); |
1744 | 1766 | final List<FieldValueList> collectedResults = Collections.synchronizedList(new ArrayList<>()); |
1745 | 1767 | final String catalogParam = effectiveCatalog; |
1746 | 1768 | final String schemaParam = effectiveSchemaPattern; |
1747 | | - |
1748 | | - Tracer tracer = this.connection.getTracer(); |
1749 | 1769 | SpanContext parentSpanContext = Span.current().getSpanContext(); |
1750 | 1770 | Runnable tableFetcher = |
1751 | 1771 | () -> { |
1752 | 1772 | Span backgroundSpan = |
1753 | | - tracer |
| 1773 | + this.connection |
| 1774 | + .getTracer() |
1754 | 1775 | .spanBuilder("BigQueryDatabaseMetaData.getTables.background") |
1755 | 1776 | .setNoParent() |
1756 | 1777 | .addLink(parentSpanContext) |
1757 | 1778 | .startSpan(); |
1758 | 1779 |
|
1759 | | - try (Scope backgroundScope = backgroundSpan.makeCurrent()) { |
| 1780 | + try (Scope scope = backgroundSpan.makeCurrent()) { |
1760 | 1781 | ExecutorService apiExecutor = null; |
1761 | 1782 | ExecutorService tableProcessorExecutor = null; |
1762 | 1783 | final FieldList localResultSchemaFields = resultSchemaFields; |
@@ -1898,12 +1919,8 @@ private ResultSet getTablesImpl( |
1898 | 1919 |
|
1899 | 1920 | Runnable wrappedTableFetcher = Context.current().wrap(tableFetcher); |
1900 | 1921 | Thread fetcherThread = new Thread(wrappedTableFetcher, "getTables-fetcher-" + effectiveCatalog); |
1901 | | - BigQueryJsonResultSet resultSet = |
1902 | | - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); |
1903 | | - |
1904 | 1922 | fetcherThread.start(); |
1905 | | - LOG.info("Started background thread for getTables"); |
1906 | | - return resultSet; |
| 1923 | + return fetcherThread; |
1907 | 1924 | } |
1908 | 1925 |
|
1909 | 1926 | Schema defineGetTablesSchema() { |
@@ -2127,24 +2144,51 @@ private ResultSet getColumnsImpl( |
2127 | 2144 | + " columnNamePattern: %s", |
2128 | 2145 | effectiveCatalog, effectiveSchemaPattern, tableNamePattern, columnNamePattern); |
2129 | 2146 |
|
| 2147 | + final Schema resultSchema = defineGetColumnsSchema(); |
| 2148 | + final BlockingQueue<BigQueryFieldValueListWrapper> queue = |
| 2149 | + new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); |
| 2150 | + |
| 2151 | + Thread fetcherThread = |
| 2152 | + runGetColumnsTaskAsync( |
| 2153 | + effectiveCatalog, |
| 2154 | + effectiveSchemaPattern, |
| 2155 | + tableNamePattern, |
| 2156 | + columnNamePattern, |
| 2157 | + resultSchema, |
| 2158 | + queue); |
| 2159 | + |
| 2160 | + BigQueryJsonResultSet resultSet = |
| 2161 | + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); |
| 2162 | + |
| 2163 | + LOG.info("Started background thread for getColumns"); |
| 2164 | + return resultSet; |
| 2165 | + } |
| 2166 | + |
| 2167 | + @VisibleForTesting |
| 2168 | + Thread runGetColumnsTaskAsync( |
| 2169 | + String effectiveCatalog, |
| 2170 | + String effectiveSchemaPattern, |
| 2171 | + String tableNamePattern, |
| 2172 | + String columnNamePattern, |
| 2173 | + Schema resultSchema, |
| 2174 | + BlockingQueue<BigQueryFieldValueListWrapper> queue) |
| 2175 | + throws SQLException { |
| 2176 | + |
2130 | 2177 | Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); |
2131 | 2178 | Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); |
2132 | 2179 | Pattern columnNameRegex = compileSqlLikePattern(columnNamePattern); |
2133 | 2180 |
|
2134 | | - final Schema resultSchema = defineGetColumnsSchema(); |
2135 | 2181 | final FieldList resultSchemaFields = resultSchema.getFields(); |
2136 | | - final BlockingQueue<BigQueryFieldValueListWrapper> queue = |
2137 | | - new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); |
2138 | 2182 | final List<FieldValueList> collectedResults = Collections.synchronizedList(new ArrayList<>()); |
2139 | 2183 | final String catalogParam = effectiveCatalog; |
2140 | 2184 | final String schemaParam = effectiveSchemaPattern; |
2141 | 2185 |
|
2142 | | - Tracer tracer = this.connection.getTracer(); |
2143 | 2186 | SpanContext parentSpanContext = Span.current().getSpanContext(); |
2144 | 2187 | Runnable columnFetcher = |
2145 | 2188 | () -> { |
2146 | 2189 | Span backgroundSpan = |
2147 | | - tracer |
| 2190 | + this.connection |
| 2191 | + .getTracer() |
2148 | 2192 | .spanBuilder("BigQueryDatabaseMetaData.getColumns.background") |
2149 | 2193 | .setNoParent() |
2150 | 2194 | .addLink(parentSpanContext) |
@@ -2252,12 +2296,8 @@ private ResultSet getColumnsImpl( |
2252 | 2296 | Runnable wrappedColumnFetcher = Context.current().wrap(columnFetcher); |
2253 | 2297 | Thread fetcherThread = |
2254 | 2298 | new Thread(wrappedColumnFetcher, "getColumns-fetcher-" + effectiveCatalog); |
2255 | | - BigQueryJsonResultSet resultSet = |
2256 | | - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); |
2257 | | - |
2258 | 2299 | fetcherThread.start(); |
2259 | | - LOG.info("Started background thread for getColumns"); |
2260 | | - return resultSet; |
| 2300 | + return fetcherThread; |
2261 | 2301 | } |
2262 | 2302 |
|
2263 | 2303 | private void processTableColumns( |
@@ -2324,7 +2364,7 @@ private void processTableColumns( |
2324 | 2364 | } |
2325 | 2365 | } |
2326 | 2366 |
|
2327 | | - private Schema defineGetColumnsSchema() { |
| 2367 | + Schema defineGetColumnsSchema() { |
2328 | 2368 | List<Field> fields = new ArrayList<>(24); |
2329 | 2369 | fields.add( |
2330 | 2370 | Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING) |
@@ -3690,27 +3730,44 @@ private ResultSet getSchemasImpl(String catalog, String schemaPattern) throws SQ |
3690 | 3730 |
|
3691 | 3731 | LOG.info("getSchemas called for catalog: %s, schemaPattern: %s", catalog, schemaPattern); |
3692 | 3732 |
|
3693 | | - final Pattern schemaRegex = compileSqlLikePattern(schemaPattern); |
3694 | 3733 | final Schema resultSchema = defineGetSchemasSchema(); |
3695 | | - final FieldList resultSchemaFields = resultSchema.getFields(); |
3696 | | - |
3697 | 3734 | final BlockingQueue<BigQueryFieldValueListWrapper> queue = |
3698 | 3735 | new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); |
| 3736 | + |
| 3737 | + Thread fetcherThread = runGetSchemasTaskAsync(catalog, schemaPattern, resultSchema, queue); |
| 3738 | + |
| 3739 | + BigQueryJsonResultSet resultSet = |
| 3740 | + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); |
| 3741 | + |
| 3742 | + LOG.info("Started background thread for getSchemas"); |
| 3743 | + return resultSet; |
| 3744 | + } |
| 3745 | + |
| 3746 | + @VisibleForTesting |
| 3747 | + Thread runGetSchemasTaskAsync( |
| 3748 | + String catalog, |
| 3749 | + String schemaPattern, |
| 3750 | + Schema resultSchema, |
| 3751 | + BlockingQueue<BigQueryFieldValueListWrapper> queue) |
| 3752 | + throws SQLException { |
| 3753 | + |
| 3754 | + final Pattern schemaRegex = compileSqlLikePattern(schemaPattern); |
| 3755 | + final FieldList resultSchemaFields = resultSchema.getFields(); |
3699 | 3756 | final List<FieldValueList> collectedResults = Collections.synchronizedList(new ArrayList<>()); |
3700 | 3757 | final String catalogParam = catalog; |
3701 | 3758 |
|
3702 | | - Tracer tracer = this.connection.getTracer(); |
3703 | 3759 | SpanContext parentSpanContext = Span.current().getSpanContext(); |
3704 | 3760 | Runnable schemaFetcher = |
3705 | 3761 | () -> { |
3706 | 3762 | Span backgroundSpan = |
3707 | | - tracer |
| 3763 | + this.connection |
| 3764 | + .getTracer() |
3708 | 3765 | .spanBuilder("BigQueryDatabaseMetaData.getSchemas.background") |
3709 | 3766 | .setNoParent() |
3710 | 3767 | .addLink(parentSpanContext) |
3711 | 3768 | .startSpan(); |
3712 | 3769 |
|
3713 | | - try (Scope backgroundScope = backgroundSpan.makeCurrent()) { |
| 3770 | + try (Scope scope = backgroundSpan.makeCurrent()) { |
3714 | 3771 | final FieldList localResultSchemaFields = resultSchemaFields; |
3715 | 3772 | List<String> projectsToScanList = new ArrayList<>(); |
3716 | 3773 |
|
@@ -3791,12 +3848,8 @@ private ResultSet getSchemasImpl(String catalog, String schemaPattern) throws SQ |
3791 | 3848 |
|
3792 | 3849 | Runnable wrappedFetcher = Context.current().wrap(schemaFetcher); |
3793 | 3850 | Thread fetcherThread = new Thread(wrappedFetcher, "getSchemas-fetcher-" + catalog); |
3794 | | - BigQueryJsonResultSet resultSet = |
3795 | | - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); |
3796 | | - |
3797 | 3851 | fetcherThread.start(); |
3798 | | - LOG.info("Started background thread for getSchemas"); |
3799 | | - return resultSet; |
| 3852 | + return fetcherThread; |
3800 | 3853 | } |
3801 | 3854 |
|
3802 | 3855 | Schema defineGetSchemasSchema() { |
|
0 commit comments