Skip to content

Commit 33ac24c

Browse files
authored
Timer based signal processing sample (#580)
* timer based signal processing sample [cont] Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * update Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * update Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * update Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * update Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * added test and update to readme Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * updated per comments Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * removed unneeded catch Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * adding processing dummy activity Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * removed unneeded processedLast boolean Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> --------- Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io>
1 parent dfb685c commit 33ac24c

4 files changed

Lines changed: 261 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ See the README.md file in each main sample directory for cut/paste Gradle comman
7474
- [**HelloSideEffect**](/core/src/main/java/io/temporal/samples/hello/HelloSideEffect.java)**: Demonstrates how to implement a Side Effect.
7575
- [**HelloUpdate**](/core/src/main/java/io/temporal/samples/hello/HelloUpdate.java): Demonstrates how to create and interact with an Update.
7676
- [**HelloDelayedStart**](/core/src/main/java/io/temporal/samples/hello/HelloDelayedStart.java): Demonstrates how to use delayed start config option when starting a Workflow Executions.
77+
- [**HelloSignalWithTimer**](/core/src/main/java/io/temporal/samples/hello/HelloSignalWithTimer.java): Demonstrates how to use collect signals for certain amount of time and then process last one.
7778

7879

7980
#### Scenario-based samples
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.hello;
21+
22+
import io.temporal.activity.ActivityInterface;
23+
import io.temporal.activity.ActivityOptions;
24+
import io.temporal.client.WorkflowClient;
25+
import io.temporal.client.WorkflowOptions;
26+
import io.temporal.client.WorkflowStub;
27+
import io.temporal.failure.CanceledFailure;
28+
import io.temporal.serviceclient.WorkflowServiceStubs;
29+
import io.temporal.worker.Worker;
30+
import io.temporal.worker.WorkerFactory;
31+
import io.temporal.workflow.*;
32+
import java.time.Duration;
33+
import org.slf4j.Logger;
34+
35+
/**
36+
* Sample Temporal workflow that shows receiving signals for a specific time period and then process
37+
* last one received and continue as new.
38+
*/
39+
public class HelloSignalWithTimer {
40+
static final String TASK_QUEUE = "HelloSignalWithTimerTaskQueue";
41+
static final String WORKFLOW_ID = "HelloSignalWithTimerWorkflow";
42+
43+
@WorkflowInterface
44+
public interface SignalWithTimerWorkflow {
45+
@WorkflowMethod
46+
void execute();
47+
48+
@SignalMethod
49+
void newValue(String value);
50+
51+
@SignalMethod
52+
void exit();
53+
}
54+
55+
@ActivityInterface
56+
public interface ValueProcessingActivities {
57+
void processValue(String value);
58+
}
59+
60+
public static class SignalWithTimerWorkflowImpl implements SignalWithTimerWorkflow {
61+
62+
private Logger logger = Workflow.getLogger(SignalWithTimerWorkflowImpl.class);
63+
private String lastValue = "";
64+
private CancellationScope timerScope;
65+
private boolean exit = false;
66+
67+
private final ValueProcessingActivities activities =
68+
Workflow.newActivityStub(
69+
ValueProcessingActivities.class,
70+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(2)).build());
71+
72+
@Override
73+
public void execute() {
74+
// Just in case if exit signal is sent as soon as execution is started
75+
if (exit) {
76+
return;
77+
}
78+
// Start timer in cancellation scope so we can cancel it on exit signal received
79+
timerScope =
80+
Workflow.newCancellationScope(
81+
() -> {
82+
try {
83+
// You can add a signal handler that updates the sleep duration
84+
// As it may change via business logic over time
85+
// For sample we just hard code it to 5 seconds
86+
Workflow.newTimer(Duration.ofSeconds(5)).get();
87+
} catch (CanceledFailure e) {
88+
// Exit signal is received causing cancellation of timer scope and timer
89+
// For sample we just log it, you can handle it if needed
90+
logger.info("Timer canceled via exit signal");
91+
}
92+
});
93+
timerScope.run();
94+
95+
// Process last received signal and either exit or ContinueAsNew depending on if we got
96+
// Exit signal or not
97+
activities.processValue(lastValue);
98+
99+
if (exit) {
100+
return;
101+
} else {
102+
SignalWithTimerWorkflow nextRun =
103+
Workflow.newContinueAsNewStub(SignalWithTimerWorkflow.class);
104+
nextRun.execute();
105+
}
106+
}
107+
108+
@Override
109+
public void newValue(String value) {
110+
// Note that we can receive a signal at the same time workflow is trying to complete or
111+
// ContinueAsNew. This would cause workflow task failure with UnhandledCommand
112+
// in order to deliver this signal to our execution.
113+
// You can choose what to do in this case depending on business logic.
114+
// For this sample we just ignore it, alternative could be to process it or carry it over
115+
// to the continued execution if needed.
116+
lastValue = value;
117+
}
118+
119+
@Override
120+
public void exit() {
121+
if (timerScope != null) {
122+
timerScope.cancel("exit received");
123+
}
124+
this.exit = true;
125+
}
126+
}
127+
128+
static class ValueProcessingActivitiesImpl implements ValueProcessingActivities {
129+
@Override
130+
public void processValue(String value) {
131+
// Here you would access downstream services to process the value
132+
// Dummy impl for sample, do nothing
133+
System.out.println("Processing value: " + value);
134+
}
135+
}
136+
137+
public static void main(String[] args) {
138+
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
139+
WorkflowClient client = WorkflowClient.newInstance(service);
140+
WorkerFactory factory = WorkerFactory.newInstance(client);
141+
142+
Worker worker = factory.newWorker(TASK_QUEUE);
143+
worker.registerWorkflowImplementationTypes(SignalWithTimerWorkflowImpl.class);
144+
worker.registerActivitiesImplementations(new ValueProcessingActivitiesImpl());
145+
146+
factory.start();
147+
148+
SignalWithTimerWorkflow workflow =
149+
client.newWorkflowStub(
150+
SignalWithTimerWorkflow.class,
151+
WorkflowOptions.newBuilder()
152+
.setWorkflowId(WORKFLOW_ID)
153+
.setTaskQueue(TASK_QUEUE)
154+
.build());
155+
// Start execution, this unblocks when its created by service
156+
WorkflowClient.start(workflow::execute);
157+
158+
// Send signals 2s apart 12 times (to simulate cancellation on last ContinueAsNew)
159+
for (int i = 0; i < 12; i++) {
160+
workflow.newValue("Value " + i);
161+
sleep(2);
162+
}
163+
sleep(1);
164+
// Send exit signal
165+
workflow.exit();
166+
167+
// Wait for execution to complete after receiving exit signal.
168+
// This should unblock pretty much immediately
169+
WorkflowStub.fromTyped(workflow).getResult(Void.class);
170+
171+
System.exit(0);
172+
}
173+
174+
private static void sleep(int seconds) {
175+
try {
176+
Thread.sleep(seconds * 1000L);
177+
} catch (Exception e) {
178+
System.out.println("Error: " + e.getMessage());
179+
}
180+
}
181+
}

core/src/main/java/io/temporal/samples/hello/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ To run each hello world sample, use one of the following commands:
3131
./gradlew -q execute -PmainClass=io.temporal.samples.hello.HelloTypedSearchAttributes
3232
./gradlew -q execute -PmainClass=io.temporal.samples.hello.HelloSideEffect
3333
./gradlew -q execute -PmainClass=io.temporal.samples.hello.HelloUpdate
34+
./gradlew -q execute -PmainClass=io.temporal.samples.hello.HelloSignalWithTimer
3435
```
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.hello;
21+
22+
import io.temporal.api.common.v1.WorkflowExecution;
23+
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
24+
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
25+
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
26+
import io.temporal.client.WorkflowClient;
27+
import io.temporal.client.WorkflowOptions;
28+
import io.temporal.client.WorkflowStub;
29+
import io.temporal.testing.TestWorkflowRule;
30+
import org.junit.Assert;
31+
import org.junit.Rule;
32+
import org.junit.Test;
33+
34+
public class HelloSignalWithTimerTest {
35+
@Rule
36+
public TestWorkflowRule testWorkflowRule =
37+
TestWorkflowRule.newBuilder()
38+
.setWorkflowTypes(HelloSignalWithTimer.SignalWithTimerWorkflowImpl.class)
39+
.setActivityImplementations(new HelloSignalWithTimer.ValueProcessingActivitiesImpl())
40+
.build();
41+
42+
private static final String WORKFLOW_ID = "SignalWithTimerTestWorkflow";
43+
44+
@Test
45+
public void testSignalWithTimer() {
46+
HelloSignalWithTimer.SignalWithTimerWorkflow workflow =
47+
testWorkflowRule
48+
.getWorkflowClient()
49+
.newWorkflowStub(
50+
HelloSignalWithTimer.SignalWithTimerWorkflow.class,
51+
WorkflowOptions.newBuilder()
52+
.setTaskQueue(testWorkflowRule.getTaskQueue())
53+
.setWorkflowId(WORKFLOW_ID)
54+
.build());
55+
56+
WorkflowClient.start(workflow::execute);
57+
workflow.newValue("1");
58+
workflow.newValue("2");
59+
workflow.exit();
60+
61+
WorkflowStub.fromTyped(workflow).getResult(Void.class);
62+
63+
DescribeWorkflowExecutionResponse res =
64+
testWorkflowRule
65+
.getWorkflowClient()
66+
.getWorkflowServiceStubs()
67+
.blockingStub()
68+
.describeWorkflowExecution(
69+
DescribeWorkflowExecutionRequest.newBuilder()
70+
.setNamespace(testWorkflowRule.getTestEnvironment().getNamespace())
71+
.setExecution(WorkflowExecution.newBuilder().setWorkflowId(WORKFLOW_ID).build())
72+
.build());
73+
74+
Assert.assertEquals(
75+
res.getWorkflowExecutionInfo().getStatus(),
76+
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);
77+
}
78+
}

0 commit comments

Comments
 (0)