Skip to content

Commit a43316f

Browse files
committed
删除notify格式的redis数据,保存事务源数据的一致性.减少对redis的操作
1 parent c50738f commit a43316f

12 files changed

Lines changed: 104 additions & 166 deletions

File tree

tx-client/src/main/java/com/codingapi/tx/datasource/service/impl/DataSourceServiceImpl.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package com.codingapi.tx.datasource.service.impl;
22

3-
import com.lorne.core.framework.utils.task.Task;
43
import com.codingapi.tx.datasource.service.DataSourceService;
54
import com.codingapi.tx.netty.service.MQTxManagerService;
5+
import com.lorne.core.framework.utils.task.Task;
66
import org.springframework.beans.factory.annotation.Autowired;
77
import org.springframework.stereotype.Service;
88

@@ -19,26 +19,22 @@ public class DataSourceServiceImpl implements DataSourceService {
1919

2020
@Override
2121
public void schedule(String groupId, Task waitTask) {
22+
23+
2224
String waitTaskId = waitTask.getKey();
23-
int rs = txManagerService.checkTransactionInfo(groupId, waitTaskId);
25+
int rs = txManagerService.cleanNotifyTransaction(groupId, waitTaskId);
2426
if (rs == 1 || rs == 0) {
2527
waitTask.setState(rs);
2628
waitTask.signalTask();
27-
//clear
28-
txManagerService.clearTransaction(groupId,waitTaskId,true);
2929

3030
return;
3131
}
32-
if(rs==-2) {
33-
rs = txManagerService.getTransaction(groupId, waitTaskId);
34-
if (rs == 1 || rs == 0) {
35-
waitTask.setState(rs);
36-
waitTask.signalTask();
37-
//clear
38-
39-
txManagerService.clearTransaction(groupId, waitTaskId, false);
40-
return;
41-
}
32+
rs = txManagerService.cleanNotifyTransactionHttp(groupId, waitTaskId);
33+
if (rs == 1 || rs == 0) {
34+
waitTask.setState(rs);
35+
waitTask.signalTask();
36+
37+
return;
4238
}
4339

4440
//添加到补偿队列

tx-client/src/main/java/com/codingapi/tx/netty/service/MQTxManagerService.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,26 +50,25 @@ public interface MQTxManagerService {
5050
* @param taskId 任务id
5151
* @return 事务状态
5252
*/
53-
int checkTransactionInfo(String groupId, String taskId);
53+
int cleanNotifyTransaction(String groupId, String taskId);
5454

5555

56-
/**
57-
* 检查事务状态 通过http请求
58-
* @param groupId 事务组id
59-
* @param waitTaskId 任务id
60-
* @return 事务状态
61-
*/
62-
int getTransaction(String groupId, String waitTaskId);
56+
// /**
57+
// * 检查事务状态 通过http请求
58+
// * @param groupId 事务组id
59+
// * @param waitTaskId 任务id
60+
// * @return 事务状态
61+
// */
62+
// int getTransaction(String groupId, String waitTaskId);
6363

6464

6565
/**
6666
* 检查并清理事务数据
6767
* @param groupId 事务组id
6868
* @param waitTaskId 任务id
69-
* @param isGroup 是否合并事务
7069
* @return 事务状态
7170
*/
72-
int clearTransaction(String groupId, String waitTaskId, boolean isGroup);
71+
int cleanNotifyTransactionHttp(String groupId, String waitTaskId);
7372

7473
/**
7574
* 记录补偿事务数据到tm

tx-client/src/main/java/com/codingapi/tx/netty/service/impl/MQTxManagerServiceImpl.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void uploadModelInfo() {
8888
}
8989

9090
@Override
91-
public int checkTransactionInfo(String groupId, String taskId) {
91+
public int cleanNotifyTransaction(String groupId, String taskId) {
9292
JSONObject jsonObject = new JSONObject();
9393
jsonObject.put("g", groupId);
9494
jsonObject.put("t", taskId);
@@ -101,26 +101,26 @@ public int checkTransactionInfo(String groupId, String taskId) {
101101
}
102102
}
103103

104-
105-
@Override
106-
public int getTransaction(String groupId, String waitTaskId) {
107-
108-
String json = managerHelper.httpGet(configReader.getTxUrl() + "getTransaction?groupId=" + groupId + "&taskId=" + waitTaskId);
109-
if (json == null) {
110-
return -2;
111-
}
112-
json = json.trim();
113-
try {
114-
return Integer.parseInt(json);
115-
}catch (Exception e){
116-
return -2;
117-
}
118-
}
104+
//
105+
// @Override
106+
// public int getTransaction(String groupId, String waitTaskId) {
107+
//
108+
// String json = managerHelper.httpGet(configReader.getTxUrl() + "getTransaction?groupId=" + groupId + "&taskId=" + waitTaskId);
109+
// if (json == null) {
110+
// return -2;
111+
// }
112+
// json = json.trim();
113+
// try {
114+
// return Integer.parseInt(json);
115+
// }catch (Exception e){
116+
// return -2;
117+
// }
118+
// }
119119

120120

121121
@Override
122-
public int clearTransaction(String groupId, String waitTaskId, boolean isGroup) {
123-
String url = configReader.getTxUrl() + "clearTransaction?groupId=" + groupId + "&taskId=" + waitTaskId + "&isGroup=" + (isGroup ? 1 : 0);
122+
public int cleanNotifyTransactionHttp(String groupId, String waitTaskId) {
123+
String url = configReader.getTxUrl() + "cleanNotifyTransactionHttp?groupId=" + groupId + "&taskId=" + waitTaskId;
124124
String clearRes = managerHelper.httpGet(url);
125125
if(clearRes==null){
126126
return -1;

tx-manager/src/main/java/com/codingapi/tm/api/controller/TxManagerController.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,12 @@ public TxServer getServer() {
2525
}
2626

2727

28-
@RequestMapping("/clearTransaction")
29-
public boolean clearTransaction(@RequestParam("groupId") String groupId,@RequestParam("taskId") String taskId,@RequestParam("isGroup") int isGroup) {
30-
return apiTxManagerService.clearTransaction(groupId,taskId,isGroup);
28+
@RequestMapping("/cleanNotifyTransaction")
29+
public int cleanNotifyTransaction(@RequestParam("groupId") String groupId,@RequestParam("taskId") String taskId) {
30+
return apiTxManagerService.cleanNotifyTransaction(groupId,taskId);
3131
}
3232

3333

34-
@RequestMapping("/getTransaction")
35-
public int getTransaction(@RequestParam("groupId") String groupId,@RequestParam("taskId") String taskId) {
36-
return apiTxManagerService.getTransaction(groupId,taskId);
37-
}
38-
3934
@RequestMapping("/sendMsg")
4035
public String sendMsg(@RequestParam("msg") String msg,@RequestParam("model") String model) {
4136
return apiTxManagerService.sendMsg(model,msg);

tx-manager/src/main/java/com/codingapi/tm/api/service/ApiTxManagerService.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,7 @@ public interface ApiTxManagerService {
3232
* @param isGroup 是否合并事务
3333
* @return 事务状态
3434
*/
35-
boolean clearTransaction(String groupId, String taskId, int isGroup);
36-
37-
/**
38-
* 检查事务数据
39-
* @param groupId 事务组Id
40-
* @param taskId 任务Id
41-
* @return 事务状态
42-
*/
43-
int getTransaction(String groupId, String taskId);
35+
int cleanNotifyTransaction(String groupId, String taskId);
4436

4537

4638
/**

tx-manager/src/main/java/com/codingapi/tm/api/service/impl/ApiTxManagerServiceImpl.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import com.codingapi.tm.manager.service.TxManagerService;
1111
import com.codingapi.tm.model.TxServer;
1212
import com.codingapi.tm.model.TxState;
13-
import com.codingapi.tm.redis.service.RedisServerService;
1413
import org.springframework.beans.factory.annotation.Autowired;
1514
import org.springframework.stereotype.Service;
1615

@@ -45,14 +44,10 @@ public TxServer getServer() {
4544

4645

4746
@Override
48-
public boolean clearTransaction(String groupId, String taskId, int isGroup) {
49-
return managerService.clearTransaction(groupId,taskId,isGroup);
47+
public int cleanNotifyTransaction(String groupId, String taskId) {
48+
return managerService.cleanNotifyTransaction(groupId,taskId);
5049
}
5150

52-
@Override
53-
public int getTransaction(String groupId, String taskId) {
54-
return managerService.getTransaction(groupId, taskId);
55-
}
5651

5752
@Override
5853
public boolean sendCompensateMsg(long currentTime, String groupId, String model, String address, String uniqueKey, String className, String methodStr, String data, int time) {

tx-manager/src/main/java/com/codingapi/tm/compensate/service/impl/CompensateServiceImpl.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,28 @@ public boolean saveCompensateMsg(TransactionCompensateMsg transactionCompensateM
5757
String key = configReader.getKeyPrefix() + transactionCompensateMsg.getGroupId();
5858
TxGroup txGroup = redisServerService.getTxGroupByKey(key);
5959
if (txGroup == null) {
60-
key = configReader.getKeyPrefixNotify() + transactionCompensateMsg.getGroupId();
61-
txGroup = redisServerService.getTxGroupByKey(key);
60+
// key = configReader.getKeyPrefixNotify() + transactionCompensateMsg.getGroupId();
61+
// txGroup = redisServerService.getTxGroupByKey(key);
62+
//todo 待完善
6263
}
6364
if(txGroup!=null) {
6465
redisServerService.deleteKey(key);
6566

67+
//已经全部通知的模块不做补偿处理
68+
boolean hasNoNotify = false;
69+
for(TxInfo txInfo:txGroup.getList()){
70+
if(txInfo.getNotify()==0){
71+
hasNoNotify = true;
72+
}
73+
}
74+
75+
if(!hasNoNotify){
76+
//事务已经执行完毕的
77+
return true;
78+
}
79+
80+
81+
6682
transactionCompensateMsg.setTxGroup(txGroup);
6783

6884
final String json = JSON.toJSONString(transactionCompensateMsg);

tx-manager/src/main/java/com/codingapi/tm/config/ConfigReader.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ public class ConfigReader {
4343
*/
4444
private final String key_prefix = "tx_manager_default_";
4545

46-
/**
47-
* 事务存在问题,临时暂存的位置,用于等待网络超时和补偿请求之前的存储位置,有最大时间
48-
*/
49-
private final String key_prefix_notify = "tx_manager_notify_";
46+
// /**
47+
// * 事务存在问题,临时暂存的位置,用于等待网络超时和补偿请求之前的存储位置,有最大时间
48+
// */
49+
// private final String key_prefix_notify = "tx_manager_notify_";
5050

5151
/**
5252
* 补偿事务永久存储数据
@@ -62,9 +62,9 @@ public String getKeyPrefix() {
6262
return key_prefix;
6363
}
6464

65-
public String getKeyPrefixNotify() {
66-
return key_prefix_notify;
67-
}
65+
// public String getKeyPrefixNotify() {
66+
// return key_prefix_notify;
67+
// }
6868

6969
public String getKeyPrefixCompensate() {
7070
return key_prefix_compensate;

tx-manager/src/main/java/com/codingapi/tm/manager/service/TxManagerService.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,6 @@ public interface TxManagerService {
2626
TxGroup addTransactionGroup(String groupId, String taskId,int isGroup, String modelName, String methodStr);
2727

2828

29-
/**
30-
*
31-
* @param groupId
32-
* @param taskId
33-
* @return 1 存在 0不存在 -1 未结束
34-
*/
35-
int getTransaction(String groupId, String taskId);
36-
37-
3829
boolean closeTransactionGroup(String groupId,int state);
3930

4031

@@ -43,7 +34,13 @@ public interface TxManagerService {
4334
void deleteTxGroup(TxGroup txGroup);
4435

4536

46-
boolean clearTransaction(String groupId, String taskId, int isGroup);
37+
/**
38+
* 检查事务组数据
39+
* @param groupId 事务组id
40+
* @param taskId 任务id
41+
* @return 本次请求的是否提交 1提交 0回滚
42+
*/
43+
int cleanNotifyTransaction(String groupId, String taskId);
4744

4845

4946

0 commit comments

Comments
 (0)