Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 58 additions & 22 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,85 +4,121 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co

## Project Overview

DataPusher+ is a CKAN extension (v2.0.0) for ultra-fast, robust data ingestion into CKAN's datastore. It replaces the legacy Datapusher webservice with a full CKAN extension that leverages [qsv](https://github.com/dathere/qsv) (a Rust-based CSV data-wrangling toolkit) for blazing-fast type inference and data analysis.
DataPusher+ is a CKAN extension (v3.0.0a0) for ultra-fast, robust data ingestion into CKAN's datastore. It replaces the legacy Datapusher webservice with a full CKAN extension that leverages [qsv](https://github.com/dathere/qsv) (a Rust-based CSV data-wrangling toolkit) for blazing-fast type inference and data analysis.

**Key differentiators:**
- Guaranteed data type inference by scanning entire files (not just first few rows)
- PostgreSQL COPY for direct data loading (no API overhead)
- Jinja2 formula system for metadata inference/suggestion (`formula` and `suggest_formula` in scheming YAML)
- Jinja2 formula system for metadata inference/suggestion (`formula` and `suggestion_formula` in scheming YAML)
- DRUF (Dataset Resource Upload First) workflow support
- v3.0: [Prefect](https://www.prefect.io/)-orchestrated ingestion flow (replaces the v2 in-process pipeline loop)

## Build & Test Commands

```bash
# Run all tests
pytest tests/
# Run the unit suite (integration tests need the docker-compose stack — see below)
pytest tests/ --ignore=tests/integration

# Run specific test file
pytest tests/test_unit.py
# Run a specific test file
pytest tests/test_security.py

# Run with coverage
pytest --cov=ckanext/datapusher_plus tests/
pytest --cov=ckanext/datapusher_plus tests/ --ignore=tests/integration

# Debug with IPython
pytest --pdbcls=IPython.terminal.debugger:TerminalPdb tests/
```

Integration tests (`tests/integration/`) require the full docker-compose stack (CKAN, Postgres, Redis, Solr, Prefect server + worker). Bring it up with `scripts/integration-up`, then run with `INTEGRATION=1 CKAN_URL=http://localhost:5050 pytest tests/integration/ -v`.

## CKAN CLI Commands

```bash
# Resubmit all resources to datapusher
# Resubmit all updated datastore resources
ckan -c /etc/ckan/default/ckan.ini datapusher_plus resubmit -y

# Submit specific package resources
# Submit a specific package's resources
ckan -c /etc/ckan/default/ckan.ini datapusher_plus submit {dataset_id}

# Register / update the DataPusher+ flow as a Prefect deployment (idempotent)
ckan -c /etc/ckan/default/ckan.ini datapusher_plus prefect-deploy

# One-shot v2 (RQ) → v3 (Prefect) migration: drains the RQ queue,
# resets stale pending task_status rows, verifies the Prefect deployment
ckan -c /etc/ckan/default/ckan.ini datapusher_plus migrate-from-rq

# Database migrations
ckan -c /etc/ckan/default/ckan.ini db upgrade -p datapusher_plus
```

## Architecture

### Pipeline Stage Pattern (v2.0)
### Prefect Flow Architecture (v3.0)

The refactored jobs module uses a modular stage-based pipeline in `ckanext/datapusher_plus/jobs/`:
The v3.0 release replaces the v2 in-process `DataProcessingPipeline` loop with a Prefect-orchestrated flow. The jobs module lives in `ckanext/datapusher_plus/jobs/`:

```
pipeline.py → Main orchestration, entry point (datapusher_plus_to_datastore)
context.py → ProcessingContext state management across stages
prefect_flow.py → Orchestration. Per-stage @task functions (each delegates to a
BaseStage.process() body) + the entry-point @flow
`datapusher_plus_flow`. Wraps the datastore-mutating tasks in
`with transaction()` for atomic rollback; owns the Jobs row
state transitions; fires the datapusher_hook HTTP callback.
__init__.py → Public surface via PEP 562 lazy __getattr__ (defers the Prefect
import so CKAN admin commands don't spin up a Prefect server).
Exposes `datapusher_plus_flow`, `push_to_datastore` (v2 shim),
`datapusher_plus_to_datastore` (alias), `callback_datapusher_hook`.
context.py → ProcessingContext — per-run mutable state shared across stages.
runtime_context.py → JobInput (frozen, JSON-serializable flow input), the per-stage
`*Result` dataclasses (DownloadResult, AnalyzeResult, …), the
RuntimeContext ContextVar (set/get/reset) and `rehydrate`.
subflows.py → @flow-wrapped subflows (pii_screening_subflow,
spatial_processing_subflow) for custom flow composition.
events.py → Custom Prefect events emitted for downstream Automations.
caching.py → Task result-persistence + cache-key configuration.
blocks.py → Prefect Block registration (result-storage config).
artifacts.py → Human-readable Prefect run-page artifacts (data-quality summaries).
quarantine.py → Bad-row quarantine for the validation task (route rejects to a
sibling CSV, continue if under `max_quarantine_pct`).
file_persistence.py→ Persists task working files to result storage so cached task
results stay valid across runs.
stages/
base.py → Abstract BaseStage class
download.py → File download with retries, proxy support, timeout handling
format_converter.py → Excel/ODS/Shapefile/GeoJSON/ZIP → CSV conversion
validation.py → RFC-4180 CSV validation, encoding detection/normalization
analysis.py → QSV-based type inference, summary stats, frequency tables
ai_suggestions.py → AI-assisted metadata suggestions via `qsv describegpt`
database.py → PostgreSQL COPY operations, smartint type selection
indexing.py → Auto-index creation based on cardinality/dates
formula.py → Jinja2 formula evaluation (package/resource metadata)
metadata.py → Datastore resource dict updates, dpp_suggestions
```

Operators can register a custom flow via `ckanext.datapusher_plus.prefect_flow`; the per-stage `@task` functions in `prefect_flow.py` are the public composable primitives.

### Key Modules

- **plugin.py** — CKAN plugin entry point, implements IConfigurer, IConfigurable, IActions, IAuthFunctions, IPackageController, IResourceUrlChange, IResourceController, ITemplateHelpers, IBlueprint, IClick (+ IFormRedirect conditionally)
- **config.py** — ~50 configuration parameters (all `ckanext.datapusher_plus.*` settings)
- **config_declaration.yaml** — CKAN 2.10+ declarative config definitions
- **qsv_utils.py** — QSV CLI wrapper (stats, frequency, type inference, validation)
- **prefect_client.py** — Thin wrapper around the Prefect 3 client; the single place the codebase touches `prefect.*`
- **jinja2_helpers.py** — FormulaProcessor and custom filters/functions for metadata formulas
- **datastore_utils.py** — PostgreSQL datastore operations
- **dictionary_stash.py** — On-disk stash/restore of per-column Data Dictionary annotations across job runs/failures
- **spatial_helpers.py** — Shapefile/GeoJSON processing with geometry simplification
- **pii_screening.py** — PII detection with configurable regex patterns
- **helpers.py** — Template helpers for job status display in CKAN UI
- **cli.py** — CKAN CLI command implementations (resubmit, submit)
- **cli.py** — CKAN CLI command implementations (resubmit, submit, prefect-deploy, migrate-from-rq)
- **logging_utils.py** — Custom TRACE logging level (level 5)
- **interfaces.py** — `IDataPusher` interface for external plugin hooks
- **job_exceptions.py** — Custom exception hierarchy (`DataTooBigError`, `JobError`, `HTTPError`, etc.)
- **utils.py** — Shared helpers (e.g. `utcnow_naive()` for naive-UTC timestamps)
- **logic/action.py** — Actions: `datapusher_submit`, `datapusher_hook`, `datapusher_status`
- **logic/schema.py** — Validation schemas for action functions
- **logic/auth.py** — Authorization functions
- **views.py** — Flask blueprints for web endpoints
- **druf_view.py** — DRUF-specific view handling
- **jobs_legacy.py** — Legacy monolithic implementation (preserved for reference)

### Database Models (model/model.py)

Expand All @@ -99,8 +135,8 @@ Formulas in scheming YAML have access to three namespaces:
- `dpp` — Inferred metadata (RECORD_COUNT, DATE_FIELDS, LAT_FIELD, LON_FIELD, etc.)

Formula types:
- `formula` — Evaluated and assigned to field immediately
- `suggest_formula` — Stored in `dpp_suggestions` field for UI suggestions
- `formula` — Evaluated and assigned to the field immediately
- `suggestion_formula` — Stored in the `dpp_suggestions` field for UI suggestions

## Coding Conventions

Expand All @@ -112,22 +148,22 @@ Formula types:
- **Logging** — Custom TRACE level (5) via `logging_utils.py`; f-string log messages; pipeline stages use `ProcessingContext.logger`
- **Error handling** — Custom exception hierarchy in `job_exceptions.py`
- **Linting** — Flake8 with E501 disabled (long lines allowed): `# flake8: noqa: E501`
- **CI** — `.github/workflows/main.yml` runs integration tests
- **CI** — `.github/workflows/` (`main.yml`, `ci.yml`) runs unit + integration tests

## External Dependencies

- **Python 3.10, 3.11, 3.12, 3.13**
- **qsv v4.0.0+** — Must be installed at path specified by `ckanext.datapusher_plus.qsv_bin`
- **qsv v20.1.0+** — Must be installed at path specified by `ckanext.datapusher_plus.qsv_bin` (`MINIMUM_QSV_VERSION` is enforced at startup)
- **CKAN 2.10+** with ckanext-scheming
- **PostgreSQL** datastore
- **RQ (Redis Queue)** for background job processing
- **Prefect 3.7+** — Orchestrates the v3.0 ingestion flow (replaces the v2 RQ-based job runner; `migrate-from-rq` handles the upgrade)

## Configuration Reference

Key settings in `ckan.ini` (see config.py and config_declaration.yaml for full list):
Key settings in `ckan.ini` (see config.py and config_declaration.yaml for the full list):
- `ckanext.datapusher_plus.qsv_bin` — Path to qsv binary
- `ckanext.datapusher_plus.formats` — Supported file formats
- `ckanext.datapusher_plus.preview_rows` — Number of preview rows (default: 1000)
- `ckanext.datapusher_plus.preview_rows` — Number of preview rows; `0` = load all rows (default: 0)
- `ckanext.datapusher_plus.auto_index_threshold` — Cardinality threshold for auto-indexing
- `ckanext.datapusher_plus.prefer_dmy` — Date format preference (DMY vs MDY)
- `ckanext.datapusher_plus.enable_druf` — Enable DRUF workflow
Expand Down
Loading