2727import com .uber .cadence .workflow .Workflow ;
2828import com .uber .cadence .workflow .WorkflowMethod ;
2929import java .time .Duration ;
30+ import java .util .ArrayList ;
31+ import java .util .List ;
32+ import java .util .concurrent .ArrayBlockingQueue ;
33+ import java .util .concurrent .BlockingDeque ;
34+ import java .util .concurrent .BlockingQueue ;
35+ import java .util .concurrent .CompletableFuture ;
3036
3137/**
3238 * Demonstrates asynchronous signalling of a workflow. Requires a local instance of Cadence server
@@ -39,32 +45,53 @@ public class HelloSignal {
3945
4046 /** Workflow interface must have a method annotated with @WorkflowMethod. */
4147 public interface GreetingWorkflow {
42- /** @return greeting string */
48+ /** @return list of greeting strings that were received through the
49+ * waitForNameMethod. This method will block until the number of greetings
50+ * specified are received. */
4351 @ WorkflowMethod
44- String getGreeting ();
52+ List < String > getGreetings ();
4553
4654 /** Receives name through an external signal. */
4755 @ SignalMethod
4856 void waitForName (String name );
57+
58+ /** Receives name through an external signal. */
59+ @ SignalMethod
60+ void exit ();
4961 }
5062
5163 /** GreetingWorkflow implementation that returns a greeting. */
5264 public static class GreetingWorkflowImpl implements GreetingWorkflow {
5365
54- private final CompletablePromise <String > name = Workflow .newPromise ();
66+ List <String > messageQueue = new ArrayList <>(10 );
67+ boolean exit = false ;
5568
5669 @ Override
57- public String getGreeting () {
58- return "Hello " + name .get () + "!" ;
70+ public List <String > getGreetings () {
71+ List <String > receivedMessages = new ArrayList <>(10 );
72+
73+ while (true ) {
74+ Workflow .await (() -> !messageQueue .isEmpty () || exit );
75+ if (messageQueue .isEmpty () && exit ){
76+ return receivedMessages ;
77+ }
78+ String message = messageQueue .remove (0 );
79+ receivedMessages .add (message );
80+ }
5981 }
6082
6183 @ Override
6284 public void waitForName (String name ) {
63- this .name .complete (name );
85+ messageQueue .add ("Hello " + name + "!" );
86+ }
87+
88+ @ Override
89+ public void exit () {
90+ exit = true ;
6491 }
6592 }
6693
67- public static void main (String [] args ) {
94+ public static void main (String [] args ) throws Exception {
6895 // Start a worker that hosts the workflow implementation.
6996 Worker worker = new Worker (DOMAIN , TASK_LIST );
7097 worker .registerWorkflowImplementationTypes (GreetingWorkflowImpl .class );
@@ -80,17 +107,21 @@ public static void main(String[] args) {
80107 .build ();
81108 GreetingWorkflow workflow =
82109 workflowClient .newWorkflowStub (GreetingWorkflow .class , workflowOptions );
110+ workflow .wait ();
83111 // Start workflow asynchronously to not use another thread to signal.
84- WorkflowClient .start (workflow ::getGreeting );
112+ WorkflowClient .start (workflow ::getGreetings );
85113 // After start for getGreeting returns, the workflow is guaranteed to be started.
86114 // So we can send a signal to it using workflow stub.
115+ // This workflow keeps receiving signals until exit is called
87116 workflow .waitForName ("World" );
117+ workflow .waitForName ("Universe" );
118+ workflow .exit ();
88119 // Calling synchronous getGreeting after workflow has started reconnects to the existing
89120 // workflow and blocks until a result is available. Note that this behavior assumes that
90121 // WorkflowOptions are not configured with WorkflowIdReusePolicy.AllowDuplicate. In that case
91122 // the call would fail with WorkflowExecutionAlreadyStartedException.
92- String greeting = workflow .getGreeting ();
93- System .out .println (greeting );
123+ List < String > greetings = workflow .getGreetings ();
124+ System .out .println (greetings );
94125 System .exit (0 );
95126 }
96127}
0 commit comments