From 15c924df6aba7f0b8dca87b75f5597110c37eb6c Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Wed, 20 May 2026 00:51:48 -0400 Subject: [PATCH] docs: bring CLAUDE.md up to date with the v3.0 Prefect architecture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A documentation audit found CLAUDE.md materially stale — it still described the v2.0 in-process pipeline. 13 of 38 verified claims were false. Corrections: * Version v2.0.0 → 3.0.0a0 (per pyproject.toml). * Architecture section rewritten for v3.0: the v2 ``DataProcessingPipeline`` loop and the named ``pipeline.py`` / ``jobs_legacy.py`` files do not exist. Orchestration is ``jobs/prefect_flow.py`` with per-stage ``@task`` functions and the ``datapusher_plus_flow`` entry ``@flow``. Documented the full v3.0 jobs-module layout (prefect_flow, runtime_context, subflows, events, caching, blocks, artifacts, quarantine, file_persistence) and the previously-omitted ``ai_suggestions`` stage. * Key Modules: dropped the non-existent ``jobs_legacy.py``; added ``prefect_client.py``, ``dictionary_stash.py``, ``utils.py``. * Formula type ``suggest_formula`` → ``suggestion_formula`` (the actual scheming-YAML key — verified in jinja2_helpers.py / formula.py). * qsv ``v4.0.0+`` → ``v20.1.0+`` (MINIMUM_QSV_VERSION; two BREAKING bumps since 4.0.0). * ``preview_rows`` default 1000 → 0 (matches config_declaration.yaml and config.py after PR #324). * External Dependencies: RQ → Prefect 3.7+ (the v3.0 runtime; requirements.txt pins prefect, not rq). * CLI commands: added ``prefect-deploy`` and ``migrate-from-rq``. * Build & Test: ``pytest tests/test_unit.py`` (no such file) replaced with a real example + the unit/integration split. Verified-correct claims (plugin.py interface list, formula namespaces, models, job_exceptions, logic actions, Python versions) were left intact. Co-Authored-By: Claude Opus 4.7 (1M context) --- CLAUDE.md | 80 ++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 22 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index a9a7e302..0bb56657 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,85 +4,121 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Project Overview -DataPusher+ is a CKAN extension (v2.0.0) for ultra-fast, robust data ingestion into CKAN's datastore. It replaces the legacy Datapusher webservice with a full CKAN extension that leverages [qsv](https://github.com/dathere/qsv) (a Rust-based CSV data-wrangling toolkit) for blazing-fast type inference and data analysis. +DataPusher+ is a CKAN extension (v3.0.0a0) for ultra-fast, robust data ingestion into CKAN's datastore. It replaces the legacy Datapusher webservice with a full CKAN extension that leverages [qsv](https://github.com/dathere/qsv) (a Rust-based CSV data-wrangling toolkit) for blazing-fast type inference and data analysis. **Key differentiators:** - Guaranteed data type inference by scanning entire files (not just first few rows) - PostgreSQL COPY for direct data loading (no API overhead) -- Jinja2 formula system for metadata inference/suggestion (`formula` and `suggest_formula` in scheming YAML) +- Jinja2 formula system for metadata inference/suggestion (`formula` and `suggestion_formula` in scheming YAML) - DRUF (Dataset Resource Upload First) workflow support +- v3.0: [Prefect](https://www.prefect.io/)-orchestrated ingestion flow (replaces the v2 in-process pipeline loop) ## Build & Test Commands ```bash -# Run all tests -pytest tests/ +# Run the unit suite (integration tests need the docker-compose stack — see below) +pytest tests/ --ignore=tests/integration -# Run specific test file -pytest tests/test_unit.py +# Run a specific test file +pytest tests/test_security.py # Run with coverage -pytest --cov=ckanext/datapusher_plus tests/ +pytest --cov=ckanext/datapusher_plus tests/ --ignore=tests/integration # Debug with IPython pytest --pdbcls=IPython.terminal.debugger:TerminalPdb tests/ ``` +Integration tests (`tests/integration/`) require the full docker-compose stack (CKAN, Postgres, Redis, Solr, Prefect server + worker). Bring it up with `scripts/integration-up`, then run with `INTEGRATION=1 CKAN_URL=http://localhost:5050 pytest tests/integration/ -v`. + ## CKAN CLI Commands ```bash -# Resubmit all resources to datapusher +# Resubmit all updated datastore resources ckan -c /etc/ckan/default/ckan.ini datapusher_plus resubmit -y -# Submit specific package resources +# Submit a specific package's resources ckan -c /etc/ckan/default/ckan.ini datapusher_plus submit {dataset_id} +# Register / update the DataPusher+ flow as a Prefect deployment (idempotent) +ckan -c /etc/ckan/default/ckan.ini datapusher_plus prefect-deploy + +# One-shot v2 (RQ) → v3 (Prefect) migration: drains the RQ queue, +# resets stale pending task_status rows, verifies the Prefect deployment +ckan -c /etc/ckan/default/ckan.ini datapusher_plus migrate-from-rq + # Database migrations ckan -c /etc/ckan/default/ckan.ini db upgrade -p datapusher_plus ``` ## Architecture -### Pipeline Stage Pattern (v2.0) +### Prefect Flow Architecture (v3.0) -The refactored jobs module uses a modular stage-based pipeline in `ckanext/datapusher_plus/jobs/`: +The v3.0 release replaces the v2 in-process `DataProcessingPipeline` loop with a Prefect-orchestrated flow. The jobs module lives in `ckanext/datapusher_plus/jobs/`: ``` -pipeline.py → Main orchestration, entry point (datapusher_plus_to_datastore) -context.py → ProcessingContext state management across stages +prefect_flow.py → Orchestration. Per-stage @task functions (each delegates to a + BaseStage.process() body) + the entry-point @flow + `datapusher_plus_flow`. Wraps the datastore-mutating tasks in + `with transaction()` for atomic rollback; owns the Jobs row + state transitions; fires the datapusher_hook HTTP callback. +__init__.py → Public surface via PEP 562 lazy __getattr__ (defers the Prefect + import so CKAN admin commands don't spin up a Prefect server). + Exposes `datapusher_plus_flow`, `push_to_datastore` (v2 shim), + `datapusher_plus_to_datastore` (alias), `callback_datapusher_hook`. +context.py → ProcessingContext — per-run mutable state shared across stages. +runtime_context.py → JobInput (frozen, JSON-serializable flow input), the per-stage + `*Result` dataclasses (DownloadResult, AnalyzeResult, …), the + RuntimeContext ContextVar (set/get/reset) and `rehydrate`. +subflows.py → @flow-wrapped subflows (pii_screening_subflow, + spatial_processing_subflow) for custom flow composition. +events.py → Custom Prefect events emitted for downstream Automations. +caching.py → Task result-persistence + cache-key configuration. +blocks.py → Prefect Block registration (result-storage config). +artifacts.py → Human-readable Prefect run-page artifacts (data-quality summaries). +quarantine.py → Bad-row quarantine for the validation task (route rejects to a + sibling CSV, continue if under `max_quarantine_pct`). +file_persistence.py→ Persists task working files to result storage so cached task + results stay valid across runs. stages/ base.py → Abstract BaseStage class download.py → File download with retries, proxy support, timeout handling format_converter.py → Excel/ODS/Shapefile/GeoJSON/ZIP → CSV conversion validation.py → RFC-4180 CSV validation, encoding detection/normalization analysis.py → QSV-based type inference, summary stats, frequency tables + ai_suggestions.py → AI-assisted metadata suggestions via `qsv describegpt` database.py → PostgreSQL COPY operations, smartint type selection indexing.py → Auto-index creation based on cardinality/dates formula.py → Jinja2 formula evaluation (package/resource metadata) metadata.py → Datastore resource dict updates, dpp_suggestions ``` +Operators can register a custom flow via `ckanext.datapusher_plus.prefect_flow`; the per-stage `@task` functions in `prefect_flow.py` are the public composable primitives. + ### Key Modules - **plugin.py** — CKAN plugin entry point, implements IConfigurer, IConfigurable, IActions, IAuthFunctions, IPackageController, IResourceUrlChange, IResourceController, ITemplateHelpers, IBlueprint, IClick (+ IFormRedirect conditionally) - **config.py** — ~50 configuration parameters (all `ckanext.datapusher_plus.*` settings) - **config_declaration.yaml** — CKAN 2.10+ declarative config definitions - **qsv_utils.py** — QSV CLI wrapper (stats, frequency, type inference, validation) +- **prefect_client.py** — Thin wrapper around the Prefect 3 client; the single place the codebase touches `prefect.*` - **jinja2_helpers.py** — FormulaProcessor and custom filters/functions for metadata formulas - **datastore_utils.py** — PostgreSQL datastore operations +- **dictionary_stash.py** — On-disk stash/restore of per-column Data Dictionary annotations across job runs/failures - **spatial_helpers.py** — Shapefile/GeoJSON processing with geometry simplification - **pii_screening.py** — PII detection with configurable regex patterns - **helpers.py** — Template helpers for job status display in CKAN UI -- **cli.py** — CKAN CLI command implementations (resubmit, submit) +- **cli.py** — CKAN CLI command implementations (resubmit, submit, prefect-deploy, migrate-from-rq) - **logging_utils.py** — Custom TRACE logging level (level 5) - **interfaces.py** — `IDataPusher` interface for external plugin hooks - **job_exceptions.py** — Custom exception hierarchy (`DataTooBigError`, `JobError`, `HTTPError`, etc.) +- **utils.py** — Shared helpers (e.g. `utcnow_naive()` for naive-UTC timestamps) - **logic/action.py** — Actions: `datapusher_submit`, `datapusher_hook`, `datapusher_status` - **logic/schema.py** — Validation schemas for action functions - **logic/auth.py** — Authorization functions - **views.py** — Flask blueprints for web endpoints - **druf_view.py** — DRUF-specific view handling -- **jobs_legacy.py** — Legacy monolithic implementation (preserved for reference) ### Database Models (model/model.py) @@ -99,8 +135,8 @@ Formulas in scheming YAML have access to three namespaces: - `dpp` — Inferred metadata (RECORD_COUNT, DATE_FIELDS, LAT_FIELD, LON_FIELD, etc.) Formula types: -- `formula` — Evaluated and assigned to field immediately -- `suggest_formula` — Stored in `dpp_suggestions` field for UI suggestions +- `formula` — Evaluated and assigned to the field immediately +- `suggestion_formula` — Stored in the `dpp_suggestions` field for UI suggestions ## Coding Conventions @@ -112,22 +148,22 @@ Formula types: - **Logging** — Custom TRACE level (5) via `logging_utils.py`; f-string log messages; pipeline stages use `ProcessingContext.logger` - **Error handling** — Custom exception hierarchy in `job_exceptions.py` - **Linting** — Flake8 with E501 disabled (long lines allowed): `# flake8: noqa: E501` -- **CI** — `.github/workflows/main.yml` runs integration tests +- **CI** — `.github/workflows/` (`main.yml`, `ci.yml`) runs unit + integration tests ## External Dependencies - **Python 3.10, 3.11, 3.12, 3.13** -- **qsv v4.0.0+** — Must be installed at path specified by `ckanext.datapusher_plus.qsv_bin` +- **qsv v20.1.0+** — Must be installed at path specified by `ckanext.datapusher_plus.qsv_bin` (`MINIMUM_QSV_VERSION` is enforced at startup) - **CKAN 2.10+** with ckanext-scheming - **PostgreSQL** datastore -- **RQ (Redis Queue)** for background job processing +- **Prefect 3.7+** — Orchestrates the v3.0 ingestion flow (replaces the v2 RQ-based job runner; `migrate-from-rq` handles the upgrade) ## Configuration Reference -Key settings in `ckan.ini` (see config.py and config_declaration.yaml for full list): +Key settings in `ckan.ini` (see config.py and config_declaration.yaml for the full list): - `ckanext.datapusher_plus.qsv_bin` — Path to qsv binary - `ckanext.datapusher_plus.formats` — Supported file formats -- `ckanext.datapusher_plus.preview_rows` — Number of preview rows (default: 1000) +- `ckanext.datapusher_plus.preview_rows` — Number of preview rows; `0` = load all rows (default: 0) - `ckanext.datapusher_plus.auto_index_threshold` — Cardinality threshold for auto-indexing - `ckanext.datapusher_plus.prefer_dmy` — Date format preference (DMY vs MDY) - `ckanext.datapusher_plus.enable_druf` — Enable DRUF workflow