diff --git a/src/bigquery_agent_analytics/binding_validation.py b/src/bigquery_agent_analytics/binding_validation.py new file mode 100644 index 00000000..98635051 --- /dev/null +++ b/src/bigquery_agent_analytics/binding_validation.py @@ -0,0 +1,868 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Pre-flight validator: ontology binding vs. existing BigQuery tables. + +This validator checks whether the BigQuery tables a binding YAML points +at physically exist with the columns and types the binding requires, +*before* the SDK starts extraction or materialization. It catches the +most common authoring error (binding YAML drifted out of sync with +physical tables) before extraction wastes ``AI.GENERATE`` tokens. + +Different from the planned extracted-graph validator in #76, which +will validate ``ExtractedGraph`` output against the resolved spec +after extraction. This validator runs before extraction. + +Usage:: + + from bigquery_agent_analytics.binding_validation import ( + validate_binding_against_bigquery, + ) + + report = validate_binding_against_bigquery( + ontology=loaded_ontology, + binding=loaded_binding, + bq_client=bigquery.Client(project="my-project", location="US"), + strict=False, + ) + + if not report.ok: + for f in report.failures: + print(f) + for w in report.warnings: + print(f"WARN: {w}") + +The user-facing CLI surface (``bq-agent-sdk binding-validate``, +``ontology-build --validate-binding[-strict]``) and the full +failure-code documentation land in PR 2b of issue #105. +""" + +from __future__ import annotations + +import dataclasses +import enum +import logging +from typing import Any, Optional + +logger = logging.getLogger("bigquery_agent_analytics." + __name__) + + +# ------------------------------------------------------------------ # +# Failure codes # +# ------------------------------------------------------------------ # + + +class FailureCode(str, enum.Enum): + """Typed enum of binding-validation failure codes. + + Seven codes always run (default mode). One additional code + (``KEY_COLUMN_NULLABLE``) emits a warning by default and escalates + to a failure under ``strict=True`` — the SDK's own + ``CREATE TABLE IF NOT EXISTS`` DDL emits NULLABLE key columns + (``ontology_materializer.py:206``), so requiring REQUIRED mode in + default-mode would reject SDK-created tables. + """ + + # Default-mode (always failures). + MISSING_TABLE = "missing_table" + MISSING_COLUMN = "missing_column" + TYPE_MISMATCH = "type_mismatch" + ENDPOINT_TYPE_MISMATCH = "endpoint_type_mismatch" + UNEXPECTED_REPEATED_MODE = "unexpected_repeated_mode" + MISSING_DATASET = "missing_dataset" + INSUFFICIENT_PERMISSIONS = "insufficient_permissions" + + # Strict-mode-only (warning by default, failure under strict=True). + KEY_COLUMN_NULLABLE = "key_column_nullable" + + +# ------------------------------------------------------------------ # +# Report types # +# ------------------------------------------------------------------ # + + +@dataclasses.dataclass(frozen=True) +class BindingValidationFailure: + """One failure found during validation. + + ``binding_path`` is a ``binding.entities[N].properties[M].column`` + style path so tooling can point users at the exact YAML line. The + ``binding_element`` field carries the ontology element name for + human-readable error reporting. + """ + + code: FailureCode + binding_element: str + binding_path: str + bq_ref: str + expected: Any = None + observed: Any = None + detail: str = "" + + +@dataclasses.dataclass(frozen=True) +class BindingValidationWarning: + """Same shape as :class:`BindingValidationFailure`. + + Warnings exist so callers can format failures and warnings + uniformly. Warnings do not flip ``report.ok`` to ``False``; they + are advisory in default mode and escalate into ``failures`` under + ``strict=True``. + """ + + code: FailureCode + binding_element: str + binding_path: str + bq_ref: str + expected: Any = None + observed: Any = None + detail: str = "" + + +@dataclasses.dataclass(frozen=True) +class BindingValidationReport: + """Result of :func:`validate_binding_against_bigquery`. + + ``failures`` are hard failures (always present in default and + strict modes). ``warnings`` are strict-only checks that emitted in + default mode (empty under ``strict=True`` because they got + escalated to ``failures``). ``ok`` returns ``True`` iff + ``failures`` is empty — warnings do *not* affect ``ok``. + """ + + failures: tuple[BindingValidationFailure, ...] = () + warnings: tuple[BindingValidationWarning, ...] = () + + @property + def ok(self) -> bool: + return not self.failures + + +# ------------------------------------------------------------------ # +# BQ type compatibility # +# ------------------------------------------------------------------ # + +# Maps the SDK's DDL type (per ``ontology_materializer._DDL_TYPE_MAP``) +# to the set of BigQuery ``SchemaField.field_type`` values that the SDK +# accepts as compatible. BigQuery returns legacy names like ``INTEGER`` +# and ``FLOAT`` from older table schemas, so each modern name lists its +# legacy alias. +_COMPATIBLE_BQ_TYPES: dict[str, frozenset[str]] = { + "STRING": frozenset({"STRING"}), + "INT64": frozenset({"INT64", "INTEGER"}), + "FLOAT64": frozenset({"FLOAT64", "FLOAT"}), + "BOOL": frozenset({"BOOL", "BOOLEAN"}), + "TIMESTAMP": frozenset({"TIMESTAMP"}), + "DATE": frozenset({"DATE"}), + "BYTES": frozenset({"BYTES"}), +} + + +def _expected_ddl_type(sdk_type: str) -> Optional[str]: + """Return the BQ DDL type the materializer would emit for *sdk_type*. + + Mirrors ``ontology_materializer._DDL_TYPE_MAP`` so the validator + uses the same expectations the SDK uses when it generates DDL + itself. Returns ``None`` for unknown SDK types — the validator + skips type-compatibility checks for those rather than guessing. + """ + # Local import to avoid a circular dep at module load time. + from .ontology_materializer import _DDL_TYPE_MAP + + return _DDL_TYPE_MAP.get(sdk_type.strip().lower()) + + +def _bq_type_matches(sdk_type: str, bq_field_type: str) -> bool: + """Return True if *bq_field_type* is compatible with *sdk_type*.""" + expected = _expected_ddl_type(sdk_type) + if expected is None: + return True # unknown SDK type: skip the check + return bq_field_type.upper() in _COMPATIBLE_BQ_TYPES.get( + expected, frozenset() + ) + + +# ------------------------------------------------------------------ # +# Public API # +# ------------------------------------------------------------------ # + + +def validate_binding_against_bigquery( + *, + ontology, + binding, + bq_client, + strict: bool = False, +) -> BindingValidationReport: + """Validate a binding against live BigQuery tables. + + Resolves the ontology + binding to a ``ResolvedGraph``, then + checks that every entity / relationship table the binding + references exists and that every bound column is present with a + compatible type. + + Args: + ontology: Upstream ``Ontology`` model. + binding: Upstream ``Binding`` model. + bq_client: A ``google.cloud.bigquery.Client``-like object with + ``get_table(table_ref)`` returning an object exposing + ``.schema`` (an iterable of ``SchemaField``-like records + with ``.name``, ``.field_type``, ``.mode`` attributes). + strict: When ``False`` (default), strict-only checks (today: + ``KEY_COLUMN_NULLABLE``) emit ``BindingValidationWarning`` + entries. When ``True``, they emit + ``BindingValidationFailure`` entries with the same code. + Default is permissive so the validator does not reject + tables produced by the SDK's own ``CREATE TABLE IF NOT + EXISTS`` DDL. + + Returns: + A :class:`BindingValidationReport`. + """ + from .resolved_spec import resolve + + spec = resolve(ontology, binding, lineage_config=None) + + # Index binding entries by name so we can build precise paths. + # Critical: derive property/column indices from the binding YAML's + # own ordering (binding.entities[i].properties), NOT from the + # ResolvedEntity's properties tuple, because ResolvedEntity orders + # properties by ontology / effective-property order. With + # inheritance or a different binding-side ordering, the two + # orderings can diverge and a path like + # ``binding.entities[0].properties[1].column`` would point to the + # wrong YAML entry. + entity_index = {b.name: i for i, b in enumerate(binding.entities)} + relationship_index = {b.name: i for i, b in enumerate(binding.relationships)} + + # Per-binding-element {logical_property_name: yaml_index} maps so + # paths reflect what the user actually wrote in YAML. + entity_prop_yaml_index: dict[str, dict[str, int]] = { + b.name: {p.name: j for j, p in enumerate(b.properties)} + for b in binding.entities + } + rel_prop_yaml_index: dict[str, dict[str, int]] = { + b.name: {p.name: j for j, p in enumerate(b.properties)} + for b in binding.relationships + } + + failures: list[BindingValidationFailure] = [] + warnings: list[BindingValidationWarning] = [] + table_cache: dict[str, Optional[Any]] = {} + + def emit( + code: FailureCode, + *, + binding_element: str, + binding_path: str, + bq_ref: str, + expected: Any = None, + observed: Any = None, + detail: str = "", + strict_only: bool = False, + ) -> None: + """Emit either a failure or a warning, honoring strict mode.""" + if strict_only and not strict: + warnings.append( + BindingValidationWarning( + code=code, + binding_element=binding_element, + binding_path=binding_path, + bq_ref=bq_ref, + expected=expected, + observed=observed, + detail=detail, + ) + ) + else: + failures.append( + BindingValidationFailure( + code=code, + binding_element=binding_element, + binding_path=binding_path, + bq_ref=bq_ref, + expected=expected, + observed=observed, + detail=detail, + ) + ) + + def fetch_table( + table_ref: str, binding_element: str, binding_path: str + ) -> Optional[Any]: + """Fetch a BQ table, classify any error, and cache the result.""" + if table_ref in table_cache: + return table_cache[table_ref] + + try: + table = bq_client.get_table(table_ref) + table_cache[table_ref] = table + return table + except Exception as exc: # noqa: BLE001 - classify by message + msg = str(exc).lower() + code: FailureCode + if "not found" in msg and "dataset" in msg: + code = FailureCode.MISSING_DATASET + elif "not found" in msg or "does not exist" in msg: + code = FailureCode.MISSING_TABLE + elif "permission" in msg or "forbidden" in msg or "denied" in msg: + code = FailureCode.INSUFFICIENT_PERMISSIONS + else: + # Default to MISSING_TABLE for unknown errors so the user gets + # an actionable failure rather than an opaque exception. + code = FailureCode.MISSING_TABLE + + emit( + code, + binding_element=binding_element, + binding_path=binding_path, + bq_ref=table_ref, + detail=str(exc), + ) + table_cache[table_ref] = None + return None + + # ---- Per-entity checks --------------------------------------- # + + for entity in spec.entities: + binding_idx = entity_index.get(entity.name) + if binding_idx is None: + # Entity not in this binding (e.g. abstract upstream — already + # filtered by resolve()). Skip silently. + continue + + binding_root = f"binding.entities[{binding_idx}]" + table = fetch_table( + entity.source, + binding_element=entity.name, + binding_path=f"{binding_root}.source", + ) + if table is None: + continue + + # Index BQ schema by column name. + bq_columns = {f.name: f for f in table.schema} + + # Check every bound property. Path indices come from the binding + # YAML's ordering, not the ResolvedEntity's ordering, so paths + # point at the actual YAML entry the user wrote. + prop_yaml_idx = entity_prop_yaml_index.get(entity.name, {}) + for prop in entity.properties: + yaml_j = prop_yaml_idx.get(prop.logical_name) + if yaml_j is None: + # Resolved property exists but has no matching binding-YAML + # entry — should not happen for a properly resolved spec, but + # if it does, fall back to a name-keyed path so the failure + # is still actionable. + prop_path = f"{binding_root}.properties[{prop.logical_name}].column" + else: + prop_path = f"{binding_root}.properties[{yaml_j}].column" + bq_field = bq_columns.get(prop.column) + + if bq_field is None: + emit( + FailureCode.MISSING_COLUMN, + binding_element=entity.name, + binding_path=prop_path, + bq_ref=f"{entity.source}.{prop.column}", + expected=prop.column, + detail=( + f"binding declares property {prop.logical_name!r} " + f"on column {prop.column!r}, not found on table " + f"{entity.source}" + ), + ) + continue + + # REPEATED-mode columns can't carry scalar properties. + if getattr(bq_field, "mode", "NULLABLE") == "REPEATED": + emit( + FailureCode.UNEXPECTED_REPEATED_MODE, + binding_element=entity.name, + binding_path=prop_path, + bq_ref=f"{entity.source}.{prop.column}", + expected="NULLABLE or REQUIRED", + observed="REPEATED", + detail=( + f"column {prop.column!r} is REPEATED on " + f"{entity.source}; the SDK can't bind scalar " + f"properties to ARRAY columns" + ), + ) + + # Type compatibility. + if not _bq_type_matches(prop.sdk_type, bq_field.field_type): + emit( + FailureCode.TYPE_MISMATCH, + binding_element=entity.name, + binding_path=prop_path, + bq_ref=f"{entity.source}.{prop.column}", + expected=_expected_ddl_type(prop.sdk_type), + observed=bq_field.field_type, + detail=( + f"binding maps property {prop.logical_name!r} (sdk_type=" + f"{prop.sdk_type!r}) to column {prop.column!r}, but BQ " + f"reports type {bq_field.field_type!r}" + ), + ) + + # Per-key-column checks (REPEATED + strict-only nullability). + # Build a {column: yaml_index} map for keys that map to a bound + # property, so REPEATED / NULLABLE failures on those columns + # carry a real binding YAML path. Falls back to a pseudo path + # only for keys that are not bound properties (ontology + # generally requires keys to be properties; this is defensive). + column_to_yaml_idx: dict[str, int] = {} + for prop in entity.properties: + yaml_j = prop_yaml_idx.get(prop.logical_name) + if yaml_j is not None: + column_to_yaml_idx[prop.column] = yaml_j + + for key_col in entity.key_columns: + yaml_j = column_to_yaml_idx.get(key_col) + if yaml_j is not None: + key_path = f"{binding_root}.properties[{yaml_j}].column" + else: + key_path = f"{binding_root}..{key_col}" + bq_field = bq_columns.get(key_col) + if bq_field is None: + # Already reported as MISSING_COLUMN above when the key was + # also a bound property. If the key isn't a bound property + # (rare; ontology requires keys to be properties), still + # surface it. + if key_col not in {p.column for p in entity.properties}: + emit( + FailureCode.MISSING_COLUMN, + binding_element=entity.name, + binding_path=key_path, + bq_ref=f"{entity.source}.{key_col}", + expected=key_col, + detail=( + f"primary-key column {key_col!r} not found on " + f"table {entity.source}" + ), + ) + continue + + if getattr(bq_field, "mode", "NULLABLE") == "REPEATED": + emit( + FailureCode.UNEXPECTED_REPEATED_MODE, + binding_element=entity.name, + binding_path=key_path, + bq_ref=f"{entity.source}.{key_col}", + expected="NULLABLE or REQUIRED", + observed="REPEATED", + detail=( + f"primary-key column {key_col!r} is REPEATED on " + f"{entity.source}; primary keys can't be ARRAY" + ), + ) + continue + + if getattr(bq_field, "mode", "NULLABLE") == "NULLABLE": + emit( + FailureCode.KEY_COLUMN_NULLABLE, + binding_element=entity.name, + binding_path=key_path, + bq_ref=f"{entity.source}.{key_col}", + expected="REQUIRED", + observed="NULLABLE", + detail=( + f"primary-key column {key_col!r} on {entity.source} " + f"is NULLABLE; under --strict this is a hard failure" + ), + strict_only=True, + ) + + # SDK metadata columns. The materializer's _entity_columns() + # (ontology_materializer.py:154) hard-codes session_id STRING + + # extracted_at TIMESTAMP for every entity table, and routing + # writes those fields unconditionally on every materialize() call + # (ontology_materializer.py:258). A user-predefined table missing + # either column would validate clean here without this check, + # then fail at load_table_from_json / INSERT time. + for meta_col, meta_type in ( + ("session_id", "STRING"), + ("extracted_at", "TIMESTAMP"), + ): + meta_path = f"{binding_root}..{meta_col}" + meta_field = bq_columns.get(meta_col) + if meta_field is None: + emit( + FailureCode.MISSING_COLUMN, + binding_element=entity.name, + binding_path=meta_path, + bq_ref=f"{entity.source}.{meta_col}", + expected=meta_col, + detail=( + f"SDK metadata column {meta_col!r} not found on " + f"{entity.source}; the materializer writes this on " + f"every materialize() call (ontology_materializer.py:" + f"159) so the table must carry it" + ), + ) + continue + if getattr(meta_field, "mode", "NULLABLE") == "REPEATED": + emit( + FailureCode.UNEXPECTED_REPEATED_MODE, + binding_element=entity.name, + binding_path=meta_path, + bq_ref=f"{entity.source}.{meta_col}", + expected="NULLABLE or REQUIRED", + observed="REPEATED", + detail=( + f"SDK metadata column {meta_col!r} on {entity.source} " + f"is REPEATED; metadata columns must be scalar" + ), + ) + continue + if meta_field.field_type.upper() not in _COMPATIBLE_BQ_TYPES.get( + meta_type, frozenset() + ): + emit( + FailureCode.TYPE_MISMATCH, + binding_element=entity.name, + binding_path=meta_path, + bq_ref=f"{entity.source}.{meta_col}", + expected=meta_type, + observed=meta_field.field_type, + detail=( + f"SDK metadata column {meta_col!r} on {entity.source} " + f"has BQ type {meta_field.field_type!r}, but the " + f"materializer writes {meta_type}" + ), + ) + + # ---- Per-relationship checks --------------------------------- # + + # Index entity sdk_types and source tables per key_column so the + # endpoint check can do BOTH (a) the spec-level expected-type + # comparison and (b) the physical cross-table comparison against + # the referenced node table's actual BQ field type. Catches the + # case where a node table has drifted out of sync with its own + # ontology (the per-entity loop above flags that as TYPE_MISMATCH + # on the node), but the edge endpoint also disagrees with the + # node's actual storage type — we want the edge to surface + # ENDPOINT_TYPE_MISMATCH even when the node is itself broken. + entity_key_types: dict[str, dict[str, str]] = {} + entity_source_by_name: dict[str, str] = {} + for ent in spec.entities: + cols = {p.column: p.sdk_type for p in ent.properties} + entity_key_types[ent.name] = { + k: cols.get(k, "string") for k in ent.key_columns + } + entity_source_by_name[ent.name] = ent.source + + for rel in spec.relationships: + binding_idx = relationship_index.get(rel.name) + if binding_idx is None: + continue + + binding_root = f"binding.relationships[{binding_idx}]" + table = fetch_table( + rel.source, + binding_element=rel.name, + binding_path=f"{binding_root}.source", + ) + if table is None: + continue + + bq_columns = {f.name: f for f in table.schema} + + # Endpoint columns: from_columns and to_columns. + def _check_endpoint( + kind: str, + rel_columns: tuple[str, ...], + endpoint_entity_name: str, + ) -> None: + """``kind`` is either ``'from_columns'`` or ``'to_columns'``.""" + endpoint_types = entity_key_types.get(endpoint_entity_name, {}) + endpoint_key_cols = list(endpoint_types.keys()) + for j, col in enumerate(rel_columns): + col_path = f"{binding_root}.{kind}[{j}]" + bq_field = bq_columns.get(col) + + if bq_field is None: + emit( + FailureCode.MISSING_COLUMN, + binding_element=rel.name, + binding_path=col_path, + bq_ref=f"{rel.source}.{col}", + expected=col, + detail=( + f"endpoint column {col!r} not found on edge table " + f"{rel.source}" + ), + ) + continue + + if getattr(bq_field, "mode", "NULLABLE") == "REPEATED": + emit( + FailureCode.UNEXPECTED_REPEATED_MODE, + binding_element=rel.name, + binding_path=col_path, + bq_ref=f"{rel.source}.{col}", + expected="NULLABLE or REQUIRED", + observed="REPEATED", + detail=( + f"endpoint column {col!r} on {rel.source} is " + f"REPEATED; endpoint keys can't be ARRAY" + ), + ) + continue + + # Endpoint type checks. Two comparisons: + # + # (1) Spec-level: edge endpoint BQ type must match the + # ontology-derived expected SDK type for the referenced + # node's primary-key column at the same position. + # (2) Physical cross-table: when the referenced node table + # is fetchable, the edge endpoint's BQ type must match + # the actual BQ field type of the referenced key column + # in the node table. Catches cases where the node table + # has drifted away from its ontology declaration but + # the edge has not — those would slip past (1) alone. + # + # Both comparisons emit ENDPOINT_TYPE_MISMATCH; (1) + # describes the spec-level disagreement, (2) describes the + # physical-table disagreement. The two checks are + # complementary, not redundant: (1) fires when only the + # edge is wrong; (2) fires when both are wrong but in + # different ways. + if j < len(endpoint_key_cols): + expected_sdk = endpoint_types[endpoint_key_cols[j]] + # (1) Spec-level type check. + if not _bq_type_matches(expected_sdk, bq_field.field_type): + emit( + FailureCode.ENDPOINT_TYPE_MISMATCH, + binding_element=rel.name, + binding_path=col_path, + bq_ref=f"{rel.source}.{col}", + expected=_expected_ddl_type(expected_sdk), + observed=bq_field.field_type, + detail=( + f"endpoint column {col!r} on {rel.source} has BQ " + f"type {bq_field.field_type!r}, but referenced " + f"entity {endpoint_entity_name!r} key " + f"{endpoint_key_cols[j]!r} expects sdk_type=" + f"{expected_sdk!r}" + ), + ) + + # (2) Physical cross-table check. Fires only when it adds + # information beyond (1) — specifically, only when the + # node table has *drifted* from its ontology declaration + # AND that drift causes the edge to disagree with the + # node's actual storage. In the common edge-only-drift + # case (edge wrong, node correct), (1) already conveys the + # same expected/observed pair, so emitting this would be + # pure double-reporting. + node_source = entity_source_by_name.get(endpoint_entity_name) + if node_source is not None: + node_table = table_cache.get(node_source) + if node_table is not None: + node_columns = {f.name: f for f in node_table.schema} + node_field = node_columns.get(endpoint_key_cols[j]) + if node_field is not None: + edge_t = bq_field.field_type.upper() + node_t = node_field.field_type.upper() + expected_ddl = _expected_ddl_type(expected_sdk) + # Map each BQ type to its canonical (modern) form + # so legacy aliases like INTEGER/INT64 don't trip + # the comparison. + edge_canonical = next( + ( + canon + for canon, aliases in _COMPATIBLE_BQ_TYPES.items() + if edge_t in aliases + ), + edge_t, + ) + node_canonical = next( + ( + canon + for canon, aliases in _COMPATIBLE_BQ_TYPES.items() + if node_t in aliases + ), + node_t, + ) + # Only emit when the node has actually drifted from + # the ontology spec AND the edge disagrees with the + # node. If node_canonical == expected_ddl, the node + # is on-spec and (1) already covers the edge's + # disagreement with the same expected/observed pair. + node_has_drifted = ( + expected_ddl is not None and node_canonical != expected_ddl + ) + if node_has_drifted and edge_canonical != node_canonical: + emit( + FailureCode.ENDPOINT_TYPE_MISMATCH, + binding_element=rel.name, + binding_path=col_path, + bq_ref=f"{rel.source}.{col}", + expected=node_field.field_type, + observed=bq_field.field_type, + detail=( + f"physical cross-table mismatch: edge " + f"column {col!r} on {rel.source} has BQ " + f"type {bq_field.field_type!r}, but the " + f"referenced node table " + f"{node_source}.{endpoint_key_cols[j]} has " + f"BQ type {node_field.field_type!r} " + f"(node has drifted from ontology spec, " + f"which expected {expected_ddl!r}). " + f"Edges with this mismatch will fail to " + f"join the node at query time." + ), + ) + + # Strict-only: endpoint keys should be REQUIRED. + if getattr(bq_field, "mode", "NULLABLE") == "NULLABLE": + emit( + FailureCode.KEY_COLUMN_NULLABLE, + binding_element=rel.name, + binding_path=col_path, + bq_ref=f"{rel.source}.{col}", + expected="REQUIRED", + observed="NULLABLE", + detail=( + f"endpoint column {col!r} on {rel.source} is " + f"NULLABLE; under --strict this is a hard failure" + ), + strict_only=True, + ) + + _check_endpoint("from_columns", rel.from_columns, rel.from_entity) + _check_endpoint("to_columns", rel.to_columns, rel.to_entity) + + # Property column checks. Same path-correctness rule as + # entities: indices come from the binding YAML's ordering. + prop_yaml_idx = rel_prop_yaml_index.get(rel.name, {}) + for prop in rel.properties: + yaml_j = prop_yaml_idx.get(prop.logical_name) + if yaml_j is None: + prop_path = f"{binding_root}.properties[{prop.logical_name}].column" + else: + prop_path = f"{binding_root}.properties[{yaml_j}].column" + bq_field = bq_columns.get(prop.column) + + if bq_field is None: + emit( + FailureCode.MISSING_COLUMN, + binding_element=rel.name, + binding_path=prop_path, + bq_ref=f"{rel.source}.{prop.column}", + expected=prop.column, + detail=( + f"binding declares property {prop.logical_name!r} on " + f"column {prop.column!r}, not found on edge table " + f"{rel.source}" + ), + ) + continue + + if getattr(bq_field, "mode", "NULLABLE") == "REPEATED": + emit( + FailureCode.UNEXPECTED_REPEATED_MODE, + binding_element=rel.name, + binding_path=prop_path, + bq_ref=f"{rel.source}.{prop.column}", + expected="NULLABLE or REQUIRED", + observed="REPEATED", + detail=( + f"property column {prop.column!r} on {rel.source} is " + f"REPEATED; the SDK can't bind scalar properties to " + f"ARRAY columns" + ), + ) + + if not _bq_type_matches(prop.sdk_type, bq_field.field_type): + emit( + FailureCode.TYPE_MISMATCH, + binding_element=rel.name, + binding_path=prop_path, + bq_ref=f"{rel.source}.{prop.column}", + expected=_expected_ddl_type(prop.sdk_type), + observed=bq_field.field_type, + detail=( + f"binding maps relationship property " + f"{prop.logical_name!r} (sdk_type={prop.sdk_type!r}) " + f"to column {prop.column!r}, but BQ reports type " + f"{bq_field.field_type!r}" + ), + ) + + # SDK metadata columns on the edge table. The materializer's + # _relationship_columns() (ontology_materializer.py:164) hard- + # codes session_id STRING + extracted_at TIMESTAMP for every + # relationship table. Same trap as on entities — a user- + # predefined edge table missing these columns would validate + # clean here and fail at INSERT time. + for meta_col, meta_type in ( + ("session_id", "STRING"), + ("extracted_at", "TIMESTAMP"), + ): + meta_path = f"{binding_root}..{meta_col}" + meta_field = bq_columns.get(meta_col) + if meta_field is None: + emit( + FailureCode.MISSING_COLUMN, + binding_element=rel.name, + binding_path=meta_path, + bq_ref=f"{rel.source}.{meta_col}", + expected=meta_col, + detail=( + f"SDK metadata column {meta_col!r} not found on " + f"edge table {rel.source}; the materializer writes " + f"this on every materialize() call " + f"(ontology_materializer.py:164) so the table must " + f"carry it" + ), + ) + continue + if getattr(meta_field, "mode", "NULLABLE") == "REPEATED": + emit( + FailureCode.UNEXPECTED_REPEATED_MODE, + binding_element=rel.name, + binding_path=meta_path, + bq_ref=f"{rel.source}.{meta_col}", + expected="NULLABLE or REQUIRED", + observed="REPEATED", + detail=( + f"SDK metadata column {meta_col!r} on {rel.source} " + f"is REPEATED; metadata columns must be scalar" + ), + ) + continue + if meta_field.field_type.upper() not in _COMPATIBLE_BQ_TYPES.get( + meta_type, frozenset() + ): + emit( + FailureCode.TYPE_MISMATCH, + binding_element=rel.name, + binding_path=meta_path, + bq_ref=f"{rel.source}.{meta_col}", + expected=meta_type, + observed=meta_field.field_type, + detail=( + f"SDK metadata column {meta_col!r} on {rel.source} " + f"has BQ type {meta_field.field_type!r}, but the " + f"materializer writes {meta_type}" + ), + ) + + return BindingValidationReport( + failures=tuple(failures), + warnings=tuple(warnings), + ) diff --git a/tests/test_binding_validation.py b/tests/test_binding_validation.py new file mode 100644 index 00000000..10287bd9 --- /dev/null +++ b/tests/test_binding_validation.py @@ -0,0 +1,1224 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for binding_validation.validate_binding_against_bigquery. + +Tests use a fake BQ client (a small inline class with a ``get_table`` +method) so they do not require live BigQuery. Each of the seven +default-mode failure codes has at least one positive case plus the +"clean against SDK-created tables" regression test for the strict-only +KEY_COLUMN_NULLABLE check. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from dataclasses import field +from typing import Optional + +import pytest + +# ------------------------------------------------------------------ # +# Fakes # +# ------------------------------------------------------------------ # + + +@dataclass +class _FakeField: + """Shape-compatible with google.cloud.bigquery.SchemaField.""" + + name: str + field_type: str + mode: str = "NULLABLE" + + +@dataclass +class _FakeTable: + schema: list[_FakeField] = field(default_factory=list) + + +class _FakeBQClient: + """Minimal get_table() impl backed by a {table_ref: schema} map. + + Missing table_refs raise ``NotFound``; entries with ``schema=None`` + raise other classified errors so we can exercise the + MISSING_DATASET / INSUFFICIENT_PERMISSIONS branches. + """ + + def __init__( + self, + tables: dict[str, list[_FakeField]], + *, + missing_datasets: Optional[set[str]] = None, + forbidden: Optional[set[str]] = None, + ) -> None: + self._tables = tables + self._missing_datasets = missing_datasets or set() + self._forbidden = forbidden or set() + + def get_table(self, table_ref: str): + if table_ref in self._forbidden: + raise PermissionError(f"403 Permission denied for table {table_ref}") + parts = table_ref.split(".") + if len(parts) >= 2: + dataset_ref = ".".join(parts[:2]) + if dataset_ref in self._missing_datasets: + raise LookupError(f"404 Not found: Dataset {dataset_ref} was not found") + if table_ref not in self._tables: + raise LookupError(f"404 Not found: Table {table_ref} does not exist") + return _FakeTable(schema=list(self._tables[table_ref])) + + +# ------------------------------------------------------------------ # +# Fixture builders — minimal Ontology + Binding for one entity-edge # +# pair, exercising every code path with small variations. # +# ------------------------------------------------------------------ # + + +def _ontology_and_binding( + project: str = "p", + dataset: str = "d", + entity_source: str = "decision_points", + rel_source: str = "candidate_edges", +): + from bigquery_ontology import load_binding + from bigquery_ontology import load_ontology + + ontology_yaml = ( + "ontology: TestGraph\n" + "entities:\n" + " - name: Decision\n" + " keys:\n" + " primary: [decision_id]\n" + " properties:\n" + " - name: decision_id\n" + " type: string\n" + " - name: confidence\n" + " type: double\n" + " - name: Outcome\n" + " keys:\n" + " primary: [outcome_id]\n" + " properties:\n" + " - name: outcome_id\n" + " type: string\n" + "relationships:\n" + " - name: HasOutcome\n" + " from: Decision\n" + " to: Outcome\n" + " properties:\n" + " - name: weight\n" + " type: double\n" + ) + binding_yaml = ( + "binding: test_bind\n" + "ontology: TestGraph\n" + "target:\n" + " backend: bigquery\n" + f" project: {project}\n" + f" dataset: {dataset}\n" + "entities:\n" + " - name: Decision\n" + f" source: {entity_source}\n" + " properties:\n" + " - name: decision_id\n" + " column: decision_id\n" + " - name: confidence\n" + " column: confidence\n" + " - name: Outcome\n" + " source: outcomes\n" + " properties:\n" + " - name: outcome_id\n" + " column: outcome_id\n" + "relationships:\n" + " - name: HasOutcome\n" + f" source: {rel_source}\n" + " from_columns: [decision_id]\n" + " to_columns: [outcome_id]\n" + " properties:\n" + " - name: weight\n" + " column: weight\n" + ) + + import pathlib + import tempfile + + tmp = pathlib.Path(tempfile.mkdtemp(prefix="bind_validate_")) + ont_path = tmp / "test.ontology.yaml" + bnd_path = tmp / "test.binding.yaml" + ont_path.write_text(ontology_yaml, encoding="utf-8") + bnd_path.write_text(binding_yaml, encoding="utf-8") + + ontology = load_ontology(str(ont_path)) + binding = load_binding(str(bnd_path), ontology=ontology) + return ontology, binding + + +def _meta_fields() -> list[_FakeField]: + """SDK metadata columns the materializer writes on every table. + + Tests building schemas directly (rather than mutating + ``_good_schemas()``) must include these — otherwise the validator + flags MISSING_COLUMN for session_id / extracted_at and pollutes + the failure count. + """ + return [ + _FakeField("session_id", "STRING"), + _FakeField("extracted_at", "TIMESTAMP"), + ] + + +def _good_schemas() -> dict[str, list[_FakeField]]: + """All tables present with matching column types — clean baseline. + + Models tables produced by ``OntologyMaterializer.create_tables()``: + NULLABLE everywhere (no NOT NULL), correct types per + ``_DDL_TYPE_MAP``, and the SDK metadata columns + (``session_id STRING`` + ``extracted_at TIMESTAMP``) the + materializer writes on every materialize() call. + + Each table gets its own metadata-field instances via + ``_meta_fields()`` so tests that mutate metadata fields on one + table do not silently affect the others. + """ + return { + "p.d.decision_points": [ + _FakeField("decision_id", "STRING"), + _FakeField("confidence", "FLOAT64"), + ] + + _meta_fields(), + "p.d.outcomes": [ + _FakeField("outcome_id", "STRING"), + ] + + _meta_fields(), + "p.d.candidate_edges": [ + _FakeField("decision_id", "STRING"), + _FakeField("outcome_id", "STRING"), + _FakeField("weight", "FLOAT64"), + ] + + _meta_fields(), + } + + +# ------------------------------------------------------------------ # +# Default-mode regression: SDK-created tables validate clean # +# ------------------------------------------------------------------ # + + +class TestSdkCreatedTablesRegression: + + def test_default_mode_ok_against_sdk_created_tables(self): + """Tables matching ``OntologyMaterializer.create_tables()`` output + must validate clean by default — every key column is NULLABLE + (the SDK doesn't emit NOT NULL), so a default-mode hard failure + on KEY_COLUMN_NULLABLE would reject SDK-created tables. Catches + the "validator rejects SDK-created tables" trap. + + Default mode may still emit advisory ``KEY_COLUMN_NULLABLE`` + warnings for those NULLABLE keys (visible to a CI gate that + chooses to surface them), but ``report.ok`` must stay True.""" + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + client = _FakeBQClient(_good_schemas()) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + assert report.ok is True + assert report.failures == () + # Warnings are expected (NULLABLE keys), but they must not flip + # `ok` to False — that's the whole point of the strict-only + # classification. + for w in report.warnings: + assert w.code == FailureCode.KEY_COLUMN_NULLABLE + + def test_expected_types_match_materializer_ddl_type_map(self): + """Stronger regression: expected types come directly from the + materializer's `_DDL_TYPE_MAP`, not from a hand-written fixture. + If a future change updates `_DDL_TYPE_MAP` (e.g. adds NUMERIC + support), this test forces a corresponding update to + `_COMPATIBLE_BQ_TYPES` in binding_validation, otherwise the + default-mode regression would silently start failing.""" + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + from bigquery_agent_analytics.ontology_materializer import _DDL_TYPE_MAP + + ontology, binding = _ontology_and_binding() + # Build schemas using the materializer's own map so the test + # mirrors what the SDK would emit. confidence: double, all keys: + # string. If _DDL_TYPE_MAP changes type names, this construction + # automatically picks them up. + schemas = { + "p.d.decision_points": [ + _FakeField("decision_id", _DDL_TYPE_MAP["string"]), + _FakeField("confidence", _DDL_TYPE_MAP["double"]), + ] + + _meta_fields(), + "p.d.outcomes": [ + _FakeField("outcome_id", _DDL_TYPE_MAP["string"]), + ] + + _meta_fields(), + "p.d.candidate_edges": [ + _FakeField("decision_id", _DDL_TYPE_MAP["string"]), + _FakeField("outcome_id", _DDL_TYPE_MAP["string"]), + _FakeField("weight", _DDL_TYPE_MAP["double"]), + ] + + _meta_fields(), + } + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + assert report.ok is True, ( + f"Validator rejected materializer-DDL-derived schemas: " + f"{[(f.code, f.detail) for f in report.failures]}" + ) + + def test_strict_mode_emits_warnings_as_failures(self): + """The same SDK-created tables, run under strict=True, must + surface NULLABLE key columns as KEY_COLUMN_NULLABLE failures.""" + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + client = _FakeBQClient(_good_schemas()) + + report = validate_binding_against_bigquery( + ontology=ontology, + binding=binding, + bq_client=client, + strict=True, + ) + + assert report.ok is False + nullable_codes = [ + f.code + for f in report.failures + if f.code == FailureCode.KEY_COLUMN_NULLABLE + ] + # Decision.decision_id, Outcome.outcome_id (entity primary keys) + # plus HasOutcome.from_columns[0]=decision_id and + # HasOutcome.to_columns[0]=outcome_id (relationship endpoints). + assert len(nullable_codes) == 4 + + +# ------------------------------------------------------------------ # +# Per-failure-code unit tests # +# ------------------------------------------------------------------ # + + +class TestMissingTable: + + def test_entity_table_missing(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + del schemas["p.d.decision_points"] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + decision_failures = [ + f for f in report.failures if f.binding_element == "Decision" + ] + assert any(f.code == FailureCode.MISSING_TABLE for f in decision_failures) + miss = next( + f for f in decision_failures if f.code == FailureCode.MISSING_TABLE + ) + assert miss.bq_ref == "p.d.decision_points" + assert miss.binding_path == "binding.entities[0].source" + + +class TestMissingColumn: + + def test_property_column_missing(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # Drop the 'confidence' column from decision_points; keep the + # other property and metadata columns intact so the test isolates + # the missing-property failure. + schemas["p.d.decision_points"] = [ + _FakeField("decision_id", "STRING"), + ] + _meta_fields() + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + miss = [ + f + for f in report.failures + if f.code == FailureCode.MISSING_COLUMN and "confidence" in f.bq_ref + ] + assert len(miss) == 1 + assert miss[0].binding_element == "Decision" + assert miss[0].bq_ref == "p.d.decision_points.confidence" + assert "properties[1].column" in miss[0].binding_path + + +class TestTypeMismatch: + + def test_entity_property_type_mismatch(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # confidence is supposed to be FLOAT64 (sdk_type=double). + schemas["p.d.decision_points"] = [ + _FakeField("decision_id", "STRING"), + _FakeField("confidence", "INT64"), # wrong type + ] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + mismatches = [ + f for f in report.failures if f.code == FailureCode.TYPE_MISMATCH + ] + assert len(mismatches) == 1 + assert mismatches[0].expected == "FLOAT64" + assert mismatches[0].observed == "INT64" + + def test_legacy_bq_type_aliases_accepted(self): + """BigQuery returns 'INTEGER' / 'FLOAT' / 'BOOLEAN' for older + schemas; the validator must treat them as compatible with INT64 + / FLOAT64 / BOOL respectively.""" + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # confidence is double → FLOAT64 expected; legacy BQ returns FLOAT. + schemas["p.d.decision_points"][1] = _FakeField("confidence", "FLOAT") + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + assert report.ok is True + + +class TestEndpointTypeMismatch: + + def test_edge_endpoint_type_does_not_match_referenced_entity_key( + self, + ): + """Edge endpoint type disagrees with the referenced node's + ontology key type, but the node table itself is on-spec. One + ENDPOINT_TYPE_MISMATCH entry is expected — the spec-level (1) + check (edge BQ type vs ontology-derived expected SDK type). + The physical cross-table (2) check is suppressed because + emitting it would double-report the same expected/observed pair + with different detail wording. The node-drifted case where (2) + adds genuinely new information is covered by + TestEndpointPhysicalCrossTableCheck.""" + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # Decision.decision_id is STRING; if the edge's decision_id + # column is INT64, that's an endpoint-type mismatch. + schemas["p.d.candidate_edges"] = [ + _FakeField("decision_id", "INT64"), + _FakeField("outcome_id", "STRING"), + _FakeField("weight", "FLOAT64"), + ] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + mismatches = [ + f + for f in report.failures + if f.code == FailureCode.ENDPOINT_TYPE_MISMATCH + ] + # Edge-only drift: only the spec-level (1) check fires. The + # physical (2) check is suppressed because the node table is + # on-spec — emitting it would be pure double-reporting with the + # same expected/observed pair as (1). + assert len(mismatches) == 1 + assert mismatches[0].binding_element == "HasOutcome" + assert "from_columns[0]" in mismatches[0].binding_path + assert mismatches[0].observed == "INT64" + assert mismatches[0].expected == "STRING" + assert "physical cross-table" not in mismatches[0].detail + + +class TestMetadataColumns: + + def test_missing_session_id_on_entity_flagged(self): + """The materializer writes session_id on every materialize() + call (ontology_materializer.py:159) so a user-predefined table + without it would fail at INSERT time. Validator must catch it + pre-flight.""" + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # Drop session_id from decision_points. + schemas["p.d.decision_points"] = [ + f for f in schemas["p.d.decision_points"] if f.name != "session_id" + ] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + miss = [ + f + for f in report.failures + if f.code == FailureCode.MISSING_COLUMN and "session_id" in f.bq_ref + ] + assert len(miss) == 1 + assert miss[0].binding_element == "Decision" + assert ".session_id" in miss[0].binding_path + + def test_missing_extracted_at_on_relationship_flagged(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + schemas["p.d.candidate_edges"] = [ + f for f in schemas["p.d.candidate_edges"] if f.name != "extracted_at" + ] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + miss = [ + f + for f in report.failures + if f.code == FailureCode.MISSING_COLUMN and "extracted_at" in f.bq_ref + ] + assert len(miss) == 1 + assert miss[0].binding_element == "HasOutcome" + assert ".extracted_at" in miss[0].binding_path + + def test_metadata_column_with_wrong_type_flagged(self): + """If session_id is INT64 instead of STRING, the materializer's + INSERT will fail. The validator must catch the type mismatch.""" + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + for f in schemas["p.d.decision_points"]: + if f.name == "session_id": + f.field_type = "INT64" # wrong; should be STRING + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + mismatches = [ + f + for f in report.failures + if f.code == FailureCode.TYPE_MISMATCH and "session_id" in f.bq_ref + ] + assert len(mismatches) == 1 + assert mismatches[0].expected == "STRING" + assert mismatches[0].observed == "INT64" + + +class TestTypeMapExhaustiveCoverage: + + def test_every_ddl_type_is_in_compatible_bq_types(self): + """Force a corresponding update to _COMPATIBLE_BQ_TYPES whenever + _DDL_TYPE_MAP grows. If the materializer adds a new SDK→BQ DDL + type but the validator's compatibility table doesn't recognize + it, every column with that type will silently mismatch. + + Each canonical DDL type emitted by the materializer must: + (a) be present as a key in _COMPATIBLE_BQ_TYPES, and + (b) accept itself as a compatible BQ field_type. + """ + from bigquery_agent_analytics.binding_validation import _COMPATIBLE_BQ_TYPES + from bigquery_agent_analytics.ontology_materializer import _DDL_TYPE_MAP + + for sdk_type, ddl_type in _DDL_TYPE_MAP.items(): + assert ddl_type in _COMPATIBLE_BQ_TYPES, ( + f"_DDL_TYPE_MAP maps {sdk_type!r} → {ddl_type!r} but " + f"_COMPATIBLE_BQ_TYPES does not list {ddl_type!r}. " + f"Update binding_validation._COMPATIBLE_BQ_TYPES alongside " + f"any change to ontology_materializer._DDL_TYPE_MAP." + ) + assert ddl_type in _COMPATIBLE_BQ_TYPES[ddl_type], ( + f"_COMPATIBLE_BQ_TYPES[{ddl_type!r}] does not accept " + f"{ddl_type!r} as a compatible BQ field_type — circular " + f"identity check failed." + ) + + +class TestCompositeKey: + + def test_composite_primary_key_validates(self): + """#105 calls out composite endpoint keys explicitly. A two- + column primary key on a node table must be matched positionally + against the edge's two-column from_columns, with a per-column + type check at each position. Required because real ontologies + routinely have (session_id, span_id) or (decision_id, version) + composite keys.""" + import pathlib + import tempfile + + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + from bigquery_ontology import load_binding + from bigquery_ontology import load_ontology + + tmp = pathlib.Path(tempfile.mkdtemp(prefix="bind_validate_ck_")) + (tmp / "ont.yaml").write_text( + "ontology: TestGraph\n" + "entities:\n" + " - name: Decision\n" + " keys:\n" + " primary: [decision_id, version]\n" + " properties:\n" + " - name: decision_id\n" + " type: string\n" + " - name: version\n" + " type: integer\n" + " - name: Outcome\n" + " keys:\n" + " primary: [outcome_id]\n" + " properties:\n" + " - name: outcome_id\n" + " type: string\n" + "relationships:\n" + " - name: HasOutcome\n" + " from: Decision\n" + " to: Outcome\n" + " properties: []\n", + encoding="utf-8", + ) + (tmp / "bnd.yaml").write_text( + "binding: test_bind\n" + "ontology: TestGraph\n" + "target:\n" + " backend: bigquery\n" + " project: p\n" + " dataset: d\n" + "entities:\n" + " - name: Decision\n" + " source: decisions\n" + " properties:\n" + " - name: decision_id\n" + " column: decision_id\n" + " - name: version\n" + " column: version\n" + " - name: Outcome\n" + " source: outcomes\n" + " properties:\n" + " - name: outcome_id\n" + " column: outcome_id\n" + "relationships:\n" + " - name: HasOutcome\n" + " source: edges\n" + " from_columns: [decision_id, version]\n" + " to_columns: [outcome_id]\n" + " properties: []\n", + encoding="utf-8", + ) + + ontology = load_ontology(str(tmp / "ont.yaml")) + binding = load_binding(str(tmp / "bnd.yaml"), ontology=ontology) + + meta = [ + _FakeField("session_id", "STRING"), + _FakeField("extracted_at", "TIMESTAMP"), + ] + schemas = { + "p.d.decisions": [ + _FakeField("decision_id", "STRING"), + _FakeField("version", "INT64"), + ] + + meta, + "p.d.outcomes": [_FakeField("outcome_id", "STRING")] + meta, + "p.d.edges": [ + _FakeField("decision_id", "STRING"), + _FakeField("version", "INT64"), # second composite key column + _FakeField("outcome_id", "STRING"), + ] + + meta, + } + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + assert report.ok is True, ( + "Composite key validation should pass when both edge endpoint " + f"columns match the node's positional types. Got failures: " + f"{[(f.code, f.detail) for f in report.failures]}" + ) + + def test_composite_key_second_column_type_mismatch(self): + """If the second column of a composite key disagrees in type, + the validator must flag ENDPOINT_TYPE_MISMATCH at position 1.""" + import pathlib + import tempfile + + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + from bigquery_ontology import load_binding + from bigquery_ontology import load_ontology + + tmp = pathlib.Path(tempfile.mkdtemp(prefix="bind_validate_ck2_")) + (tmp / "ont.yaml").write_text( + "ontology: TestGraph\n" + "entities:\n" + " - name: Decision\n" + " keys:\n" + " primary: [decision_id, version]\n" + " properties:\n" + " - name: decision_id\n" + " type: string\n" + " - name: version\n" + " type: integer\n" + " - name: Outcome\n" + " keys:\n" + " primary: [outcome_id]\n" + " properties:\n" + " - name: outcome_id\n" + " type: string\n" + "relationships:\n" + " - name: HasOutcome\n" + " from: Decision\n" + " to: Outcome\n" + " properties: []\n", + encoding="utf-8", + ) + (tmp / "bnd.yaml").write_text( + "binding: test_bind\n" + "ontology: TestGraph\n" + "target:\n" + " backend: bigquery\n" + " project: p\n" + " dataset: d\n" + "entities:\n" + " - name: Decision\n" + " source: decisions\n" + " properties:\n" + " - name: decision_id\n" + " column: decision_id\n" + " - name: version\n" + " column: version\n" + " - name: Outcome\n" + " source: outcomes\n" + " properties:\n" + " - name: outcome_id\n" + " column: outcome_id\n" + "relationships:\n" + " - name: HasOutcome\n" + " source: edges\n" + " from_columns: [decision_id, version]\n" + " to_columns: [outcome_id]\n" + " properties: []\n", + encoding="utf-8", + ) + + ontology = load_ontology(str(tmp / "ont.yaml")) + binding = load_binding(str(tmp / "bnd.yaml"), ontology=ontology) + + meta = [ + _FakeField("session_id", "STRING"), + _FakeField("extracted_at", "TIMESTAMP"), + ] + schemas = { + "p.d.decisions": [ + _FakeField("decision_id", "STRING"), + _FakeField("version", "INT64"), + ] + + meta, + "p.d.outcomes": [_FakeField("outcome_id", "STRING")] + meta, + "p.d.edges": [ + _FakeField("decision_id", "STRING"), + # Second composite key column has the wrong type + # (STRING instead of INT64). + _FakeField("version", "STRING"), + _FakeField("outcome_id", "STRING"), + ] + + meta, + } + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + mismatches = [ + f + for f in report.failures + if f.code == FailureCode.ENDPOINT_TYPE_MISMATCH + ] + assert len(mismatches) == 1 + # The mismatch must point at the SECOND column (position [1]) + # of from_columns, not the first. + assert "from_columns[1]" in mismatches[0].binding_path + assert mismatches[0].expected == "INT64" + assert mismatches[0].observed == "STRING" + + +class TestEndpointPhysicalCrossTableCheck: + + def test_edge_endpoint_disagrees_with_node_actual_field_type(self): + """When the node table has drifted away from its ontology + declaration, the per-entity loop flags a TYPE_MISMATCH on the + node. But the edge endpoint may also disagree with the node's + *actual* storage type — in which case the join would fail at + query time. The validator must surface ENDPOINT_TYPE_MISMATCH + for that edge ↔ node disagreement, not just the spec-level one. + + Setup: node's decision_id column is INT64 in BQ (drifted from + the ontology's STRING declaration). Edge's decision_id column + is STRING (matching ontology, but disagreeing with the node's + actual storage). The edge would fail to join the node. + """ + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # Node decision_id has drifted to INT64. + schemas["p.d.decision_points"] = [ + _FakeField("decision_id", "INT64"), + _FakeField("confidence", "FLOAT64"), + ] + # Edge decision_id is still STRING (ontology says so). + # candidate_edges already has decision_id as STRING in + # _good_schemas(), so no change needed there. + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + # Per-entity loop catches the node's drift. + node_mismatches = [ + f + for f in report.failures + if f.code == FailureCode.TYPE_MISMATCH + and f.binding_element == "Decision" + ] + assert len(node_mismatches) == 1 + + # Per-relationship loop catches the physical cross-table edge ↔ + # node disagreement (edge=STRING, node=INT64). The detail must + # call out that this is a physical-table mismatch so users can + # tell it apart from the spec-level (1) check. + edge_mismatches = [ + f + for f in report.failures + if f.code == FailureCode.ENDPOINT_TYPE_MISMATCH + and f.binding_element == "HasOutcome" + ] + assert len(edge_mismatches) >= 1 + assert any( + "physical cross-table mismatch" in m.detail for m in edge_mismatches + ), ( + "Expected at least one ENDPOINT_TYPE_MISMATCH detail to call " + "out the physical cross-table mismatch; got: " + f"{[m.detail for m in edge_mismatches]}" + ) + + +class TestUnexpectedRepeatedMode: + + def test_repeated_mode_on_property_column(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + schemas["p.d.decision_points"] = [ + _FakeField("decision_id", "STRING"), + _FakeField("confidence", "FLOAT64", mode="REPEATED"), + ] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + repeated = [ + f + for f in report.failures + if f.code == FailureCode.UNEXPECTED_REPEATED_MODE + ] + assert len(repeated) == 1 + assert repeated[0].observed == "REPEATED" + + +class TestMissingDataset: + + def test_missing_dataset_classified(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + client = _FakeBQClient(_good_schemas(), missing_datasets={"p.d"}) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + assert any(f.code == FailureCode.MISSING_DATASET for f in report.failures) + + +class TestInsufficientPermissions: + + def test_forbidden_table_classified(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + client = _FakeBQClient(_good_schemas(), forbidden={"p.d.decision_points"}) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + perm = [ + f + for f in report.failures + if f.code == FailureCode.INSUFFICIENT_PERMISSIONS + ] + assert len(perm) >= 1 + assert perm[0].bq_ref == "p.d.decision_points" + + +class TestKeyColumnNullable: + + def test_default_mode_emits_warnings(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + client = _FakeBQClient(_good_schemas()) + + report = validate_binding_against_bigquery( + ontology=ontology, + binding=binding, + bq_client=client, + strict=False, + ) + + # Warnings are present (NULLABLE keys), but report.ok is True. + assert report.ok is True + assert all( + w.code == FailureCode.KEY_COLUMN_NULLABLE for w in report.warnings + ) + assert len(report.warnings) >= 2 # 2 entity keys, 2 endpoint keys + + def test_strict_mode_promotes_to_failures(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + client = _FakeBQClient(_good_schemas()) + + report = validate_binding_against_bigquery( + ontology=ontology, + binding=binding, + bq_client=client, + strict=True, + ) + + assert report.ok is False + assert all( + f.code == FailureCode.KEY_COLUMN_NULLABLE for f in report.failures + ) + # No warnings — they got escalated. + assert report.warnings == () + + def test_required_keys_clean_in_strict_mode(self): + """When the user's tables already use REQUIRED key columns, + strict mode reports clean.""" + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # Mark all key + endpoint columns as REQUIRED. + for col in schemas["p.d.decision_points"]: + if col.name == "decision_id": + col.mode = "REQUIRED" + for col in schemas["p.d.outcomes"]: + if col.name == "outcome_id": + col.mode = "REQUIRED" + for col in schemas["p.d.candidate_edges"]: + if col.name in ("decision_id", "outcome_id"): + col.mode = "REQUIRED" + + client = _FakeBQClient(schemas) + report = validate_binding_against_bigquery( + ontology=ontology, + binding=binding, + bq_client=client, + strict=True, + ) + + assert report.ok is True + assert report.failures == () + assert report.warnings == () + + +# ------------------------------------------------------------------ # +# Cross-cutting: cross-project source # +# ------------------------------------------------------------------ # + + +class TestBindingPathYamlOrder: + + def test_path_index_uses_binding_yaml_order_not_resolved_order(self): + """A binding YAML that lists properties in a different order + than the ontology must produce paths using the binding's index, + not the ResolvedEntity's. Otherwise tooling pointed at + ``binding.entities[0].properties[1].column`` would land on the + wrong YAML entry. + + Setup: ontology declares (decision_id, confidence). Binding + YAML lists them in reverse order (confidence, decision_id). + The bound 'confidence' column is missing on the BQ table. The + failure path must be ``properties[0]`` (the binding's index for + confidence), not ``properties[1]`` (the resolved-side index). + """ + import pathlib + import tempfile + + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + from bigquery_ontology import load_binding + from bigquery_ontology import load_ontology + + tmp = pathlib.Path(tempfile.mkdtemp(prefix="bind_validate_order_")) + (tmp / "ont.yaml").write_text( + "ontology: TestGraph\n" + "entities:\n" + " - name: Decision\n" + " keys:\n" + " primary: [decision_id]\n" + " properties:\n" + " - name: decision_id\n" + " type: string\n" + " - name: confidence\n" + " type: double\n" + "relationships: []\n", + encoding="utf-8", + ) + # Binding lists confidence FIRST, decision_id SECOND. + (tmp / "bnd.yaml").write_text( + "binding: test_bind\n" + "ontology: TestGraph\n" + "target:\n" + " backend: bigquery\n" + " project: p\n" + " dataset: d\n" + "entities:\n" + " - name: Decision\n" + " source: decision_points\n" + " properties:\n" + " - name: confidence\n" + " column: confidence\n" + " - name: decision_id\n" + " column: decision_id\n" + "relationships: []\n", + encoding="utf-8", + ) + + ontology = load_ontology(str(tmp / "ont.yaml")) + binding = load_binding(str(tmp / "bnd.yaml"), ontology=ontology) + + # Drop confidence — its bound column is missing on BQ. Keep + # decision_id and SDK metadata columns so the test isolates the + # missing-property failure for confidence. + schemas = { + "p.d.decision_points": [_FakeField("decision_id", "STRING")] + + _meta_fields(), + } + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + miss = [ + f + for f in report.failures + if f.code == FailureCode.MISSING_COLUMN and "confidence" in f.bq_ref + ] + assert len(miss) == 1 + # The failure path index must be 0 (binding YAML's index for + # 'confidence'), not 1 (the resolved-side index — Decision's + # ontology order is (decision_id, confidence)). + assert miss[0].binding_path == ( + "binding.entities[0].properties[0].column" + ), ( + f"Path {miss[0].binding_path!r} should reflect binding YAML " + f"order (confidence is properties[0]), not resolved-side " + f"order (where confidence is properties[1])." + ) + + +class TestCrossProjectSource: + + def test_fully_qualified_entity_source_validated_against_its_project( + self, + ): + """A binding whose entity.source is fully qualified to a project + different from binding.target.project must be validated against + the entity's project, not the target's. Catches the trap from + #105's "validate resolved sources, not only target.project / + target.dataset" finding.""" + import pathlib + import tempfile + + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + # Build a binding where Decision.source is fully qualified to a + # different project. The fixture builder doesn't support that + # directly, so write one inline. + from bigquery_ontology import load_binding + from bigquery_ontology import load_ontology + + tmp = pathlib.Path(tempfile.mkdtemp(prefix="bind_validate_xp_")) + (tmp / "ont.yaml").write_text( + "ontology: TestGraph\n" + "entities:\n" + " - name: Decision\n" + " keys:\n" + " primary: [decision_id]\n" + " properties:\n" + " - name: decision_id\n" + " type: string\n" + "relationships: []\n", + encoding="utf-8", + ) + (tmp / "bnd.yaml").write_text( + "binding: test_bind\n" + "ontology: TestGraph\n" + "target:\n" + " backend: bigquery\n" + " project: target-project\n" + " dataset: target-dataset\n" + "entities:\n" + " - name: Decision\n" + # Fully qualified to a different project. + " source: source-project.source-dataset.decisions\n" + " properties:\n" + " - name: decision_id\n" + " column: decision_id\n" + "relationships: []\n", + encoding="utf-8", + ) + + ontology = load_ontology(str(tmp / "ont.yaml")) + binding = load_binding(str(tmp / "bnd.yaml"), ontology=ontology) + + # Validator should look for the table at source-project, not + # target-project. Place the table at the correct location and + # leave target-project empty. Include SDK metadata columns so + # the test does not false-fail on missing-metadata-column. + schemas = { + "source-project.source-dataset.decisions": [ + _FakeField("decision_id", "STRING"), + ] + + _meta_fields(), + } + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + # Should be clean. Any MISSING_TABLE failure would mean the + # validator looked in the wrong project. + assert all(f.code != FailureCode.MISSING_TABLE for f in report.failures), ( + "validator looked in target project instead of fully-qualified " + "entity.source project: " + f"{[(f.code, f.bq_ref) for f in report.failures]}" + ) + + +# ------------------------------------------------------------------ # +# Report shape # +# ------------------------------------------------------------------ # + + +class TestReportShape: + + def test_failure_carries_required_fields(self): + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + schemas["p.d.decision_points"] = [ + _FakeField("decision_id", "STRING"), + ] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + f = report.failures[0] + assert f.code is not None + assert f.binding_element == "Decision" + assert f.binding_path.startswith("binding.entities[") + assert f.bq_ref.startswith("p.d.") + assert isinstance(f.detail, str) and f.detail + + def test_ok_property_is_failures_empty(self): + from bigquery_agent_analytics.binding_validation import BindingValidationReport + + empty = BindingValidationReport() + assert empty.ok is True + + from bigquery_agent_analytics.binding_validation import BindingValidationFailure + from bigquery_agent_analytics.binding_validation import FailureCode + + not_ok = BindingValidationReport( + failures=( + BindingValidationFailure( + code=FailureCode.MISSING_TABLE, + binding_element="X", + binding_path="binding.entities[0].source", + bq_ref="p.d.x", + ), + ), + ) + assert not_ok.ok is False diff --git a/tests/test_integration_ontology_binding.py b/tests/test_integration_ontology_binding.py index 323e7b18..2e3465a6 100644 --- a/tests/test_integration_ontology_binding.py +++ b/tests/test_integration_ontology_binding.py @@ -614,3 +614,228 @@ def test_synthetic_lineage_query( ) rows = list(job.result()) assert len(rows) > 0, "Lineage GQL returned 0 rows" + + +# ------------------------------------------------------------------ # +# Binding-validator live test (issue #105 PR 2a) # +# ------------------------------------------------------------------ # + + +class TestBindingValidationLive: + """Live validation that ``validate_binding_against_bigquery`` + behaves correctly against real BigQuery. + + Self-contained: uses its own per-test scratch dataset (rather + than the module-scoped fixture) because the third phase of this + test deliberately drops a column via ALTER TABLE, and running + destructive SQL against a shared dataset would interfere with + other tests in this file. + + Phases: + 1. Materialize real tables via OntologyMaterializer. + 2. Default-mode validation: report.ok must be True; warnings + contain only KEY_COLUMN_NULLABLE entries (because the SDK's + CREATE TABLE IF NOT EXISTS emits NULLABLE keys). + 3. Strict-mode validation: same input must surface those + warnings as KEY_COLUMN_NULLABLE failures, with warnings + empty (escalated, not duplicated). + 4. Drop the 'confidence' column via real ALTER TABLE; default- + mode re-validation must emit exactly one MISSING_COLUMN + failure pointing at the dropped column. + """ + + @pytest.fixture(scope="function") + def isolated_scratch(self): + """Per-test scratch dataset; cleaned up unconditionally.""" + from google.cloud import bigquery + + run_id = uuid.uuid4().hex[:8] + ds_id = f"bind_validate_live_{run_id}" + client = bigquery.Client(project=_PROJECT, location=_LOCATION) + ds = bigquery.Dataset(f"{_PROJECT}.{ds_id}") + ds.location = _LOCATION + ds.default_table_expiration_ms = 3600000 + client.create_dataset(ds, exists_ok=True) + try: + yield client, ds_id + finally: + client.delete_dataset( + f"{_PROJECT}.{ds_id}", + delete_contents=True, + not_found_ok=True, + ) + + @pytest.fixture(scope="function") + def isolated_ontology_and_binding(self, isolated_scratch, tmp_path_factory): + """Per-test ontology+binding pointing at the isolated scratch.""" + from bigquery_ontology import load_binding + from bigquery_ontology import load_ontology + + _, ds_id = isolated_scratch + tmp = tmp_path_factory.mktemp("bind_validate_live") + + ont_path = tmp / "ontology.yaml" + ont_path.write_text( + "ontology: BindValidatorLive\n" + "entities:\n" + " - name: Decision\n" + " keys:\n" + " primary: [decision_id]\n" + " properties:\n" + " - name: decision_id\n" + " type: string\n" + " - name: confidence\n" + " type: double\n" + " - name: Outcome\n" + " keys:\n" + " primary: [outcome_id]\n" + " properties:\n" + " - name: outcome_id\n" + " type: string\n" + "relationships:\n" + " - name: HasOutcome\n" + " from: Decision\n" + " to: Outcome\n" + " properties:\n" + " - name: weight\n" + " type: double\n", + encoding="utf-8", + ) + + bnd_path = tmp / "binding.yaml" + bnd_path.write_text( + f"binding: live_check\n" + f"ontology: BindValidatorLive\n" + f"target:\n" + f" backend: bigquery\n" + f" project: {_PROJECT}\n" + f" dataset: {ds_id}\n" + f"entities:\n" + f" - name: Decision\n" + f" source: decisions\n" + f" properties:\n" + f" - name: decision_id\n" + f" column: decision_id\n" + f" - name: confidence\n" + f" column: confidence\n" + f" - name: Outcome\n" + f" source: outcomes\n" + f" properties:\n" + f" - name: outcome_id\n" + f" column: outcome_id\n" + f"relationships:\n" + f" - name: HasOutcome\n" + f" source: edges\n" + f" from_columns: [decision_id]\n" + f" to_columns: [outcome_id]\n" + f" properties:\n" + f" - name: weight\n" + f" column: weight\n", + encoding="utf-8", + ) + + ontology = load_ontology(str(ont_path)) + binding = load_binding(str(bnd_path), ontology=ontology) + return ontology, binding + + def test_validator_end_to_end_against_real_bigquery( + self, isolated_scratch, isolated_ontology_and_binding + ): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + from bigquery_agent_analytics.ontology_materializer import OntologyMaterializer + + client, ds_id = isolated_scratch + ontology, binding = isolated_ontology_and_binding + + # Cheap fixture-scope sanity check: the binding YAML the second + # fixture wrote must point at the dataset the first fixture + # created. If a future refactor flips either fixture to module + # scope by accident, the binding would race with whichever + # dataset was created first and this assert would catch it + # before any BQ call runs. + assert binding.target.dataset == ds_id, ( + f"binding.target.dataset={binding.target.dataset!r} but " + f"isolated_scratch yielded {ds_id!r}; fixture scopes are " + f"out of sync" + ) + + # Phase 1: materialize real tables. + mat = OntologyMaterializer.from_ontology_binding( + ontology=ontology, + binding=binding, + lineage_config=None, + write_mode="batch_load", + ) + tables = mat.create_tables() + assert set(tables.keys()) == { + "Decision", + "Outcome", + "HasOutcome", + }, f"Unexpected tables created: {sorted(tables.keys())}" + + # Phase 2: default-mode validation. SDK-created tables must + # validate clean; the only signal is advisory warnings on + # NULLABLE keys. + default_report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + assert default_report.ok is True, ( + f"Default mode rejected SDK-created tables. Failures: " + f"{[(f.code, f.detail) for f in default_report.failures]}" + ) + assert all( + w.code == FailureCode.KEY_COLUMN_NULLABLE + for w in default_report.warnings + ), ( + "Only KEY_COLUMN_NULLABLE warnings expected against SDK-" + "created tables. Got: " + f"{[w.code for w in default_report.warnings]}" + ) + # Decision.decision_id, Outcome.outcome_id (entity primary keys) + # plus HasOutcome.from_columns[0]=decision_id and + # HasOutcome.to_columns[0]=outcome_id (relationship endpoints). + assert len(default_report.warnings) == 4 + + # Phase 3: strict-mode escalation. + strict_report = validate_binding_against_bigquery( + ontology=ontology, + binding=binding, + bq_client=client, + strict=True, + ) + assert ( + strict_report.ok is False + ), "Strict mode should reject NULLABLE primary-key columns" + assert all( + f.code == FailureCode.KEY_COLUMN_NULLABLE + for f in strict_report.failures + ) + assert len(strict_report.failures) == 4 + assert strict_report.warnings == (), ( + "Strict mode must escalate warnings into failures, not " + "double-emit them" + ) + + # Phase 4: drop a non-key property column via real ALTER TABLE + # and assert the validator catches the resulting drift. + table_ref = f"{_PROJECT}.{ds_id}.decisions" + client.query(f"ALTER TABLE `{table_ref}` DROP COLUMN confidence").result() + + broken_report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + miss = [ + f + for f in broken_report.failures + if f.code == FailureCode.MISSING_COLUMN and "confidence" in f.bq_ref + ] + assert len(miss) == 1, ( + f"Expected exactly 1 MISSING_COLUMN for confidence, got " + f"failures: " + f"{[(f.code, f.bq_ref) for f in broken_report.failures]}" + ) + # Path must reflect binding YAML order. Decision's binding lists + # decision_id at properties[0] and confidence at properties[1]. + assert miss[0].binding_path == ("binding.entities[0].properties[1].column") + assert miss[0].bq_ref == f"{table_ref}.confidence"