Skip to content

Commit a92dae8

Browse files
committed
task interaction
1 parent a47b3b6 commit a92dae8

13 files changed

Lines changed: 805 additions & 0 deletions

File tree

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Demo tasks interaction
2+
3+
This example demonstrate a generic implementation for tasks interaction in Temporal.
4+
5+
The basic implementation consist on three parts:
6+
- One activity (or local activity) that send the request (create a task).
7+
- Block the workflow execution `Workflow.await` awaiting a Signal.
8+
- The workflow will eventually receive a signal that unblocks it.
9+
10+
Additionally, the example allows to track task state (PENDING, STARTED, COMPLETED...) and
11+
list open task.
12+
13+
> If the client can not send a Signal to the workflow execution, steps 2 and 3 can be replaced by an activity
14+
that polls using one of [these three strategies](../polling).
15+
16+
## Run the sample
17+
18+
- Start the worker
19+
20+
```bash
21+
./gradlew -q execute -PmainClass=io.temporal.samples.taskinteraction.worker.Worker
22+
```
23+
24+
- Schedule workflow execution
25+
26+
```bash
27+
./gradlew -q execute -PmainClass=io.temporal.samples.taskinteraction.client.StartWorkflow
28+
```
29+
30+
- List task
31+
32+
This process will query the workflow execution every 10 seconds, and print open tasks (state != COMPLETED).
33+
```bash
34+
./gradlew -q execute -PmainClass=io.temporal.samples.taskinteraction.client.ListOpenTasks
35+
36+
37+
38+
```
39+
- Update task
40+
41+
Update one of the open task to the next state (PENDING -> STARTED -> COMPLETED)
42+
```bash
43+
./gradlew -q execute -PmainClass=io.temporal.samples.taskinteraction.client.ListOpenTasks
44+
```
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.taskinteraction;
21+
22+
import com.fasterxml.jackson.annotation.JsonIgnore;
23+
24+
public class Task {
25+
26+
private String token;
27+
private Object data;
28+
private STATE state;
29+
30+
public Task() {}
31+
32+
public Task(String token) {
33+
this.token = token;
34+
this.state = STATE.PENDING;
35+
}
36+
37+
public String getToken() {
38+
return token;
39+
}
40+
41+
public void setData(Object data) {
42+
this.data = data;
43+
}
44+
45+
public <T> T result(Class<T> tClass) {
46+
return (T) data;
47+
}
48+
49+
public void setState(STATE state) {
50+
this.state = state;
51+
}
52+
53+
public STATE getState() {
54+
return state;
55+
}
56+
57+
@JsonIgnore
58+
public boolean isCompleted() {
59+
return STATE.COMPLETED == this.state;
60+
}
61+
62+
@Override
63+
public String toString() {
64+
return "Task{" + "token='" + token + '\'' + ", data=" + data + ", state=" + state + '}';
65+
}
66+
67+
public enum STATE {
68+
PENDING,
69+
STARTED,
70+
COMPLETED
71+
}
72+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.taskinteraction;
21+
22+
import io.temporal.activity.ActivityInterface;
23+
24+
@ActivityInterface
25+
public interface TaskActivity {
26+
27+
String createTask(String task);
28+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.taskinteraction;
21+
22+
public class TaskActivityImpl implements TaskActivity {
23+
@Override
24+
public String createTask(String task) {
25+
26+
// Simulating delay in task creation
27+
try {
28+
Thread.sleep(500);
29+
} catch (InterruptedException e) {
30+
throw new RuntimeException(e);
31+
}
32+
33+
return "activity created";
34+
}
35+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.taskinteraction;
21+
22+
import io.temporal.workflow.QueryMethod;
23+
import io.temporal.workflow.SignalMethod;
24+
import java.util.List;
25+
26+
/** Interface used to dynamically register signal and query handlers from the interceptor. */
27+
public interface TaskClient {
28+
29+
@SignalMethod
30+
void updateTask(TaskService.TaskRequest task);
31+
32+
@QueryMethod
33+
List<Task> getOpenTasks();
34+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.taskinteraction;
21+
22+
import com.fasterxml.jackson.annotation.JsonIgnore;
23+
import io.temporal.workflow.CompletablePromise;
24+
import io.temporal.workflow.Promise;
25+
import io.temporal.workflow.Workflow;
26+
import java.util.Collections;
27+
import java.util.HashMap;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.stream.Collectors;
31+
import org.slf4j.Logger;
32+
33+
public class TaskService<R> {
34+
35+
private final Map<String, Task> tasks = Collections.synchronizedMap(new HashMap<>());
36+
private final Map<String, CompletablePromise<R>> pendingPromises =
37+
Collections.synchronizedMap(new HashMap<>());
38+
39+
// Exposes signal and query methods that
40+
// allow us to interact with the workflow execution
41+
private final TaskClient listener =
42+
new TaskClient() {
43+
44+
@Override
45+
public void updateTask(TaskRequest taskRequest) {
46+
47+
final String token = taskRequest.getToken();
48+
final String data = taskRequest.getData();
49+
tasks.get(token).setData(data);
50+
51+
final Task t = tasks.get(token);
52+
53+
t.setState(taskRequest.state);
54+
tasks.put(t.getToken(), t);
55+
56+
logger.info("Task updated: " + t);
57+
58+
if (taskRequest.state == Task.STATE.COMPLETED) {
59+
final CompletablePromise<R> completablePromise = pendingPromises.get(token);
60+
completablePromise.complete((R) data);
61+
}
62+
}
63+
64+
@Override
65+
public List<Task> getOpenTasks() {
66+
return tasks.values().stream().filter(t -> !t.isCompleted()).collect(Collectors.toList());
67+
}
68+
};
69+
70+
public TaskService() {
71+
Workflow.registerListener(listener);
72+
}
73+
74+
private final Logger logger = Workflow.getLogger(TaskService.class);
75+
76+
public R executeTask(Callback<R> callback, String token) {
77+
return executeTaskAsync(callback, token).get();
78+
}
79+
80+
public Promise<R> executeTaskAsync(Callback<R> callback, String token) {
81+
82+
final Task task = new Task(token);
83+
logger.info("Before creating task : " + task);
84+
tasks.put(token, task);
85+
callback.execute();
86+
logger.info("Task created: " + task);
87+
88+
final CompletablePromise<R> promise = Workflow.newPromise();
89+
pendingPromises.put(token, promise);
90+
91+
return promise;
92+
}
93+
94+
public List<Task> getOpenTasks() {
95+
return listener.getOpenTasks();
96+
}
97+
98+
public interface Callback<T> {
99+
T execute();
100+
}
101+
102+
public static class TaskRequest {
103+
104+
private Task.STATE state;
105+
private String data;
106+
private String token;
107+
108+
public TaskRequest() {}
109+
110+
public TaskRequest(Task.STATE state, String data, String token) {
111+
this.state = state;
112+
this.data = data;
113+
this.token = token;
114+
}
115+
116+
@JsonIgnore
117+
public boolean isCompleted() {
118+
return this.state == Task.STATE.COMPLETED;
119+
}
120+
121+
public String getToken() {
122+
return token;
123+
}
124+
125+
public String getData() {
126+
return data;
127+
}
128+
}
129+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.taskinteraction;
21+
22+
import io.temporal.workflow.WorkflowInterface;
23+
import io.temporal.workflow.WorkflowMethod;
24+
25+
@WorkflowInterface
26+
public interface TaskWorkflow {
27+
28+
@WorkflowMethod
29+
void execute();
30+
}

0 commit comments

Comments
 (0)