Skip to content

Rewire ingestion pipeline: single ingest_by_doi entry point, wire IngestionRun/ParsedArtifact, extract Docling parsing #70

@AymanL

Description

@AymanL

Problem Statement

The ingestion codebase has three separate pipeline entry points (run_pipeline in services.py, ingest_vaccins management command, and seed_db CLI script) that each implement a different, incomplete slice of the pipeline defined in PRD #33. None of them create IngestionRun records, none create ParsedArtifact records, and none follow the full flowchart from PRD #33. The models (IngestionRun, ParsedArtifact, Document) were built correctly in prior work but nothing wires into them. Additionally, the production parsing code imports from exploration/parsing_benchmarking/, a benchmarking module that discards data (raw Docling JSON, parser config) that production needs to populate ParsedArtifact. The result is a codebase where the data model and the pipeline are completely disconnected.

Solution

Consolidate all ingestion logic into a single canonical pipeline function ingest_by_doi(doi, pdf_url=None) -> IngestionRun in services.py, following the flowchart from PRD #33. Both entry points (web view for single DOI, CSV seeding CLI for bulk) converge on this one function — the CSV path is simply a loop that calls it per row. Extract and fix the Docling parsing layer into a proper production module under ingestion/parsing/ so it returns everything needed to populate ParsedArtifact. Wire IngestionRun into every ingestion call as the audit record. Replace the ingest_vaccins management command with a seed_db management command. Remove all dead helpers.

Ingestion Flow

Both entry points converge after step 1:

DOI (web view or CSV row)
  -> validate uniqueness
  -> IngestionRun created (status=running, stage=acquire)
  -> Document created (doi set)
  -> fetch metadata via collector (fetch_all)
  -> attempt PDF download (direct url if given, then parser chain)
  ->  if no PDF:
           write metadata to Document
           IngestionRun: success, success_kind=metadata_only, stage=done
  ->  if PDF found:
           SourceFile created, stage=store
           ParsedArtifact created (docling_output, postprocessed_text, parser_config), stage=parse
           DocumentChunks created, stage=chunk
           embeddings added, stage=done
           IngestionRun: success, success_kind=full, stage=done
  ->  on any error:
           IngestionRun: status=failed, error_stage=<last stage>, error_message=<exc>
           partial state (Document, SourceFile) left in place for retry

User Stories

  1. As a backend developer, I want a single ingest_by_doi(doi, pdf_url=None) -> IngestionRun function that is the only place ingestion logic lives, so that fixing a bug or adding a stage only requires changing one place.
  2. As a backend developer, I want every ingestion attempt to produce an IngestionRun record from the very first step, so that every attempt — including failures — is auditable.
  3. As a backend developer, I want the IngestionRun to record the last-reached pipeline stage on failure, so I can pinpoint exactly where the pipeline stopped without reading logs.
  4. As a backend developer, I want failed ingestion runs to leave partial state (Document, SourceFile) in place rather than rolling back, so that a retry can pick up without re-fetching already-stored data.
  5. As a backend developer, I want ingest_by_doi to create a Document record at the start of every run, so the canonical document exists even before a PDF is found.
  6. As a backend developer, I want ingest_by_doi to create a ParsedArtifact whenever a PDF is successfully parsed, storing the raw Docling JSON, postprocessed text, and parser config, so parsing is auditable and reproducible.
  7. As a backend developer, I want ingest_by_doi to create DocumentChunk records linked to the Document whenever a PDF is parsed, so retrieval and embedding search work correctly.
  8. As a backend developer, I want ingest_by_doi to add embeddings to all DocumentChunk records it creates, so the DB is search-ready immediately after ingestion.
  9. As a backend developer, I want ingest_by_doi to mark a run as success_kind=metadata_only when no PDF is found, so I can distinguish partial from full documents without querying related tables.
  10. As a backend developer, I want ingest_by_doi to skip ingestion with a clear result when a DOI already exists in the database, so re-running a seed does not create duplicates.
  11. As a backend developer, I want the raw metadata payload from fetch_all to be stored on IngestionRun.raw_provider_payload, so API responses are preserved for audit and future reprocessing without re-fetching.
  12. As a backend developer, I want the Docling parsing logic extracted into a dedicated production module under ingestion/parsing/, so it is independent of the benchmarking exploration code and returns everything needed to populate ParsedArtifact.
  13. As a backend developer, I want the production parsing module to return the raw Docling JSON, postprocessed text, parser config, and text chunks in a single call, so callers never need to know the internals of Docling to populate the data model.
  14. As a contributor, I want a manage.py seed_db --csv <path> management command that reads a CSV of DOIs and calls ingest_by_doi for each row, so I can seed the database from a curated list without running a standalone script.
  15. As a contributor, I want seed_db --csv to support a --dry-run flag that prints what would be ingested without touching the database, so I can verify the CSV before running a real ingestion.
  16. As a contributor, I want the web view DOI submission to go through ingest_by_doi, so the web and CLI paths are always in sync.
  17. As a contributor, I want the old ingest_vaccins management command removed, so there is no confusion about which command to use.
  18. As a backend developer, I want dead helper functions (save_to_s3_and_postgres, fetch_file_and_metadata, the old run_pipeline) removed from services.py, so the module surface is minimal and unambiguous.

Implementation Decisions

  • Single pipeline function: ingest_by_doi(doi, pdf_url=None) -> IngestionRun in services.py is the canonical entry point for all DOI-initiated ingestion. It owns the full lifecycle: IngestionRun creation, Document creation, metadata fetch, PDF download, SourceFile creation, ParsedArtifact creation, chunking, and embedding.
  • Both entry points converge: The web view passes a single DOI directly. The CSV CLI loops over rows and calls ingest_by_doi per row. No branching logic in the pipeline itself for "bulk vs single."
  • IngestionRun created first: Before any fetch or file operation, so every attempt is recorded even on immediate failure.
  • Partial state preserved on failure: On any exception, the IngestionRun is updated to status=failed, error_stage, and error_message. Already-created Document and SourceFile rows are left in place to enable retry without data loss.
  • Docling extraction: A new production module under ingestion/parsing/ wraps Docling and returns a structured result containing: postprocessed_text (str), docling_output (dict, raw Docling JSON), parser_config (dict, model versions and parameters), and chunks (list of str). The benchmarking module in exploration/ is not modified.
  • ParsedArtifact creation: Created from the structured parse result inside ingest_by_doi when a PDF is present and parsing succeeds.
  • DocumentChunk creation and embedding: Happens inside ingest_by_doi after ParsedArtifact is created. The existing save_chunks and add_embeddings helpers can be kept as internal utilities called by the pipeline.
  • seed_db management command: A new Django management command wrapping the CSV reading logic. The standalone seed_db.py script may be kept as a thin CLI entry point that delegates to the same logic, or removed — implementer's choice.
  • ingest_vaccins management command: Deleted.
  • Dead helpers removed: save_to_s3_and_postgres, fetch_file_and_metadata, and the old run_pipeline are deleted from services.py.
  • pipeline_version: Required field on IngestionRun. Implementation should define a version constant (e.g. in settings or a dedicated version file) to pass at call time. The exact mechanism is left to the implementer.
  • Metadata reconciliation deferred: Writing normalized metadata to Document using the API > parsed > NULL priority rule is explicitly out of scope. Document.title should still be populated from fetch_all metadata when available.

Testing Decisions

  • Good tests assert observable database state: which records were created, what their field values are, what status/stage an IngestionRun is in. They do not test internal call sequences or mock intermediate steps.
  • ingest_by_doi full path: given a valid DOI and a downloadable PDF, assert that IngestionRun, Document, SourceFile, ParsedArtifact, and at least one DocumentChunk with a non-null embedding are created, and that IngestionRun.status=success, success_kind=full, stage=done.
  • ingest_by_doi metadata-only path: given a valid DOI with no PDF available, assert that IngestionRun and Document are created, no SourceFile/ParsedArtifact/DocumentChunk rows exist, and IngestionRun.success_kind=metadata_only.
  • ingest_by_doi duplicate DOI: given a DOI that already exists, assert that no new records are created and the result communicates the skip clearly.
  • ingest_by_doi failure mid-pipeline: simulate a parsing failure; assert IngestionRun.status=failed, error_stage is set correctly, and any already-created Document/SourceFile rows remain in the DB.
  • Production parsing module: given a local PDF file, assert the returned result has non-empty postprocessed_text, a non-empty docling_output dict, a non-empty parser_config dict, and at least one chunk. Tested in isolation from the rest of the pipeline.
  • seed_db management command: given a CSV with valid DOIs, assert that ingest_by_doi is called once per non-duplicate row.
  • Follow existing pytest + django_db patterns in tests/ingestion/.

Out of Scope

  • Metadata reconciliation (API > parsed > NULL priority, ParsedArtifact.metadata_extracted divergence capture) — deferred, defined in PRD Setup stable canonical document representation #33.
  • PDF upload entry point — the web upload path is not rewired in this PRD.
  • Multi-source metadata priority order — deferred to a follow-up discussion with the data-acquisition team.
  • Re-parsing / ParsedArtifact versioning — the 1:1 constraint remains; re-parse history is a future concern.
  • Research catalog tablesAuthor, Keywords, Theme, evidence hierarchy — out of scope per PRD Setup stable canonical document representation #33.
  • Changes to embedding model, chunking algorithm, or search API.
  • Modifying exploration/parsing_benchmarking/ — the benchmarking module is untouched; only production code under ingestion/ is changed.

Further Notes

  • This PRD is the pipeline-rewiring follow-up explicitly deferred in PRD Setup stable canonical document representation #33 ("Rewriting run_pipeline, fetch stubs, or upload flows to use Document end-to-end" was out of scope there).
  • The offline zip-based ingestion path (ingest_vaccins + data/vaccine_perfs/) is intentionally retired. The CSV-based seed_db command with a curated DOI list replaces it. Contributors who need to seed from pre-downloaded PDFs should add the corresponding DOIs to the CSV and let the pipeline fetch them.
  • IngestionRun.raw_provider_payload should store the verbatim response from fetch_all so future metadata reconciliation work has access to the raw API responses without re-fetching.
  • This PRD is also the parent for closing PR [WIP] Feat/seeding pipeline #65 (feat/seeding-pipeline), whose seeding logic should be absorbed into the unified pipeline described here.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions