Skip to content

Commit d4b1e01

Browse files
committed
tx:txc upgrade
1 parent 260ea0a commit d4b1e01

10 files changed

Lines changed: 120 additions & 110 deletions

File tree

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
package com.codingapi.tx.client.spi.transaction.txc.control;
22

3-
import com.codingapi.tx.client.bean.DTXLocal;
43
import com.codingapi.tx.client.spi.transaction.txc.resource.def.TxcService;
5-
import com.codingapi.tx.client.spi.transaction.txc.resource.def.bean.RollbackInfo;
64
import com.codingapi.tx.client.support.common.TransactionCleanService;
75
import com.codingapi.tx.commons.exception.TransactionClearException;
86
import com.codingapi.tx.commons.exception.TxcLogicException;
7+
import com.codingapi.tx.logger.TxLogger;
98
import lombok.extern.slf4j.Slf4j;
109
import org.springframework.beans.factory.annotation.Autowired;
1110
import org.springframework.stereotype.Component;
1211

13-
import java.util.Objects;
14-
1512
/**
1613
* Description:
1714
* Date: 2018/12/13
@@ -24,33 +21,26 @@ public class TxcTransactionCleanService implements TransactionCleanService {
2421

2522
private final TxcService txcService;
2623

24+
private final TxLogger txLogger;
25+
2726
@Autowired
28-
public TxcTransactionCleanService(TxcService txcService) {
27+
public TxcTransactionCleanService(TxcService txcService, TxLogger txLogger) {
2928
this.txcService = txcService;
29+
this.txLogger = txLogger;
3030
}
3131

3232
@Override
3333
public void clear(String groupId, int state, String unitId, String unitType) throws TransactionClearException {
3434
try {
35-
try {
36-
// 若需要回滚读undo_log,进行回滚
37-
if (state != 1 && state != -1) {
38-
txcService.undo(groupId, unitId);
39-
}
40-
} catch (Exception e) {
41-
if (Objects.nonNull(DTXLocal.cur())) {
42-
RollbackInfo rollbackInfo = (RollbackInfo) DTXLocal.cur().getAttachment();
43-
if (Objects.nonNull(rollbackInfo)) {
44-
txcService.undoByRollbackInfo(rollbackInfo);
45-
}
46-
}
35+
// 若需要回滚读undo_log,进行回滚
36+
if (state != 1 && state != -1) {
37+
txcService.undo(groupId, unitId);
4738
}
4839

4940
// 清理TXC
5041
txcService.cleanTxc(groupId, unitId);
5142
} catch (TxcLogicException e) {
52-
log.error("txc > clean transaction error. {}", e.getMessage());
53-
throw new TransactionClearException(e.getMessage());
43+
throw new TransactionClearException(e);
5444
}
5545
}
5646
}

tx-client/src/main/java/com/codingapi/tx/client/spi/transaction/txc/resource/TableStructAnalyser.java

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,18 @@
2121
@Component
2222
public class TableStructAnalyser {
2323

24-
@Autowired
25-
private DataSource dataSource;
24+
private final DataSource dataSource;
2625

27-
public TableStructAnalyser() {
26+
@Autowired
27+
public TableStructAnalyser(DataSource dataSource) {
28+
this.dataSource = dataSource;
2829
}
2930

30-
public TableStruct analyse(String table) {
31-
Connection connection = null;
31+
public TableStruct analyse(Connection connection, String table) throws SQLException {
3232
ResultSet structRs = null;
3333
ResultSet columnSet = null;
3434
TableStruct tableStruct = new TableStruct(table);
3535
try {
36-
DTXLocal.makeUnProxy();
37-
connection = dataSource.getConnection();
38-
connection.setAutoCommit(true);
3936
structRs = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), null, table);
4037
columnSet = connection.getMetaData().getColumns(null, "%", table, "%");
4138
while (structRs.next()) {
@@ -45,46 +42,61 @@ public TableStruct analyse(String table) {
4542
tableStruct.getColumns().put(columnSet.getString("COLUMN_NAME"), columnSet.getString("TYPE_NAME"));
4643
}
4744
} catch (SQLException e) {
48-
e.printStackTrace();
49-
} finally {
5045
try {
5146
DbUtils.close(structRs);
5247
DbUtils.close(columnSet);
53-
DbUtils.close(connection);
5448
} catch (SQLException ignored) {
5549
}
56-
DTXLocal.undoProxyStatus();
50+
throw e;
5751
}
5852
return tableStruct;
5953
}
6054

55+
public TableStruct analyse(String table) throws SQLException {
56+
Connection connection = null;
57+
try {
58+
DTXLocal.makeUnProxy();
59+
connection = dataSource.getConnection();
60+
connection.setAutoCommit(true);
61+
return analyse(connection, table);
62+
} finally {
63+
DTXLocal.undoProxyStatus();
64+
DbUtils.close(connection);
65+
}
66+
}
67+
68+
69+
public boolean existsTable(Connection connection, String table) throws SQLException {
70+
ResultSet resultSet = null;
71+
try {
72+
resultSet = connection.getMetaData().getTables(null, null, table, null);
73+
if (resultSet.next()) {
74+
return true;
75+
}
76+
} catch (SQLException e) {
77+
throw e;
78+
} finally {
79+
DbUtils.close(resultSet);
80+
}
81+
return false;
82+
}
83+
6184
/**
6285
* 存在数据表判断
6386
*
6487
* @param tableName
6588
* @return
6689
*/
67-
public boolean existsTable(String tableName) {
90+
public boolean existsTable(String tableName) throws SQLException {
6891
Connection connection = null;
69-
ResultSet resultSet = null;
7092
try {
7193
DTXLocal.makeUnProxy();
7294
connection = dataSource.getConnection();
7395
connection.setAutoCommit(true);
74-
resultSet = connection.getMetaData().getTables(null, null, tableName, null);
75-
if (resultSet.next()) {
76-
return true;
77-
}
78-
} catch (SQLException e) {
79-
e.printStackTrace();
96+
return existsTable(connection, tableName);
8097
} finally {
81-
try {
82-
DbUtils.close(resultSet);
83-
DbUtils.close(connection);
84-
} catch (SQLException ignored) {
85-
}
98+
DbUtils.close(connection);
8699
DTXLocal.undoProxyStatus();
87100
}
88-
return false;
89101
}
90102
}

tx-client/src/main/java/com/codingapi/tx/client/spi/transaction/txc/resource/TxcJdbcEventListener.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
import com.codingapi.tx.client.bean.DTXLocal;
44
import com.codingapi.tx.client.spi.transaction.txc.resource.def.SqlExecuteInterceptor;
55
import com.codingapi.tx.client.spi.transaction.txc.resource.def.bean.LockableSelect;
6-
import com.codingapi.tx.client.spi.transaction.txc.resource.init.TxcSettingFactory;
7-
import com.codingapi.tx.client.spi.transaction.txc.resource.util.SqlUtils;
86
import com.codingapi.tx.jdbcproxy.p6spy.common.PreparedStatementInformation;
97
import com.codingapi.tx.jdbcproxy.p6spy.common.StatementInformation;
108
import com.codingapi.tx.jdbcproxy.p6spy.event.SimpleJdbcEventListener;
@@ -33,12 +31,10 @@ public class TxcJdbcEventListener extends SimpleJdbcEventListener {
3331

3432
private final SqlExecuteInterceptor sqlExecuteInterceptor;
3533

36-
private final TxcSettingFactory txcSettingFactory;
3734

3835
@Autowired
39-
public TxcJdbcEventListener(SqlExecuteInterceptor sqlExecuteInterceptor, TxcSettingFactory txcSettingFactory) {
36+
public TxcJdbcEventListener(SqlExecuteInterceptor sqlExecuteInterceptor) {
4037
this.sqlExecuteInterceptor = sqlExecuteInterceptor;
41-
this.txcSettingFactory = txcSettingFactory;
4238
}
4339

4440
@Override
@@ -70,7 +66,7 @@ public void onBeforeAnyExecute(StatementInformation statementInformation) throws
7066
}
7167
log.debug("used time: {} ms", System.currentTimeMillis() - startTime);
7268
} catch (JSQLParserException e) {
73-
e.printStackTrace();
69+
throw new SQLException(e);
7470
}
7571
}
7672

tx-client/src/main/java/com/codingapi/tx/client/spi/transaction/txc/resource/TxcServiceImpl.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package com.codingapi.tx.client.spi.transaction.txc.resource;
22

3+
import ch.qos.logback.core.db.dialect.DBUtil;
34
import com.codingapi.tx.client.bean.DTXLocal;
45
import com.codingapi.tx.client.spi.transaction.txc.resource.def.TxcService;
56
import com.codingapi.tx.client.spi.transaction.txc.resource.def.TxcSqlExecutor;
67
import com.codingapi.tx.client.spi.transaction.txc.resource.def.bean.*;
8+
import com.codingapi.tx.client.spi.transaction.txc.resource.init.TxcExceptionConnectionPool;
79
import com.codingapi.tx.client.spi.transaction.txc.resource.util.SqlUtils;
810
import com.codingapi.tx.commons.exception.TxcLogicException;
11+
import com.codingapi.tx.commons.util.Transactions;
12+
import com.codingapi.tx.logger.TxLogger;
913
import lombok.extern.slf4j.Slf4j;
14+
import org.apache.commons.dbutils.DbUtils;
1015
import org.springframework.beans.factory.annotation.Autowired;
1116
import org.springframework.stereotype.Service;
1217
import org.springframework.util.DigestUtils;
@@ -16,6 +21,7 @@
1621
import java.sql.SQLException;
1722
import java.util.List;
1823
import java.util.Map;
24+
import java.util.Objects;
1925

2026
/**
2127
* Description:
@@ -27,8 +33,18 @@
2733
@Slf4j
2834
public class TxcServiceImpl implements TxcService {
2935

36+
private final TxcSqlExecutor txcSqlExecutor;
37+
38+
private final TxcExceptionConnectionPool txcExceptionConnectionPool;
39+
40+
private final TxLogger txLogger;
41+
3042
@Autowired
31-
private TxcSqlExecutor txcSqlExecutor;
43+
public TxcServiceImpl(TxcSqlExecutor txcSqlExecutor, TxcExceptionConnectionPool txcExceptionConnectionPool, TxLogger txLogger) {
44+
this.txcSqlExecutor = txcSqlExecutor;
45+
this.txcExceptionConnectionPool = txcExceptionConnectionPool;
46+
this.txLogger = txLogger;
47+
}
3248

3349
@Override
3450
public void lockResource(LockInfo lockInfo, RollbackInfo rollbackInfo) throws TxcLogicException {
@@ -220,18 +236,27 @@ public void undo(String groupId, String unitId) throws TxcLogicException {
220236
DTXLocal.makeUnProxy();
221237
txcSqlExecutor.applyUndoLog(groupId, unitId);
222238
} catch (SQLException e) {
223-
throw new TxcLogicException(e);
239+
// 撤销失败 txcExceptionConnectionPool 作撤销
240+
if (Objects.nonNull(DTXLocal.cur())) {
241+
RollbackInfo rollbackInfo = (RollbackInfo) DTXLocal.cur().getAttachment();
242+
if (Objects.nonNull(rollbackInfo)) {
243+
txLogger.trace(groupId, unitId, Transactions.TAG_TRANSACTION, "rollback by txEx pool.");
244+
Connection connection = null;
245+
try {
246+
connection = txcExceptionConnectionPool.getConnection();
247+
txcSqlExecutor.undoRollbackInfoSql(connection, rollbackInfo);
248+
} catch (SQLException e1) {
249+
throw new TxcLogicException(e1);
250+
} finally {
251+
try {
252+
DbUtils.close(connection);
253+
} catch (SQLException ignored) {
254+
}
255+
}
256+
}
257+
}
224258
} finally {
225259
DTXLocal.undoProxyStatus();
226260
}
227261
}
228-
229-
@Override
230-
public void undoByRollbackInfo(RollbackInfo rollbackInfo) throws TxcLogicException {
231-
try {
232-
txcSqlExecutor.undoRollbackInfoSql(rollbackInfo);
233-
} catch (SQLException e) {
234-
throw new TxcLogicException(e);
235-
}
236-
}
237262
}

tx-client/src/main/java/com/codingapi/tx/client/spi/transaction/txc/resource/TxcSqlExecuteInterceptor.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.springframework.beans.factory.annotation.Autowired;
2222
import org.springframework.stereotype.Component;
2323

24+
import java.sql.Connection;
2425
import java.sql.ResultSet;
2526
import java.sql.SQLException;
2627
import java.util.ArrayList;
@@ -55,6 +56,7 @@ public void preUpdate(Update update) throws SQLException {
5556
String groupId = DTXLocal.cur().getGroupId();
5657
String unitId = DTXLocal.cur().getUnitId();
5758
RollbackInfo rollbackInfo = (RollbackInfo) DTXLocal.cur().getAttachment();
59+
Connection connection = (Connection) DTXLocal.cur().getResource();
5860

5961

6062
// Update相关数据准备
@@ -67,7 +69,7 @@ public void preUpdate(Update update) throws SQLException {
6769
});
6870
for (Table table : update.getTables()) {
6971
tables.add(table.getName());
70-
TableStruct tableStruct = tableStructAnalyser.analyse(table.getName());
72+
TableStruct tableStruct = tableStructAnalyser.analyse(connection, table.getName());
7173
tableStruct.getPrimaryKeys().forEach(key -> primaryKeys.add(table.getName() + "." + key));
7274
}
7375

@@ -94,22 +96,23 @@ public void preDelete(Delete delete) throws SQLException {
9496
RollbackInfo rollbackInfo = (RollbackInfo) DTXLocal.cur().getAttachment();
9597
String groupId = DTXLocal.cur().getGroupId();
9698
String unitId = DTXLocal.cur().getUnitId();
99+
Connection connection = (Connection) DTXLocal.cur().getResource();
97100

98101
// Delete Sql 数据
99102
List<String> tables = new ArrayList<>(delete.getTables().size());
100103
List<String> primaryKeys = new ArrayList<>(3);
101104
List<String> columns = new ArrayList<>();
102105

103-
delete.getTables().forEach(table -> {
104-
TableStruct tableStruct = tableStructAnalyser.analyse(table.getName());
106+
for (Table table : delete.getTables()) {
107+
TableStruct tableStruct = tableStructAnalyser.analyse(connection, table.getName());
105108
tableStruct.getColumns().forEach((k, v) -> {
106109
columns.add(tableStruct.getTableName() + SqlUtils.DOT + k);
107110
});
108111
tableStruct.getPrimaryKeys().forEach(primaryKey -> {
109112
primaryKeys.add(tableStruct.getTableName() + SqlUtils.DOT + primaryKey);
110113
});
111114
tables.add(tableStruct.getTableName());
112-
});
115+
}
113116

114117
// 前置准备
115118
try {
@@ -133,8 +136,9 @@ public void preInsert(Insert insert) {
133136

134137
@Override
135138
public void postInsert(StatementInformation statementInformation) throws SQLException {
139+
Connection connection = (Connection) DTXLocal.cur().getResource();
136140
Insert insert = (Insert) statementInformation.getAttachment();
137-
TableStruct tableStruct = tableStructAnalyser.analyse(insert.getTable().getName());
141+
TableStruct tableStruct = tableStructAnalyser.analyse(connection, insert.getTable().getName());
138142

139143
// 解决主键
140144
PrimaryKeyListVisitor primaryKeyListVisitor = new PrimaryKeyListVisitor(insert.getTable(),
@@ -201,8 +205,9 @@ public void preSelect(LockableSelect lockableSelect) throws SQLException {
201205
List<String> primaryKeys = new ArrayList<>();
202206
Table leftTable = (Table) plainSelect.getFromItem();
203207
List<SelectItem> selectItems = new ArrayList<>();
208+
Connection connection = (Connection) DTXLocal.cur().getResource();
204209

205-
TableStruct leftTableStruct = tableStructAnalyser.analyse(leftTable.getName());
210+
TableStruct leftTableStruct = tableStructAnalyser.analyse(connection, leftTable.getName());
206211
leftTableStruct.getPrimaryKeys().forEach(primaryKey -> {
207212
Column column = new Column(leftTable, primaryKey);
208213
selectItems.add(new SelectExpressionItem(column));
@@ -212,7 +217,7 @@ public void preSelect(LockableSelect lockableSelect) throws SQLException {
212217
if (plainSelect.getJoins() != null) {
213218
for (Join join : plainSelect.getJoins()) {
214219
if (join.isSimple()) {
215-
TableStruct rightTableStruct = tableStructAnalyser.analyse(join.getRightItem().toString());
220+
TableStruct rightTableStruct = tableStructAnalyser.analyse(connection, join.getRightItem().toString());
216221
rightTableStruct.getPrimaryKeys().forEach(primaryKey -> {
217222
Column column = new Column((Table) join.getRightItem(), primaryKey);
218223
selectItems.add(new SelectExpressionItem(column));

0 commit comments

Comments
 (0)