Skip to content

Commit 0a01a56

Browse files
feat(Spanner): Add support for MySQL generated columns in Spanner reverse replication template. (#3306)
* feat(Spanner): Add support for MySQL generated columns in Spanner reverse replication template. * Test commits * Reverted misplacement * Unit tests * Generated column IT with Session file
1 parent d04c66b commit 0a01a56

19 files changed

Lines changed: 3634 additions & 379 deletions

File tree

v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/sourceddl/MySqlInformationSchemaScanner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private List<SourceColumn> scanColumns(String tableName, String schema) throws S
101101
String query =
102102
String.format(
103103
"SELECT column_name, data_type, character_maximum_length, "
104-
+ "numeric_precision, numeric_scale, is_nullable, column_key "
104+
+ "numeric_precision, numeric_scale, is_nullable, column_key, generation_expression "
105105
+ "FROM information_schema.columns "
106106
+ "WHERE table_schema = '%s' AND table_name = '%s' "
107107
+ "ORDER BY ordinal_position",
@@ -116,6 +116,8 @@ private List<SourceColumn> scanColumns(String tableName, String schema) throws S
116116
.type(rs.getString("data_type"))
117117
.isNullable("YES".equals(rs.getString("is_nullable")))
118118
.isPrimaryKey("PRI".equals(rs.getString("column_key")));
119+
String generationExpression = rs.getString("generation_expression");
120+
columnBuilder.isGenerated(generationExpression != null && !generationExpression.isEmpty());
119121

120122
// Handle size/precision/scale
121123
String maxLength = rs.getString("character_maximum_length");

v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/sourceddl/SourceColumn.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,14 @@ public abstract class SourceColumn implements Serializable {
4747

4848
public abstract SourceDatabaseType sourceType();
4949

50+
public abstract boolean isGenerated();
51+
5052
public static Builder builder(SourceDatabaseType sourceType) {
5153
return new AutoValue_SourceColumn.Builder()
5254
.sourceType(sourceType)
5355
.isNullable(true)
5456
.isPrimaryKey(false)
57+
.isGenerated(false)
5558
.columnOptions(ImmutableList.of());
5659
}
5760

@@ -68,6 +71,8 @@ public abstract static class Builder {
6871

6972
public abstract Builder isPrimaryKey(boolean isPrimaryKey);
7073

74+
public abstract Builder isGenerated(boolean isGenerated);
75+
7176
public abstract Builder size(Long size);
7277

7378
public abstract Builder precision(Integer precision);

v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/sourceddl/MySqlInformationSchemaScannerTest.java

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package com.google.cloud.teleport.v2.spanner.sourceddl;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
20+
import static org.junit.Assert.assertTrue;
1921
import static org.mockito.Mockito.mock;
2022
import static org.mockito.Mockito.when;
2123

@@ -51,7 +53,7 @@ public void testScanSingleTable() throws SQLException {
5153
// Mock column query
5254
when(stmt.executeQuery(
5355
"SELECT column_name, data_type, character_maximum_length, "
54-
+ "numeric_precision, numeric_scale, is_nullable, column_key "
56+
+ "numeric_precision, numeric_scale, is_nullable, column_key, generation_expression "
5557
+ "FROM information_schema.columns "
5658
+ "WHERE table_schema = 'testdb' AND table_name = 'users' "
5759
+ "ORDER BY ordinal_position"))
@@ -64,6 +66,7 @@ public void testScanSingleTable() throws SQLException {
6466
when(columnRs.getString("character_maximum_length")).thenReturn("10");
6567
when(columnRs.getString("numeric_precision")).thenReturn(null);
6668
when(columnRs.getString("numeric_scale")).thenReturn(null);
69+
when(columnRs.getString("generation_expression")).thenReturn("");
6770

6871
// Mock primary key query
6972
when(stmt.executeQuery(
@@ -91,6 +94,7 @@ public void testScanSingleTable() throws SQLException {
9194
assertEquals("INT", column.type());
9295
assertEquals(false, column.isNullable());
9396
assertEquals(true, column.isPrimaryKey());
97+
assertEquals(false, column.isGenerated());
9498
assertEquals(Long.valueOf(10L), column.size());
9599
assertEquals(1, table.primaryKeyColumns().size());
96100
assertEquals("id", table.primaryKeyColumns().get(0));
@@ -117,7 +121,7 @@ public void testScanTableWithNoColumns() throws SQLException {
117121

118122
when(stmt.executeQuery(
119123
"SELECT column_name, data_type, character_maximum_length, "
120-
+ "numeric_precision, numeric_scale, is_nullable, column_key "
124+
+ "numeric_precision, numeric_scale, is_nullable, column_key, generation_expression "
121125
+ "FROM information_schema.columns "
122126
+ "WHERE table_schema = 'testdb' AND table_name = 'empty_table' "
123127
+ "ORDER BY ordinal_position"))
@@ -161,7 +165,7 @@ public void testScanTableWithSpecialCharacterNames() throws SQLException {
161165

162166
when(stmt.executeQuery(
163167
"SELECT column_name, data_type, character_maximum_length, "
164-
+ "numeric_precision, numeric_scale, is_nullable, column_key "
168+
+ "numeric_precision, numeric_scale, is_nullable, column_key, generation_expression "
165169
+ "FROM information_schema.columns "
166170
+ "WHERE table_schema = 'testdb' AND table_name = 'user$#@!' "
167171
+ "ORDER BY ordinal_position"))
@@ -174,6 +178,7 @@ public void testScanTableWithSpecialCharacterNames() throws SQLException {
174178
when(columnRs.getString("character_maximum_length")).thenReturn("255");
175179
when(columnRs.getString("numeric_precision")).thenReturn(null);
176180
when(columnRs.getString("numeric_scale")).thenReturn(null);
181+
when(columnRs.getString("generation_expression")).thenReturn("concat(`first_name`,' ')");
177182

178183
when(stmt.executeQuery(
179184
"SELECT column_name "
@@ -235,7 +240,7 @@ public void testScanThrowsSQLExceptionOnColumns() throws SQLException {
235240
// Simulate SQLException when scanning columns
236241
when(stmt.executeQuery(
237242
"SELECT column_name, data_type, character_maximum_length, "
238-
+ "numeric_precision, numeric_scale, is_nullable, column_key "
243+
+ "numeric_precision, numeric_scale, is_nullable, column_key, generation_expression "
239244
+ "FROM information_schema.columns "
240245
+ "WHERE table_schema = 'testdb' AND table_name = 'users' "
241246
+ "ORDER BY ordinal_position"))
@@ -269,7 +274,7 @@ public void testScanMultipleTables() throws SQLException {
269274
// users table
270275
when(stmt.executeQuery(
271276
"SELECT column_name, data_type, character_maximum_length, "
272-
+ "numeric_precision, numeric_scale, is_nullable, column_key "
277+
+ "numeric_precision, numeric_scale, is_nullable, column_key, generation_expression "
273278
+ "FROM information_schema.columns "
274279
+ "WHERE table_schema = 'testdb' AND table_name = 'users' "
275280
+ "ORDER BY ordinal_position"))
@@ -282,6 +287,8 @@ public void testScanMultipleTables() throws SQLException {
282287
when(columnRs1.getString("character_maximum_length")).thenReturn("10");
283288
when(columnRs1.getString("numeric_precision")).thenReturn(null);
284289
when(columnRs1.getString("numeric_scale")).thenReturn(null);
290+
when(columnRs1.getString("generation_expression")).thenReturn(null);
291+
285292
when(stmt.executeQuery(
286293
"SELECT column_name "
287294
+ "FROM information_schema.key_column_usage "
@@ -295,7 +302,7 @@ public void testScanMultipleTables() throws SQLException {
295302
// orders table
296303
when(stmt.executeQuery(
297304
"SELECT column_name, data_type, character_maximum_length, "
298-
+ "numeric_precision, numeric_scale, is_nullable, column_key "
305+
+ "numeric_precision, numeric_scale, is_nullable, column_key, generation_expression "
299306
+ "FROM information_schema.columns "
300307
+ "WHERE table_schema = 'testdb' AND table_name = 'orders' "
301308
+ "ORDER BY ordinal_position"))
@@ -308,6 +315,8 @@ public void testScanMultipleTables() throws SQLException {
308315
when(columnRs2.getString("character_maximum_length")).thenReturn(null);
309316
when(columnRs2.getString("numeric_precision")).thenReturn("20");
310317
when(columnRs2.getString("numeric_scale")).thenReturn(null);
318+
when(columnRs2.getString("generation_expression")).thenReturn(null);
319+
311320
when(stmt.executeQuery(
312321
"SELECT column_name "
313322
+ "FROM information_schema.key_column_usage "
@@ -324,4 +333,73 @@ public void testScanMultipleTables() throws SQLException {
324333
assertEquals("users", schema.tables().get("users").name());
325334
assertEquals("orders", schema.tables().get("orders").name());
326335
}
336+
337+
@Test
338+
public void testScanGeneratedColumn() throws SQLException {
339+
// Mock JDBC objects
340+
Connection connection = mock(Connection.class);
341+
Statement stmt = mock(Statement.class);
342+
ResultSet tableRs = mock(ResultSet.class);
343+
ResultSet columnRs = mock(ResultSet.class);
344+
ResultSet pkRs = mock(ResultSet.class);
345+
346+
// Mock table query
347+
when(connection.createStatement()).thenReturn(stmt);
348+
when(stmt.executeQuery(
349+
"SELECT table_name, table_schema "
350+
+ "FROM information_schema.tables "
351+
+ "WHERE table_schema = 'testdb' "
352+
+ "AND table_type = 'BASE TABLE'"))
353+
.thenReturn(tableRs);
354+
when(tableRs.next()).thenReturn(true, false);
355+
when(tableRs.getString(1)).thenReturn("generated_table");
356+
when(tableRs.getString(2)).thenReturn("testdb");
357+
358+
// Mock column query
359+
when(stmt.executeQuery(
360+
"SELECT column_name, data_type, character_maximum_length, "
361+
+ "numeric_precision, numeric_scale, is_nullable, column_key, generation_expression "
362+
+ "FROM information_schema.columns "
363+
+ "WHERE table_schema = 'testdb' AND table_name = 'generated_table' "
364+
+ "ORDER BY ordinal_position"))
365+
.thenReturn(columnRs);
366+
when(columnRs.next()).thenReturn(true, true, false);
367+
368+
// Normal column
369+
when(columnRs.getString("column_name")).thenReturn("id", "full_name");
370+
when(columnRs.getString("data_type")).thenReturn("INT", "VARCHAR");
371+
when(columnRs.getString("is_nullable")).thenReturn("NO", "YES");
372+
when(columnRs.getString("column_key")).thenReturn("PRI", "");
373+
when(columnRs.getString("character_maximum_length")).thenReturn("10", "255");
374+
when(columnRs.getString("numeric_precision")).thenReturn(null, null);
375+
when(columnRs.getString("numeric_scale")).thenReturn(null, null);
376+
// Extra column for generated column
377+
when(columnRs.getString("generation_expression")).thenReturn("", "(id * 2)");
378+
379+
// Mock primary key query
380+
when(stmt.executeQuery(
381+
"SELECT column_name "
382+
+ "FROM information_schema.key_column_usage "
383+
+ "WHERE table_schema = 'testdb' AND table_name = 'generated_table' "
384+
+ "AND constraint_name = 'PRIMARY' "
385+
+ "ORDER BY ordinal_position"))
386+
.thenReturn(pkRs);
387+
when(pkRs.next()).thenReturn(true, false);
388+
when(pkRs.getString("column_name")).thenReturn("id");
389+
390+
MySqlInformationSchemaScanner scanner = new MySqlInformationSchemaScanner(connection, "testdb");
391+
SourceSchema schema = scanner.scan();
392+
393+
assertEquals(1, schema.tables().size());
394+
SourceTable table = schema.tables().get("generated_table");
395+
assertEquals(2, table.columns().size());
396+
397+
SourceColumn idCol = table.columns().get(0);
398+
assertEquals("id", idCol.name());
399+
assertFalse(idCol.isGenerated());
400+
401+
SourceColumn fullNameCol = table.columns().get(1);
402+
assertEquals("full_name", fullNameCol.name());
403+
assertTrue("Column should be generated", fullNameCol.isGenerated());
404+
}
327405
}

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/MySQLDMLGenerator.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,9 @@ private static Map<String, String> getColumnValues(
233233
if (sourcePKs.contains(colName)) {
234234
continue; // we only need non-primary keys
235235
}
236+
if (sourceColDef.isGenerated()) {
237+
continue;
238+
}
236239
if (customTransformColumns != null && customTransformColumns.contains(colName)) {
237240
response.put(colName, customTransformationResponse.get(colName).toString());
238241
continue;
@@ -302,6 +305,8 @@ private static Map<String, String> getPkColumnValues(
302305
customTransformColumns = customTransformationResponse.keySet();
303306
}
304307

308+
boolean doesGeneratedColumnExist = false;
309+
305310
for (int i = 0; i < sourcePKs.size(); i++) {
306311
String sourceColName = sourcePKs.get(i);
307312
SourceColumn sourceColDef = sourceTable.column(sourceColName);
@@ -311,6 +316,11 @@ private static Map<String, String> getPkColumnValues(
311316
return null;
312317
}
313318

319+
if (sourceColDef.isGenerated()) {
320+
doesGeneratedColumnExist = true;
321+
continue;
322+
}
323+
314324
if (customTransformColumns != null && customTransformColumns.contains(sourceColName)) {
315325
response.put(sourceColName, customTransformationResponse.get(sourceColName).toString());
316326
continue;
@@ -361,6 +371,25 @@ private static Map<String, String> getPkColumnValues(
361371
response.put(sourceColName, columnValue);
362372
}
363373

374+
if (doesGeneratedColumnExist) {
375+
// Generated column expression between source DB and spanner can have
376+
// differences. Hence, values of generated column cannot be used from the change
377+
// stream. If Primary key is generated column, then the DML statement need to
378+
// have the respective dependent column values. Since we cannot identify the
379+
// dependent columns, we are adding all the non-generated columns to the
380+
// response.
381+
Map<String, String> generatedColumnValues =
382+
getColumnValues(
383+
schemaMapper,
384+
spannerTable,
385+
sourceTable,
386+
newValuesJson,
387+
keyValuesJson,
388+
sourceDbTimezoneOffset,
389+
customTransformationResponse);
390+
response.putAll(generatedColumnValues);
391+
}
392+
364393
return response;
365394
}
366395

0 commit comments

Comments
 (0)