Skip to content

DRAFT: feat(airflow): support dag running with cadence#96

Open
shijiesheng wants to merge 2 commits intocadence-workflow:mainfrom
shijiesheng:airflow
Open

DRAFT: feat(airflow): support dag running with cadence#96
shijiesheng wants to merge 2 commits intocadence-workflow:mainfrom
shijiesheng:airflow

Conversation

@shijiesheng
Copy link
Copy Markdown
Member

What changed?

Why?

How did you test it?

Potential risks

Release notes

Documentation Changes

Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
Comment on lines +118 to +132
try:
logger.info("Executing task %s", tid)

# use activity
operator_run = operator_as_activity(task)
result = await operator_run.execute(context)
results[tid] = result
task_states[tid] = TaskState.SUCCESS
logger.info("Task %s succeeded", tid)
except Exception as exc:
task_states[tid] = TaskState.FAILED
errors[tid] = exc
logger.error("Task %s failed: %s", tid, exc)
finally:
done_events[tid].set()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: CancelledError from TaskGroup leaves tasks in unknown state

When one task fails inside the asyncio.TaskGroup, the group cancels all other in-flight tasks by raising asyncio.CancelledError. Since the handler uses except Exception (line 127), CancelledError (a BaseException) bypasses it, so task_states[tid] is never set for cancelled tasks. This means:

  1. The task_states dict will be missing entries for cancelled tasks.
  2. The subsequent task_states return value (line 144) will be incomplete.
  3. Any downstream code referencing task_states[uid] (line 101) for a cancelled task would raise KeyError if the DAG were re-entered.
  4. The TaskGroup itself will raise an ExceptionGroup wrapping the original error, which is different from the RuntimeError the code intends to raise at line 140-142 — that code is actually unreachable because TaskGroup.__aexit__ re-raises first.

The error-handling logic after the TaskGroup block (lines 138-142) is dead code — TaskGroup will always propagate failures as an ExceptionGroup before reaching it.

Suggested fix:

Either:
1. Catch `BaseException` instead of `Exception` to handle `CancelledError`, and re-raise it after updating state, OR
2. Don't use `TaskGroup` — use `asyncio.gather(return_exceptions=True)` which won't cancel siblings on failure and lets you collect all results/errors yourself.

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

Comment on lines +122 to +123
operator_run = operator_as_activity(task)
result = await operator_run.execute(context)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: operator_as_activity wraps execute but doesn't call it as activity

In cadence_dag.py line 122-123, the code creates an ActivityDefinition via operator_as_activity(task) but then directly calls operator_run.execute(context). This executes the operator inline in the workflow coroutine rather than scheduling it as a Cadence activity. The whole point of wrapping operators as activities is to get Cadence's retry, timeout, and distributed execution semantics. The activity should be invoked through the Cadence workflow activity scheduling mechanism (e.g., workflow.execute_activity), not called directly.

Additionally, the activities are created dynamically inside the workflow but never registered with the worker's registry, so even if they were scheduled properly, the worker wouldn't know how to execute them.

Suggested fix:

Use the Cadence workflow SDK's activity invocation mechanism (e.g., `await workflow.execute_activity(...)`) instead of calling `operator_run.execute(context)` directly. Activities should be registered upfront with the worker (as done in main.py) and invoked by name/reference from the workflow.

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

registry,
)

# start BookFlightAgentWorkflow
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Quality: Stale comment references BookFlightAgentWorkflow

Line 29 of main.py contains the comment # start BookFlightAgentWorkflow which appears to be copy-pasted from another example and is unrelated to the Airflow DAG worker.

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented May 1, 2026

Code Review ⚠️ Changes requested 0 resolved / 3 findings

Integrates Airflow DAGs with Cadence workflows, but contains critical issues including unhandled CancelledError state leakage and misconfigured activity execution in the operator wrapper. Stale documentation comments also require cleanup.

⚠️ Bug: CancelledError from TaskGroup leaves tasks in unknown state

📄 cadence/contrib/airflow/cadence_dag.py:118-132

When one task fails inside the asyncio.TaskGroup, the group cancels all other in-flight tasks by raising asyncio.CancelledError. Since the handler uses except Exception (line 127), CancelledError (a BaseException) bypasses it, so task_states[tid] is never set for cancelled tasks. This means:

  1. The task_states dict will be missing entries for cancelled tasks.
  2. The subsequent task_states return value (line 144) will be incomplete.
  3. Any downstream code referencing task_states[uid] (line 101) for a cancelled task would raise KeyError if the DAG were re-entered.
  4. The TaskGroup itself will raise an ExceptionGroup wrapping the original error, which is different from the RuntimeError the code intends to raise at line 140-142 — that code is actually unreachable because TaskGroup.__aexit__ re-raises first.

The error-handling logic after the TaskGroup block (lines 138-142) is dead code — TaskGroup will always propagate failures as an ExceptionGroup before reaching it.

Suggested fix
Either:
1. Catch `BaseException` instead of `Exception` to handle `CancelledError`, and re-raise it after updating state, OR
2. Don't use `TaskGroup` — use `asyncio.gather(return_exceptions=True)` which won't cancel siblings on failure and lets you collect all results/errors yourself.
⚠️ Bug: operator_as_activity wraps execute but doesn't call it as activity

📄 cadence/contrib/airflow/cadence_dag.py:122-123 📄 cadence/contrib/airflow/cadence_dag.py:149-154

In cadence_dag.py line 122-123, the code creates an ActivityDefinition via operator_as_activity(task) but then directly calls operator_run.execute(context). This executes the operator inline in the workflow coroutine rather than scheduling it as a Cadence activity. The whole point of wrapping operators as activities is to get Cadence's retry, timeout, and distributed execution semantics. The activity should be invoked through the Cadence workflow activity scheduling mechanism (e.g., workflow.execute_activity), not called directly.

Additionally, the activities are created dynamically inside the workflow but never registered with the worker's registry, so even if they were scheduled properly, the worker wouldn't know how to execute them.

Suggested fix
Use the Cadence workflow SDK's activity invocation mechanism (e.g., `await workflow.execute_activity(...)`) instead of calling `operator_run.execute(context)` directly. Activities should be registered upfront with the worker (as done in main.py) and invoked by name/reference from the workflow.
💡 Quality: Stale comment references BookFlightAgentWorkflow

📄 cadence/contrib/airflow/main.py:29

Line 29 of main.py contains the comment # start BookFlightAgentWorkflow which appears to be copy-pasted from another example and is unrelated to the Airflow DAG worker.

🤖 Prompt for agents
Code Review: Integrates Airflow DAGs with Cadence workflows, but contains critical issues including unhandled CancelledError state leakage and misconfigured activity execution in the operator wrapper. Stale documentation comments also require cleanup.

1. ⚠️ Bug: CancelledError from TaskGroup leaves tasks in unknown state
   Files: cadence/contrib/airflow/cadence_dag.py:118-132

   When one task fails inside the `asyncio.TaskGroup`, the group cancels all other in-flight tasks by raising `asyncio.CancelledError`. Since the handler uses `except Exception` (line 127), `CancelledError` (a `BaseException`) bypasses it, so `task_states[tid]` is never set for cancelled tasks. This means:
   
   1. The `task_states` dict will be missing entries for cancelled tasks.
   2. The subsequent `task_states` return value (line 144) will be incomplete.
   3. Any downstream code referencing `task_states[uid]` (line 101) for a cancelled task would raise `KeyError` if the DAG were re-entered.
   4. The `TaskGroup` itself will raise an `ExceptionGroup` wrapping the original error, which is different from the `RuntimeError` the code intends to raise at line 140-142 — that code is actually unreachable because `TaskGroup.__aexit__` re-raises first.
   
   The error-handling logic after the `TaskGroup` block (lines 138-142) is dead code — `TaskGroup` will always propagate failures as an `ExceptionGroup` before reaching it.

   Suggested fix:
   Either:
   1. Catch `BaseException` instead of `Exception` to handle `CancelledError`, and re-raise it after updating state, OR
   2. Don't use `TaskGroup` — use `asyncio.gather(return_exceptions=True)` which won't cancel siblings on failure and lets you collect all results/errors yourself.

2. ⚠️ Bug: operator_as_activity wraps execute but doesn't call it as activity
   Files: cadence/contrib/airflow/cadence_dag.py:122-123, cadence/contrib/airflow/cadence_dag.py:149-154

   In `cadence_dag.py` line 122-123, the code creates an `ActivityDefinition` via `operator_as_activity(task)` but then directly calls `operator_run.execute(context)`. This executes the operator inline in the workflow coroutine rather than scheduling it as a Cadence activity. The whole point of wrapping operators as activities is to get Cadence's retry, timeout, and distributed execution semantics. The activity should be invoked through the Cadence workflow activity scheduling mechanism (e.g., `workflow.execute_activity`), not called directly.
   
   Additionally, the activities are created dynamically inside the workflow but never registered with the worker's registry, so even if they were scheduled properly, the worker wouldn't know how to execute them.

   Suggested fix:
   Use the Cadence workflow SDK's activity invocation mechanism (e.g., `await workflow.execute_activity(...)`) instead of calling `operator_run.execute(context)` directly. Activities should be registered upfront with the worker (as done in main.py) and invoked by name/reference from the workflow.

3. 💡 Quality: Stale comment references BookFlightAgentWorkflow
   Files: cadence/contrib/airflow/main.py:29

   Line 29 of `main.py` contains the comment `# start BookFlightAgentWorkflow` which appears to be copy-pasted from another example and is unrelated to the Airflow DAG worker.

Options

Auto-apply is off → Gitar will not commit updates to this branch.
Display: compact → Showing less information.

Comment with these commands to change:

Auto-apply Compact
gitar auto-apply:on         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant