Feat(experimental): DBT project conversion#4495
Conversation
| # extract {{ var() }} references used in all jinja macro dependencies to check for any variables specific | ||
| # to a migrated DBT package and resolve them accordingly | ||
| # vars are added into __sqlmesh_vars__ in the Python env so that the native SQLMesh var() function can resolve them | ||
| if migrated_dbt_project_name: |
There was a problem hiding this comment.
Can this be encapsulated into its own function?
There was a problem hiding this comment.
I've moved the logic to _extract_migrated_dbt_variable_references
|
|
||
| return v.transform(d.replace_merge_table_aliases) | ||
|
|
||
| @field_validator("batch_concurrency", mode="before") |
There was a problem hiding this comment.
Why is this needed? There's already a validator for this field
There was a problem hiding this comment.
Oh, are you able to point me to the existing validator? I'm having trouble locating it
I added this because if you called model.render_definition() and it wrote something like:
MODEL (
name db.table,
kind INCREMENTAL_BY_UNIQUE_KEY (
unique_key a,
batch_concurrency 1
)
);
And then you tried to read it back, you get a Pydantic error:
E pydantic_core._pydantic_core.ValidationError: 1 validation error for IncrementalByUniqueKeyKind
E batch_concurrency
E Input should be 1 [type=literal_error, input_value=Literal(this=1, is_string=False), input_type=Literal]
I added a test in test_model.py to show this behaviour. It fails because it wants an int but it gets given an exp.Literal instead
There was a problem hiding this comment.
The validator is defined through the type annotation:
https://github.com/TobikoData/sqlmesh/blob/main/sqlmesh/utils/pydantic.py#L342C5-L342C23 which in turn calls https://github.com/TobikoData/sqlmesh/blob/main/sqlmesh/utils/pydantic.py#L204
There was a problem hiding this comment.
Ah, that's true for other model kinds but not IncrementalByUniqueKeyKind where it was defined as:
batch_concurrency: t.Literal[1] = 1
which doesnt invoke the validator.
However it looks like changing it to the following does the trick:
batch_concurrency: t.Annotated[t.Literal[1], BeforeValidator(positive_int_validator)] = 1
| ) | ||
|
|
||
|
|
||
| class DbtConversionConsole(TerminalConsole): |
There was a problem hiding this comment.
Does this need to inherit TerminalConsole?
There was a problem hiding this comment.
I was using it for things like _print and _confirm. How would you prefer this was structured?
There was a problem hiding this comment.
Those are just one-liners, and I’m not sure they’re worth coupling over
There was a problem hiding this comment.
No problem, i've made this class standalone now
| yield prev, curr | ||
|
|
||
|
|
||
| class JinjaGenerator: |
There was a problem hiding this comment.
Just curious: any reason to have this class? It doesn't look like the methods benefit from the shared self instance in any way. Should these just be top-level functions?
There was a problem hiding this comment.
I did think about that but having the class made it easy to do the following:
generator_fn_name = f"_generate_{node_type.__name__.lower()}"
if generator_fn := getattr(self, generator_fn_name, None):
generator_fn(...)
i.e it gives a handle self to call getattr on and limits the functions to just what are available on this class
There was a problem hiding this comment.
Just curious: can't you replace self with __module__? No objections otherwise
There was a problem hiding this comment.
Probably but it does mean that it's easier to accidentally call something unrelated that is defined in the module but not part of the generator.
I guess i'm just thinking in terms of self-containment. I could probably add a generate() top level module function and make the JinjaGenerator class private to make this API more ergonomic, but I do see it evolving a bit as this feature evolves
|
|
||
| @property | ||
| def migrated_dbt_project_name(self) -> t.Optional[str]: | ||
| return self.config.variables.get(c.MIGRATED_DBT_PROJECT_NAME) |
There was a problem hiding this comment.
Since we know from the config that this a migrated project, can we put all of the logic below into a separate loader that inherits from SqlMeshLoader?
There was a problem hiding this comment.
I've moved the logic to a MigratedDbtProjectLoader.
I couldn't see a good way to hook into the _load_scripts lifecycle of the parent SqlMeshLoader to change the builtins module and infer / set the dbt package name for macros under __dbt_packages__/, so I ended up duplicating a bunch of the existing code.
Not super happy with this but modifying the parent SqlMeshLoader to take callbacks to set these things would be just as invasive as the original changes
| if self.is_migrated_dbt_project: | ||
| from sqlmesh.dbt.target import TARGET_TYPE_TO_CONFIG_CLASS | ||
|
|
||
| connection_config = self.context._connection_config |
There was a problem hiding this comment.
Why not use the equivalent public method?
| ) | ||
| except NotImplementedError: | ||
| raise ConfigError( | ||
| f"No DBT 'Target Type' mapping for connection type: {connection_config.type_}" |
There was a problem hiding this comment.
| f"No DBT 'Target Type' mapping for connection type: {connection_config.type_}" | |
| f"Unsupported dbt target type: {connection_config.type_}" |
| if migrated_dbt_project_name: | ||
| # note: JinjaMacroRegistry is trimmed here so "all_macros" should be just be all the macros used by this model | ||
| for _, _, jinja_macro in jinja_macros.all_macros: | ||
| _, extracted_variable_names = extract_macro_references_and_variables( |
There was a problem hiding this comment.
Previously we were doing this unconditionally, but now it appears that we only extract if this is a migrated dbt project? Why is that?
There was a problem hiding this comment.
It's an oversight although surprisingly no tests failed. I'll revert it
There was a problem hiding this comment.
I've added it to the else condition
| for prefix in ("start", "end", "execution"): | ||
| yield f"{prefix}_{suffix}" | ||
|
|
||
| for item in ("runtime_stage", "gateway", "this_model", "this_env", "model_kind_name"): |
There was a problem hiding this comment.
There's no way we'll remember to update this once we add more macro variables
There was a problem hiding this comment.
Yeah I added this when I was in "hack whatever it takes to get the thing to work" mode.
Is there somewhere that can be referenced to get a definitive list?
There was a problem hiding this comment.
Good question. I don't think we have any centralized place atm.
| if self.is_migrated_dbt_project: | ||
| if path.is_relative_to(migrated_dbt_package_base_path): | ||
| package = str( | ||
| path.relative_to(migrated_dbt_package_base_path).parents[0] |
There was a problem hiding this comment.
Don't we have infer_dbt_package_from_path for this?
There was a problem hiding this comment.
We sure do, and it was written after this. I'll update it
|
|
||
| ctx, dbt_project = _load_project(src) | ||
| dbt_load_context = dbt_project.context | ||
| dbt_project.load |
There was a problem hiding this comment.
Absolutely nothing, i'll remove it
| ctx.create_external_models() | ||
| else: | ||
| console.log_warning( | ||
| "More than 1 config detected; not sure how to handle this. External models file not created" |
There was a problem hiding this comment.
Not sure this is a helpful message. What can user do to mitigate this?
There was a problem hiding this comment.
It's more to trigger a bug report. I didn't have an example of how someone could get into this situation but the data structure makes it possible.
If someone did hit this then it would be helpful to understand how so we can add a test
There was a problem hiding this comment.
I've removed the external model creation entirely
|
|
||
| if output_external_models: | ||
| # External models | ||
| if len(ctx.configs) == 1: |
There was a problem hiding this comment.
How can it be more than 1?
There was a problem hiding this comment.
ctx.configs is defined as t.Dict[Path, Config] so based on that there can be any number.
However having said that I don't think this check matters, i'll remove it
| test_rendered = _render(model.query) | ||
| if is_self_referencing(model, test_rendered): | ||
| report.self_referencing_models.append((model_output_path, model)) | ||
| # if it's self referencing, we need to output the columns or we get an error like: Error: Self-referencing models require inferrable column types. |
There was a problem hiding this comment.
Can we still convert them as INCREMENTAL_UNMANAGED and keep the is_incremental() macro?
There was a problem hiding this comment.
Some are legitimately self referencing, is_self_referencing also returns True if model.depends_on_self.
For the others that only become self referencing after removing is_incremental(), I guess we can test for that and not apply the transform which will result in them getting output with the is_incremental() checks intact
There was a problem hiding this comment.
I've updated this to test-load the converted model.
If it throws the Self-referencing models require inferrable column types error then I re-run the conversion but excluding the transforms that remove is_incremental().
This does work although on the 1000+ model test project it does have the downside of context.upsert_model() busts the fingerprint cache so it spends a bunch of extra time re-rendering queries of the dependency tree
|
|
||
| orig_depends_on = META_FIELD_CONVERTER["depends_on_"] | ||
|
|
||
| # The 'depends_on' key always gets rendered but the value is missing if its an empty list |
There was a problem hiding this comment.
I'm confused why we need this. And even if we do why can't we update the original converter
There was a problem hiding this comment.
We need this because if we don't have it, the following can occur when calling model.render_definition():
MODEL (
...,
depends_on ,
...
)
Which is a syntax error when you try to read it back.
Initially I was trying to wrap things rather than change them. I'll update the original converter instead
There was a problem hiding this comment.
I've updated the original converter
f59c316 to
4db71b5
Compare
This PR contains an initial implementation of a command that can take a DBT project, read it into memory and write the result out as a native SQLMesh project.
To invoke it, use:
The way this works is that the project is first loaded into memory using our existing
DbtLoader.The resulting models and macros are extracted from the context and their Jinja is parsed into an AST (using the Jinja library). A bunch of AST transforms are run to replace DBT-isms with SQLMesh native concepts as much as possible:
{{ ref() }}and{{ source() }}calls are replaced with the actual model names they reference (where possible){% is_incremental() %}blocks are removedJinja has no way of turning a Jinja AST back into a Jinja template string (since its goal is to generate Python code, not more Jinja) so I wrote a
JinjaGeneratorclass to go from AST back tostr.If, after applying all the Jinja AST transforms, there is still some Jinja left in the result then it is written to the SQLMesh model file as a Jinja model surrounded with
JINJA_QUERY_BEGIN; JINJA_END;blocks.If there is no Jinja left after applying the transforms, it is written directly as a native SQL model with no Jinja wrapping.
Macros that call into DBT packages are also handled. The dependency tree is migrated and put in the target folder under
macros/__dbt_packages__so that the macro hierarchy is still available when the macros are called on the SQLMesh side.A migrated project isnt truly native until all the dbt-isms have been removed. If the native loader detects it is loading a migrated project, it injects DBT shims into the Jinja context to make the migrated macros still work. The bulk of the DBT shim code is re-used from our existing DBT loader.
Known limitations:
from_sqlmesh()in the relevant dbtTargetConfigclass{%-or{%block was used. This can lead to less than ideal formatting once the AST transforms are run and the resulting AST is turned back into a Jinja string.{{ source() }}calls on the DBT side that used dynamic inputs and were aliased in the DBT config are not correctly migrated