@@ -57,52 +57,65 @@ public boolean saveCompensateMsg(TransactionCompensateMsg transactionCompensateM
5757 String key = configReader .getKeyPrefix () + transactionCompensateMsg .getGroupId ();
5858 TxGroup txGroup = redisServerService .getTxGroupByKey (key );
5959 if (txGroup == null ) {
60- // key = configReader.getKeyPrefixNotify() + transactionCompensateMsg.getGroupId();
61- // txGroup = redisServerService.getTxGroupByKey(key);
60+ return false ;
61+ }
62+
63+ redisServerService .deleteKey (key );
64+
65+ //已经全部通知的模块不做补偿处理
66+ boolean hasNoNotify = false ;
67+
68+ for (TxInfo txInfo :txGroup .getList ()){
69+ if (txInfo .getNotify ()==0 ){
70+ hasNoNotify = true ;
71+ }
6272 }
63- if (txGroup !=null ) {
64- redisServerService .deleteKey (key );
65-
66- transactionCompensateMsg .setTxGroup (txGroup );
67-
68- final String json = JSON .toJSONString (transactionCompensateMsg );
69-
70- logger .info ("Compensate->" + json );
71-
72- final String compensateKey = compensateDao .saveCompensateMsg (transactionCompensateMsg );
73-
74- //调整自动补偿机制,若开启了自动补偿,需要通知业务返回success,方可执行自动补偿
75- threadPool .execute (new Runnable () {
76- @ Override
77- public void run () {
78- try {
79- String groupId = transactionCompensateMsg .getGroupId ();
80- JSONObject requestJson = new JSONObject ();
81- requestJson .put ("action" , "compensate" );
82- requestJson .put ("groupId" , groupId );
83- requestJson .put ("json" , json );
84-
85- String url = configReader .getCompensateNotifyUrl ();
86- logger .error ("Compensate Callback Address->" + url );
87- String res = HttpUtils .postJson (url , requestJson .toJSONString ());
88- logger .error ("Compensate Callback Result->" + res );
89- if (configReader .isCompensateAuto ()) {
90- //自动补偿,是否自动执行补偿
91- if (res .contains ("success" )||res .contains ("SUCCESS" )) {
92- //自动补偿
93- autoCompensate (compensateKey , transactionCompensateMsg );
94- }
73+
74+ if (!hasNoNotify ){
75+ //事务已经执行完毕的
76+ logger .info ("TxGroup had notify ! " );
77+ return true ;
78+ }
79+
80+
81+ transactionCompensateMsg .setTxGroup (txGroup );
82+
83+ final String json = JSON .toJSONString (transactionCompensateMsg );
84+
85+ logger .info ("Compensate->" + json );
86+
87+ final String compensateKey = compensateDao .saveCompensateMsg (transactionCompensateMsg );
88+
89+ //调整自动补偿机制,若开启了自动补偿,需要通知业务返回success,方可执行自动补偿
90+ threadPool .execute (new Runnable () {
91+ @ Override
92+ public void run () {
93+ try {
94+ String groupId = transactionCompensateMsg .getGroupId ();
95+ JSONObject requestJson = new JSONObject ();
96+ requestJson .put ("action" , "compensate" );
97+ requestJson .put ("groupId" , groupId );
98+ requestJson .put ("json" , json );
99+
100+ String url = configReader .getCompensateNotifyUrl ();
101+ logger .error ("Compensate Callback Address->" + url );
102+ String res = HttpUtils .postJson (url , requestJson .toJSONString ());
103+ logger .error ("Compensate Callback Result->" + res );
104+ if (configReader .isCompensateAuto ()) {
105+ //自动补偿,是否自动执行补偿
106+ if (res .contains ("success" )||res .contains ("SUCCESS" )) {
107+ //自动补偿
108+ autoCompensate (compensateKey , transactionCompensateMsg );
95109 }
96- } catch (Exception e ) {
97- logger .error ("Compensate Callback Fails->" + e .getMessage ());
98110 }
111+ } catch (Exception e ) {
112+ logger .error ("Compensate Callback Fails->" + e .getMessage ());
99113 }
100- });
114+ }
115+ });
116+
117+ return StringUtils .isNotEmpty (compensateKey );
101118
102- return StringUtils .isNotEmpty (compensateKey );
103- }else {
104- return false ;
105- }
106119
107120 }
108121
0 commit comments