Skip to content

Commit d04c66b

Browse files
authored
[SourceDB to Spanner] Fetch size Automation (#3260)
* fetchSizeAutomation initial commit * fetchSize automation refinement * fetchSize Automation Refinement * made estimated row size nullable * Added tests: * refactoring (worker zone getter ) and spotless * refactor: move worker resource lookup to OptionsToConfigBuilder and improve FetchSizeCalculator robustness * updated pipeline controller test * spotless * added unknown sourcetype fallback * spotless * update tests * updated unknown type for postgres * updated logs in fetchsizeautomation : * fetchsizeautomation valuemapping refine,ent * usecursorfetch=false, improved code structure, added fetchsize to non uniform reader * revert useCursorFetch * useCursorFetch=false * custom decoder * updated tests * deleted lt for now * added fetchsizecalculator test * time in PK fix * time in PK fix * added tests * added more tests * keeping worker memory in bytes, test updates, license update, minor refactoring * test fix * refactoring, tests * added missing tests * mysql utils into dialect adapter * spotless
1 parent f8677e9 commit d04c66b

39 files changed

Lines changed: 2080 additions & 387 deletions

File tree

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import com.google.cloud.teleport.v2.spanner.migrations.shard.ShardingContext;
3939
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
4040
import com.google.cloud.teleport.v2.spanner.migrations.transformation.TransformationContext;
41-
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeValidator;
41+
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils;
4242
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
4343
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardingContextReader;
4444
import com.google.cloud.teleport.v2.spanner.migrations.utils.TransformationContextReader;
@@ -649,7 +649,7 @@ public static PipelineResult run(Options options) {
649649
Pipeline pipeline = Pipeline.create(options);
650650
String workerMachineType =
651651
pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getWorkerMachineType();
652-
DataflowWorkerMachineTypeValidator.validateMachineSpecs(workerMachineType, 4);
652+
DataflowWorkerMachineTypeUtils.validateMachineSpecs(workerMachineType, 4);
653653
DeadLetterQueueManager dlqManager = buildDlqManager(options);
654654
// Ingest session file into schema object.
655655
Schema schema = SessionFileReader.read(options.getSessionFilePath());

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect;
2525
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.defaults.MySqlConfigDefaults;
2626
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference;
27+
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils;
2728
import com.google.common.annotations.VisibleForTesting;
2829
import com.google.common.collect.ImmutableList;
2930
import com.google.re2j.Matcher;
@@ -33,6 +34,8 @@
3334
import java.util.List;
3435
import java.util.Map.Entry;
3536
import javax.annotation.Nullable;
37+
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
38+
import org.apache.beam.sdk.options.PipelineOptions;
3639
import org.apache.beam.sdk.transforms.Wait;
3740
import org.apache.commons.lang3.StringUtils;
3841
import org.slf4j.Logger;
@@ -43,6 +46,21 @@ public final class OptionsToConfigBuilder {
4346
private static final Logger LOG = LoggerFactory.getLogger(OptionsToConfigBuilder.class);
4447
public static final String DEFAULT_POSTGRESQL_NAMESPACE = "public";
4548

49+
/**
50+
* Extracts the worker zone from the options.
51+
*
52+
* @param options Pipeline options.
53+
* @return The worker zone or null if not found.
54+
*/
55+
public static String extractWorkerZone(PipelineOptions options) {
56+
try {
57+
return options.as(DataflowPipelineWorkerPoolOptions.class).getWorkerZone();
58+
} catch (Exception e) {
59+
LOG.warn("Could not extract worker zone from options. Defaulting to null.", e);
60+
return null;
61+
}
62+
}
63+
4664
public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
4765
SourceDbToSpannerOptions options,
4866
List<String> tables,
@@ -60,6 +78,12 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
6078
long maxConnections =
6179
options.getMaxConnections() > 0 ? (long) (options.getMaxConnections()) : 0;
6280
Integer numPartitions = options.getNumPartitions();
81+
String workerZone = extractWorkerZone(options);
82+
83+
Integer fetchSize = options.getFetchSize();
84+
if (fetchSize != null && fetchSize < 0) {
85+
fetchSize = null;
86+
}
6387

6488
return getJdbcIOWrapperConfig(
6589
sqlDialect,
@@ -78,8 +102,11 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
78102
maxConnections,
79103
numPartitions,
80104
waitOn,
81-
options.getFetchSize(),
82-
options.getUniformizationStageCountHint());
105+
fetchSize,
106+
options.getUniformizationStageCountHint(),
107+
options.getProjectId(),
108+
workerZone,
109+
options.as(DataflowPipelineWorkerPoolOptions.class).getWorkerMachineType());
83110
}
84111

85112
public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
@@ -100,7 +127,10 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
100127
Integer numPartitions,
101128
Wait.OnSignal<?> waitOn,
102129
Integer fetchSize,
103-
Long uniformizationStageCountHint) {
130+
Long uniformizationStageCountHint,
131+
String projectId,
132+
String workerZone,
133+
String workerMachineType) {
104134
JdbcIOWrapperConfig.Builder builder = builderWithDefaultsFor(sqlDialect);
105135
SourceSchemaReference sourceSchemaReference =
106136
sourceSchemaReferenceFrom(sqlDialect, dbName, namespace);
@@ -115,6 +145,14 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
115145
.build())
116146
.setJdbcDriverClassName(jdbcDriverClassName)
117147
.setJdbcDriverJars(jdbcDriverJars);
148+
149+
if (workerMachineType != null && !workerMachineType.isEmpty()) {
150+
builder.setWorkerMemoryBytes(
151+
DataflowWorkerMachineTypeUtils.getWorkerMemoryBytes(
152+
projectId, workerZone, workerMachineType));
153+
builder.setWorkerCores(
154+
DataflowWorkerMachineTypeUtils.getWorkerCores(projectId, workerZone, workerMachineType));
155+
}
118156
if (maxConnections != 0) {
119157
builder = builder.setMaxConnections(maxConnections);
120158
}
@@ -161,27 +199,34 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
161199
}
162200

163201
/**
164-
* For MySQL Dialect, if Fetchsize is expecitly set by the user, enables `useCursorFetch`.
202+
* For MySQL Dialect, if Fetchsize is explicitly set by the user or if it's auto-inferred (null),
203+
* enables `useCursorFetch`. It is disabled only if user explicitly sets FetchSize to 0.
165204
*
166205
* @param sqlDialect Sql Dialect.
167206
* @param url DB Url from passed configs.
168207
* @param fetchSize FetchSize Setting (Null if user has not explicitly set)
169-
* @return Updated URL with `useCursorFetch` only if dialect is MySql and Fetchsize is not null.
170-
* Same as input URL in all other cases.
208+
* @return Updated URL with `useCursorFetch` only if dialect is MySql and Fetchsize is not 0. Same
209+
* as input URL in all other cases.
171210
*/
172211
@VisibleForTesting
173212
@Nullable
174213
protected static String mysqlSetCursorModeIfNeeded(
175214
SQLDialect sqlDialect, String url, @Nullable Integer fetchSize) {
176-
if (fetchSize == null) {
177-
LOG.info(
178-
"FetchSize is not explicitly configured. In case of out of memory errors, please set `FetchSize` according to the available memory and maximum size of a row.");
215+
if (sqlDialect != SQLDialect.MYSQL) {
179216
return url;
180217
}
181-
if (sqlDialect != SQLDialect.MYSQL) {
218+
// For MySQL, to enable streaming/cursor mode, useCursorFetch must be true.
219+
// We enable it if fetchSize is NULL (Auto-infer) or > 0.
220+
// We only disable it if fetchSize is explicitly 0 (Fetch All).
221+
if (fetchSize != null && fetchSize == 0) {
222+
LOG.info(
223+
"FetchSize is explicitly 0. MySQL cursor mode (useCursorFetch) will not be enabled explicitly.");
182224
return url;
183225
}
184-
LOG.info("For Mysql, Fetchsize is explicitly configured. So setting `useCursorMode=true`.");
226+
227+
LOG.info(
228+
"FetchSize is {}. Setting MySQL `useCursorFetch=true`.",
229+
fetchSize == null ? "Auto" : fetchSize);
185230
String updatedUrl = addParamToJdbcUrl(url, "useCursorFetch", "true");
186231
return updatedUrl;
187232
}

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/DialectAdapter.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,18 @@
1919
import com.google.cloud.teleport.v2.source.reader.io.exception.RetriableSchemaDiscoveryException;
2020
import com.google.cloud.teleport.v2.source.reader.io.exception.SchemaDiscoveryException;
2121
import com.google.cloud.teleport.v2.source.reader.io.jdbc.JdbcSchemaReference;
22+
import com.google.cloud.teleport.v2.source.reader.io.jdbc.rowmapper.JdbcValueMappingsProvider;
2223
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.UniformSplitterDBAdapter;
2324
import com.google.cloud.teleport.v2.source.reader.io.schema.RetriableSchemaDiscovery;
2425
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceColumnIndexInfo;
2526
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference;
2627
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference.Kind;
28+
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableSchema;
2729
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType;
2830
import com.google.common.base.Preconditions;
2931
import com.google.common.collect.ImmutableList;
3032
import com.google.common.collect.ImmutableMap;
33+
import java.util.Map;
3134

3235
/**
3336
* Interface to support various dialects of JDBC databases.
@@ -111,4 +114,18 @@ ImmutableMap<String, ImmutableList<SourceColumnIndexInfo>> discoverTableIndexes(
111114
JdbcSchemaReference sourceSchemaReference,
112115
ImmutableList<String> tables)
113116
throws SchemaDiscoveryException, RetriableSchemaDiscoveryException;
117+
118+
default long estimateRowSize(
119+
SourceTableSchema sourceTableSchema, JdbcValueMappingsProvider jdbcValueMappingsProvider) {
120+
return estimateRowSize(
121+
sourceTableSchema.sourceColumnNameToSourceColumnType(), jdbcValueMappingsProvider);
122+
}
123+
124+
default long estimateRowSize(
125+
Map<String, SourceColumnType> sourceColumnNameToSourceColumnType,
126+
JdbcValueMappingsProvider jdbcValueMappingsProvider) {
127+
return sourceColumnNameToSourceColumnType.values().stream()
128+
.map(jdbcValueMappingsProvider::estimateColumnSize)
129+
.reduce(0, Integer::sum);
130+
}
114131
}

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.sql.SQLTimeoutException;
4646
import java.sql.SQLTransientConnectionException;
4747
import java.sql.Statement;
48+
import java.time.Duration;
4849
import java.util.HashMap;
4950
import java.util.HashSet;
5051
import java.util.Map;
@@ -762,4 +763,9 @@ public static ImmutableList<String> colList() {
762763

763764
private InformationSchemaStatsCols() {}
764765
}
766+
767+
@Override
768+
public Duration extractBoundaryDuration(ResultSet rs, int index) throws SQLException {
769+
return MysqlTimeConverter.toDuration(rs.getBytes(index));
770+
}
765771
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright (C) 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.mysql;
17+
18+
import com.google.re2j.Matcher;
19+
import com.google.re2j.Pattern;
20+
import java.io.Serializable;
21+
import java.nio.charset.StandardCharsets;
22+
import java.time.Duration;
23+
import java.util.concurrent.TimeUnit;
24+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
25+
import org.apache.commons.lang3.StringUtils;
26+
27+
/**
28+
* Utility class to convert MySQL TIME values (in bytes) to {@link java.time.Duration}. Handles both
29+
* Text Protocol (String bytes) and Binary Protocol (Packed bytes).
30+
*/
31+
public class MysqlTimeConverter implements Serializable {
32+
33+
// Pattern to extract fields form MySQL time output of HH:MM:ss.SSS
34+
private static final Pattern TIME_STRING_PATTERN =
35+
Pattern.compile("^(-)?(\\d+):(\\d+):(\\d+)(\\.(\\d+))?$");
36+
37+
/**
38+
* Converts raw byte[] from a MySQL TIME column to a {@link Duration}.
39+
*
40+
* @param value The byte array from ResultSet.getBytes().
41+
* @return The corresponding Duration, or null if value is null/empty.
42+
*/
43+
public static Duration toDuration(byte[] value) {
44+
if (value == null || value.length == 0) {
45+
return null;
46+
}
47+
48+
// Binary Protocol decoding (uses 0x00 or 0x01 as the first byte for sign flag)
49+
// In Text Protocol, first byte is '-' (0x2D) or an ASCII digit (>= 0x30).
50+
if (value[0] == 0 || value[0] == 1) {
51+
boolean isNegative = (value[0] == 1);
52+
int days = 0;
53+
int hours = 0;
54+
int minutes = 0;
55+
int seconds = 0;
56+
long micros = 0;
57+
58+
if (value.length >= 8) {
59+
days =
60+
(value[1] & 0xFF)
61+
| ((value[2] & 0xFF) << 8)
62+
| ((value[3] & 0xFF) << 16)
63+
| ((value[4] & 0xFF) << 24);
64+
hours = value[5] & 0xFF;
65+
minutes = value[6] & 0xFF;
66+
seconds = value[7] & 0xFF;
67+
}
68+
if (value.length >= 12) {
69+
micros =
70+
((value[8] & 0xFF)
71+
| ((value[9] & 0xFF) << 8)
72+
| ((value[10] & 0xFF) << 16)
73+
| ((value[11] & 0xFF) << 24));
74+
}
75+
76+
long totalMicros =
77+
TimeUnit.HOURS.toMicros((days * 24L) + hours)
78+
+ TimeUnit.MINUTES.toMicros(minutes)
79+
+ TimeUnit.SECONDS.toMicros(seconds)
80+
+ micros;
81+
82+
return Duration.ofNanos((isNegative ? -1 : 1) * TimeUnit.MICROSECONDS.toNanos(totalMicros));
83+
84+
} else {
85+
// Text Protocol handling
86+
String timeStr = new String(value, StandardCharsets.UTF_8);
87+
/* MySQL output is always hours:minutes:seconds.fractionalSeconds */
88+
Matcher matcher = TIME_STRING_PATTERN.matcher(timeStr);
89+
Preconditions.checkArgument(
90+
matcher.matches(),
91+
"The time string " + timeStr + " does not match " + TIME_STRING_PATTERN);
92+
boolean isNegative = matcher.group(1) != null;
93+
int hours = Integer.parseInt(matcher.group(2));
94+
int minutes = Integer.parseInt(matcher.group(3));
95+
int seconds = Integer.parseInt(matcher.group(4));
96+
long nanoSeconds =
97+
matcher.group(5) == null
98+
? 0
99+
: Long.parseLong(StringUtils.rightPad(matcher.group(6), 9, '0'));
100+
101+
// Calculate total nanos
102+
long totalNanos =
103+
TimeUnit.HOURS.toNanos(hours)
104+
+ TimeUnit.MINUTES.toNanos(minutes)
105+
+ TimeUnit.SECONDS.toNanos(seconds)
106+
+ nanoSeconds;
107+
108+
return Duration.ofNanos(isNegative ? -totalNanos : totalNanos);
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)