Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2674,6 +2674,10 @@ def _context_diff(
)

def _destroy(self) -> None:
# Create state_sync directly to avoid potential errors from get_versions() in the property
if not self._state_sync:
self._state_sync = self._new_state_sync()

# Invalidate all environments, including prod
for environment in self.state_reader.get_environments():
self.state_sync.invalidate_environment(name=environment.name, protect_prod=False)
Expand Down
87 changes: 87 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
SnapshotInfoLike,
SnapshotTableInfo,
)
from sqlmesh.core.state_sync.base import Versions, SCHEMA_VERSION
from sqlmesh.utils import CorrelationId
from sqlmesh.utils.date import TimeLike, now, to_date, to_datetime, to_timestamp
from sqlmesh.utils.errors import NoChangesPlanError, SQLMeshError, PlanError, ConfigError
Expand Down Expand Up @@ -6291,6 +6292,92 @@ def test_destroy(copy_to_temp_path):
assert not cache_path.exists()


@use_terminal_console
def test_destroy_with_version_mismatch(copy_to_temp_path):
paths = copy_to_temp_path("tests/fixtures/multi_virtual_layer")
path = Path(paths[0])
first_db_path = str(path / "db_1.db")
second_db_path = str(path / "db_2.db")

config = Config(
gateways={
"first": GatewayConfig(
connection=DuckDBConnectionConfig(database=first_db_path),
variables={"overriden_var": "gateway_1"},
),
"second": GatewayConfig(
connection=DuckDBConnectionConfig(database=second_db_path),
variables={"overriden_var": "gateway_2"},
),
},
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
model_naming=NameInferenceConfig(infer_names=True),
default_gateway="first",
gateway_managed_virtual_layer=True,
variables={"overriden_var": "global", "global_one": 88},
)

context = Context(paths=paths, config=config)
plan = context.plan_builder().build()
assert len(plan.new_snapshots) == 4
context.apply(plan)

# Create dev environment with a change
model = context.get_model("db_1.first_schema.model_one")
context.upsert_model(model.copy(update={"query": model.query.select("'c' AS extra")}))
plan = context.plan_builder("dev").build()
context.apply(plan)

# Mock get_versions to raise an error by bumping the version
def mock_get_versions(validate=True):
if validate:
raise SQLMeshError(
f"SQLMesh (local) is using version '{SCHEMA_VERSION}' which is behind '{SCHEMA_VERSION + 1}' (remote). "
f"Please upgrade SQLMesh."
)
return Versions(
schema_version=SCHEMA_VERSION + 1,
sqlglot_version="1.0.0",
sqlmesh_version="1.0.0",
)

# Clearstate sync and patch to return a state sync with mocked get_versions
context._state_sync = None
original_new_state_sync = context._new_state_sync

def patched_new_state_sync():
state_sync = original_new_state_sync()
state_sync.get_versions = mock_get_versions
return state_sync

with patch.object(context, "_new_state_sync", side_effect=patched_new_state_sync):
# Ensure accessing state_sync property would fail due to get_versions validation
with pytest.raises(SQLMeshError):
_ = context.state_sync

context._state_sync = None

# This should not raise an error even though there is a version mismatch
context._destroy()

# Verify the state tables have been removed though
state_tables = {
"_environments",
"_snapshots",
"_intervals",
"_auto_restatements",
"_environment_statements",
"_intervals",
"_plan_dags",
"_versions",
}
for table_name in state_tables:
with pytest.raises(
Exception, match=f"Catalog Error: Table with name {table_name} does not exist!"
):
context.fetchdf(f"SELECT * FROM db_1.sqlmesh.{table_name}")


@use_terminal_console
def test_audits_running_on_metadata_changes(tmp_path: Path):
def setup_senario(model_before: str, model_after: str):
Expand Down