From 2b2561f1dbe881cffa6be2b6d8f985cbe2fc5fb2 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Sat, 2 May 2026 23:38:55 -0700 Subject: [PATCH 1/6] feat(ontology): add validate_binding_against_bigquery (#105 PR 2a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-flight validator that checks whether a binding YAML's referenced BigQuery tables physically exist with the columns and types the binding requires, before extraction wastes AI.GENERATE tokens. Different from validate_extracted_graph in #76: that one validates extracted graph output against the ResolvedGraph spec. This one validates the binding against live BigQuery schemas. The two share the same public report ergonomics (ok / failures / typed codes) but keep separate Failure/Warning types because their context fields differ (binding_path/bq_ref here vs. node_id/edge_id/event_id/ FallbackScope there) — per the working plan's cross-PR consistency note on #96. Module: src/bigquery_agent_analytics/binding_validation.py Public surface: - validate_binding_against_bigquery(ontology, binding, bq_client, strict=False) -> BindingValidationReport - BindingValidationReport(failures, warnings); ok property is True iff failures is empty (warnings do not flip ok). - BindingValidationFailure / BindingValidationWarning carry code, binding_element, binding_path (binding.entities[N].properties[M] .column style), bq_ref, expected, observed, detail. - FailureCode enum: 7 default-mode codes (MISSING_TABLE, MISSING_COLUMN, TYPE_MISMATCH, ENDPOINT_TYPE_MISMATCH, UNEXPECTED_REPEATED_MODE, MISSING_DATASET, INSUFFICIENT_PERMISSIONS) + 1 strict-only code (KEY_COLUMN_NULLABLE). Internal flow: 1. resolve(ontology, binding) -> ResolvedGraph (so the validator honors fully-qualified entity.source overrides via _qualify_source at resolved_spec.py:141 — cross-project bindings work). 2. For each entity: get_table -> column-name index -> per-property type / mode checks -> per-key-column REPEATED + strict-only nullable check. 3. For each relationship: same property checks plus ENDPOINT_TYPE_MISMATCH for from_columns/to_columns whose BQ types do not match the referenced entity's primary-key column types. Type compatibility uses the materializer's _DDL_TYPE_MAP (ontology_materializer.py:125) so consistency with SDK-generated DDL is automatic. Legacy BQ aliases (INTEGER/FLOAT/BOOLEAN) accepted. Strict-mode contract: - strict=False (default): KEY_COLUMN_NULLABLE emits a BindingValidationWarning; report.ok stays True. SDK-created tables (CREATE TABLE IF NOT EXISTS without NOT NULL on key columns) must validate clean by default. - strict=True: same checks emit BindingValidationFailure with the same code; report.ok flips to False. Warnings are escalated, not duplicated. Tests (16 unit tests, all pass): - TestSdkCreatedTablesRegression: SDK-create_tables() output validates clean by default (catches the validator-rejects- SDK-tables trap); strict=True surfaces the same input as KEY_COLUMN_NULLABLE failures. - One positive case per failure code: TestMissingTable, TestMissingColumn, TestTypeMismatch (+ legacy alias acceptance), TestEndpointTypeMismatch, TestUnexpectedRepeatedMode, TestMissingDataset, TestInsufficientPermissions, TestKeyColumnNullable (default warning + strict failure + required-keys clean). - TestCrossProjectSource: a binding whose entity.source is fully qualified to a different project from binding.target.project validates against the entity's project, not the target's. - TestReportShape: failure carries binding_element / binding_path / bq_ref / detail; ok is failures-empty. PR 2b (CLI surface: bq-agent-sdk binding-validate + ontology-build --validate-binding[-strict] flags + live integration test) follows in a separate PR per the working plan. --- .../binding_validation.py | 605 ++++++++++++++++ tests/test_binding_validation.py | 659 ++++++++++++++++++ 2 files changed, 1264 insertions(+) create mode 100644 src/bigquery_agent_analytics/binding_validation.py create mode 100644 tests/test_binding_validation.py diff --git a/src/bigquery_agent_analytics/binding_validation.py b/src/bigquery_agent_analytics/binding_validation.py new file mode 100644 index 00000000..3a387708 --- /dev/null +++ b/src/bigquery_agent_analytics/binding_validation.py @@ -0,0 +1,605 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Pre-flight validator: ontology binding vs. existing BigQuery tables. + +This validator checks whether the BigQuery tables a binding YAML points +at physically exist with the columns and types the binding requires, +*before* the SDK starts extraction or materialization. It catches the +most common authoring error (binding YAML drifted out of sync with +physical tables) before extraction wastes ``AI.GENERATE`` tokens. + +Different from :func:`bigquery_agent_analytics.graph_validation. +validate_extracted_graph` (issue #76): that one validates extracted +graph output against the spec. This one validates the binding against +the live BigQuery schemas. + +Usage:: + + from bigquery_agent_analytics.binding_validation import ( + validate_binding_against_bigquery, + ) + + report = validate_binding_against_bigquery( + ontology=loaded_ontology, + binding=loaded_binding, + bq_client=bigquery.Client(project="my-project", location="US"), + strict=False, + ) + + if not report.ok: + for f in report.failures: + print(f) + for w in report.warnings: + print(f"WARN: {w}") + +See ``docs/ontology/binding-validation.md`` for the full failure-code +reference and CI usage patterns. +""" + +from __future__ import annotations + +import dataclasses +import enum +import logging +from typing import Any, Optional + +logger = logging.getLogger("bigquery_agent_analytics." + __name__) + + +# ------------------------------------------------------------------ # +# Failure codes # +# ------------------------------------------------------------------ # + + +class FailureCode(str, enum.Enum): + """Typed enum of binding-validation failure codes. + + Seven codes always run (default mode). One additional code + (``KEY_COLUMN_NULLABLE``) emits a warning by default and escalates + to a failure under ``strict=True`` — the SDK's own + ``CREATE TABLE IF NOT EXISTS`` DDL emits NULLABLE key columns + (``ontology_materializer.py:206``), so requiring REQUIRED mode in + default-mode would reject SDK-created tables. + """ + + # Default-mode (always failures). + MISSING_TABLE = "missing_table" + MISSING_COLUMN = "missing_column" + TYPE_MISMATCH = "type_mismatch" + ENDPOINT_TYPE_MISMATCH = "endpoint_type_mismatch" + UNEXPECTED_REPEATED_MODE = "unexpected_repeated_mode" + MISSING_DATASET = "missing_dataset" + INSUFFICIENT_PERMISSIONS = "insufficient_permissions" + + # Strict-mode-only (warning by default, failure under strict=True). + KEY_COLUMN_NULLABLE = "key_column_nullable" + + +# ------------------------------------------------------------------ # +# Report types # +# ------------------------------------------------------------------ # + + +@dataclasses.dataclass(frozen=True) +class BindingValidationFailure: + """One failure found during validation. + + ``binding_path`` is a ``binding.entities[N].properties[M].column`` + style path so tooling can point users at the exact YAML line. The + ``binding_element`` field carries the ontology element name for + human-readable error reporting. + """ + + code: FailureCode + binding_element: str + binding_path: str + bq_ref: str + expected: Any = None + observed: Any = None + detail: str = "" + + +@dataclasses.dataclass(frozen=True) +class BindingValidationWarning: + """Same shape as :class:`BindingValidationFailure`. + + Warnings exist so callers can format failures and warnings + uniformly. Warnings do not flip ``report.ok`` to ``False``; they + are advisory in default mode and escalate into ``failures`` under + ``strict=True``. + """ + + code: FailureCode + binding_element: str + binding_path: str + bq_ref: str + expected: Any = None + observed: Any = None + detail: str = "" + + +@dataclasses.dataclass(frozen=True) +class BindingValidationReport: + """Result of :func:`validate_binding_against_bigquery`. + + ``failures`` are hard failures (always present in default and + strict modes). ``warnings`` are strict-only checks that emitted in + default mode (empty under ``strict=True`` because they got + escalated to ``failures``). ``ok`` returns ``True`` iff + ``failures`` is empty — warnings do *not* affect ``ok``. + """ + + failures: tuple[BindingValidationFailure, ...] = () + warnings: tuple[BindingValidationWarning, ...] = () + + @property + def ok(self) -> bool: + return not self.failures + + +# ------------------------------------------------------------------ # +# BQ type compatibility # +# ------------------------------------------------------------------ # + +# Maps the SDK's DDL type (per ``ontology_materializer._DDL_TYPE_MAP``) +# to the set of BigQuery ``SchemaField.field_type`` values that the SDK +# accepts as compatible. BigQuery returns legacy names like ``INTEGER`` +# and ``FLOAT`` from older table schemas, so each modern name lists its +# legacy alias. +_COMPATIBLE_BQ_TYPES: dict[str, frozenset[str]] = { + "STRING": frozenset({"STRING"}), + "INT64": frozenset({"INT64", "INTEGER"}), + "FLOAT64": frozenset({"FLOAT64", "FLOAT"}), + "BOOL": frozenset({"BOOL", "BOOLEAN"}), + "TIMESTAMP": frozenset({"TIMESTAMP"}), + "DATE": frozenset({"DATE"}), + "BYTES": frozenset({"BYTES"}), +} + + +def _expected_ddl_type(sdk_type: str) -> Optional[str]: + """Return the BQ DDL type the materializer would emit for *sdk_type*. + + Mirrors ``ontology_materializer._DDL_TYPE_MAP`` so the validator + uses the same expectations the SDK uses when it generates DDL + itself. Returns ``None`` for unknown SDK types — the validator + skips type-compatibility checks for those rather than guessing. + """ + # Local import to avoid a circular dep at module load time. + from .ontology_materializer import _DDL_TYPE_MAP + + return _DDL_TYPE_MAP.get(sdk_type.strip().lower()) + + +def _bq_type_matches(sdk_type: str, bq_field_type: str) -> bool: + """Return True if *bq_field_type* is compatible with *sdk_type*.""" + expected = _expected_ddl_type(sdk_type) + if expected is None: + return True # unknown SDK type: skip the check + return bq_field_type.upper() in _COMPATIBLE_BQ_TYPES.get( + expected, frozenset() + ) + + +# ------------------------------------------------------------------ # +# Public API # +# ------------------------------------------------------------------ # + + +def validate_binding_against_bigquery( + *, + ontology, + binding, + bq_client, + strict: bool = False, +) -> BindingValidationReport: + """Validate a binding against live BigQuery tables. + + Resolves the ontology + binding to a ``ResolvedGraph``, then + checks that every entity / relationship table the binding + references exists and that every bound column is present with a + compatible type. + + Args: + ontology: Upstream ``Ontology`` model. + binding: Upstream ``Binding`` model. + bq_client: A ``google.cloud.bigquery.Client``-like object with + ``get_table(table_ref)`` returning an object exposing + ``.schema`` (an iterable of ``SchemaField``-like records + with ``.name``, ``.field_type``, ``.mode`` attributes). + strict: When ``False`` (default), strict-only checks (today: + ``KEY_COLUMN_NULLABLE``) emit ``BindingValidationWarning`` + entries. When ``True``, they emit + ``BindingValidationFailure`` entries with the same code. + Default is permissive so the validator does not reject + tables produced by the SDK's own ``CREATE TABLE IF NOT + EXISTS`` DDL. + + Returns: + A :class:`BindingValidationReport`. + """ + from .resolved_spec import resolve + + spec = resolve(ontology, binding, lineage_config=None) + + # Index binding entries by name so we can build precise paths. + entity_index = {b.name: i for i, b in enumerate(binding.entities)} + relationship_index = {b.name: i for i, b in enumerate(binding.relationships)} + + failures: list[BindingValidationFailure] = [] + warnings: list[BindingValidationWarning] = [] + table_cache: dict[str, Optional[Any]] = {} + + def emit( + code: FailureCode, + *, + binding_element: str, + binding_path: str, + bq_ref: str, + expected: Any = None, + observed: Any = None, + detail: str = "", + strict_only: bool = False, + ) -> None: + """Emit either a failure or a warning, honoring strict mode.""" + if strict_only and not strict: + warnings.append( + BindingValidationWarning( + code=code, + binding_element=binding_element, + binding_path=binding_path, + bq_ref=bq_ref, + expected=expected, + observed=observed, + detail=detail, + ) + ) + else: + failures.append( + BindingValidationFailure( + code=code, + binding_element=binding_element, + binding_path=binding_path, + bq_ref=bq_ref, + expected=expected, + observed=observed, + detail=detail, + ) + ) + + def fetch_table( + table_ref: str, binding_element: str, binding_path: str + ) -> Optional[Any]: + """Fetch a BQ table, classify any error, and cache the result.""" + if table_ref in table_cache: + return table_cache[table_ref] + + try: + table = bq_client.get_table(table_ref) + table_cache[table_ref] = table + return table + except Exception as exc: # noqa: BLE001 - classify by message + msg = str(exc).lower() + code: FailureCode + if "not found" in msg and "dataset" in msg: + code = FailureCode.MISSING_DATASET + elif "not found" in msg or "does not exist" in msg: + code = FailureCode.MISSING_TABLE + elif "permission" in msg or "forbidden" in msg or "denied" in msg: + code = FailureCode.INSUFFICIENT_PERMISSIONS + else: + # Default to MISSING_TABLE for unknown errors so the user gets + # an actionable failure rather than an opaque exception. + code = FailureCode.MISSING_TABLE + + emit( + code, + binding_element=binding_element, + binding_path=binding_path, + bq_ref=table_ref, + detail=str(exc), + ) + table_cache[table_ref] = None + return None + + # ---- Per-entity checks --------------------------------------- # + + for entity in spec.entities: + binding_idx = entity_index.get(entity.name) + if binding_idx is None: + # Entity not in this binding (e.g. abstract upstream — already + # filtered by resolve()). Skip silently. + continue + + binding_root = f"binding.entities[{binding_idx}]" + table = fetch_table( + entity.source, + binding_element=entity.name, + binding_path=f"{binding_root}.source", + ) + if table is None: + continue + + # Index BQ schema by column name. + bq_columns = {f.name: f for f in table.schema} + + # Check every bound property. + for j, prop in enumerate(entity.properties): + prop_path = f"{binding_root}.properties[{j}].column" + bq_field = bq_columns.get(prop.column) + + if bq_field is None: + emit( + FailureCode.MISSING_COLUMN, + binding_element=entity.name, + binding_path=prop_path, + bq_ref=f"{entity.source}.{prop.column}", + expected=prop.column, + detail=( + f"binding declares property {prop.logical_name!r} " + f"on column {prop.column!r}, not found on table " + f"{entity.source}" + ), + ) + continue + + # REPEATED-mode columns can't carry scalar properties. + if getattr(bq_field, "mode", "NULLABLE") == "REPEATED": + emit( + FailureCode.UNEXPECTED_REPEATED_MODE, + binding_element=entity.name, + binding_path=prop_path, + bq_ref=f"{entity.source}.{prop.column}", + expected="NULLABLE or REQUIRED", + observed="REPEATED", + detail=( + f"column {prop.column!r} is REPEATED on " + f"{entity.source}; the SDK can't bind scalar " + f"properties to ARRAY columns" + ), + ) + + # Type compatibility. + if not _bq_type_matches(prop.sdk_type, bq_field.field_type): + emit( + FailureCode.TYPE_MISMATCH, + binding_element=entity.name, + binding_path=prop_path, + bq_ref=f"{entity.source}.{prop.column}", + expected=_expected_ddl_type(prop.sdk_type), + observed=bq_field.field_type, + detail=( + f"binding maps property {prop.logical_name!r} (sdk_type=" + f"{prop.sdk_type!r}) to column {prop.column!r}, but BQ " + f"reports type {bq_field.field_type!r}" + ), + ) + + # Per-key-column checks (REPEATED + strict-only nullability). + for key_col in entity.key_columns: + key_path = f"{binding_root}..{key_col}" + bq_field = bq_columns.get(key_col) + if bq_field is None: + # Already reported as MISSING_COLUMN above when the key was + # also a bound property. If the key isn't a bound property + # (rare; ontology requires keys to be properties), still + # surface it. + if key_col not in {p.column for p in entity.properties}: + emit( + FailureCode.MISSING_COLUMN, + binding_element=entity.name, + binding_path=key_path, + bq_ref=f"{entity.source}.{key_col}", + expected=key_col, + detail=( + f"primary-key column {key_col!r} not found on " + f"table {entity.source}" + ), + ) + continue + + if getattr(bq_field, "mode", "NULLABLE") == "REPEATED": + emit( + FailureCode.UNEXPECTED_REPEATED_MODE, + binding_element=entity.name, + binding_path=key_path, + bq_ref=f"{entity.source}.{key_col}", + expected="NULLABLE or REQUIRED", + observed="REPEATED", + detail=( + f"primary-key column {key_col!r} is REPEATED on " + f"{entity.source}; primary keys can't be ARRAY" + ), + ) + continue + + if getattr(bq_field, "mode", "NULLABLE") == "NULLABLE": + emit( + FailureCode.KEY_COLUMN_NULLABLE, + binding_element=entity.name, + binding_path=key_path, + bq_ref=f"{entity.source}.{key_col}", + expected="REQUIRED", + observed="NULLABLE", + detail=( + f"primary-key column {key_col!r} on {entity.source} " + f"is NULLABLE; under --strict this is a hard failure" + ), + strict_only=True, + ) + + # ---- Per-relationship checks --------------------------------- # + + # Index entity sdk_types per key_column for endpoint type matching. + entity_key_types: dict[str, dict[str, str]] = {} + for ent in spec.entities: + cols = {p.column: p.sdk_type for p in ent.properties} + entity_key_types[ent.name] = { + k: cols.get(k, "string") for k in ent.key_columns + } + + for rel in spec.relationships: + binding_idx = relationship_index.get(rel.name) + if binding_idx is None: + continue + + binding_root = f"binding.relationships[{binding_idx}]" + table = fetch_table( + rel.source, + binding_element=rel.name, + binding_path=f"{binding_root}.source", + ) + if table is None: + continue + + bq_columns = {f.name: f for f in table.schema} + + # Endpoint columns: from_columns and to_columns. + def _check_endpoint( + kind: str, + rel_columns: tuple[str, ...], + endpoint_entity_name: str, + ) -> None: + """``kind`` is either ``'from_columns'`` or ``'to_columns'``.""" + endpoint_types = entity_key_types.get(endpoint_entity_name, {}) + endpoint_key_cols = list(endpoint_types.keys()) + for j, col in enumerate(rel_columns): + col_path = f"{binding_root}.{kind}[{j}]" + bq_field = bq_columns.get(col) + + if bq_field is None: + emit( + FailureCode.MISSING_COLUMN, + binding_element=rel.name, + binding_path=col_path, + bq_ref=f"{rel.source}.{col}", + expected=col, + detail=( + f"endpoint column {col!r} not found on edge table " + f"{rel.source}" + ), + ) + continue + + if getattr(bq_field, "mode", "NULLABLE") == "REPEATED": + emit( + FailureCode.UNEXPECTED_REPEATED_MODE, + binding_element=rel.name, + binding_path=col_path, + bq_ref=f"{rel.source}.{col}", + expected="NULLABLE or REQUIRED", + observed="REPEATED", + detail=( + f"endpoint column {col!r} on {rel.source} is " + f"REPEATED; endpoint keys can't be ARRAY" + ), + ) + continue + + # Endpoint type must match the referenced entity's key + # column type at the same position. + if j < len(endpoint_key_cols): + expected_sdk = endpoint_types[endpoint_key_cols[j]] + if not _bq_type_matches(expected_sdk, bq_field.field_type): + emit( + FailureCode.ENDPOINT_TYPE_MISMATCH, + binding_element=rel.name, + binding_path=col_path, + bq_ref=f"{rel.source}.{col}", + expected=_expected_ddl_type(expected_sdk), + observed=bq_field.field_type, + detail=( + f"endpoint column {col!r} on {rel.source} has BQ " + f"type {bq_field.field_type!r}, but referenced " + f"entity {endpoint_entity_name!r} key " + f"{endpoint_key_cols[j]!r} expects sdk_type=" + f"{expected_sdk!r}" + ), + ) + + # Strict-only: endpoint keys should be REQUIRED. + if getattr(bq_field, "mode", "NULLABLE") == "NULLABLE": + emit( + FailureCode.KEY_COLUMN_NULLABLE, + binding_element=rel.name, + binding_path=col_path, + bq_ref=f"{rel.source}.{col}", + expected="REQUIRED", + observed="NULLABLE", + detail=( + f"endpoint column {col!r} on {rel.source} is " + f"NULLABLE; under --strict this is a hard failure" + ), + strict_only=True, + ) + + _check_endpoint("from_columns", rel.from_columns, rel.from_entity) + _check_endpoint("to_columns", rel.to_columns, rel.to_entity) + + # Property column checks. + for j, prop in enumerate(rel.properties): + prop_path = f"{binding_root}.properties[{j}].column" + bq_field = bq_columns.get(prop.column) + + if bq_field is None: + emit( + FailureCode.MISSING_COLUMN, + binding_element=rel.name, + binding_path=prop_path, + bq_ref=f"{rel.source}.{prop.column}", + expected=prop.column, + detail=( + f"binding declares property {prop.logical_name!r} on " + f"column {prop.column!r}, not found on edge table " + f"{rel.source}" + ), + ) + continue + + if getattr(bq_field, "mode", "NULLABLE") == "REPEATED": + emit( + FailureCode.UNEXPECTED_REPEATED_MODE, + binding_element=rel.name, + binding_path=prop_path, + bq_ref=f"{rel.source}.{prop.column}", + expected="NULLABLE or REQUIRED", + observed="REPEATED", + detail=( + f"property column {prop.column!r} on {rel.source} is " + f"REPEATED; the SDK can't bind scalar properties to " + f"ARRAY columns" + ), + ) + + if not _bq_type_matches(prop.sdk_type, bq_field.field_type): + emit( + FailureCode.TYPE_MISMATCH, + binding_element=rel.name, + binding_path=prop_path, + bq_ref=f"{rel.source}.{prop.column}", + expected=_expected_ddl_type(prop.sdk_type), + observed=bq_field.field_type, + detail=( + f"binding maps relationship property " + f"{prop.logical_name!r} (sdk_type={prop.sdk_type!r}) " + f"to column {prop.column!r}, but BQ reports type " + f"{bq_field.field_type!r}" + ), + ) + + return BindingValidationReport( + failures=tuple(failures), + warnings=tuple(warnings), + ) diff --git a/tests/test_binding_validation.py b/tests/test_binding_validation.py new file mode 100644 index 00000000..18c1480d --- /dev/null +++ b/tests/test_binding_validation.py @@ -0,0 +1,659 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for binding_validation.validate_binding_against_bigquery. + +Tests use a fake BQ client (a small inline class with a ``get_table`` +method) so they do not require live BigQuery. Each of the seven +default-mode failure codes has at least one positive case plus the +"clean against SDK-created tables" regression test for the strict-only +KEY_COLUMN_NULLABLE check. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from dataclasses import field +from typing import Optional + +import pytest + +# ------------------------------------------------------------------ # +# Fakes # +# ------------------------------------------------------------------ # + + +@dataclass +class _FakeField: + """Shape-compatible with google.cloud.bigquery.SchemaField.""" + + name: str + field_type: str + mode: str = "NULLABLE" + + +@dataclass +class _FakeTable: + schema: list[_FakeField] = field(default_factory=list) + + +class _FakeBQClient: + """Minimal get_table() impl backed by a {table_ref: schema} map. + + Missing table_refs raise ``NotFound``; entries with ``schema=None`` + raise other classified errors so we can exercise the + MISSING_DATASET / INSUFFICIENT_PERMISSIONS branches. + """ + + def __init__( + self, + tables: dict[str, list[_FakeField]], + *, + missing_datasets: Optional[set[str]] = None, + forbidden: Optional[set[str]] = None, + ) -> None: + self._tables = tables + self._missing_datasets = missing_datasets or set() + self._forbidden = forbidden or set() + + def get_table(self, table_ref: str): + if table_ref in self._forbidden: + raise PermissionError(f"403 Permission denied for table {table_ref}") + parts = table_ref.split(".") + if len(parts) >= 2: + dataset_ref = ".".join(parts[:2]) + if dataset_ref in self._missing_datasets: + raise LookupError(f"404 Not found: Dataset {dataset_ref} was not found") + if table_ref not in self._tables: + raise LookupError(f"404 Not found: Table {table_ref} does not exist") + return _FakeTable(schema=list(self._tables[table_ref])) + + +# ------------------------------------------------------------------ # +# Fixture builders — minimal Ontology + Binding for one entity-edge # +# pair, exercising every code path with small variations. # +# ------------------------------------------------------------------ # + + +def _ontology_and_binding( + project: str = "p", + dataset: str = "d", + entity_source: str = "decision_points", + rel_source: str = "candidate_edges", +): + from bigquery_ontology import load_binding + from bigquery_ontology import load_ontology + + ontology_yaml = ( + "ontology: TestGraph\n" + "entities:\n" + " - name: Decision\n" + " keys:\n" + " primary: [decision_id]\n" + " properties:\n" + " - name: decision_id\n" + " type: string\n" + " - name: confidence\n" + " type: double\n" + " - name: Outcome\n" + " keys:\n" + " primary: [outcome_id]\n" + " properties:\n" + " - name: outcome_id\n" + " type: string\n" + "relationships:\n" + " - name: HasOutcome\n" + " from: Decision\n" + " to: Outcome\n" + " properties:\n" + " - name: weight\n" + " type: double\n" + ) + binding_yaml = ( + "binding: test_bind\n" + "ontology: TestGraph\n" + "target:\n" + " backend: bigquery\n" + f" project: {project}\n" + f" dataset: {dataset}\n" + "entities:\n" + " - name: Decision\n" + f" source: {entity_source}\n" + " properties:\n" + " - name: decision_id\n" + " column: decision_id\n" + " - name: confidence\n" + " column: confidence\n" + " - name: Outcome\n" + " source: outcomes\n" + " properties:\n" + " - name: outcome_id\n" + " column: outcome_id\n" + "relationships:\n" + " - name: HasOutcome\n" + f" source: {rel_source}\n" + " from_columns: [decision_id]\n" + " to_columns: [outcome_id]\n" + " properties:\n" + " - name: weight\n" + " column: weight\n" + ) + + import pathlib + import tempfile + + tmp = pathlib.Path(tempfile.mkdtemp(prefix="bind_validate_")) + ont_path = tmp / "test.ontology.yaml" + bnd_path = tmp / "test.binding.yaml" + ont_path.write_text(ontology_yaml, encoding="utf-8") + bnd_path.write_text(binding_yaml, encoding="utf-8") + + ontology = load_ontology(str(ont_path)) + binding = load_binding(str(bnd_path), ontology=ontology) + return ontology, binding + + +def _good_schemas() -> dict[str, list[_FakeField]]: + """All tables present with matching column types — clean baseline. + + Models tables produced by ``OntologyMaterializer.create_tables()``: + NULLABLE everywhere (no NOT NULL), correct types per + ``_DDL_TYPE_MAP``. + """ + return { + "p.d.decision_points": [ + _FakeField("decision_id", "STRING"), + _FakeField("confidence", "FLOAT64"), + ], + "p.d.outcomes": [ + _FakeField("outcome_id", "STRING"), + ], + "p.d.candidate_edges": [ + _FakeField("decision_id", "STRING"), + _FakeField("outcome_id", "STRING"), + _FakeField("weight", "FLOAT64"), + ], + } + + +# ------------------------------------------------------------------ # +# Default-mode regression: SDK-created tables validate clean # +# ------------------------------------------------------------------ # + + +class TestSdkCreatedTablesRegression: + + def test_default_mode_ok_against_sdk_created_tables(self): + """Tables matching ``OntologyMaterializer.create_tables()`` output + must validate clean by default — every key column is NULLABLE + (the SDK doesn't emit NOT NULL), so a default-mode hard failure + on KEY_COLUMN_NULLABLE would reject SDK-created tables. Catches + the "validator rejects SDK-created tables" trap. + + Default mode may still emit advisory ``KEY_COLUMN_NULLABLE`` + warnings for those NULLABLE keys (visible to a CI gate that + chooses to surface them), but ``report.ok`` must stay True.""" + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + client = _FakeBQClient(_good_schemas()) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + assert report.ok is True + assert report.failures == () + # Warnings are expected (NULLABLE keys), but they must not flip + # `ok` to False — that's the whole point of the strict-only + # classification. + for w in report.warnings: + assert w.code == FailureCode.KEY_COLUMN_NULLABLE + + def test_strict_mode_emits_warnings_as_failures(self): + """The same SDK-created tables, run under strict=True, must + surface NULLABLE key columns as KEY_COLUMN_NULLABLE failures.""" + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + client = _FakeBQClient(_good_schemas()) + + report = validate_binding_against_bigquery( + ontology=ontology, + binding=binding, + bq_client=client, + strict=True, + ) + + assert report.ok is False + nullable_codes = [ + f.code + for f in report.failures + if f.code == FailureCode.KEY_COLUMN_NULLABLE + ] + # Decision.decision_id, Outcome.outcome_id (entity primary keys) + # plus HasOutcome.from_columns[0]=decision_id and + # HasOutcome.to_columns[0]=outcome_id (relationship endpoints). + assert len(nullable_codes) == 4 + + +# ------------------------------------------------------------------ # +# Per-failure-code unit tests # +# ------------------------------------------------------------------ # + + +class TestMissingTable: + + def test_entity_table_missing(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + del schemas["p.d.decision_points"] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + decision_failures = [ + f for f in report.failures if f.binding_element == "Decision" + ] + assert any(f.code == FailureCode.MISSING_TABLE for f in decision_failures) + miss = next( + f for f in decision_failures if f.code == FailureCode.MISSING_TABLE + ) + assert miss.bq_ref == "p.d.decision_points" + assert miss.binding_path == "binding.entities[0].source" + + +class TestMissingColumn: + + def test_property_column_missing(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # Drop the 'confidence' column from decision_points. + schemas["p.d.decision_points"] = [ + _FakeField("decision_id", "STRING"), + ] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + miss = [f for f in report.failures if f.code == FailureCode.MISSING_COLUMN] + assert len(miss) == 1 + assert miss[0].binding_element == "Decision" + assert miss[0].bq_ref == "p.d.decision_points.confidence" + assert "properties[1].column" in miss[0].binding_path + + +class TestTypeMismatch: + + def test_entity_property_type_mismatch(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # confidence is supposed to be FLOAT64 (sdk_type=double). + schemas["p.d.decision_points"] = [ + _FakeField("decision_id", "STRING"), + _FakeField("confidence", "INT64"), # wrong type + ] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + mismatches = [ + f for f in report.failures if f.code == FailureCode.TYPE_MISMATCH + ] + assert len(mismatches) == 1 + assert mismatches[0].expected == "FLOAT64" + assert mismatches[0].observed == "INT64" + + def test_legacy_bq_type_aliases_accepted(self): + """BigQuery returns 'INTEGER' / 'FLOAT' / 'BOOLEAN' for older + schemas; the validator must treat them as compatible with INT64 + / FLOAT64 / BOOL respectively.""" + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # confidence is double → FLOAT64 expected; legacy BQ returns FLOAT. + schemas["p.d.decision_points"][1] = _FakeField("confidence", "FLOAT") + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + assert report.ok is True + + +class TestEndpointTypeMismatch: + + def test_edge_endpoint_type_does_not_match_referenced_entity_key( + self, + ): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # Decision.decision_id is STRING; if the edge's decision_id + # column is INT64, that's an endpoint-type mismatch. + schemas["p.d.candidate_edges"] = [ + _FakeField("decision_id", "INT64"), + _FakeField("outcome_id", "STRING"), + _FakeField("weight", "FLOAT64"), + ] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + mismatches = [ + f + for f in report.failures + if f.code == FailureCode.ENDPOINT_TYPE_MISMATCH + ] + assert len(mismatches) == 1 + assert mismatches[0].binding_element == "HasOutcome" + assert "from_columns[0]" in mismatches[0].binding_path + assert mismatches[0].observed == "INT64" + assert mismatches[0].expected == "STRING" + + +class TestUnexpectedRepeatedMode: + + def test_repeated_mode_on_property_column(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + schemas["p.d.decision_points"] = [ + _FakeField("decision_id", "STRING"), + _FakeField("confidence", "FLOAT64", mode="REPEATED"), + ] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + repeated = [ + f + for f in report.failures + if f.code == FailureCode.UNEXPECTED_REPEATED_MODE + ] + assert len(repeated) == 1 + assert repeated[0].observed == "REPEATED" + + +class TestMissingDataset: + + def test_missing_dataset_classified(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + client = _FakeBQClient(_good_schemas(), missing_datasets={"p.d"}) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + assert any(f.code == FailureCode.MISSING_DATASET for f in report.failures) + + +class TestInsufficientPermissions: + + def test_forbidden_table_classified(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + client = _FakeBQClient(_good_schemas(), forbidden={"p.d.decision_points"}) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + perm = [ + f + for f in report.failures + if f.code == FailureCode.INSUFFICIENT_PERMISSIONS + ] + assert len(perm) >= 1 + assert perm[0].bq_ref == "p.d.decision_points" + + +class TestKeyColumnNullable: + + def test_default_mode_emits_warnings(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + client = _FakeBQClient(_good_schemas()) + + report = validate_binding_against_bigquery( + ontology=ontology, + binding=binding, + bq_client=client, + strict=False, + ) + + # Warnings are present (NULLABLE keys), but report.ok is True. + assert report.ok is True + assert all( + w.code == FailureCode.KEY_COLUMN_NULLABLE for w in report.warnings + ) + assert len(report.warnings) >= 2 # 2 entity keys, 2 endpoint keys + + def test_strict_mode_promotes_to_failures(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + client = _FakeBQClient(_good_schemas()) + + report = validate_binding_against_bigquery( + ontology=ontology, + binding=binding, + bq_client=client, + strict=True, + ) + + assert report.ok is False + assert all( + f.code == FailureCode.KEY_COLUMN_NULLABLE for f in report.failures + ) + # No warnings — they got escalated. + assert report.warnings == () + + def test_required_keys_clean_in_strict_mode(self): + """When the user's tables already use REQUIRED key columns, + strict mode reports clean.""" + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # Mark all key + endpoint columns as REQUIRED. + for col in schemas["p.d.decision_points"]: + if col.name == "decision_id": + col.mode = "REQUIRED" + for col in schemas["p.d.outcomes"]: + if col.name == "outcome_id": + col.mode = "REQUIRED" + for col in schemas["p.d.candidate_edges"]: + if col.name in ("decision_id", "outcome_id"): + col.mode = "REQUIRED" + + client = _FakeBQClient(schemas) + report = validate_binding_against_bigquery( + ontology=ontology, + binding=binding, + bq_client=client, + strict=True, + ) + + assert report.ok is True + assert report.failures == () + assert report.warnings == () + + +# ------------------------------------------------------------------ # +# Cross-cutting: cross-project source # +# ------------------------------------------------------------------ # + + +class TestCrossProjectSource: + + def test_fully_qualified_entity_source_validated_against_its_project( + self, + ): + """A binding whose entity.source is fully qualified to a project + different from binding.target.project must be validated against + the entity's project, not the target's. Catches the trap from + #105's "validate resolved sources, not only target.project / + target.dataset" finding.""" + import pathlib + import tempfile + + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + # Build a binding where Decision.source is fully qualified to a + # different project. The fixture builder doesn't support that + # directly, so write one inline. + from bigquery_ontology import load_binding + from bigquery_ontology import load_ontology + + tmp = pathlib.Path(tempfile.mkdtemp(prefix="bind_validate_xp_")) + (tmp / "ont.yaml").write_text( + "ontology: TestGraph\n" + "entities:\n" + " - name: Decision\n" + " keys:\n" + " primary: [decision_id]\n" + " properties:\n" + " - name: decision_id\n" + " type: string\n" + "relationships: []\n", + encoding="utf-8", + ) + (tmp / "bnd.yaml").write_text( + "binding: test_bind\n" + "ontology: TestGraph\n" + "target:\n" + " backend: bigquery\n" + " project: target-project\n" + " dataset: target-dataset\n" + "entities:\n" + " - name: Decision\n" + # Fully qualified to a different project. + " source: source-project.source-dataset.decisions\n" + " properties:\n" + " - name: decision_id\n" + " column: decision_id\n" + "relationships: []\n", + encoding="utf-8", + ) + + ontology = load_ontology(str(tmp / "ont.yaml")) + binding = load_binding(str(tmp / "bnd.yaml"), ontology=ontology) + + # Validator should look for the table at source-project, not + # target-project. Place the table at the correct location and + # leave target-project empty. + schemas = { + "source-project.source-dataset.decisions": [ + _FakeField("decision_id", "STRING"), + ], + } + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + # Should be clean. Any MISSING_TABLE failure would mean the + # validator looked in the wrong project. + assert all(f.code != FailureCode.MISSING_TABLE for f in report.failures), ( + "validator looked in target project instead of fully-qualified " + "entity.source project: " + f"{[(f.code, f.bq_ref) for f in report.failures]}" + ) + + +# ------------------------------------------------------------------ # +# Report shape # +# ------------------------------------------------------------------ # + + +class TestReportShape: + + def test_failure_carries_required_fields(self): + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + schemas["p.d.decision_points"] = [ + _FakeField("decision_id", "STRING"), + ] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + f = report.failures[0] + assert f.code is not None + assert f.binding_element == "Decision" + assert f.binding_path.startswith("binding.entities[") + assert f.bq_ref.startswith("p.d.") + assert isinstance(f.detail, str) and f.detail + + def test_ok_property_is_failures_empty(self): + from bigquery_agent_analytics.binding_validation import BindingValidationReport + + empty = BindingValidationReport() + assert empty.ok is True + + from bigquery_agent_analytics.binding_validation import BindingValidationFailure + from bigquery_agent_analytics.binding_validation import FailureCode + + not_ok = BindingValidationReport( + failures=( + BindingValidationFailure( + code=FailureCode.MISSING_TABLE, + binding_element="X", + binding_path="binding.entities[0].source", + bq_ref="p.d.x", + ), + ), + ) + assert not_ok.ok is False From fe31c38aecddcb712d7a21cb94d7116bd1aaab56 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Sat, 2 May 2026 23:49:10 -0700 Subject: [PATCH 2/6] fix+test: binding_path YAML order, physical cross-table check, docs note (#105 PR 2a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses four review findings on PR #109. (1) binding_path correctness — index now reflects binding YAML order Previously the validator used enumerate(entity.properties) / enumerate(rel.properties) on ResolvedEntity / ResolvedRelationship, whose ordering follows the ontology's effective-property order, not the binding YAML's order. With inheritance or a different binding ordering, a failure path like binding.entities[0].properties[1] .column could point to the wrong YAML entry. Fix: build {logical_property_name: yaml_index} maps from binding.entities[i].properties / binding.relationships[i].properties, then derive paths via prop.logical_name lookup. Falls back to a name-keyed path on the rare case where a resolved property has no matching binding YAML entry. New regression test TestBindingPathYamlOrder :: test_path_index_uses_binding_yaml_order_not_resolved_order asserts the path correctly resolves to the binding's own index when the binding lists properties in reverse ontology order. (2) Physical cross-table endpoint check ENDPOINT_TYPE_MISMATCH previously compared edge endpoint column type against the ontology-derived expected SDK type only. That misses the case where a node table has drifted away from its ontology declaration but the edge has not — the per-entity loop catches the node's TYPE_MISMATCH but the edge's storage-level disagreement with the node was invisible. Fix: when the referenced node table is in the table cache, compare the edge endpoint's actual BQ field_type against the node table's actual key column field_type and emit a second ENDPOINT_TYPE_MISMATCH whose detail explicitly says 'physical cross-table mismatch'. Compares canonical aliases so INTEGER/INT64 are not flagged as mismatched. The two checks are complementary: spec-level fires when only the edge is wrong, physical fires when the node has drifted but the edge has not. New test TestEndpointPhysicalCrossTableCheck :: test_edge_endpoint_disagrees_with_node_actual_field_type asserts both layers fire when both diverge from the spec, and the existing TestEndpointTypeMismatch test now expects two complementary mismatches (one spec-level + one physical) and asserts the detail strings match each layer's wording. (3) Docstring no longer references docs/ontology/binding-validation.md That file lands in PR 2b. Module docstring now says the user-facing CLI surface and full failure-code documentation land in PR 2b of issue #105. (4) Stronger materializer-DDL regression New test TestSdkCreatedTablesRegression :: test_expected_types_match_materializer_ddl_type_map builds the expected schema directly from ontology_materializer._DDL_TYPE_MAP rather than a hand-written fixture. If a future change updates _DDL_TYPE_MAP (e.g. adds NUMERIC support), the test forces a corresponding update to _COMPATIBLE_BQ_TYPES in binding_validation, preventing silent regressions. 100/100 tests pass across test_binding_validation.py, test_ontology_materializer.py, test_resolved_spec.py. --- .../binding_validation.py | 138 ++++++++++- tests/test_binding_validation.py | 214 +++++++++++++++++- 2 files changed, 336 insertions(+), 16 deletions(-) diff --git a/src/bigquery_agent_analytics/binding_validation.py b/src/bigquery_agent_analytics/binding_validation.py index 3a387708..8ff942a4 100644 --- a/src/bigquery_agent_analytics/binding_validation.py +++ b/src/bigquery_agent_analytics/binding_validation.py @@ -44,8 +44,9 @@ for w in report.warnings: print(f"WARN: {w}") -See ``docs/ontology/binding-validation.md`` for the full failure-code -reference and CI usage patterns. +The user-facing CLI surface (``bq-agent-sdk binding-validate``, +``ontology-build --validate-binding[-strict]``) and the full +failure-code documentation land in PR 2b of issue #105. """ from __future__ import annotations @@ -235,9 +236,28 @@ def validate_binding_against_bigquery( spec = resolve(ontology, binding, lineage_config=None) # Index binding entries by name so we can build precise paths. + # Critical: derive property/column indices from the binding YAML's + # own ordering (binding.entities[i].properties), NOT from the + # ResolvedEntity's properties tuple, because ResolvedEntity orders + # properties by ontology / effective-property order. With + # inheritance or a different binding-side ordering, the two + # orderings can diverge and a path like + # ``binding.entities[0].properties[1].column`` would point to the + # wrong YAML entry. entity_index = {b.name: i for i, b in enumerate(binding.entities)} relationship_index = {b.name: i for i, b in enumerate(binding.relationships)} + # Per-binding-element {logical_property_name: yaml_index} maps so + # paths reflect what the user actually wrote in YAML. + entity_prop_yaml_index: dict[str, dict[str, int]] = { + b.name: {p.name: j for j, p in enumerate(b.properties)} + for b in binding.entities + } + rel_prop_yaml_index: dict[str, dict[str, int]] = { + b.name: {p.name: j for j, p in enumerate(b.properties)} + for b in binding.relationships + } + failures: list[BindingValidationFailure] = [] warnings: list[BindingValidationWarning] = [] table_cache: dict[str, Optional[Any]] = {} @@ -335,9 +355,20 @@ def fetch_table( # Index BQ schema by column name. bq_columns = {f.name: f for f in table.schema} - # Check every bound property. - for j, prop in enumerate(entity.properties): - prop_path = f"{binding_root}.properties[{j}].column" + # Check every bound property. Path indices come from the binding + # YAML's ordering, not the ResolvedEntity's ordering, so paths + # point at the actual YAML entry the user wrote. + prop_yaml_idx = entity_prop_yaml_index.get(entity.name, {}) + for prop in entity.properties: + yaml_j = prop_yaml_idx.get(prop.logical_name) + if yaml_j is None: + # Resolved property exists but has no matching binding-YAML + # entry — should not happen for a properly resolved spec, but + # if it does, fall back to a name-keyed path so the failure + # is still actionable. + prop_path = f"{binding_root}.properties[{prop.logical_name}].column" + else: + prop_path = f"{binding_root}.properties[{yaml_j}].column" bq_field = bq_columns.get(prop.column) if bq_field is None: @@ -442,13 +473,23 @@ def fetch_table( # ---- Per-relationship checks --------------------------------- # - # Index entity sdk_types per key_column for endpoint type matching. + # Index entity sdk_types and source tables per key_column so the + # endpoint check can do BOTH (a) the spec-level expected-type + # comparison and (b) the physical cross-table comparison against + # the referenced node table's actual BQ field type. Catches the + # case where a node table has drifted out of sync with its own + # ontology (the per-entity loop above flags that as TYPE_MISMATCH + # on the node), but the edge endpoint also disagrees with the + # node's actual storage type — we want the edge to surface + # ENDPOINT_TYPE_MISMATCH even when the node is itself broken. entity_key_types: dict[str, dict[str, str]] = {} + entity_source_by_name: dict[str, str] = {} for ent in spec.entities: cols = {p.column: p.sdk_type for p in ent.properties} entity_key_types[ent.name] = { k: cols.get(k, "string") for k in ent.key_columns } + entity_source_by_name[ent.name] = ent.source for rel in spec.relationships: binding_idx = relationship_index.get(rel.name) @@ -508,10 +549,27 @@ def _check_endpoint( ) continue - # Endpoint type must match the referenced entity's key - # column type at the same position. + # Endpoint type checks. Two comparisons: + # + # (1) Spec-level: edge endpoint BQ type must match the + # ontology-derived expected SDK type for the referenced + # node's primary-key column at the same position. + # (2) Physical cross-table: when the referenced node table + # is fetchable, the edge endpoint's BQ type must match + # the actual BQ field type of the referenced key column + # in the node table. Catches cases where the node table + # has drifted away from its ontology declaration but + # the edge has not — those would slip past (1) alone. + # + # Both comparisons emit ENDPOINT_TYPE_MISMATCH; (1) + # describes the spec-level disagreement, (2) describes the + # physical-table disagreement. The two checks are + # complementary, not redundant: (1) fires when only the + # edge is wrong; (2) fires when both are wrong but in + # different ways. if j < len(endpoint_key_cols): expected_sdk = endpoint_types[endpoint_key_cols[j]] + # (1) Spec-level type check. if not _bq_type_matches(expected_sdk, bq_field.field_type): emit( FailureCode.ENDPOINT_TYPE_MISMATCH, @@ -529,6 +587,58 @@ def _check_endpoint( ), ) + # (2) Physical cross-table check. Use the table_cache so + # we don't double-fetch the node table when the per-entity + # loop already did. + node_source = entity_source_by_name.get(endpoint_entity_name) + if node_source is not None: + node_table = table_cache.get(node_source) + if node_table is not None: + node_columns = {f.name: f for f in node_table.schema} + node_field = node_columns.get(endpoint_key_cols[j]) + if node_field is not None: + edge_t = bq_field.field_type.upper() + node_t = node_field.field_type.upper() + # Only flag when edge and node disagree AND the + # disagreement is real (not a mere legacy alias like + # INTEGER vs INT64 — those both map to the same SDK + # type and are equivalent at storage). + edge_canonical = next( + ( + canon + for canon, aliases in _COMPATIBLE_BQ_TYPES.items() + if edge_t in aliases + ), + edge_t, + ) + node_canonical = next( + ( + canon + for canon, aliases in _COMPATIBLE_BQ_TYPES.items() + if node_t in aliases + ), + node_t, + ) + if edge_canonical != node_canonical: + emit( + FailureCode.ENDPOINT_TYPE_MISMATCH, + binding_element=rel.name, + binding_path=col_path, + bq_ref=f"{rel.source}.{col}", + expected=node_field.field_type, + observed=bq_field.field_type, + detail=( + f"physical cross-table mismatch: edge " + f"column {col!r} on {rel.source} has BQ " + f"type {bq_field.field_type!r}, but the " + f"referenced node table " + f"{node_source}.{endpoint_key_cols[j]} has " + f"BQ type {node_field.field_type!r}. " + f"Edges with this mismatch will fail to " + f"join the node at query time." + ), + ) + # Strict-only: endpoint keys should be REQUIRED. if getattr(bq_field, "mode", "NULLABLE") == "NULLABLE": emit( @@ -548,9 +658,15 @@ def _check_endpoint( _check_endpoint("from_columns", rel.from_columns, rel.from_entity) _check_endpoint("to_columns", rel.to_columns, rel.to_entity) - # Property column checks. - for j, prop in enumerate(rel.properties): - prop_path = f"{binding_root}.properties[{j}].column" + # Property column checks. Same path-correctness rule as + # entities: indices come from the binding YAML's ordering. + prop_yaml_idx = rel_prop_yaml_index.get(rel.name, {}) + for prop in rel.properties: + yaml_j = prop_yaml_idx.get(prop.logical_name) + if yaml_j is None: + prop_path = f"{binding_root}.properties[{prop.logical_name}].column" + else: + prop_path = f"{binding_root}.properties[{yaml_j}].column" bq_field = bq_columns.get(prop.column) if bq_field is None: diff --git a/tests/test_binding_validation.py b/tests/test_binding_validation.py index 18c1480d..792dc5e9 100644 --- a/tests/test_binding_validation.py +++ b/tests/test_binding_validation.py @@ -222,6 +222,46 @@ def test_default_mode_ok_against_sdk_created_tables(self): for w in report.warnings: assert w.code == FailureCode.KEY_COLUMN_NULLABLE + def test_expected_types_match_materializer_ddl_type_map(self): + """Stronger regression: expected types come directly from the + materializer's `_DDL_TYPE_MAP`, not from a hand-written fixture. + If a future change updates `_DDL_TYPE_MAP` (e.g. adds NUMERIC + support), this test forces a corresponding update to + `_COMPATIBLE_BQ_TYPES` in binding_validation, otherwise the + default-mode regression would silently start failing.""" + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + from bigquery_agent_analytics.ontology_materializer import _DDL_TYPE_MAP + + ontology, binding = _ontology_and_binding() + # Build schemas using the materializer's own map so the test + # mirrors what the SDK would emit. confidence: double, all keys: + # string. If _DDL_TYPE_MAP changes type names, this construction + # automatically picks them up. + schemas = { + "p.d.decision_points": [ + _FakeField("decision_id", _DDL_TYPE_MAP["string"]), + _FakeField("confidence", _DDL_TYPE_MAP["double"]), + ], + "p.d.outcomes": [ + _FakeField("outcome_id", _DDL_TYPE_MAP["string"]), + ], + "p.d.candidate_edges": [ + _FakeField("decision_id", _DDL_TYPE_MAP["string"]), + _FakeField("outcome_id", _DDL_TYPE_MAP["string"]), + _FakeField("weight", _DDL_TYPE_MAP["double"]), + ], + } + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + assert report.ok is True, ( + f"Validator rejected materializer-DDL-derived schemas: " + f"{[(f.code, f.detail) for f in report.failures]}" + ) + def test_strict_mode_emits_warnings_as_failures(self): """The same SDK-created tables, run under strict=True, must surface NULLABLE key columns as KEY_COLUMN_NULLABLE failures.""" @@ -356,6 +396,13 @@ class TestEndpointTypeMismatch: def test_edge_endpoint_type_does_not_match_referenced_entity_key( self, ): + """Edge endpoint type disagrees with the referenced node's + ontology key type. Two ENDPOINT_TYPE_MISMATCH entries are + expected — one from the spec-level (1) check (edge BQ type vs + ontology-derived expected SDK type) and one from the physical + cross-table (2) check (edge BQ type vs node table's actual key + BQ type). Both are correct and complementary; users see both + descriptions and can act on either.""" from bigquery_agent_analytics.binding_validation import FailureCode from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery @@ -379,11 +426,83 @@ def test_edge_endpoint_type_does_not_match_referenced_entity_key( for f in report.failures if f.code == FailureCode.ENDPOINT_TYPE_MISMATCH ] - assert len(mismatches) == 1 - assert mismatches[0].binding_element == "HasOutcome" - assert "from_columns[0]" in mismatches[0].binding_path - assert mismatches[0].observed == "INT64" - assert mismatches[0].expected == "STRING" + # Two complementary mismatches expected: spec-level + physical. + assert len(mismatches) == 2 + assert all(m.binding_element == "HasOutcome" for m in mismatches) + assert all("from_columns[0]" in m.binding_path for m in mismatches) + assert all(m.observed == "INT64" for m in mismatches) + + spec_level = [ + m for m in mismatches if "physical cross-table" not in m.detail + ] + physical = [m for m in mismatches if "physical cross-table" in m.detail] + assert len(spec_level) == 1 + assert len(physical) == 1 + assert spec_level[0].expected == "STRING" # ontology-derived DDL type + assert physical[0].expected == "STRING" # actual node BQ field type + + +class TestEndpointPhysicalCrossTableCheck: + + def test_edge_endpoint_disagrees_with_node_actual_field_type(self): + """When the node table has drifted away from its ontology + declaration, the per-entity loop flags a TYPE_MISMATCH on the + node. But the edge endpoint may also disagree with the node's + *actual* storage type — in which case the join would fail at + query time. The validator must surface ENDPOINT_TYPE_MISMATCH + for that edge ↔ node disagreement, not just the spec-level one. + + Setup: node's decision_id column is INT64 in BQ (drifted from + the ontology's STRING declaration). Edge's decision_id column + is STRING (matching ontology, but disagreeing with the node's + actual storage). The edge would fail to join the node. + """ + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # Node decision_id has drifted to INT64. + schemas["p.d.decision_points"] = [ + _FakeField("decision_id", "INT64"), + _FakeField("confidence", "FLOAT64"), + ] + # Edge decision_id is still STRING (ontology says so). + # candidate_edges already has decision_id as STRING in + # _good_schemas(), so no change needed there. + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + # Per-entity loop catches the node's drift. + node_mismatches = [ + f + for f in report.failures + if f.code == FailureCode.TYPE_MISMATCH + and f.binding_element == "Decision" + ] + assert len(node_mismatches) == 1 + + # Per-relationship loop catches the physical cross-table edge ↔ + # node disagreement (edge=STRING, node=INT64). The detail must + # call out that this is a physical-table mismatch so users can + # tell it apart from the spec-level (1) check. + edge_mismatches = [ + f + for f in report.failures + if f.code == FailureCode.ENDPOINT_TYPE_MISMATCH + and f.binding_element == "HasOutcome" + ] + assert len(edge_mismatches) >= 1 + assert any( + "physical cross-table mismatch" in m.detail for m in edge_mismatches + ), ( + "Expected at least one ENDPOINT_TYPE_MISMATCH detail to call " + "out the physical cross-table mismatch; got: " + f"{[m.detail for m in edge_mismatches]}" + ) class TestUnexpectedRepeatedMode: @@ -531,6 +650,91 @@ def test_required_keys_clean_in_strict_mode(self): # ------------------------------------------------------------------ # +class TestBindingPathYamlOrder: + + def test_path_index_uses_binding_yaml_order_not_resolved_order(self): + """A binding YAML that lists properties in a different order + than the ontology must produce paths using the binding's index, + not the ResolvedEntity's. Otherwise tooling pointed at + ``binding.entities[0].properties[1].column`` would land on the + wrong YAML entry. + + Setup: ontology declares (decision_id, confidence). Binding + YAML lists them in reverse order (confidence, decision_id). + The bound 'confidence' column is missing on the BQ table. The + failure path must be ``properties[0]`` (the binding's index for + confidence), not ``properties[1]`` (the resolved-side index). + """ + import pathlib + import tempfile + + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + from bigquery_ontology import load_binding + from bigquery_ontology import load_ontology + + tmp = pathlib.Path(tempfile.mkdtemp(prefix="bind_validate_order_")) + (tmp / "ont.yaml").write_text( + "ontology: TestGraph\n" + "entities:\n" + " - name: Decision\n" + " keys:\n" + " primary: [decision_id]\n" + " properties:\n" + " - name: decision_id\n" + " type: string\n" + " - name: confidence\n" + " type: double\n" + "relationships: []\n", + encoding="utf-8", + ) + # Binding lists confidence FIRST, decision_id SECOND. + (tmp / "bnd.yaml").write_text( + "binding: test_bind\n" + "ontology: TestGraph\n" + "target:\n" + " backend: bigquery\n" + " project: p\n" + " dataset: d\n" + "entities:\n" + " - name: Decision\n" + " source: decision_points\n" + " properties:\n" + " - name: confidence\n" + " column: confidence\n" + " - name: decision_id\n" + " column: decision_id\n" + "relationships: []\n", + encoding="utf-8", + ) + + ontology = load_ontology(str(tmp / "ont.yaml")) + binding = load_binding(str(tmp / "bnd.yaml"), ontology=ontology) + + # Drop confidence — its bound column is missing on BQ. + schemas = { + "p.d.decision_points": [_FakeField("decision_id", "STRING")], + } + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + miss = [f for f in report.failures if f.code == FailureCode.MISSING_COLUMN] + assert len(miss) == 1 + # The failure path index must be 0 (binding YAML's index for + # 'confidence'), not 1 (the resolved-side index — Decision's + # ontology order is (decision_id, confidence)). + assert miss[0].binding_path == ( + "binding.entities[0].properties[0].column" + ), ( + f"Path {miss[0].binding_path!r} should reflect binding YAML " + f"order (confidence is properties[0]), not resolved-side " + f"order (where confidence is properties[1])." + ) + + class TestCrossProjectSource: def test_fully_qualified_entity_source_validated_against_its_project( From 2342165785bb2160b9e818ac6a88571010f8ad30 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Sun, 3 May 2026 00:05:28 -0700 Subject: [PATCH 3/6] fix+test: SDK metadata cols, key path correctness, suppress endpoint dup (#105 PR 2a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five review findings folded in. (1) Validate SDK metadata columns (session_id, extracted_at) The materializer's _entity_columns() / _relationship_columns() hard-code session_id STRING + extracted_at TIMESTAMP for every entity and relationship table (ontology_materializer.py:159, 164), and routing writes those fields unconditionally on every materialize() call (ontology_materializer.py:258, 333). Without this check, a user-predefined table missing either column would validate clean, then fail at load_table_from_json / INSERT time. The validator now checks each entity and relationship table for both metadata columns: presence (MISSING_COLUMN), non-REPEATED mode (UNEXPECTED_REPEATED_MODE), and type compatibility (TYPE_MISMATCH for STRING / TIMESTAMP). Failure binding_path uses the form binding.entities[i]..session_id so users can distinguish metadata-column failures from property failures. New tests: - TestMetadataColumns::test_missing_session_id_on_entity_flagged - TestMetadataColumns::test_missing_extracted_at_on_relationship_flagged - TestMetadataColumns::test_metadata_column_with_wrong_type_flagged (2) Key column failure paths now real YAML paths For KEY_COLUMN_NULLABLE and UNEXPECTED_REPEATED_MODE failures on entity primary keys, the path now reuses the binding YAML index of the matching bound property — paths like binding.entities[0].properties[0].column instead of pseudo paths like binding.entities[0]..decision_id. Falls back to the pseudo path only when the key is not also a bound property (defensive; ontology generally requires keys to be properties). (3) Suppress redundant physical cross-table endpoint mismatch ENDPOINT_TYPE_MISMATCH previously double-reported in the common edge-only-drift case (edge wrong, node correct): both the spec-level (1) and physical (2) checks fired with the same expected/observed pair, just different detail wording. The physical check now fires only when the node table has actually drifted from the ontology spec AND the edge disagrees with the node's actual storage. In the edge-only case, (1) already conveys the disagreement; emitting (2) was pure noise. In the node-drifted case, (2) adds genuinely new information (the node's actual type is what the edge needs to match for joins, not the ontology declaration). TestEndpointTypeMismatch::test_edge_endpoint_type_does_not_match_referenced_entity_key updated to expect 1 mismatch (spec-level only). The dedicated TestEndpointPhysicalCrossTableCheck::test_edge_endpoint_disagrees_with_node_actual_field_type case still asserts both layers fire when both diverge. (4) Exhaustive _DDL_TYPE_MAP / _COMPATIBLE_BQ_TYPES coverage New TestTypeMapExhaustiveCoverage::test_every_ddl_type_is_in_compatible_bq_types loops every value in ontology_materializer._DDL_TYPE_MAP and asserts (a) it appears as a key in _COMPATIBLE_BQ_TYPES, and (b) the canonical type accepts itself as compatible. If the materializer adds NUMERIC support without a corresponding binding-validation update, this test catches the silent miss. (5) Composite-key tests #105 calls out composite endpoint keys explicitly. Two new tests under TestCompositeKey cover the two-column primary-key topology: - test_composite_primary_key_validates: clean baseline with matching positional types on edge and node. - test_composite_key_second_column_type_mismatch: the second composite-key column has the wrong type on the edge; the validator must flag ENDPOINT_TYPE_MISMATCH at from_columns[1]. Test infrastructure: added _meta_fields() helper that returns fresh metadata-field instances so test mutations on one table do not silently affect others. _good_schemas() uses it per-table. 106/106 tests pass across test_binding_validation.py, test_ontology_materializer.py, test_resolved_spec.py. --- .../binding_validation.py | 168 ++++++- tests/test_binding_validation.py | 415 ++++++++++++++++-- 2 files changed, 545 insertions(+), 38 deletions(-) diff --git a/src/bigquery_agent_analytics/binding_validation.py b/src/bigquery_agent_analytics/binding_validation.py index 8ff942a4..fc7a37a0 100644 --- a/src/bigquery_agent_analytics/binding_validation.py +++ b/src/bigquery_agent_analytics/binding_validation.py @@ -419,8 +419,23 @@ def fetch_table( ) # Per-key-column checks (REPEATED + strict-only nullability). + # Build a {column: yaml_index} map for keys that map to a bound + # property, so REPEATED / NULLABLE failures on those columns + # carry a real binding YAML path. Falls back to a pseudo path + # only for keys that are not bound properties (ontology + # generally requires keys to be properties; this is defensive). + column_to_yaml_idx: dict[str, int] = {} + for prop in entity.properties: + yaml_j = prop_yaml_idx.get(prop.logical_name) + if yaml_j is not None: + column_to_yaml_idx[prop.column] = yaml_j + for key_col in entity.key_columns: - key_path = f"{binding_root}..{key_col}" + yaml_j = column_to_yaml_idx.get(key_col) + if yaml_j is not None: + key_path = f"{binding_root}.properties[{yaml_j}].column" + else: + key_path = f"{binding_root}..{key_col}" bq_field = bq_columns.get(key_col) if bq_field is None: # Already reported as MISSING_COLUMN above when the key was @@ -471,6 +486,65 @@ def fetch_table( strict_only=True, ) + # SDK metadata columns. The materializer's _entity_columns() + # (ontology_materializer.py:154) hard-codes session_id STRING + + # extracted_at TIMESTAMP for every entity table, and routing + # writes those fields unconditionally on every materialize() call + # (ontology_materializer.py:258). A user-predefined table missing + # either column would validate clean here without this check, + # then fail at load_table_from_json / INSERT time. + for meta_col, meta_type in ( + ("session_id", "STRING"), + ("extracted_at", "TIMESTAMP"), + ): + meta_path = f"{binding_root}..{meta_col}" + meta_field = bq_columns.get(meta_col) + if meta_field is None: + emit( + FailureCode.MISSING_COLUMN, + binding_element=entity.name, + binding_path=meta_path, + bq_ref=f"{entity.source}.{meta_col}", + expected=meta_col, + detail=( + f"SDK metadata column {meta_col!r} not found on " + f"{entity.source}; the materializer writes this on " + f"every materialize() call (ontology_materializer.py:" + f"159) so the table must carry it" + ), + ) + continue + if getattr(meta_field, "mode", "NULLABLE") == "REPEATED": + emit( + FailureCode.UNEXPECTED_REPEATED_MODE, + binding_element=entity.name, + binding_path=meta_path, + bq_ref=f"{entity.source}.{meta_col}", + expected="NULLABLE or REQUIRED", + observed="REPEATED", + detail=( + f"SDK metadata column {meta_col!r} on {entity.source} " + f"is REPEATED; metadata columns must be scalar" + ), + ) + continue + if meta_field.field_type.upper() not in _COMPATIBLE_BQ_TYPES.get( + meta_type, frozenset() + ): + emit( + FailureCode.TYPE_MISMATCH, + binding_element=entity.name, + binding_path=meta_path, + bq_ref=f"{entity.source}.{meta_col}", + expected=meta_type, + observed=meta_field.field_type, + detail=( + f"SDK metadata column {meta_col!r} on {entity.source} " + f"has BQ type {meta_field.field_type!r}, but the " + f"materializer writes {meta_type}" + ), + ) + # ---- Per-relationship checks --------------------------------- # # Index entity sdk_types and source tables per key_column so the @@ -587,9 +661,14 @@ def _check_endpoint( ), ) - # (2) Physical cross-table check. Use the table_cache so - # we don't double-fetch the node table when the per-entity - # loop already did. + # (2) Physical cross-table check. Fires only when it adds + # information beyond (1) — specifically, only when the + # node table has *drifted* from its ontology declaration + # AND that drift causes the edge to disagree with the + # node's actual storage. In the common edge-only-drift + # case (edge wrong, node correct), (1) already conveys the + # same expected/observed pair, so emitting this would be + # pure double-reporting. node_source = entity_source_by_name.get(endpoint_entity_name) if node_source is not None: node_table = table_cache.get(node_source) @@ -599,10 +678,10 @@ def _check_endpoint( if node_field is not None: edge_t = bq_field.field_type.upper() node_t = node_field.field_type.upper() - # Only flag when edge and node disagree AND the - # disagreement is real (not a mere legacy alias like - # INTEGER vs INT64 — those both map to the same SDK - # type and are equivalent at storage). + expected_ddl = _expected_ddl_type(expected_sdk) + # Map each BQ type to its canonical (modern) form + # so legacy aliases like INTEGER/INT64 don't trip + # the comparison. edge_canonical = next( ( canon @@ -619,7 +698,15 @@ def _check_endpoint( ), node_t, ) - if edge_canonical != node_canonical: + # Only emit when the node has actually drifted from + # the ontology spec AND the edge disagrees with the + # node. If node_canonical == expected_ddl, the node + # is on-spec and (1) already covers the edge's + # disagreement with the same expected/observed pair. + node_has_drifted = ( + expected_ddl is not None and node_canonical != expected_ddl + ) + if node_has_drifted and edge_canonical != node_canonical: emit( FailureCode.ENDPOINT_TYPE_MISMATCH, binding_element=rel.name, @@ -633,7 +720,9 @@ def _check_endpoint( f"type {bq_field.field_type!r}, but the " f"referenced node table " f"{node_source}.{endpoint_key_cols[j]} has " - f"BQ type {node_field.field_type!r}. " + f"BQ type {node_field.field_type!r} " + f"(node has drifted from ontology spec, " + f"which expected {expected_ddl!r}). " f"Edges with this mismatch will fail to " f"join the node at query time." ), @@ -715,6 +804,65 @@ def _check_endpoint( ), ) + # SDK metadata columns on the edge table. The materializer's + # _relationship_columns() (ontology_materializer.py:164) hard- + # codes session_id STRING + extracted_at TIMESTAMP for every + # relationship table. Same trap as on entities — a user- + # predefined edge table missing these columns would validate + # clean here and fail at INSERT time. + for meta_col, meta_type in ( + ("session_id", "STRING"), + ("extracted_at", "TIMESTAMP"), + ): + meta_path = f"{binding_root}..{meta_col}" + meta_field = bq_columns.get(meta_col) + if meta_field is None: + emit( + FailureCode.MISSING_COLUMN, + binding_element=rel.name, + binding_path=meta_path, + bq_ref=f"{rel.source}.{meta_col}", + expected=meta_col, + detail=( + f"SDK metadata column {meta_col!r} not found on " + f"edge table {rel.source}; the materializer writes " + f"this on every materialize() call " + f"(ontology_materializer.py:164) so the table must " + f"carry it" + ), + ) + continue + if getattr(meta_field, "mode", "NULLABLE") == "REPEATED": + emit( + FailureCode.UNEXPECTED_REPEATED_MODE, + binding_element=rel.name, + binding_path=meta_path, + bq_ref=f"{rel.source}.{meta_col}", + expected="NULLABLE or REQUIRED", + observed="REPEATED", + detail=( + f"SDK metadata column {meta_col!r} on {rel.source} " + f"is REPEATED; metadata columns must be scalar" + ), + ) + continue + if meta_field.field_type.upper() not in _COMPATIBLE_BQ_TYPES.get( + meta_type, frozenset() + ): + emit( + FailureCode.TYPE_MISMATCH, + binding_element=rel.name, + binding_path=meta_path, + bq_ref=f"{rel.source}.{meta_col}", + expected=meta_type, + observed=meta_field.field_type, + detail=( + f"SDK metadata column {meta_col!r} on {rel.source} " + f"has BQ type {meta_field.field_type!r}, but the " + f"materializer writes {meta_type}" + ), + ) + return BindingValidationReport( failures=tuple(failures), warnings=tuple(warnings), diff --git a/tests/test_binding_validation.py b/tests/test_binding_validation.py index 792dc5e9..bcde4974 100644 --- a/tests/test_binding_validation.py +++ b/tests/test_binding_validation.py @@ -164,26 +164,49 @@ def _ontology_and_binding( return ontology, binding +def _meta_fields() -> list[_FakeField]: + """SDK metadata columns the materializer writes on every table. + + Tests building schemas directly (rather than mutating + ``_good_schemas()``) must include these — otherwise the validator + flags MISSING_COLUMN for session_id / extracted_at and pollutes + the failure count. + """ + return [ + _FakeField("session_id", "STRING"), + _FakeField("extracted_at", "TIMESTAMP"), + ] + + def _good_schemas() -> dict[str, list[_FakeField]]: """All tables present with matching column types — clean baseline. Models tables produced by ``OntologyMaterializer.create_tables()``: NULLABLE everywhere (no NOT NULL), correct types per - ``_DDL_TYPE_MAP``. + ``_DDL_TYPE_MAP``, and the SDK metadata columns + (``session_id STRING`` + ``extracted_at TIMESTAMP``) the + materializer writes on every materialize() call. + + Each table gets its own metadata-field instances via + ``_meta_fields()`` so tests that mutate metadata fields on one + table do not silently affect the others. """ return { "p.d.decision_points": [ _FakeField("decision_id", "STRING"), _FakeField("confidence", "FLOAT64"), - ], + ] + + _meta_fields(), "p.d.outcomes": [ _FakeField("outcome_id", "STRING"), - ], + ] + + _meta_fields(), "p.d.candidate_edges": [ _FakeField("decision_id", "STRING"), _FakeField("outcome_id", "STRING"), _FakeField("weight", "FLOAT64"), - ], + ] + + _meta_fields(), } @@ -241,15 +264,18 @@ def test_expected_types_match_materializer_ddl_type_map(self): "p.d.decision_points": [ _FakeField("decision_id", _DDL_TYPE_MAP["string"]), _FakeField("confidence", _DDL_TYPE_MAP["double"]), - ], + ] + + _meta_fields(), "p.d.outcomes": [ _FakeField("outcome_id", _DDL_TYPE_MAP["string"]), - ], + ] + + _meta_fields(), "p.d.candidate_edges": [ _FakeField("decision_id", _DDL_TYPE_MAP["string"]), _FakeField("outcome_id", _DDL_TYPE_MAP["string"]), _FakeField("weight", _DDL_TYPE_MAP["double"]), - ], + ] + + _meta_fields(), } client = _FakeBQClient(schemas) @@ -329,17 +355,23 @@ def test_property_column_missing(self): ontology, binding = _ontology_and_binding() schemas = _good_schemas() - # Drop the 'confidence' column from decision_points. + # Drop the 'confidence' column from decision_points; keep the + # other property and metadata columns intact so the test isolates + # the missing-property failure. schemas["p.d.decision_points"] = [ _FakeField("decision_id", "STRING"), - ] + ] + _meta_fields() client = _FakeBQClient(schemas) report = validate_binding_against_bigquery( ontology=ontology, binding=binding, bq_client=client ) - miss = [f for f in report.failures if f.code == FailureCode.MISSING_COLUMN] + miss = [ + f + for f in report.failures + if f.code == FailureCode.MISSING_COLUMN and "confidence" in f.bq_ref + ] assert len(miss) == 1 assert miss[0].binding_element == "Decision" assert miss[0].bq_ref == "p.d.decision_points.confidence" @@ -426,20 +458,338 @@ def test_edge_endpoint_type_does_not_match_referenced_entity_key( for f in report.failures if f.code == FailureCode.ENDPOINT_TYPE_MISMATCH ] - # Two complementary mismatches expected: spec-level + physical. - assert len(mismatches) == 2 - assert all(m.binding_element == "HasOutcome" for m in mismatches) - assert all("from_columns[0]" in m.binding_path for m in mismatches) - assert all(m.observed == "INT64" for m in mismatches) - - spec_level = [ - m for m in mismatches if "physical cross-table" not in m.detail + # Edge-only drift: only the spec-level (1) check fires. The + # physical (2) check is suppressed because the node table is + # on-spec — emitting it would be pure double-reporting with the + # same expected/observed pair as (1). + assert len(mismatches) == 1 + assert mismatches[0].binding_element == "HasOutcome" + assert "from_columns[0]" in mismatches[0].binding_path + assert mismatches[0].observed == "INT64" + assert mismatches[0].expected == "STRING" + assert "physical cross-table" not in mismatches[0].detail + + +class TestMetadataColumns: + + def test_missing_session_id_on_entity_flagged(self): + """The materializer writes session_id on every materialize() + call (ontology_materializer.py:159) so a user-predefined table + without it would fail at INSERT time. Validator must catch it + pre-flight.""" + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + # Drop session_id from decision_points. + schemas["p.d.decision_points"] = [ + f for f in schemas["p.d.decision_points"] if f.name != "session_id" + ] + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + miss = [ + f + for f in report.failures + if f.code == FailureCode.MISSING_COLUMN and "session_id" in f.bq_ref + ] + assert len(miss) == 1 + assert miss[0].binding_element == "Decision" + assert ".session_id" in miss[0].binding_path + + def test_missing_extracted_at_on_relationship_flagged(self): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + schemas["p.d.candidate_edges"] = [ + f for f in schemas["p.d.candidate_edges"] if f.name != "extracted_at" ] - physical = [m for m in mismatches if "physical cross-table" in m.detail] - assert len(spec_level) == 1 - assert len(physical) == 1 - assert spec_level[0].expected == "STRING" # ontology-derived DDL type - assert physical[0].expected == "STRING" # actual node BQ field type + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + miss = [ + f + for f in report.failures + if f.code == FailureCode.MISSING_COLUMN and "extracted_at" in f.bq_ref + ] + assert len(miss) == 1 + assert miss[0].binding_element == "HasOutcome" + assert ".extracted_at" in miss[0].binding_path + + def test_metadata_column_with_wrong_type_flagged(self): + """If session_id is INT64 instead of STRING, the materializer's + INSERT will fail. The validator must catch the type mismatch.""" + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + + ontology, binding = _ontology_and_binding() + schemas = _good_schemas() + for f in schemas["p.d.decision_points"]: + if f.name == "session_id": + f.field_type = "INT64" # wrong; should be STRING + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + mismatches = [ + f + for f in report.failures + if f.code == FailureCode.TYPE_MISMATCH and "session_id" in f.bq_ref + ] + assert len(mismatches) == 1 + assert mismatches[0].expected == "STRING" + assert mismatches[0].observed == "INT64" + + +class TestTypeMapExhaustiveCoverage: + + def test_every_ddl_type_is_in_compatible_bq_types(self): + """Force a corresponding update to _COMPATIBLE_BQ_TYPES whenever + _DDL_TYPE_MAP grows. If the materializer adds a new SDK→BQ DDL + type but the validator's compatibility table doesn't recognize + it, every column with that type will silently mismatch. + + Each canonical DDL type emitted by the materializer must: + (a) be present as a key in _COMPATIBLE_BQ_TYPES, and + (b) accept itself as a compatible BQ field_type. + """ + from bigquery_agent_analytics.binding_validation import _COMPATIBLE_BQ_TYPES + from bigquery_agent_analytics.ontology_materializer import _DDL_TYPE_MAP + + for sdk_type, ddl_type in _DDL_TYPE_MAP.items(): + assert ddl_type in _COMPATIBLE_BQ_TYPES, ( + f"_DDL_TYPE_MAP maps {sdk_type!r} → {ddl_type!r} but " + f"_COMPATIBLE_BQ_TYPES does not list {ddl_type!r}. " + f"Update binding_validation._COMPATIBLE_BQ_TYPES alongside " + f"any change to ontology_materializer._DDL_TYPE_MAP." + ) + assert ddl_type in _COMPATIBLE_BQ_TYPES[ddl_type], ( + f"_COMPATIBLE_BQ_TYPES[{ddl_type!r}] does not accept " + f"{ddl_type!r} as a compatible BQ field_type — circular " + f"identity check failed." + ) + + +class TestCompositeKey: + + def test_composite_primary_key_validates(self): + """#105 calls out composite endpoint keys explicitly. A two- + column primary key on a node table must be matched positionally + against the edge's two-column from_columns, with a per-column + type check at each position. Required because real ontologies + routinely have (session_id, span_id) or (decision_id, version) + composite keys.""" + import pathlib + import tempfile + + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + from bigquery_ontology import load_binding + from bigquery_ontology import load_ontology + + tmp = pathlib.Path(tempfile.mkdtemp(prefix="bind_validate_ck_")) + (tmp / "ont.yaml").write_text( + "ontology: TestGraph\n" + "entities:\n" + " - name: Decision\n" + " keys:\n" + " primary: [decision_id, version]\n" + " properties:\n" + " - name: decision_id\n" + " type: string\n" + " - name: version\n" + " type: integer\n" + " - name: Outcome\n" + " keys:\n" + " primary: [outcome_id]\n" + " properties:\n" + " - name: outcome_id\n" + " type: string\n" + "relationships:\n" + " - name: HasOutcome\n" + " from: Decision\n" + " to: Outcome\n" + " properties: []\n", + encoding="utf-8", + ) + (tmp / "bnd.yaml").write_text( + "binding: test_bind\n" + "ontology: TestGraph\n" + "target:\n" + " backend: bigquery\n" + " project: p\n" + " dataset: d\n" + "entities:\n" + " - name: Decision\n" + " source: decisions\n" + " properties:\n" + " - name: decision_id\n" + " column: decision_id\n" + " - name: version\n" + " column: version\n" + " - name: Outcome\n" + " source: outcomes\n" + " properties:\n" + " - name: outcome_id\n" + " column: outcome_id\n" + "relationships:\n" + " - name: HasOutcome\n" + " source: edges\n" + " from_columns: [decision_id, version]\n" + " to_columns: [outcome_id]\n" + " properties: []\n", + encoding="utf-8", + ) + + ontology = load_ontology(str(tmp / "ont.yaml")) + binding = load_binding(str(tmp / "bnd.yaml"), ontology=ontology) + + meta = [ + _FakeField("session_id", "STRING"), + _FakeField("extracted_at", "TIMESTAMP"), + ] + schemas = { + "p.d.decisions": [ + _FakeField("decision_id", "STRING"), + _FakeField("version", "INT64"), + ] + + meta, + "p.d.outcomes": [_FakeField("outcome_id", "STRING")] + meta, + "p.d.edges": [ + _FakeField("decision_id", "STRING"), + _FakeField("version", "INT64"), # second composite key column + _FakeField("outcome_id", "STRING"), + ] + + meta, + } + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + assert report.ok is True, ( + "Composite key validation should pass when both edge endpoint " + f"columns match the node's positional types. Got failures: " + f"{[(f.code, f.detail) for f in report.failures]}" + ) + + def test_composite_key_second_column_type_mismatch(self): + """If the second column of a composite key disagrees in type, + the validator must flag ENDPOINT_TYPE_MISMATCH at position 1.""" + import pathlib + import tempfile + + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + from bigquery_ontology import load_binding + from bigquery_ontology import load_ontology + + tmp = pathlib.Path(tempfile.mkdtemp(prefix="bind_validate_ck2_")) + (tmp / "ont.yaml").write_text( + "ontology: TestGraph\n" + "entities:\n" + " - name: Decision\n" + " keys:\n" + " primary: [decision_id, version]\n" + " properties:\n" + " - name: decision_id\n" + " type: string\n" + " - name: version\n" + " type: integer\n" + " - name: Outcome\n" + " keys:\n" + " primary: [outcome_id]\n" + " properties:\n" + " - name: outcome_id\n" + " type: string\n" + "relationships:\n" + " - name: HasOutcome\n" + " from: Decision\n" + " to: Outcome\n" + " properties: []\n", + encoding="utf-8", + ) + (tmp / "bnd.yaml").write_text( + "binding: test_bind\n" + "ontology: TestGraph\n" + "target:\n" + " backend: bigquery\n" + " project: p\n" + " dataset: d\n" + "entities:\n" + " - name: Decision\n" + " source: decisions\n" + " properties:\n" + " - name: decision_id\n" + " column: decision_id\n" + " - name: version\n" + " column: version\n" + " - name: Outcome\n" + " source: outcomes\n" + " properties:\n" + " - name: outcome_id\n" + " column: outcome_id\n" + "relationships:\n" + " - name: HasOutcome\n" + " source: edges\n" + " from_columns: [decision_id, version]\n" + " to_columns: [outcome_id]\n" + " properties: []\n", + encoding="utf-8", + ) + + ontology = load_ontology(str(tmp / "ont.yaml")) + binding = load_binding(str(tmp / "bnd.yaml"), ontology=ontology) + + meta = [ + _FakeField("session_id", "STRING"), + _FakeField("extracted_at", "TIMESTAMP"), + ] + schemas = { + "p.d.decisions": [ + _FakeField("decision_id", "STRING"), + _FakeField("version", "INT64"), + ] + + meta, + "p.d.outcomes": [_FakeField("outcome_id", "STRING")] + meta, + "p.d.edges": [ + _FakeField("decision_id", "STRING"), + # Second composite key column has the wrong type + # (STRING instead of INT64). + _FakeField("version", "STRING"), + _FakeField("outcome_id", "STRING"), + ] + + meta, + } + client = _FakeBQClient(schemas) + + report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + + mismatches = [ + f + for f in report.failures + if f.code == FailureCode.ENDPOINT_TYPE_MISMATCH + ] + assert len(mismatches) == 1 + # The mismatch must point at the SECOND column (position [1]) + # of from_columns, not the first. + assert "from_columns[1]" in mismatches[0].binding_path + assert mismatches[0].expected == "INT64" + assert mismatches[0].observed == "STRING" class TestEndpointPhysicalCrossTableCheck: @@ -711,9 +1061,12 @@ def test_path_index_uses_binding_yaml_order_not_resolved_order(self): ontology = load_ontology(str(tmp / "ont.yaml")) binding = load_binding(str(tmp / "bnd.yaml"), ontology=ontology) - # Drop confidence — its bound column is missing on BQ. + # Drop confidence — its bound column is missing on BQ. Keep + # decision_id and SDK metadata columns so the test isolates the + # missing-property failure for confidence. schemas = { - "p.d.decision_points": [_FakeField("decision_id", "STRING")], + "p.d.decision_points": [_FakeField("decision_id", "STRING")] + + _meta_fields(), } client = _FakeBQClient(schemas) @@ -721,7 +1074,11 @@ def test_path_index_uses_binding_yaml_order_not_resolved_order(self): ontology=ontology, binding=binding, bq_client=client ) - miss = [f for f in report.failures if f.code == FailureCode.MISSING_COLUMN] + miss = [ + f + for f in report.failures + if f.code == FailureCode.MISSING_COLUMN and "confidence" in f.bq_ref + ] assert len(miss) == 1 # The failure path index must be 0 (binding YAML's index for # 'confidence'), not 1 (the resolved-side index — Decision's @@ -792,11 +1149,13 @@ def test_fully_qualified_entity_source_validated_against_its_project( # Validator should look for the table at source-project, not # target-project. Place the table at the correct location and - # leave target-project empty. + # leave target-project empty. Include SDK metadata columns so + # the test does not false-fail on missing-metadata-column. schemas = { "source-project.source-dataset.decisions": [ _FakeField("decision_id", "STRING"), - ], + ] + + _meta_fields(), } client = _FakeBQClient(schemas) From 29e016e8cf0042681d4b792aa0b3e4fdc7ddc212 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Sun, 3 May 2026 00:12:35 -0700 Subject: [PATCH 4/6] docs: align endpoint-test docstring + module-doc reference (#105 PR 2a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two comment-only cleanups; no behavior change. (1) tests/test_binding_validation.py:431 — TestEndpointTypeMismatch:: test_edge_endpoint_type_does_not_match_referenced_entity_key docstring previously said two ENDPOINT_TYPE_MISMATCH entries are expected. The implementation suppresses the physical (2) check in edge-only-drift cases as of commit 2342165, so only one entry is expected now. Docstring updated to match, with a pointer to TestEndpointPhysicalCrossTableCheck for the node-drifted case where (2) genuinely adds information. (2) src/bigquery_agent_analytics/binding_validation.py:23 — module docstring referenced bigquery_agent_analytics.graph_validation .validate_extracted_graph as if it existed. That symbol lands with issue #76 and is not on this PR. Reworded to plain text: 'Different from the planned extracted-graph validator in issue #76', with an explicit warning that no graph_validation module exists yet so callers do not try to import it. 25/25 unit tests pass. --- src/bigquery_agent_analytics/binding_validation.py | 10 ++++++---- tests/test_binding_validation.py | 14 ++++++++------ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/bigquery_agent_analytics/binding_validation.py b/src/bigquery_agent_analytics/binding_validation.py index fc7a37a0..3f85beb5 100644 --- a/src/bigquery_agent_analytics/binding_validation.py +++ b/src/bigquery_agent_analytics/binding_validation.py @@ -20,10 +20,12 @@ most common authoring error (binding YAML drifted out of sync with physical tables) before extraction wastes ``AI.GENERATE`` tokens. -Different from :func:`bigquery_agent_analytics.graph_validation. -validate_extracted_graph` (issue #76): that one validates extracted -graph output against the spec. This one validates the binding against -the live BigQuery schemas. +Different from the planned extracted-graph validator in issue #76: +that one will validate extracted graph output against the spec +(post-extraction, pre-materialization). This one validates the +binding against the live BigQuery schemas (pre-extraction). #76 has +not landed yet, so do not import any symbol from a +``graph_validation`` module — it does not exist on this PR. Usage:: diff --git a/tests/test_binding_validation.py b/tests/test_binding_validation.py index bcde4974..10287bd9 100644 --- a/tests/test_binding_validation.py +++ b/tests/test_binding_validation.py @@ -429,12 +429,14 @@ def test_edge_endpoint_type_does_not_match_referenced_entity_key( self, ): """Edge endpoint type disagrees with the referenced node's - ontology key type. Two ENDPOINT_TYPE_MISMATCH entries are - expected — one from the spec-level (1) check (edge BQ type vs - ontology-derived expected SDK type) and one from the physical - cross-table (2) check (edge BQ type vs node table's actual key - BQ type). Both are correct and complementary; users see both - descriptions and can act on either.""" + ontology key type, but the node table itself is on-spec. One + ENDPOINT_TYPE_MISMATCH entry is expected — the spec-level (1) + check (edge BQ type vs ontology-derived expected SDK type). + The physical cross-table (2) check is suppressed because + emitting it would double-report the same expected/observed pair + with different detail wording. The node-drifted case where (2) + adds genuinely new information is covered by + TestEndpointPhysicalCrossTableCheck.""" from bigquery_agent_analytics.binding_validation import FailureCode from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery From 6ececb199d6a537e1fa634ef63316093c99f66cb Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Sun, 3 May 2026 09:53:17 -0700 Subject: [PATCH 5/6] test(integration): live binding validator end-to-end against real BigQuery (#105 PR 2a) Adds TestBindingValidationLive::test_validator_end_to_end_against_real_bigquery to tests/test_integration_ontology_binding.py, gated on RUN_LIVE_BIGQUERY_TESTS=1 alongside the other live tests in this module. Self-contained: uses its own per-test scratch dataset (function- scope fixtures, not the module-scoped ones the rest of the file shares) because phase 4 of the test deliberately drops a column via ALTER TABLE. Running destructive SQL against the shared dataset would interfere with other tests in this file. Phases: 1. Materialize real tables via OntologyMaterializer (executes real CREATE TABLE IF NOT EXISTS for entity + relationship tables, including SDK metadata columns). 2. Default-mode validation: report.ok must be True; warnings contain only KEY_COLUMN_NULLABLE entries (because CREATE TABLE IF NOT EXISTS emits NULLABLE keys without NOT NULL constraints). 3. Strict-mode validation: same input must surface those four warnings as KEY_COLUMN_NULLABLE failures, with warnings empty (escalated, not duplicated). 4. Drop the 'confidence' column via real ALTER TABLE; default- mode re-validation must emit exactly one MISSING_COLUMN failure pointing at binding.entities[0].properties[1].column (binding YAML order: decision_id at [0], confidence at [1]). Verified live against test-project-0728-467323 (raincoatrun@): PASSED in 13.28s. Skipped automatically without RUN_LIVE_BIGQUERY_TESTS=1. --- tests/test_integration_ontology_binding.py | 213 +++++++++++++++++++++ 1 file changed, 213 insertions(+) diff --git a/tests/test_integration_ontology_binding.py b/tests/test_integration_ontology_binding.py index 323e7b18..351fa7f1 100644 --- a/tests/test_integration_ontology_binding.py +++ b/tests/test_integration_ontology_binding.py @@ -614,3 +614,216 @@ def test_synthetic_lineage_query( ) rows = list(job.result()) assert len(rows) > 0, "Lineage GQL returned 0 rows" + + +# ------------------------------------------------------------------ # +# Binding-validator live test (issue #105 PR 2a) # +# ------------------------------------------------------------------ # + + +class TestBindingValidationLive: + """Live validation that ``validate_binding_against_bigquery`` + behaves correctly against real BigQuery. + + Self-contained: uses its own per-test scratch dataset (rather + than the module-scoped fixture) because the third phase of this + test deliberately drops a column via ALTER TABLE, and running + destructive SQL against a shared dataset would interfere with + other tests in this file. + + Phases: + 1. Materialize real tables via OntologyMaterializer. + 2. Default-mode validation: report.ok must be True; warnings + contain only KEY_COLUMN_NULLABLE entries (because the SDK's + CREATE TABLE IF NOT EXISTS emits NULLABLE keys). + 3. Strict-mode validation: same input must surface those + warnings as KEY_COLUMN_NULLABLE failures, with warnings + empty (escalated, not duplicated). + 4. Drop the 'confidence' column via real ALTER TABLE; default- + mode re-validation must emit exactly one MISSING_COLUMN + failure pointing at the dropped column. + """ + + @pytest.fixture(scope="function") + def isolated_scratch(self): + """Per-test scratch dataset; cleaned up unconditionally.""" + from google.cloud import bigquery + + run_id = uuid.uuid4().hex[:8] + ds_id = f"bind_validate_live_{run_id}" + client = bigquery.Client(project=_PROJECT, location=_LOCATION) + ds = bigquery.Dataset(f"{_PROJECT}.{ds_id}") + ds.location = _LOCATION + ds.default_table_expiration_ms = 3600000 + client.create_dataset(ds, exists_ok=True) + try: + yield client, ds_id + finally: + client.delete_dataset( + f"{_PROJECT}.{ds_id}", + delete_contents=True, + not_found_ok=True, + ) + + @pytest.fixture(scope="function") + def isolated_ontology_and_binding(self, isolated_scratch, tmp_path_factory): + """Per-test ontology+binding pointing at the isolated scratch.""" + from bigquery_ontology import load_binding + from bigquery_ontology import load_ontology + + _, ds_id = isolated_scratch + tmp = tmp_path_factory.mktemp("bind_validate_live") + + ont_path = tmp / "ontology.yaml" + ont_path.write_text( + "ontology: BindValidatorLive\n" + "entities:\n" + " - name: Decision\n" + " keys:\n" + " primary: [decision_id]\n" + " properties:\n" + " - name: decision_id\n" + " type: string\n" + " - name: confidence\n" + " type: double\n" + " - name: Outcome\n" + " keys:\n" + " primary: [outcome_id]\n" + " properties:\n" + " - name: outcome_id\n" + " type: string\n" + "relationships:\n" + " - name: HasOutcome\n" + " from: Decision\n" + " to: Outcome\n" + " properties:\n" + " - name: weight\n" + " type: double\n", + encoding="utf-8", + ) + + bnd_path = tmp / "binding.yaml" + bnd_path.write_text( + f"binding: live_check\n" + f"ontology: BindValidatorLive\n" + f"target:\n" + f" backend: bigquery\n" + f" project: {_PROJECT}\n" + f" dataset: {ds_id}\n" + f"entities:\n" + f" - name: Decision\n" + f" source: decisions\n" + f" properties:\n" + f" - name: decision_id\n" + f" column: decision_id\n" + f" - name: confidence\n" + f" column: confidence\n" + f" - name: Outcome\n" + f" source: outcomes\n" + f" properties:\n" + f" - name: outcome_id\n" + f" column: outcome_id\n" + f"relationships:\n" + f" - name: HasOutcome\n" + f" source: edges\n" + f" from_columns: [decision_id]\n" + f" to_columns: [outcome_id]\n" + f" properties:\n" + f" - name: weight\n" + f" column: weight\n", + encoding="utf-8", + ) + + ontology = load_ontology(str(ont_path)) + binding = load_binding(str(bnd_path), ontology=ontology) + return ontology, binding + + def test_validator_end_to_end_against_real_bigquery( + self, isolated_scratch, isolated_ontology_and_binding + ): + from bigquery_agent_analytics.binding_validation import FailureCode + from bigquery_agent_analytics.binding_validation import validate_binding_against_bigquery + from bigquery_agent_analytics.ontology_materializer import OntologyMaterializer + + client, ds_id = isolated_scratch + ontology, binding = isolated_ontology_and_binding + + # Phase 1: materialize real tables. + mat = OntologyMaterializer.from_ontology_binding( + ontology=ontology, + binding=binding, + lineage_config=None, + write_mode="batch_load", + ) + tables = mat.create_tables() + assert set(tables.keys()) == { + "Decision", + "Outcome", + "HasOutcome", + }, f"Unexpected tables created: {sorted(tables.keys())}" + + # Phase 2: default-mode validation. SDK-created tables must + # validate clean; the only signal is advisory warnings on + # NULLABLE keys. + default_report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + assert default_report.ok is True, ( + f"Default mode rejected SDK-created tables. Failures: " + f"{[(f.code, f.detail) for f in default_report.failures]}" + ) + assert all( + w.code == FailureCode.KEY_COLUMN_NULLABLE + for w in default_report.warnings + ), ( + "Only KEY_COLUMN_NULLABLE warnings expected against SDK-" + "created tables. Got: " + f"{[w.code for w in default_report.warnings]}" + ) + # Decision.decision_id, Outcome.outcome_id (entity primary keys) + # plus HasOutcome.from_columns[0]=decision_id and + # HasOutcome.to_columns[0]=outcome_id (relationship endpoints). + assert len(default_report.warnings) == 4 + + # Phase 3: strict-mode escalation. + strict_report = validate_binding_against_bigquery( + ontology=ontology, + binding=binding, + bq_client=client, + strict=True, + ) + assert ( + strict_report.ok is False + ), "Strict mode should reject NULLABLE primary-key columns" + assert all( + f.code == FailureCode.KEY_COLUMN_NULLABLE + for f in strict_report.failures + ) + assert len(strict_report.failures) == 4 + assert strict_report.warnings == (), ( + "Strict mode must escalate warnings into failures, not " + "double-emit them" + ) + + # Phase 4: drop a non-key property column via real ALTER TABLE + # and assert the validator catches the resulting drift. + table_ref = f"{_PROJECT}.{ds_id}.decisions" + client.query(f"ALTER TABLE `{table_ref}` DROP COLUMN confidence").result() + + broken_report = validate_binding_against_bigquery( + ontology=ontology, binding=binding, bq_client=client + ) + miss = [ + f + for f in broken_report.failures + if f.code == FailureCode.MISSING_COLUMN and "confidence" in f.bq_ref + ] + assert len(miss) == 1, ( + f"Expected exactly 1 MISSING_COLUMN for confidence, got " + f"failures: " + f"{[(f.code, f.bq_ref) for f in broken_report.failures]}" + ) + # Path must reflect binding YAML order. Decision's binding lists + # decision_id at properties[0] and confidence at properties[1]. + assert miss[0].binding_path == ("binding.entities[0].properties[1].column") + assert miss[0].bq_ref == f"{table_ref}.confidence" From 6a14aa2ebd723a12545d475d306ca1fe651fc3fd Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Sun, 3 May 2026 19:24:12 -0700 Subject: [PATCH 6/6] docs+test: soften module docstring; add fixture-scope sanity assert (#105 PR 2a) Two small cleanups; no behavior change. (1) src/bigquery_agent_analytics/binding_validation.py: module docstring previously read like a PR-review note (told callers not to import a graph_validation module that does not exist yet). Replaced with a public-doc-shaped sentence that just describes the relationship to #76's planned extracted-graph validator. (2) tests/test_integration_ontology_binding.py: the live test TestBindingValidationLive depends on two function-scope fixtures (isolated_scratch and isolated_ontology_and_binding), and the second's binding YAML embeds the first's dataset id. If either fixture is later flipped to module scope by accident, the binding would point at a stale dataset. New assertion binding.target.dataset == ds_id catches that drift before any BQ call runs. Live test still PASSES against test-project-0728-467323 (13.24s). --- src/bigquery_agent_analytics/binding_validation.py | 9 +++------ tests/test_integration_ontology_binding.py | 12 ++++++++++++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/bigquery_agent_analytics/binding_validation.py b/src/bigquery_agent_analytics/binding_validation.py index 3f85beb5..98635051 100644 --- a/src/bigquery_agent_analytics/binding_validation.py +++ b/src/bigquery_agent_analytics/binding_validation.py @@ -20,12 +20,9 @@ most common authoring error (binding YAML drifted out of sync with physical tables) before extraction wastes ``AI.GENERATE`` tokens. -Different from the planned extracted-graph validator in issue #76: -that one will validate extracted graph output against the spec -(post-extraction, pre-materialization). This one validates the -binding against the live BigQuery schemas (pre-extraction). #76 has -not landed yet, so do not import any symbol from a -``graph_validation`` module — it does not exist on this PR. +Different from the planned extracted-graph validator in #76, which +will validate ``ExtractedGraph`` output against the resolved spec +after extraction. This validator runs before extraction. Usage:: diff --git a/tests/test_integration_ontology_binding.py b/tests/test_integration_ontology_binding.py index 351fa7f1..2e3465a6 100644 --- a/tests/test_integration_ontology_binding.py +++ b/tests/test_integration_ontology_binding.py @@ -748,6 +748,18 @@ def test_validator_end_to_end_against_real_bigquery( client, ds_id = isolated_scratch ontology, binding = isolated_ontology_and_binding + # Cheap fixture-scope sanity check: the binding YAML the second + # fixture wrote must point at the dataset the first fixture + # created. If a future refactor flips either fixture to module + # scope by accident, the binding would race with whichever + # dataset was created first and this assert would catch it + # before any BQ call runs. + assert binding.target.dataset == ds_id, ( + f"binding.target.dataset={binding.target.dataset!r} but " + f"isolated_scratch yielded {ds_id!r}; fixture scopes are " + f"out of sync" + ) + # Phase 1: materialize real tables. mat = OntologyMaterializer.from_ontology_binding( ontology=ontology,