Skip to content

Commit 8cff357

Browse files
authored
Merge pull request #389 from conductor-oss/feature/wmq-support
feat: Workflow Message Queue (WMQ) support
2 parents a751eba + 417a4f6 commit 8cff357

9 files changed

Lines changed: 359 additions & 2 deletions

File tree

docs/workflow-message-queue.md

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# Workflow Message Queue (WMQ)
2+
3+
Send messages to a running workflow and process them inside the workflow using the
4+
`PULL_WORKFLOW_MESSAGES` system task.
5+
6+
## Server requirement
7+
8+
WMQ must be enabled on the Conductor server:
9+
10+
```properties
11+
conductor.workflow-message-queue.enabled=true
12+
```
13+
14+
---
15+
16+
## Sending a message
17+
18+
After starting (or executing) a workflow you can push any JSON-serialisable dict
19+
to it using `executor.send_message` or `workflow_client.send_message`.
20+
21+
```python
22+
from conductor.client.configuration.configuration import Configuration
23+
from conductor.client.http.models import StartWorkflowRequest
24+
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
25+
26+
config = Configuration() # reads CONDUCTOR_SERVER_URL / CONDUCTOR_AUTH_TOKEN from env
27+
executor = WorkflowExecutor(config)
28+
29+
# --- start a workflow that has a PULL_WORKFLOW_MESSAGES task in it ---
30+
workflow_id = executor.start_workflow(
31+
StartWorkflowRequest(name="order_processing", input={"orderId": "ORD-42"})
32+
)
33+
34+
# --- send a message to the running workflow ---
35+
message_id = executor.send_message(
36+
workflow_id,
37+
{"event": "payment_confirmed", "amount": 99.99, "currency": "USD"},
38+
)
39+
print(f"Message enqueued: {message_id}")
40+
```
41+
42+
You can call `send_message` multiple times; each call returns a unique UUID.
43+
44+
```python
45+
# Send a batch of status updates
46+
for status in ["PICKED", "SHIPPED", "OUT_FOR_DELIVERY"]:
47+
executor.send_message(workflow_id, {"status": status})
48+
```
49+
50+
---
51+
52+
## Defining a workflow that receives messages
53+
54+
Use `PullWorkflowMessagesTask` inside your workflow definition to consume the queue.
55+
56+
```python
57+
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
58+
from conductor.client.workflow.task.pull_workflow_messages_task import PullWorkflowMessagesTask
59+
from conductor.client.workflow.task.simple_task import SimpleTask
60+
61+
# Pull up to 5 messages at a time
62+
pull = PullWorkflowMessagesTask(task_ref_name="pull_messages", batch_size=5)
63+
64+
process = SimpleTask(
65+
task_def_name="process_message_worker",
66+
task_reference_name="process_message",
67+
)
68+
# Pass pulled messages to the next task via input parameter references
69+
process.input_parameters["messages"] = "${pull_messages.output.messages}"
70+
71+
wf = (
72+
ConductorWorkflow(executor=executor, name="order_processing", version=1)
73+
.add(pull)
74+
.add(process)
75+
)
76+
77+
wf.register(overwrite=True)
78+
```
79+
80+
### Task output shape
81+
82+
When messages are available the `PULL_WORKFLOW_MESSAGES` task output looks like:
83+
84+
```json
85+
{
86+
"messages": [
87+
{
88+
"id": "f3c2a1b0-...",
89+
"workflowId": "<workflow-instance-id>",
90+
"payload": { "event": "payment_confirmed", "amount": 99.99 },
91+
"receivedAt": "2024-01-01T12:00:00Z"
92+
}
93+
],
94+
"count": 1
95+
}
96+
```
97+
98+
Reference individual fields in subsequent tasks:
99+
100+
```python
101+
next_task.input_parameters["firstMessage"] = "${pull_messages.output.messages[0].payload}"
102+
```
103+
104+
---
105+
106+
## Using the low-level client directly
107+
108+
If you prefer the `WorkflowClient` directly:
109+
110+
```python
111+
from conductor.client.orkes_clients import OrkesClients
112+
113+
clients = OrkesClients(config)
114+
workflow_client = clients.get_workflow_client()
115+
116+
message_id = workflow_client.send_message(
117+
workflow_id,
118+
{"type": "notification", "text": "Hello from outside the workflow"},
119+
)
120+
```
121+
122+
---
123+
124+
## Error handling
125+
126+
| HTTP status | Reason | What to do |
127+
|-------------|--------|------------|
128+
| `404 Not Found` | Workflow ID does not exist | Verify the workflow was started successfully |
129+
| `409 Conflict` | Workflow is not `RUNNING` | Check workflow status before sending |
130+
| `429 Too Many Requests` | Queue is at capacity (default 1 000 messages) | Back off and retry, or increase `conductor.workflow-message-queue.maxQueueSize` |
131+
132+
```python
133+
from conductor.client.http.rest import ApiException
134+
135+
try:
136+
executor.send_message(workflow_id, {"ping": True})
137+
except ApiException as e:
138+
if e.status == 404:
139+
print("Workflow not found")
140+
elif e.status == 409:
141+
print("Workflow is not running")
142+
elif e.status == 429:
143+
print("Queue full — back off and retry")
144+
else:
145+
raise
146+
```

src/conductor/client/http/api/workflow_resource_api.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3178,4 +3178,94 @@ def execute_workflow_with_return_strategy_with_http_info(self, body, name, versi
31783178
_return_http_data_only=params.get('_return_http_data_only'),
31793179
_preload_content=params.get('_preload_content', True),
31803180
_request_timeout=params.get('_request_timeout'),
3181+
collection_formats=collection_formats)
3182+
3183+
def send_workflow_message(self, body, workflow_id, **kwargs): # noqa: E501
3184+
"""Push a message into a running workflow's message queue (WMQ). # noqa: E501
3185+
3186+
This method makes a synchronous HTTP request by default. To make an
3187+
asynchronous HTTP request, please pass async_req=True
3188+
>>> thread = api.send_workflow_message(body, workflow_id, async_req=True)
3189+
>>> result = thread.get()
3190+
3191+
:param async_req bool
3192+
:param dict(str, object) body: Arbitrary JSON payload (required)
3193+
:param str workflow_id: (required)
3194+
:return: str — the UUID assigned to the pushed message
3195+
If the method is called asynchronously,
3196+
returns the request thread.
3197+
"""
3198+
kwargs['_return_http_data_only'] = True
3199+
if kwargs.get('async_req'):
3200+
return self.send_workflow_message_with_http_info(body, workflow_id, **kwargs) # noqa: E501
3201+
else:
3202+
(data) = self.send_workflow_message_with_http_info(body, workflow_id, **kwargs) # noqa: E501
3203+
return data
3204+
3205+
def send_workflow_message_with_http_info(self, body, workflow_id, **kwargs): # noqa: E501
3206+
"""Push a message into a running workflow's message queue (WMQ). # noqa: E501
3207+
3208+
:param async_req bool
3209+
:param dict(str, object) body: Arbitrary JSON payload (required)
3210+
:param str workflow_id: (required)
3211+
:return: str — the UUID assigned to the pushed message
3212+
"""
3213+
3214+
all_params = ['body', 'workflow_id'] # noqa: E501
3215+
all_params.append('async_req')
3216+
all_params.append('_return_http_data_only')
3217+
all_params.append('_preload_content')
3218+
all_params.append('_request_timeout')
3219+
3220+
params = locals()
3221+
for key, val in six.iteritems(params['kwargs']):
3222+
if key not in all_params:
3223+
raise TypeError(
3224+
"Got an unexpected keyword argument '%s'"
3225+
" to method send_workflow_message" % key
3226+
)
3227+
params[key] = val
3228+
del params['kwargs']
3229+
3230+
if ('body' not in params or params['body'] is None):
3231+
raise ValueError("Missing the required parameter `body` when calling `send_workflow_message`") # noqa: E501
3232+
if ('workflow_id' not in params or params['workflow_id'] is None):
3233+
raise ValueError("Missing the required parameter `workflow_id` when calling `send_workflow_message`") # noqa: E501
3234+
3235+
collection_formats = {}
3236+
3237+
path_params = {}
3238+
if 'workflow_id' in params:
3239+
path_params['workflowId'] = params['workflow_id'] # noqa: E501
3240+
3241+
query_params = []
3242+
header_params = {}
3243+
form_params = []
3244+
local_var_files = {}
3245+
3246+
body_params = None
3247+
if 'body' in params:
3248+
body_params = params['body']
3249+
3250+
header_params['Accept'] = self.api_client.select_header_accept(
3251+
['text/plain']) # noqa: E501
3252+
header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501
3253+
['application/json']) # noqa: E501
3254+
3255+
auth_settings = ['api_key'] # noqa: E501
3256+
3257+
return self.api_client.call_api(
3258+
'/workflow/{workflowId}/messages', 'POST',
3259+
path_params,
3260+
query_params,
3261+
header_params,
3262+
body=body_params,
3263+
post_params=form_params,
3264+
files=local_var_files,
3265+
response_type='str', # noqa: E501
3266+
auth_settings=auth_settings,
3267+
async_req=params.get('async_req'),
3268+
_return_http_data_only=params.get('_return_http_data_only'),
3269+
_preload_content=params.get('_preload_content', True),
3270+
_request_timeout=params.get('_request_timeout'),
31813271
collection_formats=collection_formats)

src/conductor/client/http/models/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,5 @@
6666
from conductor.client.http.models.circuit_breaker_transition_response import CircuitBreakerTransitionResponse
6767
from conductor.client.http.models.signal_response import SignalResponse, TaskStatus
6868
from conductor.client.http.models.authentication_config import AuthenticationConfig
69-
from conductor.client.http.models.tag_object import TagObject
69+
from conductor.client.http.models.tag_object import TagObject
70+
from conductor.client.http.models.workflow_message import WorkflowMessage
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from dataclasses import dataclass, field
2+
from typing import Dict, Optional
3+
4+
5+
@dataclass
6+
class WorkflowMessage:
7+
"""Represents a message pushed into a running workflow's queue (WMQ).
8+
9+
Attributes:
10+
id: UUID assigned by the server on push.
11+
workflow_id: The workflow instance that owns this message.
12+
payload: Arbitrary JSON payload supplied by the caller.
13+
received_at: ISO-8601 UTC timestamp set at ingestion time.
14+
"""
15+
16+
id: Optional[str] = field(default=None)
17+
workflow_id: Optional[str] = field(default=None)
18+
payload: Optional[Dict[str, object]] = field(default=None)
19+
received_at: Optional[str] = field(default=None)
20+
21+
swagger_types = {
22+
'id': 'str',
23+
'workflow_id': 'str',
24+
'payload': 'dict(str, object)',
25+
'received_at': 'str',
26+
}
27+
28+
attribute_map = {
29+
'id': 'id',
30+
'workflow_id': 'workflowId',
31+
'payload': 'payload',
32+
'received_at': 'receivedAt',
33+
}
34+
35+
def to_dict(self) -> dict:
36+
result = {}
37+
for attr, _ in self.swagger_types.items():
38+
value = getattr(self, attr)
39+
if value is not None:
40+
result[self.attribute_map[attr]] = value
41+
return result

src/conductor/client/orkes/orkes_workflow_client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,3 +215,17 @@ def update_state(self, workflow_id: str, update_requesst: WorkflowStateUpdate,
215215
kwargs["wait_for_seconds"] = wait_for_seconds
216216

217217
return self.workflowResourceApi.update_workflow_and_task_state(update_requesst=update_requesst, workflow_id=workflow_id, **kwargs)
218+
219+
def send_message(self, workflow_id: str, message: Dict[str, object]) -> str:
220+
"""Push a message into the message queue of a running workflow (WMQ).
221+
222+
Requires conductor.workflow-message-queue.enabled=true on the server.
223+
224+
Args:
225+
workflow_id: The running workflow instance ID.
226+
message: Arbitrary JSON-serialisable dict to deliver to the workflow.
227+
228+
Returns:
229+
The UUID string assigned to the message by the server.
230+
"""
231+
return self.workflowResourceApi.send_workflow_message(message, workflow_id)

src/conductor/client/workflow/executor/workflow_executor.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,25 @@ def signal_async(self, workflow_id: str, status: str, body: Dict[str, Any]) -> N
283283
body=body
284284
)
285285

286+
def send_message(self, workflow_id: str, message: Dict[str, Any]) -> str:
287+
"""Push a message into the message queue of a running workflow (WMQ).
288+
289+
The workflow must have a PULL_WORKFLOW_MESSAGES task to consume messages.
290+
Requires conductor.workflow-message-queue.enabled=true on the server.
291+
292+
Args:
293+
workflow_id: The running workflow instance ID.
294+
message: Arbitrary JSON-serialisable dict to deliver to the workflow.
295+
296+
Returns:
297+
The UUID string assigned to the message by the server.
298+
299+
Raises:
300+
ApiException: 404 if the workflow is not found, 409 if not in RUNNING state,
301+
429 if the queue is at capacity.
302+
"""
303+
return self.workflow_client.send_message(workflow_id, message)
304+
286305
def __get_task_result(self, task_id: str, workflow_id: str, task_output: Dict[str, Any], status: str) -> TaskResult:
287306
return TaskResult(
288307
workflow_instance_id=workflow_id,
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from __future__ import annotations
2+
3+
from typing_extensions import Self
4+
5+
from conductor.client.workflow.task.task import TaskInterface
6+
from conductor.client.workflow.task.task_type import TaskType
7+
8+
9+
class PullWorkflowMessagesTask(TaskInterface):
10+
"""Consume messages from the workflow's message queue (WMQ).
11+
12+
When messages are available the task completes with:
13+
output.messages — list of WorkflowMessage objects
14+
output.count — number of messages returned
15+
16+
When the queue is empty the task stays IN_PROGRESS and is re-evaluated
17+
after ~1 second (non-blocking polling behavior).
18+
19+
Args:
20+
task_ref_name: Unique task reference name within the workflow.
21+
batch_size: Maximum number of messages to dequeue per execution (default 1,
22+
server cap is typically 100).
23+
"""
24+
25+
def __init__(self, task_ref_name: str, batch_size: int = 1) -> Self:
26+
super().__init__(
27+
task_reference_name=task_ref_name,
28+
task_type=TaskType.PULL_WORKFLOW_MESSAGES,
29+
)
30+
self.input_parameters["batchSize"] = batch_size

src/conductor/client/workflow/task/task_type.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,4 @@ class TaskType(str, Enum):
4040
LLM_SEARCH_EMBEDDINGS = "LLM_SEARCH_EMBEDDINGS"
4141
LIST_MCP_TOOLS = "LIST_MCP_TOOLS"
4242
CALL_MCP_TOOL = "CALL_MCP_TOOL"
43+
PULL_WORKFLOW_MESSAGES = "PULL_WORKFLOW_MESSAGES"

0 commit comments

Comments
 (0)