Skip to content

Commit 294e4c2

Browse files
authored
[ISSUE #5044] Data synchronization strong verification in mariadb gtid mode (#5045)
* [ISSUE #5040] Support gtid mode for sync data with mysql * fix conflicts with master * fix checkstyle error * [ISSUE #5044] Data synchronization strong verification in mariadb gtid mode * fix checkstyle error
1 parent b3a42e1 commit 294e4c2

5 files changed

Lines changed: 49 additions & 24 deletions

File tree

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class CanalSinkConfig extends SinkConfig {
4141

4242
private boolean isGTIDMode = true;
4343

44+
private boolean isMariaDB = true;
45+
4446
// skip sink process exception
4547
private Boolean skipException = false;
4648

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public class CanalSourceConfig extends SourceConfig {
4747

4848
private String serverUUID;
4949

50+
private boolean isMariaDB = true;
51+
5052
private boolean isGTIDMode = true;
5153

5254
private Integer batchSize = 10000;

eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -476,9 +476,11 @@ private Exception doCall() {
476476
}
477477
JdbcTemplate template = dbDialect.getJdbcTemplate();
478478
String sourceGtid = context.getGtid();
479-
if (StringUtils.isNotEmpty(sourceGtid)) {
480-
String setGtid = "SET @@session.gtid_next = '" + sourceGtid + "';";
481-
template.execute(setGtid);
479+
if (StringUtils.isNotEmpty(sourceGtid) && !sinkConfig.isMariaDB()) {
480+
String setMySQLGtid = "SET @@session.gtid_next = '" + sourceGtid + "';";
481+
template.execute(setMySQLGtid);
482+
} else if (StringUtils.isNotEmpty(sourceGtid) && sinkConfig.isMariaDB()) {
483+
throw new RuntimeException("unsupport gtid mode for mariaDB");
482484
} else {
483485
log.error("gtid is empty in gtid mode");
484486
throw new RuntimeException("gtid is empty in gtid mode");
@@ -510,8 +512,13 @@ public void setValues(PreparedStatement ps) throws SQLException {
510512
});
511513

512514
// reset gtid
513-
String resetGtid = "SET @@session.gtid_next = AUTOMATIC;";
514-
dbDialect.getJdbcTemplate().execute(resetGtid);
515+
if (sinkConfig.isMariaDB()) {
516+
throw new RuntimeException("unsupport gtid mode for mariaDB");
517+
} else {
518+
String resetMySQLGtid = "SET @@session.gtid_next = 'AUTOMATIC';";
519+
dbDialect.getJdbcTemplate().execute(resetMySQLGtid);
520+
}
521+
515522
error = null;
516523
exeResult = ExecuteResult.SUCCESS;
517524
} catch (DeadlockLoserDataAccessException ex) {

eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
public class EntryParser {
5050

5151
public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig sourceConfig, List<Entry> datas,
52-
RdbTableMgr tables) {
52+
RdbTableMgr tables) {
5353
List<CanalConnectRecord> recordList = new ArrayList<>();
5454
List<Entry> transactionDataBuffer = new ArrayList<>();
5555
// need check weather the entry is loopback
@@ -60,9 +60,9 @@ public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig source
6060
switch (entry.getEntryType()) {
6161
case ROWDATA:
6262
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
63-
if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode()) {
64-
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
65-
if (currentGtid.contains(sourceConfig.getServerUUID())) {
63+
// don't support gtid for mariadb
64+
if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) {
65+
if (checkGtidForEntry(entry, sourceConfig)) {
6666
transactionDataBuffer.add(entry);
6767
}
6868
} else {
@@ -90,9 +90,14 @@ public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig source
9090
return recordMap;
9191
}
9292

93+
private static boolean checkGtidForEntry(Entry entry, CanalSourceConfig sourceConfig) {
94+
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
95+
return currentGtid.contains(sourceConfig.getServerUUID());
96+
}
97+
9398
private static void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig,
94-
List<CanalConnectRecord> recordList,
95-
List<Entry> transactionDataBuffer, RdbTableMgr tables) {
99+
List<CanalConnectRecord> recordList,
100+
List<Entry> transactionDataBuffer, RdbTableMgr tables) {
96101
for (Entry bufferEntry : transactionDataBuffer) {
97102
List<CanalConnectRecord> recordParsedList = internParse(sourceConfig, bufferEntry, tables);
98103
if (CollectionUtils.isEmpty(recordParsedList)) {
@@ -130,7 +135,7 @@ private static Column getColumnIgnoreCase(List<Column> columns, String columName
130135
}
131136

132137
private static List<CanalConnectRecord> internParse(CanalSourceConfig sourceConfig, Entry entry,
133-
RdbTableMgr tableMgr) {
138+
RdbTableMgr tableMgr) {
134139
String schemaName = entry.getHeader().getSchemaName();
135140
String tableName = entry.getHeader().getTableName();
136141
if (tableMgr.getTable(schemaName, tableName) == null) {
@@ -169,7 +174,7 @@ private static List<CanalConnectRecord> internParse(CanalSourceConfig sourceConf
169174
}
170175

171176
private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entry entry,
172-
RowChange rowChange, RowData rowData) {
177+
RowChange rowChange, RowData rowData) {
173178
CanalConnectRecord canalConnectRecord = new CanalConnectRecord();
174179
canalConnectRecord.setTableName(entry.getHeader().getTableName());
175180
canalConnectRecord.setSchemaName(entry.getHeader().getSchemaName());
@@ -179,10 +184,16 @@ private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfi
179184
canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset());
180185
// if enabled gtid mode, gtid not null
181186
if (canalSourceConfig.isGTIDMode()) {
182-
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
183-
String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID());
184-
canalConnectRecord.setGtid(gtidRange);
185-
canalConnectRecord.setCurrentGtid(currentGtid);
187+
if (canalSourceConfig.isMariaDB()) {
188+
String currentGtid = entry.getHeader().getGtid();
189+
canalConnectRecord.setGtid(currentGtid);
190+
canalConnectRecord.setCurrentGtid(currentGtid);
191+
} else {
192+
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
193+
String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID());
194+
canalConnectRecord.setGtid(gtidRange);
195+
canalConnectRecord.setCurrentGtid(currentGtid);
196+
}
186197
}
187198

188199
EventType eventType = canalConnectRecord.getEventType();
@@ -276,7 +287,7 @@ public static String replaceGtidRange(String gtid, String currentGtid, String se
276287
}
277288

278289
private static void checkUpdateKeyColumns(Map<String, EventColumn> oldKeyColumns,
279-
Map<String, EventColumn> keyColumns) {
290+
Map<String, EventColumn> keyColumns) {
280291
if (oldKeyColumns.isEmpty()) {
281292
return;
282293
}

eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,16 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) {
198198
recordPositionMap.put("journalName", canalRecordPartition.getJournalName());
199199
recordPositionMap.put("timestamp", canalRecordPartition.getTimeStamp());
200200
recordPositionMap.put("position", canalRecordOffset.getOffset());
201-
String gtidRange = canalRecordOffset.getGtid();
202-
if (gtidRange != null) {
203-
if (canalRecordOffset.getCurrentGtid() != null) {
204-
gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(),
205-
sourceConfig.getServerUUID());
201+
// for mariaDB not support gtid mode
202+
if (sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) {
203+
String gtidRange = canalRecordOffset.getGtid();
204+
if (gtidRange != null) {
205+
if (canalRecordOffset.getCurrentGtid() != null) {
206+
gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(),
207+
sourceConfig.getServerUUID());
208+
}
209+
recordPositionMap.put("gtid", gtidRange);
206210
}
207-
recordPositionMap.put("gtid", gtidRange);
208211
}
209212
positions.add(JsonUtils.toJSONString(recordPositionMap));
210213
});

0 commit comments

Comments
 (0)