DRAFT: feat(airflow): support dag running with cadence#96
DRAFT: feat(airflow): support dag running with cadence#96shijiesheng wants to merge 2 commits intocadence-workflow:mainfrom
Conversation
Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
| try: | ||
| logger.info("Executing task %s", tid) | ||
|
|
||
| # use activity | ||
| operator_run = operator_as_activity(task) | ||
| result = await operator_run.execute(context) | ||
| results[tid] = result | ||
| task_states[tid] = TaskState.SUCCESS | ||
| logger.info("Task %s succeeded", tid) | ||
| except Exception as exc: | ||
| task_states[tid] = TaskState.FAILED | ||
| errors[tid] = exc | ||
| logger.error("Task %s failed: %s", tid, exc) | ||
| finally: | ||
| done_events[tid].set() |
There was a problem hiding this comment.
⚠️ Bug: CancelledError from TaskGroup leaves tasks in unknown state
When one task fails inside the asyncio.TaskGroup, the group cancels all other in-flight tasks by raising asyncio.CancelledError. Since the handler uses except Exception (line 127), CancelledError (a BaseException) bypasses it, so task_states[tid] is never set for cancelled tasks. This means:
- The
task_statesdict will be missing entries for cancelled tasks. - The subsequent
task_statesreturn value (line 144) will be incomplete. - Any downstream code referencing
task_states[uid](line 101) for a cancelled task would raiseKeyErrorif the DAG were re-entered. - The
TaskGroupitself will raise anExceptionGroupwrapping the original error, which is different from theRuntimeErrorthe code intends to raise at line 140-142 — that code is actually unreachable becauseTaskGroup.__aexit__re-raises first.
The error-handling logic after the TaskGroup block (lines 138-142) is dead code — TaskGroup will always propagate failures as an ExceptionGroup before reaching it.
Suggested fix:
Either:
1. Catch `BaseException` instead of `Exception` to handle `CancelledError`, and re-raise it after updating state, OR
2. Don't use `TaskGroup` — use `asyncio.gather(return_exceptions=True)` which won't cancel siblings on failure and lets you collect all results/errors yourself.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| operator_run = operator_as_activity(task) | ||
| result = await operator_run.execute(context) |
There was a problem hiding this comment.
⚠️ Bug: operator_as_activity wraps execute but doesn't call it as activity
In cadence_dag.py line 122-123, the code creates an ActivityDefinition via operator_as_activity(task) but then directly calls operator_run.execute(context). This executes the operator inline in the workflow coroutine rather than scheduling it as a Cadence activity. The whole point of wrapping operators as activities is to get Cadence's retry, timeout, and distributed execution semantics. The activity should be invoked through the Cadence workflow activity scheduling mechanism (e.g., workflow.execute_activity), not called directly.
Additionally, the activities are created dynamically inside the workflow but never registered with the worker's registry, so even if they were scheduled properly, the worker wouldn't know how to execute them.
Suggested fix:
Use the Cadence workflow SDK's activity invocation mechanism (e.g., `await workflow.execute_activity(...)`) instead of calling `operator_run.execute(context)` directly. Activities should be registered upfront with the worker (as done in main.py) and invoked by name/reference from the workflow.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| registry, | ||
| ) | ||
|
|
||
| # start BookFlightAgentWorkflow |
There was a problem hiding this comment.
💡 Quality: Stale comment references BookFlightAgentWorkflow
Line 29 of main.py contains the comment # start BookFlightAgentWorkflow which appears to be copy-pasted from another example and is unrelated to the Airflow DAG worker.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Code Review
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
What changed?
Why?
How did you test it?
Potential risks
Release notes
Documentation Changes