@@ -53,7 +53,6 @@ public class CompensateServiceImpl implements CompensateService {
5353 @ Override
5454 public boolean saveCompensateMsg (TransactionCompensateMsg transactionCompensateMsg ) {
5555
56-
5756 String key = configReader .getKeyPrefix () + transactionCompensateMsg .getGroupId ();
5857 TxGroup txGroup = redisServerService .getTxGroupByKey (key );
5958 if (txGroup == null ) {
@@ -68,25 +67,35 @@ public boolean saveCompensateMsg(TransactionCompensateMsg transactionCompensateM
6867
6968 logger .info ("补偿->" + json );
7069
71- new Thread (new Runnable () {
70+ final String compensateKey = compensateDao .saveCompensateMsg (transactionCompensateMsg );
71+
72+ //调整自动补偿机制,若开启了自动补偿,需要通知业务返回success,方可执行自动补偿
73+ threadPool .execute (new Runnable () {
7274 @ Override
7375 public void run () {
7476 try {
77+ String groupId = transactionCompensateMsg .getGroupId ();
78+ JSONObject requestJson = new JSONObject ();
79+ requestJson .put ("action" ,"compensate" );
80+ requestJson .put ("groupId" ,groupId );
81+ requestJson .put ("json" ,json );
82+
7583 String url = configReader .getCompensateNotifyUrl ();
7684 logger .error ("补偿回调地址->" + url );
77- String res = HttpUtils .postJson (url , json );
85+ String res = HttpUtils .postJson (url , requestJson . toJSONString () );
7886 logger .error ("补偿回调结果->" + res );
87+ if (configReader .isCompensateAuto ()) {
88+ //自动补偿,是否自动执行补偿
89+ if ("success" .equalsIgnoreCase (res )) {
90+ //自动补偿
91+ autoCompensate (compensateKey , transactionCompensateMsg );
92+ }
93+ }
7994 } catch (Exception e ) {
80- e .printStackTrace ();
8195 logger .error ("补偿回调失败->" + e .getMessage ());
8296 }
8397 }
84- }).start ();
85-
86- final String compensateKey = compensateDao .saveCompensateMsg (transactionCompensateMsg );
87-
88- //自动补偿
89- autoCompensate (compensateKey , transactionCompensateMsg );
98+ });
9099
91100 return StringUtils .isNotEmpty (compensateKey );
92101
@@ -95,35 +104,52 @@ public void run() {
95104
96105 public void autoCompensate (final String compensateKey , TransactionCompensateMsg transactionCompensateMsg ) {
97106 final String json = JSON .toJSONString (transactionCompensateMsg );
98- if (configReader .isCompensateAuto ()) {
99-
100- logger .info ("进入补偿->" + json );
101- //自动补偿指导补偿成功...
102- threadPool .execute (new Runnable () {
103- @ Override
104- public void run () {
105- try {
106- final int tryTime = configReader .getCompensateTryTime ();
107- boolean isOk = _executeCompensate (json );
108- logger .info ("自动补偿结果->" + isOk + ",json->" + json );
109- while (!isOk ) {
110- try {
111- Thread .sleep (tryTime * 1000 );
112- } catch (InterruptedException e ) {
113- e .printStackTrace ();
114- }
115- isOk = _executeCompensate (json );
116- logger .info ("try补偿(补偿失败,进入补偿队列)->" + isOk + ",json->" + json );
117- }
107+ logger .info ("自动补偿->" + json );
108+ //自动补偿业务执行...
109+ final int tryTime = configReader .getCompensateTryTime ();
110+ boolean autoExecuteRes = false ;
111+ try {
112+ int executeCount = 0 ;
113+ autoExecuteRes = _executeCompensate (json );
114+ logger .info ("自动补偿结果->" + autoExecuteRes + ",json->" + json );
115+ while (!autoExecuteRes ) {
116+ logger .info ("try补偿(补偿失败,进入补偿队列)->" + autoExecuteRes + ",json->" + json );
117+ executeCount ++;
118+ if (executeCount ==3 ){
119+ autoExecuteRes = false ;
120+ break ;
121+ }
122+ try {
123+ Thread .sleep (tryTime * 1000 );
124+ } catch (InterruptedException e ) {
125+ e .printStackTrace ();
126+ }
127+ autoExecuteRes = _executeCompensate (json );
128+ }
118129
119- compensateDao .deleteCompensateByKey (compensateKey );
130+ //执行成功删除数据
131+ if (autoExecuteRes ) {
132+ compensateDao .deleteCompensateByKey (compensateKey );
133+ }
120134
121- } catch (ServiceException e ) {
122- e .printStackTrace ();
123- }
124- }
125- });
135+ }catch (Exception e ){
136+ logger .error ("自动补偿失败,msg:" +e .getLocalizedMessage ());
137+ //推送数据给第三方通知
138+ autoExecuteRes = false ;
126139 }
140+
141+ //执行补偿以后通知给业务方
142+ String groupId = transactionCompensateMsg .getGroupId ();
143+ JSONObject requestJson = new JSONObject ();
144+ requestJson .put ("action" ,"notify" );
145+ requestJson .put ("groupId" ,groupId );
146+ requestJson .put ("resState" ,autoExecuteRes );
147+
148+ String url = configReader .getCompensateNotifyUrl ();
149+ logger .error ("补偿结果回调地址->" + url );
150+ String res = HttpUtils .postJson (url , requestJson .toJSONString ());
151+ logger .error ("补偿结果回调结果->" + res );
152+
127153 }
128154
129155
0 commit comments