Skip to content

Commit b843f18

Browse files
committed
tx:coordinate upgrade
1 parent 433c549 commit b843f18

20 files changed

Lines changed: 257 additions & 180 deletions

File tree

example/spring-demo-client/src/main/resources/application-ujued.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ spring.datasource.password=123456
33
tx-lcn.logger.enabled=true
44

55
#spring.cloud.consul.discovery.health-check-url=http://127.0.0.1:${server.port}/actuator/health
6-
logging.level.com.codingapi.tx.client=DEBUG
6+
#logging.level.com.codingapi.tx.client=DEBUG

tx-client/src/main/java/com/codingapi/tx/client/TxClientConfiguration.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
import com.codingapi.tx.client.spi.transaction.txc.resource.init.DefaultTxcSettingFactory;
44
import com.codingapi.tx.client.spi.transaction.txc.resource.init.TxcSettingFactory;
5+
import com.codingapi.tx.client.support.checking.DTXChecking;
6+
import com.codingapi.tx.client.support.checking.SimpleDTXChecking;
7+
import com.codingapi.tx.client.support.common.template.TransactionCleanTemplate;
58
import org.apache.commons.dbutils.QueryRunner;
9+
import org.springframework.beans.factory.SmartInitializingSingleton;
610
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
711
import org.springframework.boot.context.properties.EnableConfigurationProperties;
812
import org.springframework.context.annotation.Bean;
@@ -34,4 +38,14 @@ public QueryRunner queryRunner(DataSource dataSource) {
3438
public TxcSettingFactory txcSettingFactory() {
3539
return new DefaultTxcSettingFactory();
3640
}
41+
42+
@Bean
43+
public SmartInitializingSingleton dtxCheckingTransactionCleanTemplateAdapter(DTXChecking dtxChecking,
44+
TransactionCleanTemplate transactionCleanTemplate) {
45+
if (dtxChecking instanceof SimpleDTXChecking) {
46+
return () -> ((SimpleDTXChecking) dtxChecking).setTransactionCleanTemplate(transactionCleanTemplate);
47+
}
48+
return () -> {
49+
};
50+
}
3751
}

tx-client/src/main/java/com/codingapi/tx/client/aspect/weave/DTXLogicWeaver.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ public Object runTransaction(DTXInfo dtxInfo, BusinessCallback business) throws
6868
try {
6969
return transactionServiceExecutor.transactionRunning(info);
7070
} finally {
71+
synchronized (DTXLocal.cur()) {
72+
DTXLocal.cur().notify();
73+
}
7174
DTXLocal.makeNeverAppeared();
7275
log.info("tx-unit end------>");
7376
}

tx-client/src/main/java/com/codingapi/tx/client/bean/DTXLocal.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ public class DTXLocal {
3131
*/
3232
private String unitId;
3333

34+
/**
35+
* 业务相关资源
36+
*/
37+
private Object resource;
38+
3439

3540
////////////////////////// volatile ///////////////////////////////
3641

tx-client/src/main/java/com/codingapi/tx/client/spi/message/txc/TxcTransactionCleanService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.codingapi.tx.client.spi.transaction.txc.resource.def.TxcService;
44
import com.codingapi.tx.commons.exception.TransactionClearException;
55
import com.codingapi.tx.client.support.common.TransactionCleanService;
6+
import com.codingapi.tx.commons.exception.TxcLogicException;
67
import com.codingapi.tx.logger.TxLogger;
78
import lombok.extern.slf4j.Slf4j;
89
import org.springframework.beans.factory.annotation.Autowired;
@@ -40,7 +41,7 @@ public void clear(String groupId, int state, String unitId, String unitType) thr
4041

4142
// 清理TXC
4243
txcService.cleanTxc(groupId, unitId);
43-
} catch (SQLException e) {
44+
} catch (TxcLogicException e) {
4445
log.error("txc > clean transaction error. {}", e.getMessage());
4546
throw new TransactionClearException(e.getMessage());
4647
}

tx-client/src/main/java/com/codingapi/tx/client/spi/transaction/txc/control/TxcRunningTransaction.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.codingapi.tx.client.spi.transaction.txc.resource.def.bean.RollbackInfo;
1010
import com.codingapi.tx.commons.exception.TransactionClearException;
1111
import com.codingapi.tx.commons.exception.TxClientException;
12+
import com.codingapi.tx.commons.exception.TxcLogicException;
1213
import lombok.extern.slf4j.Slf4j;
1314
import org.springframework.beans.factory.annotation.Autowired;
1415
import org.springframework.stereotype.Component;
@@ -51,10 +52,6 @@ public void preBusinessCode(TxTransactionInfo info) {
5152

5253
@Override
5354
public void onBusinessCodeError(TxTransactionInfo info, Throwable throwable) {
54-
// 写Undo log 早于 clean
55-
txcService.writeUndoLog(
56-
info.getGroupId(), info.getUnitId(), (RollbackInfo) DTXLocal.cur().getAttachment());
57-
5855
try {
5956
log.info("txc > running > clean transaction.");
6057
transactionCleanTemplate.clean(
@@ -70,8 +67,12 @@ public void onBusinessCodeError(TxTransactionInfo info, Throwable throwable) {
7067
@Override
7168
public void onBusinessCodeSuccess(TxTransactionInfo info, Object result) throws TxClientException {
7269
// 写Undo log
73-
txcService.writeUndoLog(
74-
info.getGroupId(), info.getUnitId(), (RollbackInfo) DTXLocal.cur().getAttachment());
70+
try {
71+
txcService.writeUndoLog(
72+
info.getGroupId(), info.getUnitId(), (RollbackInfo) DTXLocal.cur().getAttachment());
73+
} catch (TxcLogicException e) {
74+
throw new TxClientException(e);
75+
}
7576
// 加入事务组
7677
transactionControlTemplate.joinGroup(info.getGroupId(), info.getUnitId(), info.getTransactionType(),
7778
info.getTransactionInfo());

tx-client/src/main/java/com/codingapi/tx/client/spi/transaction/txc/control/TxcStartingTransaction.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package com.codingapi.tx.client.spi.transaction.txc.control;
22

3-
import com.codingapi.tx.client.bean.TxTransactionInfo;
43
import com.codingapi.tx.client.bean.DTXLocal;
5-
import com.codingapi.tx.client.spi.transaction.txc.resource.def.TxcService;
4+
import com.codingapi.tx.client.bean.TxTransactionInfo;
65
import com.codingapi.tx.client.spi.transaction.txc.resource.def.bean.RollbackInfo;
7-
import com.codingapi.tx.client.support.separate.TXLCNTransactionControl;
86
import com.codingapi.tx.client.support.common.template.TransactionControlTemplate;
7+
import com.codingapi.tx.client.support.separate.TXLCNTransactionControl;
98
import com.codingapi.tx.commons.exception.BeforeBusinessException;
109
import lombok.extern.slf4j.Slf4j;
1110
import org.springframework.beans.factory.annotation.Autowired;
@@ -23,15 +22,10 @@
2322
@Slf4j
2423
public class TxcStartingTransaction implements TXLCNTransactionControl {
2524

26-
private final TxcService txcService;
27-
2825
private final TransactionControlTemplate transactionControlTemplate;
2926

3027
@Autowired
31-
public TxcStartingTransaction(
32-
TxcService txcService,
33-
TransactionControlTemplate transactionControlTemplate) {
34-
this.txcService = txcService;
28+
public TxcStartingTransaction(TransactionControlTemplate transactionControlTemplate) {
3529
this.transactionControlTemplate = transactionControlTemplate;
3630
}
3731

@@ -74,11 +68,6 @@ public void postBusinessCode(TxTransactionInfo info) {
7468
state = -1;
7569
}
7670

77-
// 非提交状态,写Undo log
78-
if (state != 1) {
79-
txcService.writeUndoLog(info.getGroupId(), info.getUnitId(), rollbackInfo);
80-
}
81-
8271
// 关闭事务组
8372
transactionControlTemplate.notifyGroup(info.getGroupId(), info.getUnitId(), info.getTransactionType(), state);
8473
}

tx-client/src/main/java/com/codingapi/tx/client/spi/transaction/txc/resource/TxcJdbcEventListener.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package com.codingapi.tx.client.spi.transaction.txc.resource;
22

3+
import com.codingapi.tx.client.bean.DTXLocal;
34
import com.codingapi.tx.client.spi.transaction.txc.resource.def.SqlExecuteInterceptor;
45
import com.codingapi.tx.client.spi.transaction.txc.resource.def.bean.LockableSelect;
56
import com.codingapi.tx.client.spi.transaction.txc.resource.init.TxcSettingFactory;
7+
import com.codingapi.tx.client.spi.transaction.txc.resource.util.SqlUtils;
68
import com.codingapi.tx.jdbcproxy.p6spy.common.PreparedStatementInformation;
79
import com.codingapi.tx.jdbcproxy.p6spy.common.StatementInformation;
810
import com.codingapi.tx.jdbcproxy.p6spy.event.SimpleJdbcEventListener;
11+
import com.codingapi.tx.jdbcproxy.p6spy.util.TxcUtils;
912
import lombok.extern.slf4j.Slf4j;
1013
import net.sf.jsqlparser.JSQLParserException;
1114
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
@@ -42,11 +45,14 @@ public TxcJdbcEventListener(SqlExecuteInterceptor sqlExecuteInterceptor, TxcSett
4245
public void onBeforeAnyExecute(StatementInformation statementInformation) throws SQLException {
4346
String sql = statementInformation.getSqlWithValues();
4447

45-
// 忽略Txc数据表
46-
if (sql.contains(txcSettingFactory.lockTableName()) || sql.contains(txcSettingFactory.undoLogTableName())) {
48+
// 忽略TxcSQL
49+
if (TxcUtils.isTxcSQL(sql)) {
4750
return;
4851
}
4952

53+
// 当前业务链接
54+
DTXLocal.cur().setResource(statementInformation.getStatement().getConnection());
55+
5056
// 拦截处理
5157
try {
5258
long startTime = System.currentTimeMillis();

tx-client/src/main/java/com/codingapi/tx/client/spi/transaction/txc/resource/TxcServiceImpl.java

Lines changed: 41 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
import com.codingapi.tx.client.spi.transaction.txc.resource.def.TxcSqlExecutor;
66
import com.codingapi.tx.client.spi.transaction.txc.resource.def.bean.*;
77
import com.codingapi.tx.client.spi.transaction.txc.resource.util.SqlUtils;
8+
import com.codingapi.tx.commons.exception.TxcLogicException;
89
import lombok.extern.slf4j.Slf4j;
910
import org.springframework.beans.factory.annotation.Autowired;
1011
import org.springframework.stereotype.Service;
1112
import org.springframework.util.DigestUtils;
1213

1314
import java.nio.charset.StandardCharsets;
15+
import java.sql.Connection;
1416
import java.sql.SQLException;
1517
import java.util.List;
1618
import java.util.Map;
@@ -29,53 +31,50 @@ public class TxcServiceImpl implements TxcService {
2931
private TxcSqlExecutor txcSqlExecutor;
3032

3133
@Override
32-
public void lockResource(LockInfo lockInfo, RollbackInfo rollbackInfo) throws SQLException {
34+
public void lockResource(LockInfo lockInfo, RollbackInfo rollbackInfo) throws TxcLogicException {
3335
try {
34-
DTXLocal.makeUnProxy();
36+
Connection connection = (Connection) DTXLocal.cur().getResource();
3537
// key value MD5 HEX to store
3638
lockInfo.setKeyValue(DigestUtils.md5DigestAsHex(lockInfo.getKeyValue().getBytes(StandardCharsets.UTF_8)));
37-
txcSqlExecutor.tryLock(lockInfo);
39+
txcSqlExecutor.tryLock(connection, lockInfo);
3840
} catch (SQLException e) {
3941
rollbackInfo.setStatus(-1);
40-
throw new SQLException("Resource is locked! Place try again later.");
41-
} finally {
42-
DTXLocal.undoProxyStatus();
42+
throw new TxcLogicException("Resource is locked! Place try again later.");
4343
}
4444
}
4545

4646
@Override
47-
public void lockSelect(SelectImageParams selectImageParams, boolean isxLock) throws SQLException {
48-
List<ModifiedRecord> modifiedRecords;
47+
public void lockSelect(SelectImageParams selectImageParams, boolean isxLock) throws TxcLogicException {
48+
Connection connection = (Connection) DTXLocal.cur().getResource();
4949
try {
50-
DTXLocal.makeUnProxy();
51-
modifiedRecords = txcSqlExecutor.selectSqlPreviousPrimaryKeys(selectImageParams);
52-
} finally {
53-
DTXLocal.undoProxyStatus();
54-
}
55-
for (ModifiedRecord modifiedRecord : modifiedRecords) {
56-
for (Map.Entry<String, FieldCluster> entry : modifiedRecord.getFieldClusters().entrySet()) {
57-
String k = entry.getKey();
58-
FieldCluster v = entry.getValue();
59-
lockResource(new LockInfo()
60-
.setGroupId(selectImageParams.getGroupId())
61-
.setUnitId(selectImageParams.getUnitId())
62-
.setxLock(isxLock)
63-
.setKeyValue(v.getPrimaryKeys().toString())
64-
.setTableName(k), selectImageParams.getRollbackInfo());
50+
List<ModifiedRecord> modifiedRecords = txcSqlExecutor.selectSqlPreviousPrimaryKeys(connection, selectImageParams);
51+
for (ModifiedRecord modifiedRecord : modifiedRecords) {
52+
for (Map.Entry<String, FieldCluster> entry : modifiedRecord.getFieldClusters().entrySet()) {
53+
String k = entry.getKey();
54+
FieldCluster v = entry.getValue();
55+
lockResource(new LockInfo()
56+
.setGroupId(selectImageParams.getGroupId())
57+
.setUnitId(selectImageParams.getUnitId())
58+
.setxLock(isxLock)
59+
.setKeyValue(v.getPrimaryKeys().toString())
60+
.setTableName(k), selectImageParams.getRollbackInfo());
61+
}
6562
}
63+
} catch (SQLException e) {
64+
throw new TxcLogicException(e);
6665
}
6766
}
6867

6968
@Override
70-
public void resolveUpdateImage(UpdateImageParams updateImageParams) throws SQLException {
69+
public void resolveUpdateImage(UpdateImageParams updateImageParams) throws TxcLogicException {
7170

7271
// 前置镜像数据集
7372
List<ModifiedRecord> modifiedRecords;
73+
Connection connection = (Connection) DTXLocal.cur().getResource();
7474
try {
75-
DTXLocal.makeUnProxy();
76-
modifiedRecords = txcSqlExecutor.updateSqlPreviousData(updateImageParams);
77-
} finally {
78-
DTXLocal.undoProxyStatus();
75+
modifiedRecords = txcSqlExecutor.updateSqlPreviousData(connection, updateImageParams);
76+
} catch (SQLException e) {
77+
throw new TxcLogicException(e);
7978
}
8079

8180

@@ -115,25 +114,25 @@ public void resolveUpdateImage(UpdateImageParams updateImageParams) throws SQLEx
115114
// Lock Resource
116115
this.lockResource(new LockInfo()
117116
.setxLock(true)
117+
.setKeyValue(v.getPrimaryKeys().toString())
118118
.setGroupId(updateImageParams.getGroupId())
119119
.setUnitId(updateImageParams.getUnitId())
120-
.setKeyValue(v.getPrimaryKeys().toString())
121120
.setTableName(k), updateImageParams.getRollbackInfo());
122121
}
123122
}
124123
log.debug("rollback info: {}", updateImageParams.getRollbackInfo());
125124
}
126125

127126
@Override
128-
public void resolveDeleteImage(DeleteImageParams deleteImageParams) throws SQLException {
127+
public void resolveDeleteImage(DeleteImageParams deleteImageParams) throws TxcLogicException {
129128

130129
// 前置数据
131130
List<ModifiedRecord> modifiedRecords;
131+
Connection connection = (Connection) DTXLocal.cur().getResource();
132132
try {
133-
DTXLocal.makeUnProxy();
134-
modifiedRecords = txcSqlExecutor.deleteSqlPreviousData(deleteImageParams);
135-
} finally {
136-
DTXLocal.undoProxyStatus();
133+
modifiedRecords = txcSqlExecutor.deleteSqlPreviousData(connection, deleteImageParams);
134+
} catch (SQLException e) {
135+
throw new TxcLogicException(e);
137136
}
138137

139138
// rollback sql
@@ -168,7 +167,7 @@ public void resolveDeleteImage(DeleteImageParams deleteImageParams) throws SQLEx
168167
}
169168

170169
@Override
171-
public void writeUndoLog(String groupId, String unitId, RollbackInfo rollbackInfo) {
170+
public void writeUndoLog(String groupId, String unitId, RollbackInfo rollbackInfo) throws TxcLogicException {
172171
if (rollbackInfo.getRollbackSqlList().size() == 0) {
173172
return;
174173
}
@@ -182,21 +181,21 @@ public void writeUndoLog(String groupId, String unitId, RollbackInfo rollbackInf
182181
DTXLocal.makeUnProxy();
183182
txcSqlExecutor.writeUndoLog(undoLogDO);
184183
} catch (SQLException e) {
185-
log.error("error: {} code: {}", e.getMessage(), e.getErrorCode());
184+
throw new TxcLogicException(e);
186185
} finally {
187186
DTXLocal.undoProxyStatus();
188187
}
189188
}
190189

191190
@Override
192-
public void cleanTxc(String groupId, String unitId) throws SQLException {
191+
public void cleanTxc(String groupId, String unitId) throws TxcLogicException {
193192
// 清理事务单元相关锁
194193
try {
195194
DTXLocal.makeUnProxy();
196195
txcSqlExecutor.clearLock(groupId, unitId);
197196
} catch (SQLException e) {
198197
if (e.getErrorCode() != SqlUtils.MYSQL_TABLE_NOT_EXISTS_CODE) {
199-
throw e;
198+
throw new TxcLogicException(e);
200199
}
201200
} finally {
202201
DTXLocal.undoProxyStatus();
@@ -208,18 +207,20 @@ public void cleanTxc(String groupId, String unitId) throws SQLException {
208207
txcSqlExecutor.clearUndoLog(groupId, unitId);
209208
} catch (SQLException e) {
210209
if (e.getErrorCode() != SqlUtils.MYSQL_TABLE_NOT_EXISTS_CODE) {
211-
throw e;
210+
throw new TxcLogicException(e);
212211
}
213212
} finally {
214213
DTXLocal.undoProxyStatus();
215214
}
216215
}
217216

218217
@Override
219-
public void undo(String groupId, String unitId) throws SQLException {
218+
public void undo(String groupId, String unitId) throws TxcLogicException {
220219
try {
221220
DTXLocal.makeUnProxy();
222221
txcSqlExecutor.applyUndoLog(groupId, unitId);
222+
} catch (SQLException e) {
223+
throw new TxcLogicException(e);
223224
} finally {
224225
DTXLocal.undoProxyStatus();
225226
}

0 commit comments

Comments
 (0)