From cd7efd60b0a825ad18acef78520b2cc716f2d036 Mon Sep 17 00:00:00 2001 From: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com> Date: Wed, 2 Jul 2025 18:53:28 +0300 Subject: [PATCH] Fix: Create state sync without version check for destroy --- sqlmesh/core/context.py | 4 ++ tests/core/test_integration.py | 87 ++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 1be6ca1dac..9af1922175 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -2674,6 +2674,10 @@ def _context_diff( ) def _destroy(self) -> None: + # Create state_sync directly to avoid potential errors from get_versions() in the property + if not self._state_sync: + self._state_sync = self._new_state_sync() + # Invalidate all environments, including prod for environment in self.state_reader.get_environments(): self.state_sync.invalidate_environment(name=environment.name, protect_prod=False) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index f68cb7ac47..5418de25a0 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -67,6 +67,7 @@ SnapshotInfoLike, SnapshotTableInfo, ) +from sqlmesh.core.state_sync.base import Versions, SCHEMA_VERSION from sqlmesh.utils import CorrelationId from sqlmesh.utils.date import TimeLike, now, to_date, to_datetime, to_timestamp from sqlmesh.utils.errors import NoChangesPlanError, SQLMeshError, PlanError, ConfigError @@ -6291,6 +6292,92 @@ def test_destroy(copy_to_temp_path): assert not cache_path.exists() +@use_terminal_console +def test_destroy_with_version_mismatch(copy_to_temp_path): + paths = copy_to_temp_path("tests/fixtures/multi_virtual_layer") + path = Path(paths[0]) + first_db_path = str(path / "db_1.db") + second_db_path = str(path / "db_2.db") + + config = Config( + gateways={ + "first": GatewayConfig( + connection=DuckDBConnectionConfig(database=first_db_path), + variables={"overriden_var": "gateway_1"}, + ), + "second": GatewayConfig( + connection=DuckDBConnectionConfig(database=second_db_path), + variables={"overriden_var": "gateway_2"}, + ), + }, + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + model_naming=NameInferenceConfig(infer_names=True), + default_gateway="first", + gateway_managed_virtual_layer=True, + variables={"overriden_var": "global", "global_one": 88}, + ) + + context = Context(paths=paths, config=config) + plan = context.plan_builder().build() + assert len(plan.new_snapshots) == 4 + context.apply(plan) + + # Create dev environment with a change + model = context.get_model("db_1.first_schema.model_one") + context.upsert_model(model.copy(update={"query": model.query.select("'c' AS extra")})) + plan = context.plan_builder("dev").build() + context.apply(plan) + + # Mock get_versions to raise an error by bumping the version + def mock_get_versions(validate=True): + if validate: + raise SQLMeshError( + f"SQLMesh (local) is using version '{SCHEMA_VERSION}' which is behind '{SCHEMA_VERSION + 1}' (remote). " + f"Please upgrade SQLMesh." + ) + return Versions( + schema_version=SCHEMA_VERSION + 1, + sqlglot_version="1.0.0", + sqlmesh_version="1.0.0", + ) + + # Clearstate sync and patch to return a state sync with mocked get_versions + context._state_sync = None + original_new_state_sync = context._new_state_sync + + def patched_new_state_sync(): + state_sync = original_new_state_sync() + state_sync.get_versions = mock_get_versions + return state_sync + + with patch.object(context, "_new_state_sync", side_effect=patched_new_state_sync): + # Ensure accessing state_sync property would fail due to get_versions validation + with pytest.raises(SQLMeshError): + _ = context.state_sync + + context._state_sync = None + + # This should not raise an error even though there is a version mismatch + context._destroy() + + # Verify the state tables have been removed though + state_tables = { + "_environments", + "_snapshots", + "_intervals", + "_auto_restatements", + "_environment_statements", + "_intervals", + "_plan_dags", + "_versions", + } + for table_name in state_tables: + with pytest.raises( + Exception, match=f"Catalog Error: Table with name {table_name} does not exist!" + ): + context.fetchdf(f"SELECT * FROM db_1.sqlmesh.{table_name}") + + @use_terminal_console def test_audits_running_on_metadata_changes(tmp_path: Path): def setup_senario(model_before: str, model_after: str):