feat: define child workflow state machine#99
feat: define child workflow state machine#99timl3136 wants to merge 2 commits intocadence-workflow:mainfrom
Conversation
Signed-off-by: Tim Li <ltim@uber.com>
| request: decision.StartChildWorkflowExecutionDecisionAttributes | ||
| execution: DecisionFuture[WorkflowExecution] | ||
| result: DecisionFuture[Payload] | ||
| _initiated_event_id: int | None = None |
| StartChildWorkflowExecutionFailed, | ||
| ) | ||
|
|
||
| child_workflow_events = EventDispatcher("workflow_id") |
There was a problem hiding this comment.
If you pass "initiated_event_id" as the parameter here you won't have to repeat it on all the decorators.
| domain=self.request.domain, | ||
| workflow_execution=WorkflowExecution( | ||
| workflow_id=self.request.workflow_id, | ||
| run_id="", |
There was a problem hiding this comment.
This is an interesting case. We don't have a RunID (as it hasn't started yet) but we'd like to cancel it. Does this match how the other clients handle it?
Since transfer tasks can be executed in parallel I wonder if this would create a race condition on the server. We might try canceling the Workflow before it has ever started. We might need to wait for it to start before we can cancel it.
| ) | ||
| if self.state is DecisionState.CANCELED_AFTER_REQUESTED: | ||
| return record_immediate_cancel(self.request) | ||
| if self.state is DecisionState.CANCELED_AFTER_RECORDED: |
There was a problem hiding this comment.
I think we also need to model the CANCELED_AFTER_STARTED scenario to cancel an in-flight Child workflow. At that point we'd have the RunID.
| return True | ||
|
|
||
| if self.state is DecisionState.STARTED: | ||
| self._transition(DecisionState.CANCELED_AFTER_RECORDED) |
There was a problem hiding this comment.
Nit: I mentioned this above, but this may be a separate state because the Workflow has already started.
| self._transition(DecisionState.COMPLETED) | ||
| self.result.set_exception(ChildWorkflowExecutionTerminated()) | ||
|
|
||
| @child_workflow_events.event("workflow_execution") |
There was a problem hiding this comment.
The state machine is never keyed by the WorkflowExecution, only the WorkflowID. To dispatch this event it would grab the value of the worklfow_execution field and look it up in the aliases dict in DecisionManager.
We likely need to extend the EventDispatcher and/or DecisionManager to handle this case.
Oh this is what gitar is commenting on below.
|
|
||
|
|
||
| @to_expectation.register | ||
| def _(attrs: history.StartChildWorkflowExecutionFailedEventAttributes) -> Expectation: |
There was a problem hiding this comment.
The expectation value shouldn't be CANCEL here and we need to add Expectation building for the other cancellation events.
| def child_wf_initiated( | ||
| event_id: int, workflow_id: str, *, initiated_event_id: int | ||
| ) -> history.HistoryEvent: | ||
| return history.HistoryEvent( | ||
| event_id=event_id, | ||
| start_child_workflow_execution_initiated_event_attributes=history.StartChildWorkflowExecutionInitiatedEventAttributes( | ||
| workflow_id=workflow_id, | ||
| workflow_type=WorkflowType(name="MyWorkflow"), | ||
| ), | ||
| ) |
There was a problem hiding this comment.
💡 Quality: Test helper has unused initiated_event_id parameter
The child_wf_initiated helper function accepts an initiated_event_id keyword argument that is never used in the function body. This is misleading because callers pass values like initiated_event_id=1, suggesting the parameter configures something, but it has no effect. The StartChildWorkflowExecutionInitiatedEventAttributes proto doesn't have an initiated_event_id field — it IS the initiated event, and its event_id is set via the outer HistoryEvent(event_id=...).
Suggested fix:
def child_wf_initiated(
event_id: int, workflow_id: str
) -> history.HistoryEvent:
return history.HistoryEvent(
event_id=event_id,
start_child_workflow_execution_initiated_event_attributes=history.StartChildWorkflowExecutionInitiatedEventAttributes(
workflow_id=workflow_id,
workflow_type=WorkflowType(name="MyWorkflow"),
),
)
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
CI failed: The CI build failed due to formatting inconsistencies in the newly added files, which triggered a linting error. Running the project's formatting tools locally will resolve the issue.OverviewOne failure was identified in the CI build process, caused by unformatted code in the new feature branch. Total logs analyzed: 1. FailuresFormatting Check Failure (confidence: high)
Summary
Code Review 👍 Approved with suggestions 2 resolved / 3 findingsDefines the child workflow state machine with improved event dispatching and error handling, addressing previous runtime routing and expectation logic issues. Remove the unused 💡 Quality: Test helper has unused
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
What changed?
define child workflow state machine
Why?
essential step for implement child wf
How did you test it?
unit tests
Potential risks
Release notes
Documentation Changes