Skip to content

[FLINK-39165] Prevent concurrent FlinkSessionJob / FlinkDeployment update#1065

Open
mielientiev wants to merge 3 commits into
apache:mainfrom
mielientiev:FLINK-39165
Open

[FLINK-39165] Prevent concurrent FlinkSessionJob / FlinkDeployment update#1065
mielientiev wants to merge 3 commits into
apache:mainfrom
mielientiev:FLINK-39165

Conversation

@mielientiev
Copy link
Copy Markdown

@mielientiev mielientiev commented Feb 26, 2026

What is the purpose of the change

https://issues.apache.org/jira/browse/FLINK-39165

When a user updates both a FlinkDeployment (session cluster) and its FlinkSessionJob at the same time, both resources get reconciled concurrently. The deployment deletes and recreates the cluster while the session job tries to take a savepoint and resubmit, at some point job will be "lost" and "Job Not Found error", or submitted several times

Brief change log

  • SessionJobReconciler.sessionClusterReady(): Added reconciliation state check on top of the existing JM READY check, state must be DEPLOYED or ROLLED_BACK
  • SessionJobReconciler.noDeploymentChangesPending(): New method called only from readyToReconcile(). Blocks when metadata.generation != status.observedGeneration, meaning a deployment spec change is pending but the reconciler hasn't picked it up yet. This is separated from sessionClusterReady() so it does not affect cleanup path.
  • SessionReconciler.readyToReconcile(): When the deployment has a pending spec change, checks if any of its session jobs are mid-upgrade (UPGRADING or ROLLING_BACK). If so, waits for them to finish before updating the cluster. Session jobs that haven't been deployed yet are skipped to avoid a deadlock where the cluster waits for the job and the job waits for the cluster.

Verifying this change

Added a few unit tests, as well as tested manually: deployed a session cluster with a session job and HA enabled, changed both specs at the same time, confirmed the session job waited for the cluster to finish upgrading and then upgraded itself with a savepoint on the new stable cluster. Checked that deletion is working as expected as well.

Test Scenario:

1 session cluster FlinkDeployment and 3 FlinkSessionJob-s

  1. New deployment (first time using helm install). Logs: https://pastebin.com/HutrD3F2
  2. Concurrent update. E.g. bumped restartNonce for all FlinkSessionJob and FlinkDeployment
    (via helm upgrade). Logs: https://pastebin.com/xuPdS8kQ
  3. Deletion (e.g. helm uninstall), all jobs were deleted in correct order, first FlinkSessionJobs, then FlinkDeployment: https://pastebin.com/GyuPnwg0

Does this pull request potentially affect one of the following parts

  • Dependencies: no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: no
  • Core observer or reconciler logic that is regularly executed: yes. sessionClusterReady() runs on every session job reconciliation and readyToReconcile() runs on every session deployment reconciliation. The session job lookup on the deployment side only happens when the deployment has a pending spec change.

Documentation

Does this pull request introduce a new feature? no

@mielientiev mielientiev marked this pull request as draft February 27, 2026 05:23
@mielientiev mielientiev marked this pull request as ready for review February 27, 2026 08:55
@mielientiev
Copy link
Copy Markdown
Author

@gyfora could you please review when you have time. Thanks!

@mielientiev
Copy link
Copy Markdown
Author

Hi @davidradl @gyfora, just a gentle reminder to review this PR when you have a moment

Copy link
Copy Markdown
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think overall the idea is very good. I am a bit hesitant about blocking session upgrades when jobs are transitioning, unless we can guarantee that we can make progress eventually (like a session job will never stay in UPGRADING state). Maybe that's already the case, worth double checking.

Also please double check that the observed generation is always updated for any spec/flinkdeployment change.

return false;
}
var state = reconStatus.getState();
return state == ReconciliationState.UPGRADING || state == ReconciliationState.ROLLING_BACK;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any case where a session job may get stuck in upgrading/rolling_back state? This check is quite tricky as it may cause a "deadlock" situation with the session upgrade.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point! I think there's a possibility when something goes wrong and session job can be stuck in upgrading/rolling_back state and basically meaning that until the job won't be fixed/deleted, the cluster can be stuck.

What is in your opinion is the better / safer way to fix this case?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The safer way is to never prevent session deployment updates even if a job is upgrading. We can prevent jobs while the session is upgrading but not the other way around. Simply remove the logic from the SessionReconciler

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested this end-to-end against with the change scoped to SessionJobReconciler only, with both upgradeMode: stateless and upgradeMode: savepoint (HA + persistent checkpoint storage), and it behaves correctly, with no Connection refused / JobNotFoundException received. The savepoint falls back cleanly to last-state when needed. So keeping the support as part of SessionJobReconciler should be all good.

The asymmetry is mostly fine in practice: stateful jobs running with HA + checkpointing recover transparently via the savepoint -> last-state fallback. One case worth flagging though:

  • Stateful jobs without HA: In the rare ordering where a session-job upgrade is already in flight when the FlinkDeployment starts upgrading, last-state has nothing to fallback to and the job can end up parked in waiting for upgradeable state. Probably best treated as a documented prerequisite (HA + checkpointing for stateful session jobs) rather than something to gate the deployment side on. This case I can also address & document as part of FLINK-39571.

One small follow-up suggestion: it would help future readers if SessionReconciler#readyToReconcile will carry a short JavaDoc explaining why the FlinkDeployment side intentionally does not gate on FlinkSessionJob state as this asymmetry is deliberate (deployment updates must never be blocked by job upgrades, and the reverse coupling is the only one needed to close the race).

Comment on lines +228 to +229
flinkdep.getMetadata().getGeneration(),
flinkdep.getStatus().getObservedGeneration())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be worth double checking the the observedGeneration is always correctly updated (even for ignore/non-upgrade changes such as labels/operator configs etc). Otherwise we could easily block session job deletion

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked this and for ignore/non-upgrade changes (labels/operator configs) I see that observedGeneration is updated as well.

return false;
}
var state = reconStatus.getState();
return state == ReconciliationState.UPGRADING || state == ReconciliationState.ROLLING_BACK;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested this end-to-end against with the change scoped to SessionJobReconciler only, with both upgradeMode: stateless and upgradeMode: savepoint (HA + persistent checkpoint storage), and it behaves correctly, with no Connection refused / JobNotFoundException received. The savepoint falls back cleanly to last-state when needed. So keeping the support as part of SessionJobReconciler should be all good.

The asymmetry is mostly fine in practice: stateful jobs running with HA + checkpointing recover transparently via the savepoint -> last-state fallback. One case worth flagging though:

  • Stateful jobs without HA: In the rare ordering where a session-job upgrade is already in flight when the FlinkDeployment starts upgrading, last-state has nothing to fallback to and the job can end up parked in waiting for upgradeable state. Probably best treated as a documented prerequisite (HA + checkpointing for stateful session jobs) rather than something to gate the deployment side on. This case I can also address & document as part of FLINK-39571.

One small follow-up suggestion: it would help future readers if SessionReconciler#readyToReconcile will carry a short JavaDoc explaining why the FlinkDeployment side intentionally does not gate on FlinkSessionJob state as this asymmetry is deliberate (deployment updates must never be blocked by job upgrades, and the reverse coupling is the only one needed to close the race).

Comment on lines +210 to +220
/**
* Checks that the FlinkDeployment has no pending spec changes
*
* <p>When they differ, the deployment spec was changed but the operator hasn't acted yet. The
* cluster may still appear healthy (JM READY, state DEPLOYED), but it is about to be updated
* (e.g. deleted and recreated). Allowing a session job to start upgrading in this window would
* risk the savepoint being destroyed during the cluster rebuild.
*
* <p>This check is only used in {@link #readyToReconcile}, not in {@link #sessionClusterReady},
* so it does not block cleanup or FlinkService creation.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest here to mention that this check is intended to cover a very early & incipient upgrade stage of the session cluster:

Suggested change
/**
* Checks that the FlinkDeployment has no pending spec changes
*
* <p>When they differ, the deployment spec was changed but the operator hasn't acted yet. The
* cluster may still appear healthy (JM READY, state DEPLOYED), but it is about to be updated
* (e.g. deleted and recreated). Allowing a session job to start upgrading in this window would
* risk the savepoint being destroyed during the cluster rebuild.
*
* <p>This check is only used in {@link #readyToReconcile}, not in {@link #sessionClusterReady},
* so it does not block cleanup or FlinkService creation.
*/
/**
* Checks that the FlinkDeployment has no pending spec changes, i.e. that {@code
* metadata.generation} equals {@code status.observedGeneration}.
*
* <p>This guard targets the earliest, most incipient stage of a session-cluster upgrade: the
* brief window between the user submitting a new FlinkDeployment spec and the
* FlinkDeploymentController observing it and starting to act on it. During that window the
* cluster still appears healthy from every other angle (JM READY, reconciliation state
* DEPLOYED), so {@link #sessionClusterReady} alone would let a session-job upgrade through.
* Allowing a savepoint upgrade to start here would risk the savepoint being destroyed during
* the cluster rebuild.
*
* <p>This check is only used in {@link #readyToReconcile}, not in {@link #sessionClusterReady},
* so it does not block cleanup of {@link
* org.apache.flink.kubernetes.operator.service.FlinkService} creation.
*/

* so it does not block cleanup or FlinkService creation.
*/
private static boolean noDeploymentChangesPending(
Optional<FlinkDeployment> flinkDeploymentOpt) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit: replace Optional<FlinkDeployment> with a nullable FlinkDeployment parameter. Optional is meant for return values, not parameters (per Effective Java Item 55, and IntelliJ flags it).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants