[FLINK-39165] Prevent concurrent FlinkSessionJob / FlinkDeployment update#1065
[FLINK-39165] Prevent concurrent FlinkSessionJob / FlinkDeployment update#1065mielientiev wants to merge 3 commits into
Conversation
|
@gyfora could you please review when you have time. Thanks! |
|
Hi @davidradl @gyfora, just a gentle reminder to review this PR when you have a moment |
gyfora
left a comment
There was a problem hiding this comment.
I think overall the idea is very good. I am a bit hesitant about blocking session upgrades when jobs are transitioning, unless we can guarantee that we can make progress eventually (like a session job will never stay in UPGRADING state). Maybe that's already the case, worth double checking.
Also please double check that the observed generation is always updated for any spec/flinkdeployment change.
| return false; | ||
| } | ||
| var state = reconStatus.getState(); | ||
| return state == ReconciliationState.UPGRADING || state == ReconciliationState.ROLLING_BACK; |
There was a problem hiding this comment.
Is there any case where a session job may get stuck in upgrading/rolling_back state? This check is quite tricky as it may cause a "deadlock" situation with the session upgrade.
There was a problem hiding this comment.
That's a good point! I think there's a possibility when something goes wrong and session job can be stuck in upgrading/rolling_back state and basically meaning that until the job won't be fixed/deleted, the cluster can be stuck.
What is in your opinion is the better / safer way to fix this case?
There was a problem hiding this comment.
The safer way is to never prevent session deployment updates even if a job is upgrading. We can prevent jobs while the session is upgrading but not the other way around. Simply remove the logic from the SessionReconciler
There was a problem hiding this comment.
I tested this end-to-end against with the change scoped to SessionJobReconciler only, with both upgradeMode: stateless and upgradeMode: savepoint (HA + persistent checkpoint storage), and it behaves correctly, with no Connection refused / JobNotFoundException received. The savepoint falls back cleanly to last-state when needed. So keeping the support as part of SessionJobReconciler should be all good.
The asymmetry is mostly fine in practice: stateful jobs running with HA + checkpointing recover transparently via the savepoint -> last-state fallback. One case worth flagging though:
- Stateful jobs without HA: In the rare ordering where a session-job upgrade is already in flight when the FlinkDeployment starts upgrading,
last-statehas nothing to fallback to and the job can end up parked in waiting for upgradeable state. Probably best treated as a documented prerequisite (HA + checkpointing for stateful session jobs) rather than something to gate the deployment side on. This case I can also address & document as part of FLINK-39571.
One small follow-up suggestion: it would help future readers if SessionReconciler#readyToReconcile will carry a short JavaDoc explaining why the FlinkDeployment side intentionally does not gate on FlinkSessionJob state as this asymmetry is deliberate (deployment updates must never be blocked by job upgrades, and the reverse coupling is the only one needed to close the race).
| flinkdep.getMetadata().getGeneration(), | ||
| flinkdep.getStatus().getObservedGeneration())) { |
There was a problem hiding this comment.
I think it would be worth double checking the the observedGeneration is always correctly updated (even for ignore/non-upgrade changes such as labels/operator configs etc). Otherwise we could easily block session job deletion
There was a problem hiding this comment.
I checked this and for ignore/non-upgrade changes (labels/operator configs) I see that observedGeneration is updated as well.
| return false; | ||
| } | ||
| var state = reconStatus.getState(); | ||
| return state == ReconciliationState.UPGRADING || state == ReconciliationState.ROLLING_BACK; |
There was a problem hiding this comment.
I tested this end-to-end against with the change scoped to SessionJobReconciler only, with both upgradeMode: stateless and upgradeMode: savepoint (HA + persistent checkpoint storage), and it behaves correctly, with no Connection refused / JobNotFoundException received. The savepoint falls back cleanly to last-state when needed. So keeping the support as part of SessionJobReconciler should be all good.
The asymmetry is mostly fine in practice: stateful jobs running with HA + checkpointing recover transparently via the savepoint -> last-state fallback. One case worth flagging though:
- Stateful jobs without HA: In the rare ordering where a session-job upgrade is already in flight when the FlinkDeployment starts upgrading,
last-statehas nothing to fallback to and the job can end up parked in waiting for upgradeable state. Probably best treated as a documented prerequisite (HA + checkpointing for stateful session jobs) rather than something to gate the deployment side on. This case I can also address & document as part of FLINK-39571.
One small follow-up suggestion: it would help future readers if SessionReconciler#readyToReconcile will carry a short JavaDoc explaining why the FlinkDeployment side intentionally does not gate on FlinkSessionJob state as this asymmetry is deliberate (deployment updates must never be blocked by job upgrades, and the reverse coupling is the only one needed to close the race).
| /** | ||
| * Checks that the FlinkDeployment has no pending spec changes | ||
| * | ||
| * <p>When they differ, the deployment spec was changed but the operator hasn't acted yet. The | ||
| * cluster may still appear healthy (JM READY, state DEPLOYED), but it is about to be updated | ||
| * (e.g. deleted and recreated). Allowing a session job to start upgrading in this window would | ||
| * risk the savepoint being destroyed during the cluster rebuild. | ||
| * | ||
| * <p>This check is only used in {@link #readyToReconcile}, not in {@link #sessionClusterReady}, | ||
| * so it does not block cleanup or FlinkService creation. | ||
| */ |
There was a problem hiding this comment.
I would suggest here to mention that this check is intended to cover a very early & incipient upgrade stage of the session cluster:
| /** | |
| * Checks that the FlinkDeployment has no pending spec changes | |
| * | |
| * <p>When they differ, the deployment spec was changed but the operator hasn't acted yet. The | |
| * cluster may still appear healthy (JM READY, state DEPLOYED), but it is about to be updated | |
| * (e.g. deleted and recreated). Allowing a session job to start upgrading in this window would | |
| * risk the savepoint being destroyed during the cluster rebuild. | |
| * | |
| * <p>This check is only used in {@link #readyToReconcile}, not in {@link #sessionClusterReady}, | |
| * so it does not block cleanup or FlinkService creation. | |
| */ | |
| /** | |
| * Checks that the FlinkDeployment has no pending spec changes, i.e. that {@code | |
| * metadata.generation} equals {@code status.observedGeneration}. | |
| * | |
| * <p>This guard targets the earliest, most incipient stage of a session-cluster upgrade: the | |
| * brief window between the user submitting a new FlinkDeployment spec and the | |
| * FlinkDeploymentController observing it and starting to act on it. During that window the | |
| * cluster still appears healthy from every other angle (JM READY, reconciliation state | |
| * DEPLOYED), so {@link #sessionClusterReady} alone would let a session-job upgrade through. | |
| * Allowing a savepoint upgrade to start here would risk the savepoint being destroyed during | |
| * the cluster rebuild. | |
| * | |
| * <p>This check is only used in {@link #readyToReconcile}, not in {@link #sessionClusterReady}, | |
| * so it does not block cleanup of {@link | |
| * org.apache.flink.kubernetes.operator.service.FlinkService} creation. | |
| */ |
| * so it does not block cleanup or FlinkService creation. | ||
| */ | ||
| private static boolean noDeploymentChangesPending( | ||
| Optional<FlinkDeployment> flinkDeploymentOpt) { |
There was a problem hiding this comment.
Minor nit: replace Optional<FlinkDeployment> with a nullable FlinkDeployment parameter. Optional is meant for return values, not parameters (per Effective Java Item 55, and IntelliJ flags it).
What is the purpose of the change
https://issues.apache.org/jira/browse/FLINK-39165
When a user updates both a
FlinkDeployment(session cluster) and itsFlinkSessionJobat the same time, both resources get reconciled concurrently. The deployment deletes and recreates the cluster while the session job tries to take a savepoint and resubmit, at some point job will be "lost" and "Job Not Found error", or submitted several timesBrief change log
SessionJobReconciler.sessionClusterReady(): Added reconciliation state check on top of the existing JM READY check, state must beDEPLOYEDorROLLED_BACKSessionJobReconciler.noDeploymentChangesPending(): New method called only fromreadyToReconcile(). Blocks whenmetadata.generation != status.observedGeneration, meaning a deployment spec change is pending but the reconciler hasn't picked it up yet. This is separated fromsessionClusterReady()so it does not affect cleanup path.SessionReconciler.readyToReconcile(): When the deployment has a pending spec change, checks if any of its session jobs are mid-upgrade (UPGRADINGorROLLING_BACK). If so, waits for them to finish before updating the cluster. Session jobs that haven't been deployed yet are skipped to avoid a deadlock where the cluster waits for the job and the job waits for the cluster.Verifying this change
Added a few unit tests, as well as tested manually: deployed a session cluster with a session job and HA enabled, changed both specs at the same time, confirmed the session job waited for the cluster to finish upgrading and then upgraded itself with a savepoint on the new stable cluster. Checked that deletion is working as expected as well.
Test Scenario:
1 session cluster
FlinkDeploymentand 3FlinkSessionJob-shelm install). Logs: https://pastebin.com/HutrD3F2restartNoncefor allFlinkSessionJobandFlinkDeployment(via
helm upgrade). Logs: https://pastebin.com/xuPdS8kQhelm uninstall), all jobs were deleted in correct order, firstFlinkSessionJobs, thenFlinkDeployment: https://pastebin.com/GyuPnwg0Does this pull request potentially affect one of the following parts
sessionClusterReady()runs on every session job reconciliation andreadyToReconcile()runs on every session deployment reconciliation. The session job lookup on the deployment side only happens when the deployment has a pending spec change.Documentation
Does this pull request introduce a new feature? no