Skip to content

Commit c0801d4

Browse files
authored
Move direct engine state lifecycle to top-level callers (#4928)
## Changes - Move state lifecycle (`Open`/`Finalize`) from internal methods to top-level command callers, so state is opened exactly once per command. Double-open panics. - Script-based `bundle run` no longer pulls remote state it doesn't need. - Skip the `/api/2.0/preview/scim/v2/Me` call during `bundle run` when only `ErrorOnEmptyState` is set. - Skip state file creation when deploy/destroy plans are empty. ## Why The previous pattern hid `Open`/`Finalize` inside low-level methods, making it hard to reason about when state was opened, whether it would be written, and whether double-opens could occur. Lifting the lifecycle to callers makes the invariant obvious and enforceable, and avoids unnecessary state I/O and API calls in read-only flows. Preparation for WAL & metadata service. ## Tests - Unit tests for `DeploymentState`: open/save/finalize round-trip, panic on double open, delete tracking. - Existing acceptance tests cover deploy, destroy, `bundle run`, and config-remote-sync flows.
1 parent 1a1b85a commit c0801d4

19 files changed

Lines changed: 272 additions & 215 deletions

bundle/configsync/diff.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,19 @@ func DetectChanges(ctx context.Context, b *bundle.Bundle, engine engine.EngineTy
127127
return nil, fmt.Errorf("state snapshot not available: %w", err)
128128
}
129129

130-
deployBundle := &direct.DeploymentBundle{}
131-
var statePath string
130+
var deployBundle *direct.DeploymentBundle
132131
if engine.IsDirect() {
133-
_, statePath = b.StateFilenameDirect(ctx)
132+
// For direct engine, state is already opened by the caller (process.go).
133+
deployBundle = &b.DeploymentBundle
134134
} else {
135-
_, statePath = b.StateFilenameConfigSnapshot(ctx)
135+
deployBundle = &direct.DeploymentBundle{}
136+
_, statePath := b.StateFilenameConfigSnapshot(ctx)
137+
if err := deployBundle.StateDB.Open(statePath); err != nil {
138+
return nil, fmt.Errorf("failed to open state: %w", err)
139+
}
136140
}
137141

138-
plan, err := deployBundle.CalculatePlan(ctx, b.WorkspaceClient(), &b.Config, statePath)
142+
plan, err := deployBundle.CalculatePlan(ctx, b.WorkspaceClient(), &b.Config)
139143
if err != nil {
140144
return nil, fmt.Errorf("failed to calculate plan: %w", err)
141145
}

bundle/deploy/terraform/check_dashboards_modified_remotely.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@ func collectDashboardsFromState(ctx context.Context, b *bundle.Bundle, directDep
2121
var state ExportedResourcesMap
2222
var err error
2323
if directDeployment {
24-
_, localPath := b.StateFilenameDirect(ctx)
25-
state, err = b.DeploymentBundle.ExportState(ctx, localPath)
24+
state = b.DeploymentBundle.ExportState(ctx)
2625
} else {
2726
state, err = ParseResourcesState(ctx, b)
28-
}
29-
if err != nil {
30-
return nil, err
27+
if err != nil {
28+
return nil, err
29+
}
3130
}
3231

3332
var dashboards []dashboardState

bundle/direct/bind.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac
9595
return nil, err
9696
}
9797

98-
// Finalize to write temp state to disk so CalculatePlan can read it
98+
// Finalize to persist temp state to disk
9999
err = b.StateDB.Finalize()
100100
if err != nil {
101101
os.Remove(tmpStatePath)
@@ -105,7 +105,7 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac
105105
log.Infof(ctx, "Bound %s to id=%s (in temp state)", resourceKey, resourceID)
106106

107107
// First plan + update: populate state with resolved config
108-
plan, err := b.CalculatePlan(ctx, client, configRoot, tmpStatePath)
108+
plan, err := b.CalculatePlan(ctx, client, configRoot)
109109
if err != nil {
110110
os.Remove(tmpStatePath)
111111
return nil, err
@@ -146,7 +146,7 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac
146146
}
147147

148148
// Second plan: this is the plan to present to the user (change between remote resource and config)
149-
plan, err = b.CalculatePlan(ctx, client, configRoot, tmpStatePath)
149+
plan, err = b.CalculatePlan(ctx, client, configRoot)
150150
if err != nil {
151151
os.Remove(tmpStatePath)
152152
return nil, err

bundle/direct/bundle_apply.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,6 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa
151151

152152
return true
153153
})
154-
155-
// This must run even if deploy failed:
156-
err = b.StateDB.Finalize()
157-
if err != nil {
158-
logdiag.LogError(ctx, err)
159-
}
160154
}
161155

162156
func (b *DeploymentBundle) LookupReferencePostDeploy(ctx context.Context, path *structpath.PathNode) (any, error) {

bundle/direct/bundle_plan.go

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"errors"
77
"fmt"
88
"maps"
9-
"os"
109
"reflect"
1110
"slices"
1211
"strings"
@@ -42,22 +41,14 @@ func (b *DeploymentBundle) init(client *databricks.WorkspaceClient) error {
4241
// ValidatePlanAgainstState validates that a plan's lineage and serial match the current state.
4342
// This should be called early in the deployment process, before any file operations.
4443
// If the plan has no lineage (first deployment), validation is skipped.
45-
func ValidatePlanAgainstState(statePath string, plan *deployplan.Plan) error {
44+
func ValidatePlanAgainstState(stateDB *dstate.DeploymentState, plan *deployplan.Plan) error {
4645
// If plan has no lineage, this is a first deployment before any state exists
4746
// No validation needed
4847
if plan.Lineage == "" {
4948
return nil
5049
}
5150

52-
var stateDB dstate.DeploymentState
53-
err := stateDB.Open(statePath)
54-
if err != nil {
55-
// If state file doesn't exist but plan has lineage, something is wrong
56-
if os.IsNotExist(err) {
57-
return fmt.Errorf("plan has lineage %q but state file does not exist at %s; the state may have been deleted", plan.Lineage, statePath)
58-
}
59-
return fmt.Errorf("reading state from %s: %w", statePath, err)
60-
}
51+
stateDB.AssertOpened()
6152

6253
// Validate that the plan's lineage matches the current state's lineage
6354
if plan.Lineage != stateDB.Data.Lineage {
@@ -74,13 +65,10 @@ func ValidatePlanAgainstState(statePath string, plan *deployplan.Plan) error {
7465

7566
// InitForApply initializes the DeploymentBundle for applying a pre-computed plan.
7667
// This is used when --plan is specified to skip the planning phase.
77-
func (b *DeploymentBundle) InitForApply(ctx context.Context, client *databricks.WorkspaceClient, statePath string, plan *deployplan.Plan) error {
78-
err := b.StateDB.Open(statePath)
79-
if err != nil {
80-
return fmt.Errorf("reading state from %s: %w", statePath, err)
81-
}
68+
func (b *DeploymentBundle) InitForApply(ctx context.Context, client *databricks.WorkspaceClient, plan *deployplan.Plan) error {
69+
b.StateDB.AssertOpened()
8270

83-
err = b.init(client)
71+
err := b.init(client)
8472
if err != nil {
8573
return err
8674
}
@@ -110,13 +98,10 @@ func (b *DeploymentBundle) InitForApply(ctx context.Context, client *databricks.
11098
return nil
11199
}
112100

113-
func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks.WorkspaceClient, configRoot *config.Root, statePath string) (*deployplan.Plan, error) {
114-
err := b.StateDB.Open(statePath)
115-
if err != nil {
116-
return nil, fmt.Errorf("reading state from %s: %w", statePath, err)
117-
}
101+
func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks.WorkspaceClient, configRoot *config.Root) (*deployplan.Plan, error) {
102+
b.StateDB.AssertOpened()
118103

119-
err = b.init(client)
104+
err := b.init(client)
120105
if err != nil {
121106
return nil, err
122107
}

bundle/direct/dstate/state.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,7 @@ func (db *DeploymentState) Open(path string) error {
113113
defer db.mu.Unlock()
114114

115115
if db.Path != "" {
116-
if db.Path == path {
117-
return nil
118-
}
119-
return fmt.Errorf("already read state %v, cannot open %v", db.Path, path)
116+
panic(fmt.Sprintf("state already opened: %v, cannot open %v", db.Path, path))
120117
}
121118

122119
data, err := os.ReadFile(path)

bundle/direct/dstate/state_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package dstate
2+
3+
import (
4+
"path/filepath"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestOpenSaveFinalizeRoundTrip(t *testing.T) {
12+
path := filepath.Join(t.TempDir(), "state.json")
13+
14+
var db DeploymentState
15+
require.NoError(t, db.Open(path))
16+
17+
require.NoError(t, db.SaveState("jobs.my_job", "123", map[string]string{"key": "val"}, nil))
18+
require.NoError(t, db.Finalize())
19+
20+
// Re-open and verify persisted data.
21+
var db2 DeploymentState
22+
require.NoError(t, db2.Open(path))
23+
assert.Equal(t, 1, db2.Data.Serial)
24+
assert.Equal(t, "123", db2.GetResourceID("jobs.my_job"))
25+
}
26+
27+
func TestPanicOnDoubleOpen(t *testing.T) {
28+
path := filepath.Join(t.TempDir(), "state.json")
29+
30+
var db DeploymentState
31+
require.NoError(t, db.Open(path))
32+
33+
assert.Panics(t, func() {
34+
_ = db.Open(path)
35+
})
36+
}
37+
38+
func TestDeleteState(t *testing.T) {
39+
path := filepath.Join(t.TempDir(), "state.json")
40+
41+
var db DeploymentState
42+
require.NoError(t, db.Open(path))
43+
require.NoError(t, db.SaveState("jobs.my_job", "123", map[string]string{}, nil))
44+
require.NoError(t, db.Finalize())
45+
46+
require.NoError(t, db.DeleteState("jobs.my_job"))
47+
require.NoError(t, db.Finalize())
48+
49+
var db2 DeploymentState
50+
require.NoError(t, db2.Open(path))
51+
assert.Equal(t, 2, db2.Data.Serial)
52+
assert.Equal(t, "", db2.GetResourceID("jobs.my_job"))
53+
}

bundle/direct/pkg.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,7 @@ func (d *DeploymentUnit) SetRemoteState(remoteState any) error {
6464
return nil
6565
}
6666

67-
func (b *DeploymentBundle) ExportState(ctx context.Context, path string) (resourcestate.ExportedResourcesMap, error) {
68-
err := b.StateDB.Open(path)
69-
if err != nil {
70-
return nil, err
71-
}
72-
return b.StateDB.ExportState(ctx), nil
67+
func (b *DeploymentBundle) ExportState(ctx context.Context) resourcestate.ExportedResourcesMap {
68+
b.StateDB.AssertOpened()
69+
return b.StateDB.ExportState(ctx)
7370
}

bundle/phases/deploy.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,13 @@ func deployCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, ta
104104

105105
if targetEngine.IsDirect() {
106106
b.DeploymentBundle.Apply(ctx, b.WorkspaceClient(), plan, direct.MigrateMode(false))
107+
// Finalize state: write to disk even if deploy failed, so partial progress is saved.
108+
// Skip for empty plans to avoid creating a state file when nothing was deployed.
109+
if len(plan.Plan) > 0 {
110+
if err := b.DeploymentBundle.StateDB.Finalize(); err != nil {
111+
logdiag.LogError(ctx, err)
112+
}
113+
}
107114
} else {
108115
bundle.ApplyContext(ctx, b, terraform.Apply())
109116
}
@@ -178,8 +185,7 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand
178185

179186
if plan != nil {
180187
// Initialize DeploymentBundle for applying the loaded plan
181-
_, localPath := b.StateFilenameDirect(ctx)
182-
err := b.DeploymentBundle.InitForApply(ctx, b.WorkspaceClient(), localPath, plan)
188+
err := b.DeploymentBundle.InitForApply(ctx, b.WorkspaceClient(), plan)
183189
if err != nil {
184190
logdiag.LogError(ctx, err)
185191
return
@@ -213,8 +219,7 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand
213219

214220
func RunPlan(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) *deployplan.Plan {
215221
if engine.IsDirect() {
216-
_, localPath := b.StateFilenameDirect(ctx)
217-
plan, err := b.DeploymentBundle.CalculatePlan(ctx, b.WorkspaceClient(), &b.Config, localPath)
222+
plan, err := b.DeploymentBundle.CalculatePlan(ctx, b.WorkspaceClient(), &b.Config)
218223
if err != nil {
219224
logdiag.LogError(ctx, err)
220225
return nil

bundle/phases/destroy.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ func approvalForDestroy(ctx context.Context, b *bundle.Bundle, plan *deployplan.
9797
func destroyCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, engine engine.EngineType) {
9898
if engine.IsDirect() {
9999
b.DeploymentBundle.Apply(ctx, b.WorkspaceClient(), plan, direct.MigrateMode(false))
100+
// Skip Finalize for empty plans to avoid creating a state file when nothing was destroyed.
101+
if len(plan.Plan) > 0 {
102+
if err := b.DeploymentBundle.StateDB.Finalize(); err != nil {
103+
logdiag.LogError(ctx, err)
104+
}
105+
}
100106
} else {
101107
// Core destructive mutators for destroy. These require informed user consent.
102108
bundle.ApplyContext(ctx, b, terraform.Apply())
@@ -157,8 +163,7 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) {
157163

158164
var plan *deployplan.Plan
159165
if engine.IsDirect() {
160-
_, localPath := b.StateFilenameDirect(ctx)
161-
plan, err = b.DeploymentBundle.CalculatePlan(ctx, b.WorkspaceClient(), nil, localPath)
166+
plan, err = b.DeploymentBundle.CalculatePlan(ctx, b.WorkspaceClient(), nil)
162167
if err != nil {
163168
logdiag.LogError(ctx, err)
164169
return

0 commit comments

Comments
 (0)