Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
from __future__ import annotations

from cadence._internal.workflow.statemachine.decision_state_machine import (
BaseDecisionStateMachine,
DecisionFuture,
DecisionId,
DecisionState,
DecisionType,
)
from cadence._internal.workflow.statemachine.event_dispatcher import EventDispatcher
from cadence._internal.workflow.statemachine.nondeterminism import (
record_immediate_cancel,
)
from cadence.api.v1 import decision, history
from cadence.api.v1.common_pb2 import Payload, WorkflowExecution
from cadence.error import (
ChildWorkflowExecutionCanceled,
ChildWorkflowExecutionFailed,
ChildWorkflowExecutionTerminated,
ChildWorkflowExecutionTimedOut,
StartChildWorkflowExecutionFailed,
)

# Default id_attr is "initiated_event_id" because the majority of child workflow events
# reference the state machine by the event ID of the InitiatedEvent. handle_initiated is
# the exception — it uses "workflow_id" directly and then registers the event ID as an alias.
child_workflow_events = EventDispatcher("initiated_event_id")


class ChildWorkflowExecutionStateMachine(BaseDecisionStateMachine):
"""State machine for StartChildWorkflowExecution and child close events."""

request: decision.StartChildWorkflowExecutionDecisionAttributes
execution: DecisionFuture[WorkflowExecution]
result: DecisionFuture[Payload]
_run_id: str | None

def __init__(
self,
request: decision.StartChildWorkflowExecutionDecisionAttributes,
execution: DecisionFuture[WorkflowExecution],
result: DecisionFuture[Payload],
) -> None:
super().__init__()
self.request = request
self.execution = execution
self.result = result
self._run_id = None

def get_id(self) -> DecisionId:
return DecisionId(DecisionType.CHILD_WORKFLOW, self.request.workflow_id)

def get_decision(self) -> decision.Decision | None:
if self.state is DecisionState.REQUESTED:
return decision.Decision(
start_child_workflow_execution_decision_attributes=self.request
)
if self.state is DecisionState.CANCELED_AFTER_REQUESTED:
return record_immediate_cancel(self.request)
if self.state in (
DecisionState.CANCELED_AFTER_RECORDED,
DecisionState.CANCELED_AFTER_STARTED,
):
return decision.Decision(
request_cancel_external_workflow_execution_decision_attributes=decision.RequestCancelExternalWorkflowExecutionDecisionAttributes(
domain=self.request.domain,
workflow_execution=WorkflowExecution(
workflow_id=self.request.workflow_id,
run_id=self._run_id or "",
),
child_workflow_only=True,
)
)
return None

def request_cancel(self) -> bool:
if self.state is DecisionState.REQUESTED:
self._transition(DecisionState.CANCELED_AFTER_REQUESTED)
self.execution.force_cancel()
self.result.force_cancel()
return True

if self.state is DecisionState.RECORDED:
self._transition(DecisionState.CANCELED_AFTER_RECORDED)
self.execution.force_cancel()
return True

if self.state is DecisionState.STARTED:
# We have a run_id at this point; use CANCELED_AFTER_STARTED so the
# cancel decision includes it, which avoids a potential race where we
# try to cancel before the server has finished processing the start.
self._transition(DecisionState.CANCELED_AFTER_STARTED)
return True

return False

@child_workflow_events.event("workflow_id", event_id_is_alias=True)
def handle_initiated(
self, _: history.StartChildWorkflowExecutionInitiatedEventAttributes
) -> None:
self._transition(DecisionState.RECORDED)

@child_workflow_events.event()
def handle_initiation_failed(
self, event: history.StartChildWorkflowExecutionFailedEventAttributes
) -> None:
self._transition(DecisionState.COMPLETED)
exc = StartChildWorkflowExecutionFailed(
f"start child failed: {event.cause}",
cause=event.cause,
workflow_id=event.workflow_id,
)
self.execution.set_exception(exc)
self.result.set_exception(exc)

@child_workflow_events.event()
def handle_started(
self, event: history.ChildWorkflowExecutionStartedEventAttributes
) -> None:
self._transition(DecisionState.STARTED)
self._run_id = event.workflow_execution.run_id
self.execution.set_result(event.workflow_execution)

@child_workflow_events.event()
def handle_completed(
self, event: history.ChildWorkflowExecutionCompletedEventAttributes
) -> None:
self._transition(DecisionState.COMPLETED)
self.result.set_result(event.result)

@child_workflow_events.event()
def handle_failed(
self, event: history.ChildWorkflowExecutionFailedEventAttributes
) -> None:
self._transition(DecisionState.COMPLETED)
self.result.set_exception(
ChildWorkflowExecutionFailed(
event.failure.reason,
failure=event.failure,
)
)

@child_workflow_events.event()
def handle_canceled(
self, event: history.ChildWorkflowExecutionCanceledEventAttributes
) -> None:
self._transition(DecisionState.COMPLETED)
self.result.set_exception(
ChildWorkflowExecutionCanceled(
"child workflow canceled", details=event.details
)
)

@child_workflow_events.event()
def handle_timed_out(
self, event: history.ChildWorkflowExecutionTimedOutEventAttributes
) -> None:
self._transition(DecisionState.COMPLETED)
self.result.set_exception(
ChildWorkflowExecutionTimedOut(
f"child workflow timed out: {event.timeout_type}",
timeout_type=int(event.timeout_type),
)
)

@child_workflow_events.event()
def handle_terminated(
self, event: history.ChildWorkflowExecutionTerminatedEventAttributes
) -> None:
self._transition(DecisionState.COMPLETED)
self.result.set_exception(ChildWorkflowExecutionTerminated())

# RequestCancelExternalWorkflowExecution events reference the child workflow by
# workflow_execution.workflow_id (a nested field), not by a bare string id.
# The dispatcher resolves dotted paths, so "workflow_execution.workflow_id" extracts
# the correct key for the alias lookup. event_id_is_alias=True registers this event's
# ID so that the subsequent handle_cancel_failed can look it up via initiated_event_id.
@child_workflow_events.event("workflow_execution.workflow_id", event_id_is_alias=True)
def handle_cancel_initiated(
self, _: history.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes
) -> None:
self._transition(DecisionState.CANCELLATION_RECORDED)

@child_workflow_events.event()
def handle_cancel_failed(
self, _: history.RequestCancelExternalWorkflowExecutionFailedEventAttributes
) -> None:
self._transition(DecisionState.STARTED)
29 changes: 26 additions & 3 deletions cadence/_internal/workflow/statemachine/decision_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
activity_events,
ActivityStateMachine,
)
from cadence._internal.workflow.statemachine.child_workflow_execution_state_machine import (
child_workflow_events,
ChildWorkflowExecutionStateMachine,
)
from cadence._internal.workflow.statemachine.completion_state_machine import (
CompletionStateMachine,
)
Expand All @@ -21,14 +25,15 @@
from cadence._internal.workflow.statemachine.event_dispatcher import (
EventDispatcher,
Action,
resolve_id_attr,
)
from cadence._internal.workflow.statemachine.nondeterminism import DeterminismTracker
from cadence._internal.workflow.statemachine.timer_state_machine import (
TimerStateMachine,
timer_events,
)
from cadence.api.v1 import decision, history
from cadence.api.v1.common_pb2 import Payload
from cadence.api.v1.common_pb2 import Payload, WorkflowExecution

DecisionAlias = Tuple[DecisionType, str | int]

Expand Down Expand Up @@ -67,6 +72,7 @@ class DecisionManager:
{
DecisionType.ACTIVITY: activity_events,
DecisionType.TIMER: timer_events,
DecisionType.CHILD_WORKFLOW: child_workflow_events,
}
)

Expand Down Expand Up @@ -110,6 +116,22 @@ def start_timer(

return future

# ----- Child Workflow API -----

def schedule_child_workflow(
self, attrs: decision.StartChildWorkflowExecutionDecisionAttributes
) -> tuple[asyncio.Future[WorkflowExecution], asyncio.Future[Payload]]:
if self._replaying:
self._determinism_tracker.validate_action(attrs)
decision_id = DecisionId(DecisionType.CHILD_WORKFLOW, attrs.workflow_id)
execution: DecisionFuture[WorkflowExecution] = self._create_future(decision_id)
result: DecisionFuture[Payload] = DecisionFuture(
self._event_loop, lambda: self._request_cancel(decision_id)
)
machine = ChildWorkflowExecutionStateMachine(attrs, execution, result)
self._add_state_machine(machine)
return execution, result

# ----- Workflow API -----
def complete_workflow(self, decision: decision.Decision) -> None:
if self._replaying:
Expand Down Expand Up @@ -152,8 +174,9 @@ def handle_history_event(self, event: history.HistoryEvent) -> None:
decision_type = event_action.decision_type
action = event_action.action
# Find what state machine the event references.
# This may be a reference via the user id or a reference to a previous event
id_for_event = getattr(event_attributes, action.id_attr)
# This may be a reference via the user id or a reference to a previous event.
# Supports dotted paths (e.g. "workflow_execution.workflow_id") for nested fields.
id_for_event = resolve_id_attr(event_attributes, action.id_attr)
alias = (decision_type, id_for_event)
machine = self.aliases.get(alias, None)
if machine is None:
Expand Down
31 changes: 26 additions & 5 deletions cadence/_internal/workflow/statemachine/event_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ def decorator(func: EventHandler) -> EventHandler:
return decorator


def resolve_id_attr(obj: Any, path: str) -> Any:
"""Resolve a potentially dotted attribute path from a proto message.

For example, resolve_id_attr(attrs, "workflow_execution.workflow_id") will
return attrs.workflow_execution.workflow_id.
"""
for part in path.split("."):
obj = getattr(obj, part)
return obj


def _find_event_type(func: EventHandler) -> Type[Message]:
sig = signature(func)
type_hints = get_type_hints(func)
Expand All @@ -69,8 +80,18 @@ def _find_event_type(func: EventHandler) -> Type[Message]:


def _validate_field(func: EventHandler, event_type: Type[Message], field: str) -> None:
fields = event_type.DESCRIPTOR.fields_by_name
if field not in fields:
raise ValueError(
f"{func.__qualname__} handles {event_type.__qualname__}, which has no field {field}"
)
"""Validate that all parts of a (potentially dotted) field path exist on the proto type."""
descriptor = event_type.DESCRIPTOR
parts = field.split(".")
for i, part in enumerate(parts):
fields = descriptor.fields_by_name
if part not in fields:
raise ValueError(
f"{func.__qualname__} handles {event_type.__qualname__}, which has no field {part!r} (in path {field!r})"
)
if i < len(parts) - 1:
descriptor = fields[part].message_type
if descriptor is None:
raise ValueError(
f"{func.__qualname__}: field {part!r} is not a message type, cannot access sub-field in path {field!r}"
)
52 changes: 52 additions & 0 deletions cadence/_internal/workflow/statemachine/nondeterminism.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,58 @@ def _(attrs: history.ActivityTaskCancelRequestedEventAttributes) -> Expectation:
return Expectation(DecisionId(DecisionType.ACTIVITY, attrs.activity_id), CANCEL)


@to_expectation.register
def _(attrs: decision.StartChildWorkflowExecutionDecisionAttributes) -> Expectation:
return Expectation(
DecisionId(DecisionType.CHILD_WORKFLOW, attrs.workflow_id),
{"workflow_type": attrs.workflow_type.name},
)


@to_expectation.register
def _(
attrs: history.StartChildWorkflowExecutionInitiatedEventAttributes,
) -> Expectation:
return Expectation(
DecisionId(DecisionType.CHILD_WORKFLOW, attrs.workflow_id),
{"workflow_type": attrs.workflow_type.name},
)


@to_expectation.register
def _(attrs: history.StartChildWorkflowExecutionFailedEventAttributes) -> Expectation:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expectation value shouldn't be CANCEL here and we need to add Expectation building for the other cancellation events.

# A start failure is not a cancellation; enforce the same workflow_type as the initiation
# so that a mismatch between replay history and new code is caught.
return Expectation(
DecisionId(DecisionType.CHILD_WORKFLOW, attrs.workflow_id),
{"workflow_type": attrs.workflow_type.name},
)


@to_expectation.register
def _(
attrs: decision.RequestCancelExternalWorkflowExecutionDecisionAttributes,
) -> Expectation:
return Expectation(
DecisionId(
DecisionType.CHILD_WORKFLOW, attrs.workflow_execution.workflow_id
),
CANCEL,
)
Comment thread
gitar-bot[bot] marked this conversation as resolved.


@to_expectation.register
def _(
attrs: history.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes,
) -> Expectation:
return Expectation(
DecisionId(
DecisionType.CHILD_WORKFLOW, attrs.workflow_execution.workflow_id
),
CANCEL,
)


# Workflow Completion - Enforce complete vs failure. Maybe we should enforce the output data?
@to_expectation.register
def _(_: decision.CompleteWorkflowExecutionDecisionAttributes) -> Expectation:
Expand Down
Loading
Loading