Skip to content

Commit dbb5d06

Browse files
author
Zhang Wenhao
committed
<fix>[core]: improve SimpleFlowChain
* Added named constructors, factory methods, `then/error/done` overloads, and asynchronous backup collection to SimpleFlowChain; * Added the FlowBuilder stream builder and the `Flow.of` factory method to Flow, allowing for the configuration of `skip/run/rollback` behavior and the generation of anonymous Flow implementations. Related: ZSV-5936 Change-Id: I776b7374716365687279697063726f7a7167736f
1 parent e03cbcb commit dbb5d06

2 files changed

Lines changed: 160 additions & 0 deletions

File tree

core/src/main/java/org/zstack/core/workflow/SimpleFlowChain.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.zstack.core.CoreGlobalProperty;
99
import org.zstack.core.Platform;
1010
import org.zstack.core.errorcode.ErrorFacade;
11+
import org.zstack.header.core.AsyncBackup;
1112
import org.zstack.header.core.progress.ProgressFlowChainProcessorFactory;
1213
import org.zstack.header.core.workflow.*;
1314
import org.zstack.header.errorcode.ErrorCode;
@@ -23,6 +24,7 @@
2324
import java.util.*;
2425
import java.util.Map.Entry;
2526
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.function.Consumer;
2628
import java.util.function.Function;
2729

2830
import static org.zstack.core.Platform.inerr;
@@ -62,6 +64,7 @@ public class SimpleFlowChain implements FlowTrigger, FlowRollback, FlowChain, Fl
6264
private List<List<Runnable>> afterDone = new ArrayList<>();
6365
private List<List<Runnable>> afterError = new ArrayList<>();
6466
private List<List<Runnable>> afterFinal = new ArrayList<>();
67+
private List<AsyncBackup> asyncBackups = new ArrayList<>();
6568

6669
@Autowired(required = false)
6770
ProgressFlowChainProcessorFactory progressFactory;
@@ -131,11 +134,20 @@ public SimpleFlowChain() {
131134
id = "FCID_" + Platform.getUuid().substring(0, 8);
132135
}
133136

137+
public SimpleFlowChain(String chainName) {
138+
this();
139+
this.name = chainName;
140+
}
141+
134142
public SimpleFlowChain(Map<String, Object> data) {
135143
id = "FCID_" + Platform.getUuid().substring(0, 8);
136144
this.data.putAll(data);
137145
}
138146

147+
public static SimpleFlowChain of(String chainName) {
148+
return new SimpleFlowChain(chainName);
149+
}
150+
139151
@Override
140152
public List<Flow> getFlows() {
141153
return flows;
@@ -228,6 +240,12 @@ public SimpleFlowChain then(Flow flow) {
228240
return this;
229241
}
230242

243+
public SimpleFlowChain then(String flowName, Consumer<FlowTrigger> consumer) {
244+
return then(Flow.of(flowName)
245+
.handle(consumer)
246+
.build());
247+
}
248+
231249
public SimpleFlowChain ctxHandler(FlowContextHandler handler) {
232250
DebugUtils.Assert(contextHandler==null, "there has been an FlowContextHandler installed");
233251
contextHandler = handler;
@@ -240,6 +258,21 @@ public SimpleFlowChain error(FlowErrorHandler handler) {
240258
return this;
241259
}
242260

261+
@SuppressWarnings("rawtypes")
262+
public SimpleFlowChain error(Consumer<ErrorCode> handler) {
263+
AsyncBackup firstAsyncBackup = this.asyncBackups.isEmpty() ? null : this.asyncBackups.get(0);
264+
AsyncBackup[] otherAsyncBackups = this.asyncBackups.isEmpty() ? new AsyncBackup[0] :
265+
this.asyncBackups.subList(1, this.asyncBackups.size()).toArray(new AsyncBackup[0]);
266+
267+
DebugUtils.Assert(handler != null, "handler of errorHandler should not be null");
268+
return error(new FlowErrorHandler(firstAsyncBackup, otherAsyncBackups) {
269+
@Override
270+
public void handle(ErrorCode errCode, Map data) {
271+
handler.accept(errCode);
272+
}
273+
});
274+
}
275+
243276
@Override
244277
public FlowChain Finally(FlowFinallyHandler handler) {
245278
finallyHandler = handler;
@@ -285,13 +318,39 @@ public Map getData() {
285318
return this.data;
286319
}
287320

321+
public SimpleFlowChain propagateExceptionTo(AsyncBackup... backups) {
322+
DebugUtils.Assert(backups != null, "backups in methods propagateExceptionTo() must be not null");
323+
DebugUtils.Assert(Arrays.stream(backups).noneMatch(Objects::isNull),
324+
"backups in propagateExceptionTo() should not contain null elements");
325+
DebugUtils.Assert(doneHandler == null, "propagateExceptionTo() must be called before SimpleFlowChain.done()");
326+
DebugUtils.Assert(errorHandler == null, "propagateExceptionTo() must be called before SimpleFlowChain.error()");
327+
DebugUtils.Assert(finallyHandler == null, "propagateExceptionTo() must be called before SimpleFlowChain.Finally()");
328+
this.asyncBackups.addAll(Arrays.asList(backups));
329+
return this;
330+
}
331+
288332
@Override
289333
public SimpleFlowChain done(FlowDoneHandler handler) {
290334
DebugUtils.Assert(doneHandler==null, "there has been a FlowDoneHandler installed");
291335
doneHandler = handler;
292336
return this;
293337
}
294338

339+
@SuppressWarnings("rawtypes")
340+
public SimpleFlowChain done(Runnable runnable) {
341+
AsyncBackup firstAsyncBackup = this.asyncBackups.isEmpty() ? null : this.asyncBackups.get(0);
342+
AsyncBackup[] otherAsyncBackups = this.asyncBackups.isEmpty() ? new AsyncBackup[0] :
343+
this.asyncBackups.subList(1, this.asyncBackups.size()).toArray(new AsyncBackup[0]);
344+
345+
DebugUtils.Assert(runnable != null, "runnable of doneHandler should not be null");
346+
return done(new FlowDoneHandler(firstAsyncBackup, otherAsyncBackups) {
347+
@Override
348+
public void handle(Map data) {
349+
runnable.run();
350+
}
351+
});
352+
}
353+
295354
private void collectAfterRunnable(Flow flow) {
296355
List<Field> ad = FieldUtils.getAnnotatedFieldsOnThisClass(AfterDone.class, flow.getClass());
297356
for (Field f : ad) {

header/src/main/java/org/zstack/header/core/workflow/Flow.java

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package org.zstack.header.core.workflow;
22

3+
import org.zstack.utils.DebugUtils;
34
import org.zstack.utils.FieldUtils;
45

56
import java.util.Map;
7+
import java.util.function.BiConsumer;
8+
import java.util.function.Consumer;
9+
import java.util.function.Predicate;
610

711
public interface Flow {
812
void run(FlowTrigger trigger, Map data);
@@ -23,4 +27,101 @@ default String name() {
2327
}
2428
return String.format("%s", this.getClass().getSimpleName());
2529
}
30+
31+
@SuppressWarnings("rawtypes")
32+
public static class FlowBuilder {
33+
public final String flowName;
34+
private Predicate<Map> skipPredicate;
35+
private BiConsumer<FlowTrigger, Map> triggerConsumer;
36+
private BiConsumer<FlowRollback, Map> rollbackConsumer;
37+
38+
private FlowBuilder(String flowName) {
39+
DebugUtils.Assert(flowName != null, "flowName should not be null");
40+
this.flowName = flowName;
41+
}
42+
43+
public FlowBuilder withSkipPredicate(Predicate<Map> predicate) {
44+
DebugUtils.Assert(predicate != null, "skipPredicate of FlowBuilder should not be null");
45+
this.skipPredicate = predicate;
46+
return this;
47+
}
48+
49+
public FlowBuilder skipIf(Predicate<Map> predicate) {
50+
return withSkipPredicate(predicate);
51+
}
52+
53+
public FlowBuilder runIf(Predicate<Map> predicate) {
54+
DebugUtils.Assert(predicate != null, "predicate of FlowBuilder.runIf() should not be null");
55+
return withSkipPredicate(predicate.negate());
56+
}
57+
58+
public FlowBuilder handle(BiConsumer<FlowTrigger, Map> consumer) {
59+
DebugUtils.Assert(consumer != null, "consumer of FlowBuilder.handle() should not be null");
60+
this.triggerConsumer = consumer;
61+
return this;
62+
}
63+
64+
public FlowBuilder handle(Consumer<FlowTrigger> consumer) {
65+
DebugUtils.Assert(consumer != null, "consumer of FlowBuilder.handle() should not be null");
66+
this.triggerConsumer = (trigger, data) -> consumer.accept(trigger);
67+
return this;
68+
}
69+
70+
public FlowBuilder rollback(BiConsumer<FlowRollback, Map> consumer) {
71+
DebugUtils.Assert(consumer != null, "consumer of FlowBuilder.rollback() should not be null");
72+
this.rollbackConsumer = consumer;
73+
return this;
74+
}
75+
76+
public FlowBuilder rollback(Consumer<FlowRollback> consumer) {
77+
DebugUtils.Assert(consumer != null, "consumer of FlowBuilder.rollback() should not be null");
78+
this.rollbackConsumer = (trigger, data) -> consumer.accept(trigger);
79+
return this;
80+
}
81+
82+
public Flow build() {
83+
DebugUtils.Assert(triggerConsumer != null, "handle() must be called before build()");
84+
Predicate<Map> skipPredicateSnapshot = skipPredicate;
85+
BiConsumer<FlowTrigger, Map> triggerConsumerSnapshot = triggerConsumer;
86+
BiConsumer<FlowRollback, Map> rollbackConsumerSnapshot = rollbackConsumer;
87+
88+
return new Flow() {
89+
@Override
90+
public boolean skip(Map data) {
91+
if (skipPredicateSnapshot == null) {
92+
return false;
93+
}
94+
return skipPredicateSnapshot.test(data);
95+
}
96+
97+
@Override
98+
public void run(FlowTrigger trigger, Map data) {
99+
triggerConsumerSnapshot.accept(trigger, data);
100+
}
101+
102+
@Override
103+
public void rollback(FlowRollback trigger, Map data) {
104+
if (rollbackConsumerSnapshot == null) {
105+
trigger.rollback();
106+
} else {
107+
rollbackConsumerSnapshot.accept(trigger, data);
108+
}
109+
}
110+
111+
@Override
112+
public String name() {
113+
return flowName;
114+
}
115+
116+
@Override
117+
public String toString() {
118+
return name();
119+
}
120+
};
121+
}
122+
}
123+
124+
public static FlowBuilder of(String flowName) {
125+
return new FlowBuilder(flowName);
126+
}
26127
}

0 commit comments

Comments
 (0)