diff --git a/.claude/temp.md b/.claude/temp.md
deleted file mode 100644
index f62ebbb4..00000000
--- a/.claude/temp.md
+++ /dev/null
@@ -1,4 +0,0 @@
-
-------
-
-
diff --git a/.gitignore b/.gitignore
index 4036d692..557942a9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -162,6 +162,19 @@ works/management/commands/goas_v01_simplified_0.1-90.geojson
works/management/commands/goas_v01_simplified-0.05-80.geojson
+# Zenodo data artifacts (rendered per-environment; never commit sandbox state)
+data/optimap-main.zip
+data/*.gpkg
+data/*.geojson
+data/*.geojson.gz
+data/*.csv
+data/*.csv.gz
+data/README.md
+data/zenodo_dynamic.json
+
+# Test environment files (may contain secrets)
+tests/.env
+
works/management/commands/goas_v01_simplified.geojson
works/management/commands/goas_v01.gpkg
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3d39e80a..40c468a6 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
+- **Zenodo data archival groundwork** (issue #63) — `python manage.py render_zenodo` builds `README.md`, a versioned `optimap-main.zip` (current git `HEAD`), and a `zenodo_dynamic.json` payload under `data/`; `deposit_zenodo` (or the combined `zenodo_deposit`) updates an existing Zenodo draft via [`zenodo-client`](https://pypi.org/project/zenodo-client/) and never publishes automatically. Each run records a `ZenodoDepositionLog` row (status, file list, total size, DOI, draft URL) and emails all `is_staff` users the outcome with a direct link to the draft. An admin action *Trigger Zenodo Deposition* runs the full render+deposit cycle. The `/data/` page now shows the latest successful deposition (sandbox-aware in `DEBUG`, production-only otherwise). Settings: `ZENODO_API_TOKEN`, `ZENODO_SANDBOX_DEPOSITION_ID`, `ZENODO_API_BASE`. Sources, related-identifier URLs, funding metadata, and the codebook are wired up incrementally in follow-up commits.
+- **Zenodo deposition is now fully self-sufficient** (issue #63 closes the "write code to create a new deposition" item). The deposit step bootstraps a fresh draft via `POST /deposit/depositions` when no `ZENODO_SANDBOX_DEPOSITION_ID` is configured and no prior successful `ZenodoDepositionLog` exists; otherwise it reuses the latest log row's ID so re-triggered runs land on the same draft without manual env edits. When the targeted record has been manually published (`submitted=true` + `state="done"`), the next run calls `POST .../actions/newversion`, follows `links.latest_draft`, and updates *that* draft instead — so the second and later deposit cycles work end-to-end without operator intervention. Publishing remains manual.
+- **Annual Django-Q schedule for Zenodo deposition** — `python manage.py schedule_zenodo_deposit` registers `works.tasks.run_zenodo_deposition` to run yearly on Dec 31 23:59 (idempotent; safe to re-run). The task chains `regenerate_all_data_dumps` → `render_zenodo_package` → `deposit_to_zenodo` so the deposit always reflects the latest data.
- **Tag works with EO4GEO Body of Knowledge concepts** (closes #245). New `bok_concepts` field on `Work` plus an autosuggest combobox on the work landing page (≥3-character query, full keyboard, multi-select) backed by `GET /api/v1/bok/search/`. Tagged concepts render as chips that link to the canonical concept page on `bok.eo4geo.eu`, surface in the public Work API as `bok_concepts` / `bok_concepts_resolved`, and emit JSON-LD `about: [DefinedTerm,…]` on the landing page. Adding the first concept on a harvested work flips its status from Harvested to Contributed for admin review; Recognition Board credit is recorded under a new generic *Ontology contributions* kind (so the same bucket can later cover other controlled vocabularies) and deduped per (user, work) so the same user adding more concepts later does not double-count. The cached BoK snapshot is refreshed by `python manage.py refresh_bok_snapshot` (pinned to `v3` by default; configurable via `OPTIMAP_BOK_VERSION`). The editor is **opt-in**: set `OPTIMAP_BOK_ENABLED_COLLECTIONS` to a comma-separated list of `Collection.identifier` slugs to enable it on works in those collections — empty (default) disables the editor site-wide. Read-only chips on already-tagged works remain visible regardless.
### Changed
diff --git a/README.md b/README.md
index 35d24489..7df4b292 100644
--- a/README.md
+++ b/README.md
@@ -654,6 +654,158 @@ The app is deployed in the TUD Enterprise Cloud at =10.0
# SVG → PNG for the OPTIMAP logo on the og:image preview
cairosvg>=2.7
+# Zenodo data deposition (issue #63)
+zenodo-client==0.3.6
+markdown>=3.7
+jinja2>=3.1.4
+
# Geoextent library for spatial/temporal extent extraction
git+https://github.com/nuest/geoextent.git@main#egg=geoextent
\ No newline at end of file
diff --git a/tests/.env.template b/tests/.env.template
new file mode 100644
index 00000000..dc01fa57
--- /dev/null
+++ b/tests/.env.template
@@ -0,0 +1,13 @@
+# Zenodo API Configuration for Testing
+# Copy this file to tests/.env and fill in your actual values
+
+# Zenodo Sandbox API Token
+# Get from: https://sandbox.zenodo.org/account/settings/applications/tokens/new/
+ZENODO_API_TOKEN=your_sandbox_token_here
+
+# Zenodo Sandbox Deposition ID
+# Create a draft deposit first, then get its ID from the URL or API response
+ZENODO_SANDBOX_DEPOSITION_ID=your_deposition_id_here
+
+# Zenodo API Base URL (sandbox for testing, production for real deposits)
+ZENODO_API_BASE=https://sandbox.zenodo.org/api
diff --git a/tests/test_deposit_zenodo.py b/tests/test_deposit_zenodo.py
new file mode 100644
index 00000000..d7ac397e
--- /dev/null
+++ b/tests/test_deposit_zenodo.py
@@ -0,0 +1,747 @@
+# tests/test_deposit_zenodo.py
+import json
+import tempfile
+from pathlib import Path
+from copy import deepcopy
+from unittest.mock import patch
+
+from django.core.management import call_command
+from django.test import TestCase, SimpleTestCase, override_settings
+from works.models import Work, Source, ZenodoDepositionLog
+from works.zenodo import _build_upload_list, _latest_dump_files
+
+
+class BuildUploadListTest(SimpleTestCase):
+ """Direct unit tests for the upload-list helpers (issue #63, item 4)."""
+
+ def setUp(self):
+ self._tmpdir = tempfile.TemporaryDirectory()
+ self.root = Path(self._tmpdir.name)
+ self.data_dir = self.root / "data"
+ self.dump_dir = self.root / "optimap_cache"
+ self.data_dir.mkdir()
+ self.dump_dir.mkdir()
+
+ def tearDown(self):
+ self._tmpdir.cleanup()
+
+ def test_latest_dump_files_picks_newest_timestamp_only(self):
+ # Two cycles in the same dir, three formats each
+ for ts in ("20240101", "20250101"):
+ (self.dump_dir / f"optimap_data_dump_{ts}.geojson").write_text("{}")
+ (self.dump_dir / f"optimap_data_dump_{ts}.geojson.gz").write_bytes(b"\x1f\x8b")
+ (self.dump_dir / f"optimap_data_dump_{ts}.gpkg").write_bytes(b"GPKG")
+ # And a CSV pair for the newer cycle only
+ (self.dump_dir / "optimap_data_dump_20250101.csv").write_text("a,b\n")
+ (self.dump_dir / "optimap_data_dump_20250101.csv.gz").write_bytes(b"\x1f\x8b")
+
+ files = _latest_dump_files(self.dump_dir)
+ names = {p.name for p in files}
+ self.assertEqual(names, {
+ "optimap_data_dump_20250101.geojson",
+ "optimap_data_dump_20250101.geojson.gz",
+ "optimap_data_dump_20250101.gpkg",
+ "optimap_data_dump_20250101.csv",
+ "optimap_data_dump_20250101.csv.gz",
+ })
+
+ def test_build_upload_list_includes_csv_variants(self):
+ (self.data_dir / "README.md").write_text("# x")
+ (self.data_dir / "optimap-main.zip").write_bytes(b"ZIP")
+ for ext in ("geojson", "geojson.gz", "gpkg", "csv", "csv.gz"):
+ (self.data_dir / f"optimap_data_dump_20250101.{ext}").write_bytes(b"x")
+
+ paths = _build_upload_list(self.data_dir, dump_dir=self.dump_dir)
+ names = {p.name for p in paths}
+
+ # README + git archive snapshot
+ self.assertIn("README.md", names)
+ self.assertIn("optimap-main.zip", names)
+ # All five dump formats land in the upload
+ for ext in ("geojson", "geojson.gz", "gpkg", "csv", "csv.gz"):
+ self.assertIn(f"optimap_data_dump_20250101.{ext}", names)
+
+ def test_build_upload_list_falls_back_to_dump_dir_when_data_dir_has_no_dumps(self):
+ """Production layout: render writes to data/, regenerate writes to cache."""
+ (self.data_dir / "README.md").write_text("# x")
+ (self.data_dir / "optimap-main.zip").write_bytes(b"ZIP")
+ # Dumps only in dump_dir
+ for ext in ("geojson", "gpkg", "csv"):
+ (self.dump_dir / f"optimap_data_dump_20250101.{ext}").write_bytes(b"x")
+
+ paths = _build_upload_list(self.data_dir, dump_dir=self.dump_dir)
+ names = {p.name for p in paths}
+ self.assertIn("README.md", names)
+ self.assertIn("optimap_data_dump_20250101.geojson", names)
+ self.assertIn("optimap_data_dump_20250101.gpkg", names)
+ self.assertIn("optimap_data_dump_20250101.csv", names)
+
+
+class DepositZenodoTest(TestCase):
+ def setUp(self):
+ self._tmpdir = tempfile.TemporaryDirectory()
+ self.project_root = Path(self._tmpdir.name)
+ self.templates_dir = self.project_root / "works" / "templates"
+ self.cmds_dir = self.project_root / "works" / "management" / "commands"
+ self.data_dir = self.project_root / "data"
+ self.templates_dir.mkdir(parents=True, exist_ok=True)
+ self.cmds_dir.mkdir(parents=True, exist_ok=True)
+ self.data_dir.mkdir(parents=True, exist_ok=True)
+
+ # Minimal README so description→HTML works
+ (self.data_dir / "README.md").write_text("# Title\n\nSome text.", encoding="utf-8")
+ (self.data_dir / "optimap-main.zip").write_bytes(b"ZIP")
+ # dynamic JSON with new related identifiers and version
+ (self.data_dir / "zenodo_dynamic.json").write_text(json.dumps({
+ "title": "OPTIMAP FAIR Data Package (test)",
+ "version": "v999",
+ "related_identifiers": [
+ {"relation": "describes", "identifier": "https://optimap.science", "scheme": "url"}
+ ]
+ }), encoding="utf-8")
+
+ # Fake dump files to upload
+ (self.data_dir / "optimap_data_dump_20250101.geojson").write_text("{}", encoding="utf-8")
+ (self.data_dir / "optimap_data_dump_20250101.gpkg").write_bytes(b"GPKG")
+
+ # Minimal DB so import paths work
+ Work.objects.create(title="A", publicationDate="2010-10-10")
+ Source.objects.create(name="OPTIMAP", url_field="https://optimap.science")
+
+ # Import zenodo module
+ import importlib
+ self.zenodo_mod = importlib.import_module("works.zenodo")
+
+ class FakePath(Path):
+ _flavour = Path(".")._flavour
+ def resolve(self):
+ return self
+ self.FakePath = FakePath
+ self.zenodo_file = str(self.project_root / "works" / "zenodo.py")
+
+ def tearDown(self):
+ self._tmpdir.cleanup()
+
+ def test_deposit_merges_metadata_and_uses_zenodo_client_for_uploads(self):
+ # Fake Zenodo deposition (existing metadata)
+ existing = {
+ "submitted": False,
+ "state": "unsubmitted",
+ "links": {"edit": "http://edit", "bucket": "http://bucket"},
+ "metadata": {
+ "title": "Existing Title",
+ "upload_type": "dataset",
+ "publication_date": "2025-07-14",
+ "creators": [{"name": "OPTIMAP"}],
+ "keywords": ["Open Science"],
+ "related_identifiers": [
+ {"relation": "isSupplementTo", "identifier": "https://old.example", "scheme": "url"}
+ ],
+ "language": "eng",
+ "description": "Old
",
+ "version": "v1",
+ },
+ }
+
+ put_payload = {}
+
+ def _fake_get(url, params=None, **kwargs):
+ class R:
+ status_code = 200
+ text = "ok"
+ def json(self):
+ # whatever object your test expects (e.g., deepcopy(existing))
+ return deepcopy(existing)
+ def raise_for_status(self):
+ return None
+ return R()
+
+ def _fake_post(url, params=None, json=None, **kwargs):
+ class R:
+ status_code = 200
+ text = "ok"
+ def json(self):
+ # return what your code reads from POST responses, if anything
+ return {"links": {"bucket": "https://example-bucket"}}
+ def raise_for_status(self):
+ return None
+ return R()
+
+ def _fake_put(url, params=None, data=None, headers=None, **kwargs):
+ class R:
+ status_code = 200
+ text = "ok"
+ def raise_for_status(self):
+ return None
+ return R()
+
+ uploaded = {}
+
+ # zenodo-client upload shim: capture files that would be uploaded
+ def _fake_update_zenodo(deposition_id, paths, sandbox=True, access_token=None, publish=False):
+ self.assertEqual(deposition_id, "123456")
+ self.assertTrue(sandbox)
+ self.assertEqual(access_token, "tok")
+ names = {Path(p).name for p in paths}
+ self.assertIn("README.md", names)
+ self.assertIn("optimap-main.zip", names)
+ self.assertTrue(any(n.endswith(".geojson") for n in names))
+ self.assertTrue(any(n.endswith(".gpkg") for n in names))
+ uploaded["paths"] = [str(p) for p in paths]
+ class R:
+ def json(self): return {"links": {"html": f"https://sandbox.zenodo.org/deposit/{deposition_id}"}}
+ return R()
+
+ # Mock Zenodo client
+ mock_zenodo = type('MockZenodo', (), {
+ 'access_token': None,
+ 'update': lambda *args, **kwargs: _fake_update_zenodo(**kwargs)
+ })()
+
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch.object(self.zenodo_mod.requests, "get", _fake_get), \
+ patch.object(self.zenodo_mod.requests, "put", _fake_put), \
+ patch.object(self.zenodo_mod.requests, "delete", lambda *a, **k: type('R', (), {'status_code': 204})()), \
+ patch.object(self.zenodo_mod, "Zenodo", return_value=mock_zenodo), \
+ patch.object(self.zenodo_mod, "_markdown_to_html", lambda s: "HTML
"), \
+ override_settings(ZENODO_UPLOADS_ENABLED=True, ZENODO_API_TOKEN="tok", ZENODO_SANDBOX_DEPOSITION_ID="123456"):
+
+ call_command(
+ "deposit_zenodo",
+ "--deposition-id", "123456",
+ )
+
+ # Merged metadata: required fields preserved, description/version updated, related merged
+ merged = put_payload["metadata"]
+ self.assertEqual(merged["title"], "Existing Title")
+ self.assertEqual(merged["upload_type"], "dataset")
+ self.assertEqual(merged["publication_date"], "2025-07-14")
+ self.assertEqual(merged["creators"], [{"name": "OPTIMAP"}])
+
+ self.assertIn("description", merged)
+ self.assertTrue(merged["description"].startswith("HTML
+
+ self.assertIsInstance(merged.get("version"), str)
+ rel = {(d["identifier"], d["relation"]) for d in merged.get("related_identifiers", [])}
+ self.assertIn(("https://old.example", "isSupplementTo"), rel)
+ self.assertIn(("https://optimap.science", "describes"), rel)
+
+ # Uploader called with expected files
+ self.assertIn("paths", uploaded)
+ self.assertGreater(len(uploaded["paths"]), 0)
+
+ def test_doi_fields_are_protected_from_overwrite(self):
+ """Test that DOI and prereserve_doi fields are never overwritten."""
+ # Existing deposition with reserved DOI
+ existing_with_doi = {
+ "submitted": False,
+ "state": "unsubmitted",
+ "links": {"edit": "http://edit", "bucket": "http://bucket"},
+ "metadata": {
+ "title": "Test Title",
+ "upload_type": "dataset",
+ "publication_date": "2025-01-01",
+ "creators": [{"name": "Test Author"}],
+ "doi": "10.5072/zenodo.123456",
+ "prereserve_doi": {"doi": "10.5072/zenodo.123456", "recid": 123456},
+ "version": "v1",
+ "description": "
Old description
",
+ },
+ }
+
+ captured_metadata = {}
+
+ def _fake_get(url, params=None, **kwargs):
+ class R:
+ status_code = 200
+ text = "ok"
+ def json(self):
+ return deepcopy(existing_with_doi)
+ def raise_for_status(self):
+ return None
+ return R()
+
+ def _fake_put(url, params=None, data=None, headers=None, **kwargs):
+ # Capture the metadata that would be sent to Zenodo
+ if data:
+ captured_metadata.update(json.loads(data))
+ class R:
+ status_code = 200
+ text = "ok"
+ def raise_for_status(self):
+ return None
+ return R()
+
+ def _fake_update_zenodo(deposition_id, paths, sandbox=True, access_token=None, publish=False):
+ class R:
+ def json(self):
+ return {"links": {"html": "https://sandbox.zenodo.org/deposit/123456"}}
+ return R()
+
+ # Create dynamic JSON that tries to include a DOI (should be ignored)
+ (self.data_dir / "zenodo_dynamic.json").write_text(json.dumps({
+ "title": "NEW TITLE (should be ignored)",
+ "version": "v999",
+ "doi": "10.9999/fake.doi", # This should be removed before merging
+ "prereserve_doi": {"doi": "10.9999/fake.doi", "recid": 999}, # This too
+ "description": "New description",
+ }), encoding="utf-8")
+
+ # Mock Zenodo client
+ mock_zenodo2 = type('MockZenodo', (), {
+ 'access_token': None,
+ 'update': lambda *args, **kwargs: _fake_update_zenodo(**kwargs)
+ })()
+
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch.object(self.zenodo_mod.requests, "get", _fake_get), \
+ patch.object(self.zenodo_mod.requests, "put", _fake_put), \
+ patch.object(self.zenodo_mod.requests, "delete", lambda *a, **k: type('R', (), {'status_code': 204})()), \
+ patch.object(self.zenodo_mod, "Zenodo", return_value=mock_zenodo2), \
+ patch.object(self.zenodo_mod, "_markdown_to_html", lambda s: "Updated
"), \
+ override_settings(
+ ZENODO_UPLOADS_ENABLED=True,
+ ZENODO_API_TOKEN="test_token",
+ ZENODO_API_BASE="https://sandbox.zenodo.org/api"
+ ):
+
+ call_command(
+ "deposit_zenodo",
+ "--deposition-id", "123456",
+ "--token", "test_token",
+ )
+
+ # Verify captured metadata
+ merged = captured_metadata.get("metadata", {})
+
+ # DOI should be preserved from existing (not overwritten)
+ self.assertEqual(merged.get("doi"), "10.5072/zenodo.123456",
+ "DOI should be preserved from existing deposition")
+ self.assertNotEqual(merged.get("doi"), "10.9999/fake.doi",
+ "DOI should NOT be overwritten by incoming data")
+
+ # prereserve_doi should also be preserved
+ self.assertEqual(merged.get("prereserve_doi", {}).get("doi"), "10.5072/zenodo.123456",
+ "prereserve_doi should be preserved")
+
+ # Non-DOI fields should be updated from incoming data (no longer protected)
+ self.assertEqual(merged["title"], "NEW TITLE (should be ignored)",
+ "Title should be updated from incoming data")
+ self.assertEqual(merged["upload_type"], "dataset",
+ "upload_type should be present")
+
+ # Version and description should be updated
+ self.assertEqual(merged["version"], "v999",
+ "Version should be updated (in default patch list)")
+ self.assertIn("Updated
", merged.get("description", ""),
+ "Description should be updated (in default patch list)")
+
+ def test_grants_metadata_falls_back_to_notes_when_zenodo_rejects(self):
+ """If Zenodo's curated grants vocabulary doesn't include a BMBF /
+ BMFTR grant ID, the metadata PUT returns 400 — the deposit must
+ retry once without `grants` and append a free-text funding
+ statement to `metadata.notes` so the info isn't lost (issue #63
+ Q2 decision)."""
+ existing = {
+ "submitted": False,
+ "state": "unsubmitted",
+ "links": {"edit": "http://edit", "bucket": "http://bucket"},
+ "metadata": {
+ "title": "T", "upload_type": "dataset",
+ "publication_date": "2025-01-01",
+ "creators": [{"name": "OPTIMAP"}],
+ "version": "v1", "description": "x
",
+ },
+ }
+
+ (self.data_dir / "zenodo_dynamic.json").write_text(json.dumps({
+ "title": "T", "version": "v2",
+ "grants": [
+ {"id": "10.13039/501100002347::16TOA028B"},
+ {"id": "10.13039/501100002347::16KOA009A"},
+ ],
+ }), encoding="utf-8")
+
+ puts: list[dict] = []
+
+ def _fake_get(url, params=None, **kwargs):
+ class R:
+ status_code = 200; text = "ok"
+ def json(self_): return deepcopy(existing)
+ def raise_for_status(self_): return None
+ return R()
+
+ def _fake_put(url, params=None, data=None, headers=None, **kwargs):
+ payload = json.loads(data) if data else {}
+ puts.append(payload)
+ class R:
+ # First PUT: 400 because the grants list isn't curated.
+ # Second PUT: 200 because the fallback removed `grants`.
+ status_code = 400 if len(puts) == 1 else 200
+ text = (
+ '{"errors":[{"field":"metadata.grants","message":"not found"}]}'
+ if len(puts) == 1 else "ok"
+ )
+ def raise_for_status(self_):
+ if self_.status_code >= 400:
+ import requests
+ raise requests.HTTPError(f"{self_.status_code} {self_.text}")
+ return R()
+
+ def _fake_update_zenodo(deposition_id, paths, sandbox=True, access_token=None, publish=False):
+ class R:
+ def json(self_):
+ return {"links": {"html": f"https://sandbox.zenodo.org/deposit/{deposition_id}"}}
+ return R()
+
+ mock_zenodo = type('MockZenodo', (), {
+ 'access_token': None,
+ 'update': lambda *a, **kw: _fake_update_zenodo(**kw),
+ })()
+
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch.object(self.zenodo_mod.requests, "get", _fake_get), \
+ patch.object(self.zenodo_mod.requests, "put", _fake_put), \
+ patch.object(self.zenodo_mod.requests, "delete",
+ lambda *a, **k: type('R', (), {'status_code': 204})()), \
+ patch.object(self.zenodo_mod, "Zenodo", return_value=mock_zenodo), \
+ patch.object(self.zenodo_mod, "_markdown_to_html", lambda s: "x
"), \
+ override_settings(
+ ZENODO_UPLOADS_ENABLED=True,
+ ZENODO_API_TOKEN="tok",
+ ZENODO_API_BASE="https://sandbox.zenodo.org/api",
+ ):
+ call_command("deposit_zenodo", "--deposition-id", "123456", "--token", "tok")
+
+ # Two PUTs: one with grants (rejected), one without (succeeded)
+ self.assertEqual(len(puts), 2)
+ first, second = puts[0]["metadata"], puts[1]["metadata"]
+
+ # First attempt sent both grant IDs
+ self.assertEqual(
+ [g["id"] for g in first.get("grants", [])],
+ ["10.13039/501100002347::16TOA028B", "10.13039/501100002347::16KOA009A"],
+ )
+ # Fallback PUT carries no `grants`, but funding info lives in `notes`
+ self.assertNotIn("grants", second)
+ self.assertIn("OPTIMETA", second.get("notes", ""))
+ self.assertIn("KOMET", second.get("notes", ""))
+ self.assertIn("16TOA028B", second.get("notes", ""))
+ self.assertIn("16KOA009A", second.get("notes", ""))
+
+
+class DepositionIdResolutionTest(TestCase):
+ """Resolution + bootstrap + new-version flow (issue #63 item 2)."""
+
+ def setUp(self):
+ self._tmpdir = tempfile.TemporaryDirectory()
+ self.project_root = Path(self._tmpdir.name)
+ self.templates_dir = self.project_root / "works" / "templates"
+ self.data_dir = self.project_root / "data"
+ self.templates_dir.mkdir(parents=True, exist_ok=True)
+ self.data_dir.mkdir(parents=True, exist_ok=True)
+
+ (self.data_dir / "README.md").write_text("# Title\n\nSome text.", encoding="utf-8")
+ (self.data_dir / "optimap-main.zip").write_bytes(b"ZIP")
+ (self.data_dir / "zenodo_dynamic.json").write_text(json.dumps({
+ "title": "OPTIMAP FAIR Data Package",
+ "version": "v1",
+ "related_identifiers": [],
+ }), encoding="utf-8")
+ (self.data_dir / "optimap_data_dump_20250101.geojson").write_text("{}", encoding="utf-8")
+
+ Work.objects.create(title="A", publicationDate="2010-10-10")
+
+ import importlib
+ self.zenodo_mod = importlib.import_module("works.zenodo")
+
+ class FakePath(Path):
+ _flavour = Path(".")._flavour
+ def resolve(self):
+ return self
+ self.FakePath = FakePath
+ self.zenodo_file = str(self.project_root / "works" / "zenodo.py")
+
+ def tearDown(self):
+ self._tmpdir.cleanup()
+
+ def _draft_metadata(self):
+ return {
+ "submitted": False,
+ "state": "unsubmitted",
+ "links": {"edit": "http://edit"},
+ "metadata": {
+ "title": "OPTIMAP",
+ "upload_type": "dataset",
+ "publication_date": "2025-01-01",
+ "creators": [{"name": "OPTIMAP"}],
+ "version": "v0",
+ "description": "x
",
+ },
+ }
+
+ def _patches(self, *, fake_get, fake_post, fake_put, mock_zenodo):
+ return [
+ patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file),
+ patch.object(self.zenodo_mod, "Path", self.FakePath),
+ patch.object(self.zenodo_mod.requests, "get", fake_get),
+ patch.object(self.zenodo_mod.requests, "post", fake_post),
+ patch.object(self.zenodo_mod.requests, "put", fake_put),
+ patch.object(
+ self.zenodo_mod.requests, "delete",
+ lambda *a, **k: type("R", (), {"status_code": 204})(),
+ ),
+ patch.object(self.zenodo_mod, "Zenodo", return_value=mock_zenodo),
+ patch.object(self.zenodo_mod, "_markdown_to_html", lambda s: "x
"),
+ ]
+
+ def test_bootstrap_creates_new_draft_when_no_id_and_no_prior_log(self):
+ """Issue #63 item 2: ``write code to create a new deposition``.
+ With no env/setting ID and no successful log row, the deposit must
+ POST /deposit/depositions to bootstrap a fresh draft, then use the
+ returned id for the rest of the cycle."""
+ from works.zenodo import deposit_to_zenodo
+
+ posted_urls: list[str] = []
+
+ def _fake_post(url, params=None, headers=None, data=None, **kwargs):
+ posted_urls.append(url)
+ class R:
+ status_code = 201
+ text = "ok"
+ def json(self_): return {"id": 987654, "links": {"self": "http://x/987654"}}
+ def raise_for_status(self_): return None
+ return R()
+
+ outer_self = self
+ def _fake_get(url, params=None, **kwargs):
+ class R:
+ status_code = 200
+ text = "ok"
+ def json(self_): return deepcopy(outer_self._draft_metadata())
+ def raise_for_status(self_): return None
+ return R()
+
+ def _fake_put(url, params=None, data=None, headers=None, **kwargs):
+ class R:
+ status_code = 200
+ text = "ok"
+ def raise_for_status(self_): return None
+ return R()
+
+ captured = {}
+ def _fake_update(deposition_id, paths, sandbox=True, access_token=None, publish=False):
+ captured["deposition_id"] = deposition_id
+ class R:
+ def json(self_): return {"links": {"html": f"https://sandbox.zenodo.org/deposit/{deposition_id}"}}
+ return R()
+
+ mock_zenodo = type("MockZenodo", (), {
+ "access_token": None,
+ "update": lambda *a, **kw: _fake_update(**kw),
+ })()
+
+ ctx = self._patches(
+ fake_get=_fake_get, fake_post=_fake_post, fake_put=_fake_put,
+ mock_zenodo=mock_zenodo,
+ )
+ from contextlib import ExitStack
+ with ExitStack() as stack, override_settings(
+ ZENODO_API_TOKEN="tok",
+ ZENODO_API_BASE="https://sandbox.zenodo.org/api",
+ ):
+ for p in ctx:
+ stack.enter_context(p)
+ log_entry = deposit_to_zenodo()
+
+ # POST to /deposit/depositions was made
+ self.assertTrue(any(u.endswith("/deposit/depositions") for u in posted_urls),
+ f"Expected bootstrap POST, got: {posted_urls}")
+ # The log row uses the bootstrapped ID
+ self.assertEqual(log_entry.deposition_id, "987654")
+ self.assertEqual(log_entry.status, "success")
+ self.assertEqual(captured.get("deposition_id"), "987654")
+
+ def test_resolves_from_latest_log_when_no_id_supplied(self):
+ """When no explicit ID is set but a prior successful log exists for
+ the same api_base, reuse that ID (no bootstrap POST)."""
+ from works.zenodo import deposit_to_zenodo
+
+ api_base = "https://sandbox.zenodo.org/api"
+ ZenodoDepositionLog.objects.create(
+ deposition_id="555555", api_base=api_base, status="success", version="v3",
+ )
+
+ outer = self
+ def _fake_post(url, **kw):
+ raise AssertionError(f"Bootstrap POST should not happen; got {url}")
+
+ def _fake_get(url, params=None, **kwargs):
+ class R:
+ status_code = 200
+ text = "ok"
+ def json(self_): return deepcopy(outer._draft_metadata())
+ def raise_for_status(self_): return None
+ return R()
+
+ def _fake_put(url, params=None, data=None, headers=None, **kwargs):
+ class R:
+ status_code = 200
+ text = "ok"
+ def raise_for_status(self_): return None
+ return R()
+
+ captured = {}
+ def _fake_update(deposition_id, paths, sandbox=True, access_token=None, publish=False):
+ captured["deposition_id"] = deposition_id
+ class R:
+ def json(self_): return {"links": {"html": "https://sandbox.zenodo.org/deposit/555555"}}
+ return R()
+
+ mock_zenodo = type("MockZenodo", (), {
+ "access_token": None,
+ "update": lambda *a, **kw: _fake_update(**kw),
+ })()
+
+ from contextlib import ExitStack
+ with ExitStack() as stack, override_settings(
+ ZENODO_API_TOKEN="tok", ZENODO_API_BASE=api_base,
+ ):
+ for p in self._patches(
+ fake_get=_fake_get, fake_post=_fake_post,
+ fake_put=_fake_put, mock_zenodo=mock_zenodo,
+ ):
+ stack.enter_context(p)
+ log_entry = deposit_to_zenodo()
+
+ self.assertEqual(log_entry.deposition_id, "555555")
+ self.assertEqual(captured.get("deposition_id"), "555555")
+
+ def test_new_version_when_target_is_already_published(self):
+ """Once the previously deposited record has been manually published,
+ the next run must POST .../actions/newversion and target the new
+ draft instead — otherwise the PUT/upload would 400."""
+ from works.zenodo import deposit_to_zenodo
+
+ published = {
+ "submitted": True,
+ "state": "done",
+ "links": {
+ "edit": "http://edit",
+ "self": "https://sandbox.zenodo.org/api/deposit/depositions/111",
+ },
+ "metadata": {
+ "title": "OPTIMAP",
+ "upload_type": "dataset",
+ "publication_date": "2025-01-01",
+ "creators": [{"name": "OPTIMAP"}],
+ "version": "v1",
+ "description": "x
",
+ "doi": "10.5281/zenodo.111",
+ },
+ }
+ new_draft = {
+ "submitted": False,
+ "state": "unsubmitted",
+ "links": {"edit": "http://edit"},
+ "metadata": {
+ "title": "OPTIMAP",
+ "upload_type": "dataset",
+ "publication_date": "2025-01-01",
+ "creators": [{"name": "OPTIMAP"}],
+ "version": "v1",
+ "description": "x
",
+ },
+ }
+
+ gets: list[str] = []
+
+ def _fake_get(url, params=None, **kwargs):
+ gets.append(url)
+ payload = published if "/depositions/111" in url else new_draft
+ class R:
+ status_code = 200
+ text = "ok"
+ def json(self_): return deepcopy(payload)
+ def raise_for_status(self_): return None
+ return R()
+
+ posted: list[str] = []
+
+ def _fake_post(url, params=None, headers=None, data=None, **kwargs):
+ posted.append(url)
+ class R:
+ status_code = 201
+ text = "ok"
+ def json(self_):
+ # newversion response carries latest_draft pointing at the new ID
+ return {"links": {
+ "latest_draft": "https://sandbox.zenodo.org/api/deposit/depositions/222"
+ }}
+ def raise_for_status(self_): return None
+ return R()
+
+ def _fake_put(url, params=None, data=None, headers=None, **kwargs):
+ class R:
+ status_code = 200
+ text = "ok"
+ def raise_for_status(self_): return None
+ return R()
+
+ captured = {}
+ def _fake_update(deposition_id, paths, sandbox=True, access_token=None, publish=False):
+ captured["deposition_id"] = deposition_id
+ class R:
+ def json(self_): return {"links": {"html": f"https://sandbox.zenodo.org/deposit/{deposition_id}"}}
+ return R()
+
+ mock_zenodo = type("MockZenodo", (), {
+ "access_token": None,
+ "update": lambda *a, **kw: _fake_update(**kw),
+ })()
+
+ from contextlib import ExitStack
+ with ExitStack() as stack, override_settings(
+ ZENODO_API_TOKEN="tok",
+ ZENODO_API_BASE="https://sandbox.zenodo.org/api",
+ ):
+ for p in self._patches(
+ fake_get=_fake_get, fake_post=_fake_post,
+ fake_put=_fake_put, mock_zenodo=mock_zenodo,
+ ):
+ stack.enter_context(p)
+ log_entry = deposit_to_zenodo(deposition_id="111")
+
+ # The newversion POST landed on the published deposit
+ self.assertTrue(
+ any(u.endswith("/depositions/111/actions/newversion") for u in posted),
+ f"Expected newversion POST; got: {posted}",
+ )
+ # The log row tracks the new draft ID, not the old published one
+ self.assertEqual(log_entry.deposition_id, "222")
+ self.assertEqual(captured.get("deposition_id"), "222")
+ # And the upload+PUT targeted the new draft (verified via update call)
+
+
+class ResolveHelpersTest(SimpleTestCase):
+ """Sanity-check the URL/ID helpers in isolation."""
+
+ def test_extract_id_from_url(self):
+ from works.zenodo import _extract_id_from_url
+ self.assertEqual(_extract_id_from_url(
+ "https://sandbox.zenodo.org/api/deposit/depositions/12345"), "12345")
+ self.assertEqual(_extract_id_from_url(
+ "https://sandbox.zenodo.org/api/deposit/depositions/12345/"), "12345")
+ self.assertIsNone(_extract_id_from_url(None))
+ self.assertIsNone(_extract_id_from_url(""))
+
+ def test_is_published_only_when_both_flags_match(self):
+ from works.zenodo import _is_published
+ self.assertTrue(_is_published({"submitted": True, "state": "done"}))
+ self.assertFalse(_is_published({"submitted": False, "state": "done"}))
+ self.assertFalse(_is_published({"submitted": True, "state": "inprogress"}))
+ self.assertFalse(_is_published({"submitted": True, "state": "unsubmitted"}))
+ self.assertFalse(_is_published({}))
diff --git a/tests/test_render_zenodo.py b/tests/test_render_zenodo.py
new file mode 100644
index 00000000..5a8b0830
--- /dev/null
+++ b/tests/test_render_zenodo.py
@@ -0,0 +1,339 @@
+# tests/test_render_zenodo.py
+import json
+import tempfile
+from pathlib import Path
+from unittest.mock import patch
+
+from django.core.management import call_command
+from django.test import TestCase, override_settings
+from works.models import Work, Source, ZenodoDepositionLog
+
+
+class RenderZenodoTest(TestCase):
+ def setUp(self):
+ # Temp “project root”
+ self._tmpdir = tempfile.TemporaryDirectory()
+ self.project_root = Path(self._tmpdir.name)
+ self.templates_dir = self.project_root / "works" / "templates"
+ self.cmds_dir = self.project_root / "works" / "management" / "commands"
+ self.data_dir = self.project_root / "data"
+ self.templates_dir.mkdir(parents=True, exist_ok=True)
+ self.cmds_dir.mkdir(parents=True, exist_ok=True)
+ self.data_dir.mkdir(parents=True, exist_ok=True)
+
+ # Copy the real README.md.j2 from the source tree so the codebook /
+ # cross-format prose are the same in tests as in production. This
+ # keeps assertions on README content honest.
+ real_template = (
+ Path(__file__).resolve().parents[1] / "works" / "templates" / "README.md.j2"
+ )
+ (self.templates_dir / "README.md.j2").write_text(
+ real_template.read_text(encoding="utf-8"), encoding="utf-8",
+ )
+
+ # DB fixtures
+ Work.objects.create(title="A", publicationDate="2010-10-10")
+
+ # Bad labels to clean
+ Source.objects.create(name="2000", url_field="https://optimap.science") # numeric-only -> OPTIMAP
+ Source.objects.create(name="", url_field="https://example.org") # blank -> domain label
+ Source.objects.create(name=" ", url_field="https://example.org") # duplicate -> dedupe
+
+ # Good label
+ Source.objects.create(
+ name="AGILE: GIScience Series",
+ url_field="https://agile-giss.copernicus.org"
+ )
+
+ # Import zenodo module after DB is ready
+ import importlib
+ self.zenodo_mod = importlib.import_module("works.zenodo")
+
+ # Fake Path so resolve() stays inside tmp root
+ class FakePath(Path):
+ _flavour = Path(".")._flavour
+ def resolve(self):
+ return self
+ self.FakePath = FakePath
+ self.zenodo_file = str(self.project_root / "works" / "zenodo.py")
+
+ def tearDown(self):
+ self._tmpdir.cleanup()
+
+ def _fake_git_archive(self, *args, **kwargs):
+ """Stand-in for subprocess.run([git archive…]) that writes a small
+ non-empty zip at the path given via the `-o` argument, so the render
+ step's hard failure-on-empty check stays satisfied."""
+ argv = args[0] if args else kwargs.get("args", [])
+ if "-o" in argv:
+ out_path = Path(argv[argv.index("-o") + 1])
+ out_path.write_bytes(b"PK\x03\x04stub")
+ class _R:
+ returncode = 0
+ stdout = ""
+ stderr = ""
+ return _R()
+
+ def test_render_produces_clean_readme_and_assets(self):
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch("subprocess.run", self._fake_git_archive):
+ call_command("render_zenodo")
+
+ readme_path = self.data_dir / "README.md"
+ zip_path = self.data_dir / "optimap-main.zip"
+ dyn_path = self.data_dir / "zenodo_dynamic.json"
+
+ self.assertTrue(readme_path.exists(), "README.md not generated")
+ self.assertTrue(zip_path.exists(), "optimap-main.zip not generated")
+ self.assertTrue(dyn_path.exists(), "zenodo_dynamic.json not generated")
+
+ md = readme_path.read_text(encoding="utf-8")
+ # Sources cleanup assertions
+ self.assertNotIn("- [2000](", md, "Numeric-only label leaked into Sources")
+ self.assertIn("- [OPTIMAP](https://optimap.science)", md, "OPTIMAP override missing")
+ self.assertIn("AGILE: GIScience Series", md, "Named source missing")
+ # example.org should appear only once after dedupe
+ self.assertEqual(md.count("example.org"), 1, "Duplicate source/domain not deduped")
+
+ @override_settings(BASE_URL="https://optimap.science")
+ def test_render_includes_live_download_urls_as_related_identifiers(self):
+ """Each render must overwrite related_identifiers with the live
+ download URLs derived from settings.BASE_URL — never trust a stale
+ zenodo_dynamic.json (issue #63, item 5)."""
+ # Seed a stale dyn file with a localhost identifier; render must drop it.
+ (self.data_dir / "zenodo_dynamic.json").write_text(json.dumps({
+ "related_identifiers": [
+ {"scheme": "url", "identifier": "http://127.0.0.1:8000/stale",
+ "relation": "isSupplementTo", "resource_type": "dataset"}
+ ]
+ }), encoding="utf-8")
+
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch("subprocess.run", self._fake_git_archive):
+ call_command("render_zenodo")
+
+ dyn = json.loads((self.data_dir / "zenodo_dynamic.json").read_text(encoding="utf-8"))
+ live_urls = {
+ r["identifier"]
+ for r in dyn["related_identifiers"]
+ if r["relation"] == "isSupplementTo"
+ }
+ self.assertEqual(live_urls, {
+ "https://optimap.science/download/geojson/",
+ "https://optimap.science/download/geopackage/",
+ "https://optimap.science/download/csv/",
+ })
+ for r in dyn["related_identifiers"]:
+ if r["relation"] == "isSupplementTo":
+ self.assertEqual(r["resource_type"], "dataset")
+ self.assertEqual(r["scheme"], "url")
+
+ @override_settings(BASE_URL="https://optimap.science")
+ def test_render_includes_describes_entry_per_source(self):
+ """Each Source becomes one related_identifiers entry with
+ relation=describes. ISSN-L wins over URL; sources sharing a
+ canonical identifier are deduped; optimap.science is skipped
+ (issue #63, item 6 / comment 2025-07-14)."""
+ # Source with an ISSN-L → scheme=issn
+ Source.objects.create(
+ name="Earth System Science Data",
+ url_field="https://essd.copernicus.org/oai",
+ homepage_url="https://www.earth-system-science-data.net/",
+ issn_l="1866-3508",
+ )
+ # Source without ISSN-L but with homepage → scheme=url, identifier=homepage
+ Source.objects.create(
+ name="Some Repository",
+ url_field="https://example.org/oai",
+ homepage_url="https://example.com/journal",
+ )
+
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch("subprocess.run", self._fake_git_archive):
+ call_command("render_zenodo")
+
+ dyn = json.loads((self.data_dir / "zenodo_dynamic.json").read_text(encoding="utf-8"))
+ describes = [
+ r for r in dyn["related_identifiers"] if r["relation"] == "describes"
+ ]
+ for r in describes:
+ self.assertEqual(r["resource_type"], "publication")
+
+ idents = {(r["scheme"], r["identifier"]) for r in describes}
+
+ # ISSN-L wins over homepage URL
+ self.assertIn(("issn", "1866-3508"), idents)
+ # Homepage URL is the fallback (canonicalised to https + lowercased host)
+ self.assertIn(("url", "https://example.com/journal"), idents)
+ # optimap.science (seeded in setUp via numeric-name source) must not
+ # appear — the portal isn't a source it describes.
+ for scheme, ident in idents:
+ self.assertNotIn("optimap.science", ident)
+ # Two sources point at example.org and example.com but the dedupe key
+ # is the resolved identifier, so they coexist; the duplicate
+ # example.org seed in setUp has no homepage_url so falls back to its
+ # url_field once after dedupe.
+ self.assertEqual(
+ sum(1 for s, i in idents if "example.org" in i), 1,
+ "Duplicate example.org Sources should collapse to one describes entry",
+ )
+
+ def test_render_raises_when_git_archive_fails(self):
+ """A failed `git archive` must propagate so the deposit doesn't ship
+ an empty optimap-main.zip (issue #63, last checklist item)."""
+ import subprocess
+
+ def _failing(*a, **k):
+ raise subprocess.CalledProcessError(
+ returncode=128, cmd=a[0] if a else [], stderr="fatal: not a git repository"
+ )
+
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch("subprocess.run", _failing):
+ with self.assertRaisesRegex(Exception, r"git archive HEAD.*failed"):
+ call_command("render_zenodo")
+
+ def test_render_default_keywords_match_issue_decisions(self):
+ """Keywords default to the list agreed in nuest's 2025-07-14 comment.
+ Both `Open Research Information` and its short form `ORI` ship so
+ the record is findable under either label."""
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch("subprocess.run", self._fake_git_archive):
+ call_command("render_zenodo")
+
+ dyn = json.loads((self.data_dir / "zenodo_dynamic.json").read_text(encoding="utf-8"))
+ self.assertEqual(dyn["keywords"], [
+ "Open Access", "Open Science", "Open Research Information",
+ "ORI", "Open Data", "FAIR",
+ ])
+
+ def test_render_version_starts_at_v1_with_no_prior_deposits(self):
+ """Fresh DB, no ZenodoDepositionLog rows → render emits v1.
+ The data/last_version.txt file was removed in favour of DB state."""
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch("subprocess.run", self._fake_git_archive):
+ call_command("render_zenodo")
+
+ dyn = json.loads((self.data_dir / "zenodo_dynamic.json").read_text(encoding="utf-8"))
+ self.assertEqual(dyn["version"], "v1")
+ # And the legacy tracking file must not be created either.
+ self.assertFalse((self.data_dir / "last_version.txt").exists())
+
+ def test_render_version_increments_from_latest_successful_log(self):
+ """Render reads the latest successful ZenodoDepositionLog for the
+ target api_base and emits the next vN. Sandbox and production
+ increment independently; failed depositions don't burn a version."""
+ api_base = "https://sandbox.zenodo.org/api"
+ # Successful logs at v1 and v2 for this api_base; the latest wins.
+ ZenodoDepositionLog.objects.create(
+ deposition_id="42", api_base=api_base, status="success", version="v1",
+ )
+ ZenodoDepositionLog.objects.create(
+ deposition_id="42", api_base=api_base, status="success", version="v2",
+ )
+ # A failed deposit at v3 must not advance the counter.
+ ZenodoDepositionLog.objects.create(
+ deposition_id="42", api_base=api_base, status="failed", version="v3",
+ )
+ # A successful deposit at a different api_base must not advance it either.
+ ZenodoDepositionLog.objects.create(
+ deposition_id="99", api_base="https://zenodo.org/api",
+ status="success", version="v50",
+ )
+
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch("subprocess.run", self._fake_git_archive), \
+ override_settings(ZENODO_API_BASE=api_base):
+ call_command("render_zenodo")
+
+ dyn = json.loads((self.data_dir / "zenodo_dynamic.json").read_text(encoding="utf-8"))
+ self.assertEqual(dyn["version"], "v3")
+
+ def test_render_emits_grants_for_optimeta_and_komet(self):
+ """Render emits structured `grants` for OPTIMETA (BMBF 16TOA028B)
+ and KOMET (BMFTR 16KOA009A), per the 2025-08-21 issue comment on
+ #63 (NFDI4Earth intentionally excluded)."""
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch("subprocess.run", self._fake_git_archive):
+ call_command("render_zenodo")
+
+ dyn = json.loads((self.data_dir / "zenodo_dynamic.json").read_text(encoding="utf-8"))
+ grant_ids = [g["id"] for g in dyn.get("grants", [])]
+ self.assertEqual(grant_ids, [
+ "10.13039/501100002347::16TOA028B", # OPTIMETA
+ "10.13039/501100002347::16KOA009A", # KOMET
+ ])
+ # Only `id` keys are exposed to Zenodo — the human-readable
+ # name/funder/grant labels live in the _FUNDING constant.
+ for g in dyn["grants"]:
+ self.assertEqual(list(g.keys()), ["id"])
+
+ def test_render_emits_license_split_additional_description(self):
+ """License split (CC0 for data, GPL-3.0 for code) is documented as a
+ Zenodo `additional_descriptions` entry of type=notes — per the
+ 2025-07-21 issue comment."""
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch("subprocess.run", self._fake_git_archive):
+ call_command("render_zenodo")
+
+ dyn = json.loads((self.data_dir / "zenodo_dynamic.json").read_text(encoding="utf-8"))
+ notes = dyn.get("additional_descriptions") or []
+ self.assertEqual(len(notes), 1)
+ self.assertEqual(notes[0]["type"], "notes")
+ html = notes[0]["description"]
+ # Both licenses called out, with their actual file scopes
+ self.assertIn("CC0-1.0", html)
+ self.assertIn("GPL-3.0", html)
+ self.assertIn("optimap-main.zip", html)
+ self.assertIn("optimap_data_dump_*.csv", html)
+ self.assertIn("optimap_data_dump_*.gpkg", html)
+
+ def test_render_codebook_covers_post_rebase_fields(self):
+ """README codebook mentions the fields added since the initial
+ Zenodo branch (type, authors, keywords, topics, bok_concepts,
+ placename, country_code, openalex_id) and notes cross-format
+ equivalence (WKT in CSV)."""
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch("subprocess.run", self._fake_git_archive):
+ call_command("render_zenodo")
+
+ md = (self.data_dir / "README.md").read_text(encoding="utf-8")
+ # Cross-format note
+ self.assertIn("CSV column", md)
+ self.assertIn("WKT", md)
+ # New fields
+ for field in (
+ "`type`", "`authors`", "`keywords`", "`topics`",
+ "`bok_concepts`", "`placename`", "`country_code`",
+ "`openalex_id`",
+ ):
+ self.assertIn(field, md, f"codebook is missing {field}")
+
+ def test_render_raises_when_git_archive_writes_empty_file(self):
+ """If `git archive` exits 0 but writes a 0-byte file (corrupt repo,
+ SIGPIPE, …) we still fail rather than uploading an empty zip."""
+ def _empty_archive(*args, **kwargs):
+ argv = args[0] if args else kwargs.get("args", [])
+ if "-o" in argv:
+ out_path = Path(argv[argv.index("-o") + 1])
+ out_path.write_bytes(b"")
+ class _R:
+ returncode = 0
+ stderr = "warning: empty tree"
+ return _R()
+
+ with patch.object(self.zenodo_mod, "__file__", new=self.zenodo_file), \
+ patch.object(self.zenodo_mod, "Path", self.FakePath), \
+ patch("subprocess.run", _empty_archive):
+ with self.assertRaisesRegex(Exception, r"produced no archive"):
+ call_command("render_zenodo")
diff --git a/tests/test_zenodo_integration.py b/tests/test_zenodo_integration.py
new file mode 100644
index 00000000..b5138ba5
--- /dev/null
+++ b/tests/test_zenodo_integration.py
@@ -0,0 +1,337 @@
+"""
+Integration tests for Zenodo deposition.
+
+These tests run against the actual Zenodo sandbox API and require:
+1. A tests/.env file with ZENODO_API_TOKEN and ZENODO_SANDBOX_DEPOSITION_ID
+2. Active internet connection
+3. Valid Zenodo sandbox credentials
+
+To run these tests:
+ python manage.py test tests.test_zenodo_integration
+
+To skip these tests (default):
+ python manage.py test tests --exclude-tag=integration
+"""
+import os
+import json
+import tempfile
+from pathlib import Path
+from django.test import TestCase, tag, override_settings
+from django.core.management import call_command
+from works.models import Work, Source
+from django.conf import settings
+
+
+def load_test_env():
+ """Load environment variables from tests/.env file."""
+ env_file = Path(__file__).parent / '.env'
+ if env_file.exists():
+ with open(env_file) as f:
+ for line in f:
+ line = line.strip()
+ if line and not line.startswith('#') and '=' in line:
+ key, value = line.split('=', 1)
+ os.environ.setdefault(key.strip(), value.strip())
+
+
+@tag('integration', 'zenodo')
+class ZenodoIntegrationTest(TestCase):
+ """
+ Integration tests for Zenodo API.
+
+ Requires tests/.env with:
+ - ZENODO_API_TOKEN
+ - ZENODO_SANDBOX_DEPOSITION_ID
+ - ZENODO_API_BASE (optional, defaults to sandbox)
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ load_test_env()
+
+ cls.api_token = os.environ.get('ZENODO_API_TOKEN')
+ cls.deposition_id = os.environ.get('ZENODO_SANDBOX_DEPOSITION_ID')
+ cls.api_base = os.environ.get('ZENODO_API_BASE', 'https://sandbox.zenodo.org/api')
+
+ if not cls.api_token or not cls.deposition_id:
+ raise unittest.SkipTest(
+ "Zenodo integration tests require ZENODO_API_TOKEN and "
+ "ZENODO_SANDBOX_DEPOSITION_ID in tests/.env file. "
+ "See tests/.env.template for setup instructions."
+ )
+
+ def setUp(self):
+ """Set up test data and temporary directories."""
+ self._tmpdir = tempfile.TemporaryDirectory()
+ self.project_root = Path(self._tmpdir.name)
+ self.data_dir = self.project_root / "data"
+ self.data_dir.mkdir(parents=True, exist_ok=True)
+
+ # Create test data files
+ (self.data_dir / "README.md").write_text(
+ "# OPTIMAP Test Data\\n\\nTest dataset for integration testing.",
+ encoding="utf-8"
+ )
+ (self.data_dir / "optimap-main.zip").write_bytes(b"TEST_ZIP_CONTENT")
+
+ # Create dynamic metadata
+ (self.data_dir / "zenodo_dynamic.json").write_text(json.dumps({
+ "title": "OPTIMAP Test Dataset",
+ "version": "v1.0.0-test",
+ "related_identifiers": [
+ {
+ "relation": "describes",
+ "identifier": "https://optimap.science",
+ "scheme": "url"
+ }
+ ]
+ }), encoding="utf-8")
+
+ # Create fake data dump files
+ (self.data_dir / "optimap_data_dump_20250101.geojson").write_text("{}", encoding="utf-8")
+ (self.data_dir / "optimap_data_dump_20250101.gpkg").write_bytes(b"GPKG_TEST")
+
+ # Create minimal database records
+ Work.objects.create(title="Test Work", doi="10.test/integration")
+ Source.objects.create(name="Test Source", url_field="https://test.example.com")
+
+ def tearDown(self):
+ """Clean up temporary directories."""
+ self._tmpdir.cleanup()
+
+ @override_settings(
+ ZENODO_API_TOKEN=None, # Will be set from environment
+ ZENODO_SANDBOX_DEPOSITION_ID=None, # Will be set from environment
+ ZENODO_API_BASE=None # Will be set from environment
+ )
+ def test_render_zenodo_command(self):
+ """Test that render_zenodo command generates all required files."""
+ with override_settings(
+ ZENODO_API_TOKEN=self.api_token,
+ ZENODO_SANDBOX_DEPOSITION_ID=self.deposition_id,
+ ZENODO_API_BASE=self.api_base
+ ):
+ # Run render command
+ call_command(
+ 'render_zenodo',
+ stdout=tempfile.TemporaryFile(mode='w+'),
+ stderr=tempfile.TemporaryFile(mode='w+')
+ )
+
+ # Verify generated files exist
+ data_dir = Path(settings.BASE_DIR) / 'data'
+ self.assertTrue((data_dir / 'README.md').exists(), "README.md should be generated")
+ self.assertTrue((data_dir / 'zenodo_dynamic.json').exists(), "zenodo_dynamic.json should exist")
+
+ @override_settings(
+ ZENODO_API_TOKEN=None,
+ ZENODO_SANDBOX_DEPOSITION_ID=None,
+ ZENODO_API_BASE=None
+ )
+ def test_deposit_zenodo_command_dry_run(self):
+ """Test deposit_zenodo command in dry-run mode (no actual upload)."""
+ with override_settings(
+ ZENODO_API_TOKEN=self.api_token,
+ ZENODO_SANDBOX_DEPOSITION_ID=self.deposition_id,
+ ZENODO_API_BASE=self.api_base
+ ):
+ # Test with --dry-run flag if available
+ # This test verifies the command can be called without errors
+ # Actual upload testing would require cleanup logic
+ try:
+ call_command(
+ 'deposit_zenodo',
+ '--help',
+ stdout=tempfile.TemporaryFile(mode='w+'),
+ stderr=tempfile.TemporaryFile(mode='w+')
+ )
+ except SystemExit:
+ pass # --help exits, which is expected
+
+ def test_env_file_loading(self):
+ """Test that environment variables are loaded from tests/.env."""
+ self.assertIsNotNone(self.api_token, "ZENODO_API_TOKEN should be loaded from .env")
+ self.assertIsNotNone(self.deposition_id, "ZENODO_SANDBOX_DEPOSITION_ID should be loaded")
+ self.assertIn('zenodo.org', self.api_base, "ZENODO_API_BASE should contain zenodo.org")
+
+ def test_zenodo_api_connectivity(self):
+ """Test basic connectivity to Zenodo API."""
+ import requests
+
+ headers = {"Authorization": f"Bearer {self.api_token}"}
+ response = requests.get(f"{self.api_base}/deposit/depositions", headers=headers)
+
+ self.assertEqual(
+ response.status_code, 200,
+ f"Should be able to connect to Zenodo API. Status: {response.status_code}"
+ )
+
+ depositions = response.json()
+ self.assertIsInstance(depositions, list, "Depositions should be a list")
+
+
+@tag('integration', 'zenodo', 'slow')
+class ZenodoFullDepositTest(TestCase):
+ """
+ Full end-to-end deposit tests.
+
+ WARNING: These tests actually upload to Zenodo sandbox.
+ Use with caution and clean up manually if needed.
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ load_test_env()
+
+ cls.api_token = os.environ.get('ZENODO_API_TOKEN')
+ cls.deposition_id = os.environ.get('ZENODO_SANDBOX_DEPOSITION_ID')
+ cls.api_base = os.environ.get('ZENODO_API_BASE', 'https://sandbox.zenodo.org/api')
+
+ if not cls.api_token or not cls.deposition_id:
+ raise unittest.SkipTest(
+ "Full deposit tests require ZENODO_API_TOKEN and "
+ "ZENODO_SANDBOX_DEPOSITION_ID in tests/.env"
+ )
+
+ def setUp(self):
+ """Set up test data."""
+ Work.objects.create(title="Full Test Work", doi="10.test/full")
+ Source.objects.create(name="Full Test Source", url_field="https://test.example.com")
+
+ @tag('slow', 'upload')
+ def test_full_deposit_cycle(self):
+ """
+ Test full deposit cycle: render → deposit → verify.
+
+ This test actually uploads to Zenodo sandbox.
+ Run manually with: python manage.py test tests.test_zenodo_integration.ZenodoFullDepositTest --tag=upload
+ """
+ from works.models import ZenodoDepositionLog
+ import tempfile
+ from pathlib import Path
+
+ # Set up temporary data directory
+ with tempfile.TemporaryDirectory() as tmpdir:
+ data_dir = Path(tmpdir) / "data"
+ data_dir.mkdir(parents=True, exist_ok=True)
+
+ # Create required files
+ (data_dir / "README.md").write_text(
+ "# OPTIMAP Integration Test\\n\\nTest deposit cycle.",
+ encoding="utf-8"
+ )
+ (data_dir / "optimap-main.zip").write_bytes(b"TEST_ZIP_CONTENT_INTEGRATION")
+
+ # Create dynamic metadata
+ import json
+ (data_dir / "zenodo_dynamic.json").write_text(json.dumps({
+ "title": "OPTIMAP Integration Test Dataset",
+ "version": "v1.0.0-integration-test",
+ "description": "Integration test deposit",
+ "keywords": ["test", "integration"],
+ "related_identifiers": [
+ {
+ "relation": "describes",
+ "identifier": "https://optimap.science/test",
+ "scheme": "url"
+ }
+ ]
+ }), encoding="utf-8")
+
+ # Override settings to use temporary directory
+ with override_settings(
+ ZENODO_API_TOKEN=self.api_token,
+ ZENODO_SANDBOX_DEPOSITION_ID=self.deposition_id,
+ ZENODO_API_BASE=self.api_base,
+ PROJECT_ROOT=Path(tmpdir)
+ ):
+ # Get initial log count
+ initial_log_count = ZenodoDepositionLog.objects.count()
+
+ # Run deposit command
+ from io import StringIO
+ out = StringIO()
+ err = StringIO()
+
+ call_command(
+ 'deposit_zenodo',
+ '--deposition-id', self.deposition_id,
+ stdout=out,
+ stderr=err
+ )
+
+ # Verify log was created
+ self.assertEqual(
+ ZenodoDepositionLog.objects.count(),
+ initial_log_count + 1,
+ "A deposition log entry should be created"
+ )
+
+ # Get the most recent log entry
+ log_entry = ZenodoDepositionLog.objects.order_by('-deposition_date').first()
+
+ # Verify log entry details
+ self.assertIsNotNone(log_entry, "Log entry should exist")
+ self.assertEqual(log_entry.deposition_id, self.deposition_id)
+ self.assertEqual(log_entry.status, 'success',
+ f"Deposition should succeed. Error: {log_entry.error_message}")
+ self.assertEqual(log_entry.api_base, self.api_base)
+ self.assertEqual(log_entry.version, "v1.0.0-integration-test")
+ self.assertGreater(log_entry.works_count, 0, "Should track works count")
+ self.assertIsNotNone(log_entry.files_uploaded, "Should track uploaded files")
+ self.assertGreater(len(log_entry.files_uploaded), 0, "Should have uploaded files")
+ self.assertGreater(log_entry.total_size_bytes, 0, "Should track total size")
+ self.assertIsNotNone(log_entry.upload_duration_seconds, "Should track duration")
+ self.assertGreater(log_entry.upload_duration_seconds, 0, "Duration should be positive")
+ self.assertIsNotNone(log_entry.deposition_summary, "Should have summary")
+ self.assertIn("Successfully uploaded", log_entry.deposition_summary)
+
+ # Verify files were tracked
+ file_names = [f['name'] for f in log_entry.files_uploaded]
+ self.assertIn("README.md", file_names, "README.md should be uploaded")
+ self.assertIn("optimap-main.zip", file_names, "ZIP should be uploaded")
+
+ # Verify Zenodo response data (if available)
+ if log_entry.zenodo_url:
+ self.assertIn("zenodo.org", log_entry.zenodo_url, "Should have Zenodo URL")
+
+ # Verify command output
+ output = out.getvalue()
+ self.assertIn("Updated deposition", output, "Should report success")
+ self.assertIn("Deposition log saved", output, "Should confirm log was saved")
+
+ # Test API to verify deposition
+ import requests
+ headers = {"Authorization": f"Bearer {self.api_token}"}
+ response = requests.get(
+ f"{self.api_base}/deposit/depositions/{self.deposition_id}",
+ headers=headers
+ )
+ self.assertEqual(response.status_code, 200, "Should be able to fetch deposition")
+
+ dep_data = response.json()
+ self.assertEqual(
+ str(dep_data.get('id')),
+ self.deposition_id,
+ "Deposition ID should match"
+ )
+
+ # Verify files were actually uploaded to Zenodo
+ files = dep_data.get('files', [])
+ self.assertGreater(len(files), 0, "Deposition should have files")
+
+ zenodo_file_names = [f['filename'] for f in files]
+ self.assertIn("README.md", zenodo_file_names, "README.md should be on Zenodo")
+
+ # Print test success details (using print instead of self.stdout for TestCase)
+ print(
+ f"\n✅ Full deposit cycle test passed. "
+ f"Log ID: {log_entry.id}, "
+ f"Files uploaded: {len(log_entry.files_uploaded)}, "
+ f"Duration: {log_entry.upload_duration_seconds:.2f}s"
+ )
+
+
+import unittest
diff --git a/works/admin.py b/works/admin.py
index d86bebc8..757c6a3c 100644
--- a/works/admin.py
+++ b/works/admin.py
@@ -2,6 +2,8 @@
# SPDX-License-Identifier: GPL-3.0-or-later
import logging
+import os
+
logger = logging.getLogger(__name__)
from django.contrib import admin, messages
@@ -12,7 +14,7 @@
from leaflet.admin import LeafletGeoAdmin
from works.models import Work, Source, HarvestingEvent, BlockedEmail, BlockedDomain, GlobalRegion, Collection
from import_export.admin import ImportExportModelAdmin
-from works.models import Contribution, EmailLog, Subscription, UserProfile, WikidataExportLog
+from works.models import Contribution, EmailLog, Subscription, UserProfile, WikidataExportLog, ZenodoDepositionLog
from works.tasks import schedule_subscription_email_task, send_monthly_email, schedule_monthly_email_task, send_subscription_based_email
from django_q.models import Schedule
from django_q.tasks import async_task
@@ -22,6 +24,54 @@
from django.test import Client
from django.http import HttpResponse
from works.wikidata import export_works_to_wikidata, export_works_to_wikidata_dryrun
+from works.zenodo import render_zenodo_package, deposit_to_zenodo
+
+@admin.action(description="Trigger Zenodo Deposition")
+def trigger_zenodo_deposition(modeladmin, request, queryset):
+ """
+ Admin action to trigger a complete Zenodo deposition (render + upload).
+ Note: This action doesn't filter by queryset - it deposits ALL works.
+ """
+ try:
+ # Step 1: Render package
+ messages.info(request, "Step 1/2: Rendering Zenodo package...")
+ result = render_zenodo_package()
+ messages.success(request, f"✓ Rendered version {result['version']}")
+
+ # Step 2: Deposit to Zenodo
+ messages.info(request, "Step 2/2: Depositing to Zenodo...")
+
+ # Resolve deposition ID from settings — optional. When unset,
+ # deposit_to_zenodo() reuses the latest from the log or bootstraps
+ # a fresh draft via POST /deposit/depositions.
+ deposition_id = os.getenv("ZENODO_SANDBOX_DEPOSITION_ID") or getattr(
+ settings, "ZENODO_SANDBOX_DEPOSITION_ID", None
+ )
+
+ log_entry = deposit_to_zenodo(
+ deposition_id=str(deposition_id) if deposition_id else None
+ )
+
+ if log_entry.status == 'success':
+ messages.success(
+ request,
+ f"✓ Successfully deposited {log_entry.works_count} works to Zenodo (version {log_entry.version})"
+ )
+ if log_entry.zenodo_url:
+ messages.info(
+ request,
+ format_html(
+ 'Review draft deposition at: {} ',
+ log_entry.zenodo_url,
+ log_entry.zenodo_url
+ )
+ )
+ else:
+ messages.error(request, f"✗ Deposition failed: {log_entry.error_message}")
+
+ except Exception as ex:
+ messages.error(request, f"Deposition failed: {ex}")
+ logger.exception("Zenodo deposition failed from admin action")
@admin.action(description="Export selected works to Wikidata/Wikibase")
def export_to_wikidata(modeladmin, request, queryset):
@@ -229,7 +279,8 @@ class WorkAdmin(LeafletGeoAdmin, ImportExportModelAdmin):
readonly_fields = ("created_by", "updated_by", "openalex_link")
actions = [make_public, make_draft, regenerate_all_exports,
"export_permalinks_csv", "email_permalinks_preview",
- export_to_wikidata, export_to_wikidata_dryrun]
+ export_to_wikidata, export_to_wikidata_dryrun,
+ trigger_zenodo_deposition]
@admin.display(boolean=True, description="Has DOI")
def has_permalink(self, obj):
@@ -588,6 +639,160 @@ def error_message_display(self, obj):
)
return "—"
+
+@admin.register(ZenodoDepositionLog)
+class ZenodoDepositionLogAdmin(admin.ModelAdmin):
+ """Admin interface for Zenodo deposition logs."""
+ list_display = (
+ "id",
+ "deposition_date",
+ "status",
+ "deposition_id",
+ "version",
+ "works_count",
+ "total_size_display",
+ "duration_display",
+ "zenodo_link",
+ )
+ list_filter = ("status", "deposition_date", "api_base")
+ search_fields = (
+ "deposition_id",
+ "doi",
+ "version",
+ "deposition_summary",
+ "error_message",
+ )
+ readonly_fields = (
+ "deposition_date",
+ "status",
+ "deposition_id",
+ "doi",
+ "zenodo_link_display",
+ "api_base",
+ "version",
+ "files_uploaded_display",
+ "metadata_merged_display",
+ "works_count",
+ "total_size_bytes",
+ "upload_duration_seconds",
+ "error_message_display",
+ "error_details_display",
+ "deposition_summary",
+ "notes",
+ )
+ fields = (
+ "deposition_date",
+ "status",
+ "deposition_id",
+ "doi",
+ "zenodo_link_display",
+ "api_base",
+ "version",
+ "works_count",
+ "total_size_bytes",
+ "upload_duration_seconds",
+ "files_uploaded_display",
+ "metadata_merged_display",
+ "deposition_summary",
+ "notes",
+ "error_message_display",
+ "error_details_display",
+ )
+ ordering = ("-deposition_date",)
+ date_hierarchy = "deposition_date"
+
+ @admin.display(description="Zenodo")
+ def zenodo_link(self, obj):
+ if obj.zenodo_url:
+ return format_html(
+ ' {} ',
+ obj.zenodo_url,
+ obj.deposition_id
+ )
+ return obj.deposition_id
+
+ @admin.display(description="Zenodo Link")
+ def zenodo_link_display(self, obj):
+ if obj.zenodo_url:
+ return format_html(
+ '{} ',
+ obj.zenodo_url,
+ obj.zenodo_url
+ )
+ elif obj.deposition_id:
+ return format_html(
+ '{}/deposit/{} (view in Zenodo UI)',
+ obj.api_base.replace('/api', ''),
+ obj.deposition_id
+ )
+ return "—"
+
+ @admin.display(description="Size")
+ def total_size_display(self, obj):
+ if obj.total_size_bytes:
+ # Convert bytes to human-readable format
+ for unit in ['B', 'KB', 'MB', 'GB']:
+ if obj.total_size_bytes < 1024.0:
+ return f"{obj.total_size_bytes:.1f} {unit}"
+ obj.total_size_bytes /= 1024.0
+ return f"{obj.total_size_bytes:.1f} TB"
+ return "—"
+
+ @admin.display(description="Duration")
+ def duration_display(self, obj):
+ if obj.upload_duration_seconds:
+ minutes = int(obj.upload_duration_seconds // 60)
+ seconds = int(obj.upload_duration_seconds % 60)
+ if minutes > 0:
+ return f"{minutes}m {seconds}s"
+ return f"{seconds}s"
+ return "—"
+
+ @admin.display(description="Files Uploaded")
+ def files_uploaded_display(self, obj):
+ if obj.files_uploaded:
+ files_html = ""
+ for file_info in obj.files_uploaded:
+ if isinstance(file_info, dict):
+ name = file_info.get('name', '?')
+ size = file_info.get('size', 0)
+ files_html += f"{name} ({size:,} bytes) "
+ else:
+ files_html += f"{file_info} "
+ files_html += " "
+ return format_html(files_html)
+ return "—"
+
+ @admin.display(description="Metadata Merged")
+ def metadata_merged_display(self, obj):
+ if obj.metadata_merged:
+ import json
+ return format_html(
+ '{} ',
+ json.dumps(obj.metadata_merged, indent=2)
+ )
+ return "—"
+
+ @admin.display(description="Error Message")
+ def error_message_display(self, obj):
+ if obj.error_message:
+ return format_html(
+ '{} ',
+ obj.error_message
+ )
+ return "—"
+
+ @admin.display(description="Error Details")
+ def error_details_display(self, obj):
+ if obj.error_details:
+ import json
+ return format_html(
+ '{} ',
+ json.dumps(obj.error_details, indent=2)
+ )
+ return "—"
+
+
@admin.register(Subscription)
class SubscriptionAdmin(admin.ModelAdmin):
list_display = ("user", "region", "subscribed")
diff --git a/works/management/commands/deposit_zenodo.py b/works/management/commands/deposit_zenodo.py
new file mode 100644
index 00000000..b6e1b8e6
--- /dev/null
+++ b/works/management/commands/deposit_zenodo.py
@@ -0,0 +1,67 @@
+"""Management command wrapper for deposit_to_zenodo()."""
+import os
+
+from django.conf import settings
+from django.core.management.base import BaseCommand, CommandError
+
+from works.zenodo import deposit_to_zenodo
+
+
+class Command(BaseCommand):
+ help = "Update an existing Zenodo deposition draft with generated files and selectively patched metadata."
+
+ def add_arguments(self, parser):
+ parser.add_argument("--deposition-id", dest="deposition_id", help="Existing deposition (draft) ID on Zenodo.")
+ parser.add_argument(
+ "--patch",
+ dest="patch",
+ default=(
+ "description,version,keywords,related_identifiers,"
+ "additional_descriptions,grants,title,upload_type,"
+ "publication_date,creators"
+ ),
+ help="Comma-separated list of metadata fields to patch (others are preserved).",
+ )
+ parser.add_argument("--merge-keywords", action="store_true", help="Merge incoming keywords with existing.")
+ parser.add_argument("--merge-related", action="store_true", help="Merge incoming related_identifiers.")
+ parser.add_argument("--no-build", action="store_true", help="(Kept for compatibility; ignored here.)")
+ parser.add_argument("--token", dest="token", help="Zenodo API token (overrides env/settings).")
+
+ def handle(self, *args, **opts):
+ # Resolve deposition ID — optional. When unset, deposit_to_zenodo()
+ # falls back to the latest successful log row for this api_base, and
+ # if there is none, bootstraps a fresh draft via POST /deposit/depositions.
+ deposition_id = opts.get("deposition_id") or os.getenv("ZENODO_SANDBOX_DEPOSITION_ID") or getattr(
+ settings, "ZENODO_SANDBOX_DEPOSITION_ID", None
+ )
+
+ # Resolve API base
+ api_base = os.getenv("ZENODO_API_BASE") or getattr(settings, "ZENODO_API_BASE", "https://sandbox.zenodo.org/api")
+
+ self.stdout.write(f"Depositing OPTIMAP data dump to {api_base} (configured via settings/default)")
+ if deposition_id:
+ self.stdout.write(f"Using deposition ID {deposition_id}")
+ else:
+ self.stdout.write("No deposition ID configured — will reuse the latest from the log or bootstrap a new draft.")
+
+ try:
+ log_entry = deposit_to_zenodo(
+ deposition_id=str(deposition_id) if deposition_id else None,
+ api_base=api_base,
+ token=opts.get("token"),
+ patch_fields=opts.get("patch"),
+ merge_keywords=opts.get("merge_keywords", False),
+ merge_related=opts.get("merge_related", False),
+ stdout_callback=self.stdout.write,
+ )
+
+ if log_entry.status == 'success':
+ self.stdout.write(self.style.SUCCESS("✓ Deposit completed successfully"))
+ if log_entry.zenodo_url:
+ self.stdout.write(f"\nNote: This deposition is in DRAFT state and not yet published.")
+ self.stdout.write(f"Review at: {log_entry.zenodo_url}")
+ else:
+ raise CommandError(f"Deposition failed: {log_entry.error_message}")
+
+ except Exception as ex:
+ raise CommandError(f"Deposition failed: {ex}") from ex
diff --git a/works/management/commands/render_zenodo.py b/works/management/commands/render_zenodo.py
new file mode 100644
index 00000000..1cf2fb67
--- /dev/null
+++ b/works/management/commands/render_zenodo.py
@@ -0,0 +1,18 @@
+"""Management command wrapper for render_zenodo_package()."""
+from django.core.management.base import BaseCommand
+
+from works.zenodo import render_zenodo_package
+
+
+class Command(BaseCommand):
+ help = "Generate optimap-main.zip, data/README.md and data/zenodo_dynamic.json."
+
+ def handle(self, *args, **options):
+ result = render_zenodo_package(stdout_callback=self.stdout.write)
+
+ self.stdout.write(self.style.SUCCESS(
+ f"Generated assets in {result['data_dir']}:\n"
+ f" - {result['archive_path'].name}\n"
+ f" - {result['readme_path'].name}\n"
+ f" - {result['metadata_path'].name}"
+ ))
diff --git a/works/management/commands/schedule_zenodo_deposit.py b/works/management/commands/schedule_zenodo_deposit.py
new file mode 100644
index 00000000..91f394fc
--- /dev/null
+++ b/works/management/commands/schedule_zenodo_deposit.py
@@ -0,0 +1,55 @@
+# SPDX-FileCopyrightText: 2026 OPTIMETA and KOMET projects
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+"""Schedule the annual Zenodo deposition run.
+
+The deposit cycle (regenerate data dumps → render README/zip/metadata →
+update or bootstrap a Zenodo draft) is wrapped in
+``works.tasks.run_zenodo_deposition`` and registered with Django-Q as a
+yearly schedule. The first run lands on Dec 31 23:59 of the current year
+(local time); subsequent runs repeat annually. Publishing the resulting
+draft remains manual — admins receive an email with the draft link.
+
+This command is idempotent: re-running it will not add duplicate schedule
+entries.
+"""
+
+from datetime import datetime
+
+from django.core.management.base import BaseCommand
+from django_q.models import Schedule
+from django_q.tasks import schedule
+
+
+FUNC_NAME = "works.tasks.run_zenodo_deposition"
+
+
+class Command(BaseCommand):
+ help = (
+ "Schedule the annual Zenodo deposition run (Dec 31 23:59, yearly). "
+ "Idempotent."
+ )
+
+ def handle(self, *args, **options):
+ if Schedule.objects.filter(func=FUNC_NAME).exists():
+ self.stdout.write("Zenodo deposition is already scheduled.")
+ return
+
+ now = datetime.now()
+ next_run = now.replace(
+ month=12, day=31, hour=23, minute=59, second=0, microsecond=0
+ )
+ if next_run <= now:
+ next_run = next_run.replace(year=now.year + 1)
+
+ schedule(
+ FUNC_NAME,
+ schedule_type=Schedule.YEARLY,
+ repeats=-1,
+ next_run=next_run,
+ )
+ self.stdout.write(
+ self.style.SUCCESS(
+ f"Scheduled annual Zenodo deposition for {next_run.isoformat()}."
+ )
+ )
diff --git a/works/management/commands/zenodo_deposit.py b/works/management/commands/zenodo_deposit.py
new file mode 100644
index 00000000..f805f2eb
--- /dev/null
+++ b/works/management/commands/zenodo_deposit.py
@@ -0,0 +1,115 @@
+"""
+Management command to trigger a complete Zenodo deposition cycle.
+
+This command runs both render_zenodo and deposit_zenodo in sequence,
+making it easy to manually trigger a full deposition to Zenodo.
+
+Usage:
+ python manage.py zenodo_deposit
+ python manage.py zenodo_deposit --deposition-id 123456
+ python manage.py zenodo_deposit --token YOUR_TOKEN
+"""
+import os
+from django.conf import settings
+from django.core.management.base import BaseCommand, CommandError
+from django.core.management import call_command
+
+
+class Command(BaseCommand):
+ help = "Trigger a complete Zenodo deposition cycle (render + deposit)."
+
+ def add_arguments(self, parser):
+ parser.add_argument(
+ "--deposition-id",
+ dest="deposition_id",
+ help="Existing deposition (draft) ID on Zenodo. Uses ZENODO_SANDBOX_DEPOSITION_ID if not provided.",
+ )
+ parser.add_argument(
+ "--token",
+ dest="token",
+ help="Zenodo API token (overrides env/settings).",
+ )
+ parser.add_argument(
+ "--skip-render",
+ action="store_true",
+ help="Skip the render step and only run deposit (assumes files already exist).",
+ )
+ parser.add_argument(
+ "--patch",
+ dest="patch",
+ default="description,version,keywords,related_identifiers",
+ help="Comma-separated list of metadata fields to patch (default: description,version,keywords,related_identifiers).",
+ )
+ parser.add_argument(
+ "--merge-keywords",
+ action="store_true",
+ help="Merge incoming keywords with existing (don't replace).",
+ )
+ parser.add_argument(
+ "--merge-related",
+ action="store_true",
+ help="Merge incoming related_identifiers with existing (don't replace).",
+ )
+
+ def handle(self, *args, **opts):
+ deposition_id = opts.get("deposition_id") or os.getenv("ZENODO_SANDBOX_DEPOSITION_ID")
+ token = opts.get("token")
+
+ api_base = os.getenv("ZENODO_API_BASE") or getattr(
+ settings, "ZENODO_API_BASE", "https://sandbox.zenodo.org/api"
+ )
+
+ self.stdout.write(self.style.SUCCESS("\n" + "="*70))
+ self.stdout.write(self.style.SUCCESS(" Zenodo Deposition Manager"))
+ self.stdout.write(self.style.SUCCESS("="*70))
+ self.stdout.write(f"\nTarget: {api_base}")
+ if deposition_id:
+ self.stdout.write(f"Deposition ID: {deposition_id}\n")
+ else:
+ self.stdout.write(
+ "Deposition ID: (none configured — will reuse latest from log or bootstrap a new draft)\n"
+ )
+
+ # Step 1: Render (unless skipped)
+ if not opts.get("skip_render"):
+ self.stdout.write(self.style.WARNING("\n[Step 1/2] Rendering data files and metadata..."))
+ try:
+ call_command("render_zenodo", stdout=self.stdout, stderr=self.stderr)
+ self.stdout.write(self.style.SUCCESS("✓ Render completed successfully\n"))
+ except Exception as ex:
+ self.stdout.write(self.style.ERROR(f"✗ Render failed: {ex}"))
+ raise CommandError(f"Render step failed: {ex}") from ex
+ else:
+ self.stdout.write(self.style.WARNING("\n[Step 1/2] Skipping render step (--skip-render)\n"))
+
+ # Step 2: Deposit
+ self.stdout.write(self.style.WARNING("[Step 2/2] Uploading to Zenodo..."))
+ try:
+ deposit_opts = {
+ "patch": opts.get("patch"),
+ "merge_keywords": opts.get("merge_keywords", False),
+ "merge_related": opts.get("merge_related", False),
+ }
+ if deposition_id:
+ deposit_opts["deposition_id"] = deposition_id
+ if token:
+ deposit_opts["token"] = token
+
+ call_command("deposit_zenodo", **deposit_opts, stdout=self.stdout, stderr=self.stderr)
+ self.stdout.write(self.style.SUCCESS("✓ Deposit completed successfully\n"))
+ except Exception as ex:
+ self.stdout.write(self.style.ERROR(f"✗ Deposit failed: {ex}"))
+ raise CommandError(f"Deposit step failed: {ex}") from ex
+
+ # Summary
+ self.stdout.write(self.style.SUCCESS("\n" + "="*70))
+ self.stdout.write(self.style.SUCCESS(" Zenodo deposition completed successfully!"))
+ self.stdout.write(self.style.SUCCESS("="*70))
+ self.stdout.write("\nNext steps:")
+ if deposition_id:
+ self.stdout.write(" • Check the deposition at: " + api_base.replace("/api", f"/deposit/{deposition_id}"))
+ else:
+ self.stdout.write(" • Check the admin → Zenodo Deposition Logs for the new draft URL")
+ self.stdout.write(" • Review files and metadata")
+ self.stdout.write(" • Publish when ready (cannot be undone!)")
+ self.stdout.write(self.style.WARNING("\nNote: This deposition is in DRAFT state and not yet published.\n"))
diff --git a/works/migrations/0009_add_zenodo_deposition_log.py b/works/migrations/0009_add_zenodo_deposition_log.py
new file mode 100644
index 00000000..794a0c88
--- /dev/null
+++ b/works/migrations/0009_add_zenodo_deposition_log.py
@@ -0,0 +1,161 @@
+# Generated by Django 5.1.9 on 2026-05-11 12:30
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("works", "0008_add_bok_concepts_and_ontology_kind"),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name="ZenodoDepositionLog",
+ fields=[
+ (
+ "id",
+ models.BigAutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ (
+ "deposition_date",
+ models.DateTimeField(auto_now_add=True, db_index=True),
+ ),
+ (
+ "status",
+ models.CharField(
+ choices=[
+ ("success", "Success"),
+ ("partial", "Partial Success"),
+ ("failed", "Failed"),
+ ],
+ db_index=True,
+ max_length=20,
+ ),
+ ),
+ (
+ "deposition_id",
+ models.CharField(
+ db_index=True, help_text="Zenodo deposition ID", max_length=50
+ ),
+ ),
+ (
+ "doi",
+ models.CharField(
+ blank=True,
+ help_text="DOI assigned by Zenodo (if published)",
+ max_length=255,
+ null=True,
+ ),
+ ),
+ (
+ "zenodo_url",
+ models.URLField(
+ blank=True,
+ help_text="URL to Zenodo record",
+ max_length=512,
+ null=True,
+ ),
+ ),
+ (
+ "api_base",
+ models.URLField(
+ help_text="Zenodo API base URL (sandbox or production)",
+ max_length=512,
+ ),
+ ),
+ (
+ "version",
+ models.CharField(
+ blank=True,
+ help_text='Zenodo deposit version label (e.g. "v1", "v2"); next-version counter for this api_base.',
+ max_length=100,
+ null=True,
+ ),
+ ),
+ (
+ "files_uploaded",
+ models.JSONField(
+ blank=True,
+ help_text="List of files uploaded (names and sizes)",
+ null=True,
+ ),
+ ),
+ (
+ "metadata_merged",
+ models.JSONField(
+ blank=True,
+ help_text="Metadata fields that were updated",
+ null=True,
+ ),
+ ),
+ (
+ "works_count",
+ models.IntegerField(
+ default=0,
+ help_text="Number of works included in this deposition",
+ ),
+ ),
+ (
+ "total_size_bytes",
+ models.BigIntegerField(
+ default=0, help_text="Total size of uploaded files in bytes"
+ ),
+ ),
+ (
+ "upload_duration_seconds",
+ models.FloatField(
+ blank=True,
+ help_text="Time taken to upload all files",
+ null=True,
+ ),
+ ),
+ (
+ "error_message",
+ models.TextField(
+ blank=True,
+ help_text="Error message if deposition failed",
+ null=True,
+ ),
+ ),
+ (
+ "error_details",
+ models.JSONField(
+ blank=True,
+ help_text="Detailed error information (stack trace, API response, etc.)",
+ null=True,
+ ),
+ ),
+ (
+ "deposition_summary",
+ models.TextField(
+ blank=True,
+ help_text="Human-readable summary of the deposition",
+ null=True,
+ ),
+ ),
+ (
+ "notes",
+ models.TextField(
+ blank=True, help_text="Additional notes or comments", null=True
+ ),
+ ),
+ ],
+ options={
+ "verbose_name": "Zenodo Deposition Log",
+ "verbose_name_plural": "Zenodo Deposition Logs",
+ "ordering": ["-deposition_date"],
+ "indexes": [
+ models.Index(
+ fields=["deposition_id"], name="works_zenodo_dep_id_idx"
+ ),
+ models.Index(fields=["doi"], name="works_zenodo_doi_idx"),
+ ],
+ },
+ ),
+ ]
diff --git a/works/models.py b/works/models.py
index f32dcde3..413bde85 100644
--- a/works/models.py
+++ b/works/models.py
@@ -779,3 +779,111 @@ def __str__(self):
who = self.user.username if self.user else "(deleted)"
return f"{who} → {self.get_kind_display()} on {self.work_id}"
+class ZenodoDepositionLog(models.Model):
+ """
+ Log of Zenodo depositions.
+ Tracks when data was deposited to Zenodo, success/failure status,
+ file uploads, metadata updates, and any errors encountered.
+ """
+ STATUS_CHOICES = [
+ ('success', 'Success'),
+ ('partial', 'Partial Success'),
+ ('failed', 'Failed'),
+ ]
+
+ deposition_date = models.DateTimeField(auto_now_add=True, db_index=True)
+ status = models.CharField(max_length=20, choices=STATUS_CHOICES, db_index=True)
+
+ # Zenodo-specific identifiers
+ deposition_id = models.CharField(
+ max_length=50,
+ db_index=True,
+ help_text='Zenodo deposition ID'
+ )
+ doi = models.CharField(
+ max_length=255,
+ blank=True,
+ null=True,
+ help_text='DOI assigned by Zenodo (if published)'
+ )
+ zenodo_url = models.URLField(
+ max_length=512,
+ blank=True,
+ null=True,
+ help_text='URL to Zenodo record'
+ )
+
+ # API endpoint used
+ api_base = models.URLField(
+ max_length=512,
+ help_text='Zenodo API base URL (sandbox or production)'
+ )
+
+ # What was deposited
+ version = models.CharField(
+ max_length=100,
+ blank=True,
+ null=True,
+ help_text='Zenodo deposit version label (e.g. "v1", "v2"); next-version counter for this api_base.'
+ )
+ files_uploaded = models.JSONField(
+ blank=True,
+ null=True,
+ help_text='List of files uploaded (names and sizes)'
+ )
+ metadata_merged = models.JSONField(
+ blank=True,
+ null=True,
+ help_text='Metadata fields that were updated'
+ )
+
+ # Statistics
+ works_count = models.IntegerField(
+ default=0,
+ help_text='Number of works included in this deposition'
+ )
+ total_size_bytes = models.BigIntegerField(
+ default=0,
+ help_text='Total size of uploaded files in bytes'
+ )
+ upload_duration_seconds = models.FloatField(
+ blank=True,
+ null=True,
+ help_text='Time taken to upload all files'
+ )
+
+ # Error tracking
+ error_message = models.TextField(
+ blank=True,
+ null=True,
+ help_text='Error message if deposition failed'
+ )
+ error_details = models.JSONField(
+ blank=True,
+ null=True,
+ help_text='Detailed error information (stack trace, API response, etc.)'
+ )
+
+ # Summary and notes
+ deposition_summary = models.TextField(
+ blank=True,
+ null=True,
+ help_text='Human-readable summary of the deposition'
+ )
+ notes = models.TextField(
+ blank=True,
+ null=True,
+ help_text='Additional notes or comments'
+ )
+
+ class Meta:
+ ordering = ['-deposition_date']
+ verbose_name = 'Zenodo Deposition Log'
+ verbose_name_plural = 'Zenodo Deposition Logs'
+ indexes = [
+ models.Index(fields=['deposition_id'], name='works_zenodo_dep_id_idx'),
+ models.Index(fields=['doi'], name='works_zenodo_doi_idx'),
+ ]
+
+ def __str__(self):
+ return f"{self.status.capitalize()} deposition {self.deposition_id} on {self.deposition_date.strftime('%Y-%m-%d %H:%M')}"
diff --git a/works/tasks.py b/works/tasks.py
index 960da4ee..70da8720 100644
--- a/works/tasks.py
+++ b/works/tasks.py
@@ -486,3 +486,22 @@ def regenerate_all_data_dumps():
csv_path = convert_geojson_to_csv(geojson_path)
cleanup_old_data_dumps(cache_dir, settings.DATA_DUMP_RETENTION)
return {"geojson": geojson_path, "gpkg": gpkg_path, "csv": csv_path}
+
+
+# -----------------------------------------------------------------------------
+# Zenodo deposition.
+# -----------------------------------------------------------------------------
+
+def run_zenodo_deposition():
+ """Run the full Zenodo deposition cycle: regenerate dumps → render
+ README/zip/metadata → upload to (or bootstrap) a Zenodo draft.
+
+ Used as the scheduled Django-Q task (annual, last day of the year via
+ ``schedule_zenodo_deposit``). Publishing remains manual — admins receive
+ an email with the draft link.
+ """
+ from works.zenodo import deposit_to_zenodo, render_zenodo_package
+
+ regenerate_all_data_dumps()
+ render_zenodo_package()
+ return deposit_to_zenodo()
diff --git a/works/templates/README.md.j2 b/works/templates/README.md.j2
new file mode 100644
index 00000000..272ab023
--- /dev/null
+++ b/works/templates/README.md.j2
@@ -0,0 +1,71 @@
+# OPTIMAP FAIR Data Package
+
+**Version:** {{ version }}
+
+**Generated on:** {{ date }}
+
+
+## Dataset Summary
+
+- **Total articles:** {{ article_count }}
+- **Articles with spatial data:** {{ spatial_count }}
+- **Articles with temporal coverage:** {{ temporal_count }}
+- **Earliest publication date:** {{ earliest_date }}
+- **Latest publication date:** {{ latest_date }}
+
+
+## Sources
+
+{% for s in sources %}- [{{ s.name }}]({{ s.url }})
+{% endfor %}
+
+
+## Codebook
+
+The same field names appear verbatim across all three formats: as
+**GeoJSON `Feature.properties` keys**, as **CSV column headers**, and as
+**GeoPackage attribute columns**. CSV represents geometry as a `WKT`
+column ([OGC Simple Features](https://www.ogc.org/standard/sfa/));
+GeoJSON uses the standard `geometry` member; GeoPackage uses the
+default geometry column from the GeoPackage driver.
+
+| Field | Description |
+|-----------------------------|--------------------------------------------------------------------------|
+| `id` | Primary key of the work record |
+| `title` | Title of the work |
+| `type` | Work type (Crossref / OpenAlex vocabulary, e.g. `article`, `preprint`) |
+| `abstract` | Abstract or summary |
+| `doi` | Digital Object Identifier (if available) |
+| `url` | URL to the article or preprint |
+| `publicationDate` | Publication date (ISO 8601) |
+| `status` | Lifecycle code: `p` (Published) — only `p` works appear in the dumps |
+| `source` | Foreign-key reference to the harvested source (see Sources section) |
+| `volume` | Journal volume (where applicable) |
+| `issue` | Journal issue (where applicable) |
+| `first_page` / `last_page` | Pagination (where applicable) |
+| `authors` | Author names (list) |
+| `keywords` | Subject keywords (list, from source or OpenAlex) |
+| `topics` | Research topics (list, typically from OpenAlex) |
+| `bok_concepts` | EO4GEO Body of Knowledge concept codes (list, user-contributed) |
+| `geometry` (GeoJSON / GPKG) | Spatial extent — GeometryCollection in WGS 84 (EPSG:4326) |
+| `WKT` (CSV only) | Same spatial extent in OGC Well-Known Text |
+| `timeperiod_startdate` | Temporal coverage start dates (list, ISO 8601) |
+| `timeperiod_enddate` | Temporal coverage end dates (list, ISO 8601) |
+| `placename` | Reverse-geocoded placename for the geometry centroid (Nominatim) |
+| `country_code` | ISO 3166-1 alpha-2 country code (or 3166-2 subdivision) for the centroid |
+| `provenance` | Structured JSON: harvest origin, per-field sources, contributions |
+| `openalex_id` | OpenAlex Work identifier (`W…`) when matched |
+| `openalex_ids` | OpenAlex IDs object (DOI, PMID, etc.) |
+| `openalex_open_access_status` | OpenAlex open-access status (`gold`, `green`, `bronze`, `closed`, …) |
+| `creationDate` | Timestamp the record entered OPTIMAP |
+| `lastUpdate` | Timestamp of the last modification |
+
+
+## License
+
+This record includes:
+
+- **Data files** under **CC0-1.0** ( )
+- **optimap-main.zip** (code snapshot) under **GPL-3.0** ()
+
+**Note:** Data are CC0; the software snapshot is GPLv3.
diff --git a/works/templates/data.html b/works/templates/data.html
index 18cff1ba..09b501aa 100644
--- a/works/templates/data.html
+++ b/works/templates/data.html
@@ -94,6 +94,82 @@ Download Publication Data
{% endif %}
+ {% if latest_zenodo %}
+
+
+ Zenodo Archive
+
+ The OPTIMAP dataset is regularly archived on Zenodo for long-term preservation and citability.
+
+
+
+
+
+
+ Latest Deposition
+
+
+
+
+ Version: {{ latest_zenodo.version|default:"N/A" }}
+ Date: {{ latest_zenodo.deposition_date|date:"Y-m-d H:i" }} UTC
+ Works included: {{ latest_zenodo.works_count|intcomma }}
+ Files uploaded: {{ latest_zenodo.files_uploaded|length }}
+ Total size:
+ {% if latest_zenodo.total_size_bytes %}
+ {% load humanize %}
+ {{ latest_zenodo.total_size_bytes|filesizeformat }}
+ {% else %}
+ N/A
+ {% endif %}
+
+
+
+ {% if latest_zenodo.zenodo_url %}
+
+
+ View on Zenodo
+
+
+ {% endif %}
+ {% if latest_zenodo.doi %}
+
+ DOI:
+ {{ latest_zenodo.doi }}
+
+ {% endif %}
+ {% if latest_zenodo.deposition_summary %}
+
+ {{ latest_zenodo.deposition_summary|truncatewords:30 }}
+
+ {% endif %}
+
+
+
+
+
+ {% if latest_zenodo.doi %}
+
+
+
+ Citation
+
+
+ OPTIMAP Contributors. ({{ latest_zenodo.deposition_date.year }}).
+ OPTIMAP FAIR Data Package
+ {% if latest_zenodo.version %}({{ latest_zenodo.version }}){% endif %}.
+ Zenodo.
+ https://doi.org/{{ latest_zenodo.doi }}
+
+
+ Copy citation
+
+
+
+ {% endif %}
+
+ {% endif %}
+
{% endblock %}
diff --git a/works/views.py b/works/views.py
index f796515d..242470d6 100644
--- a/works/views.py
+++ b/works/views.py
@@ -281,6 +281,24 @@ def data(request):
else:
last_updated = None
+ # Get latest Zenodo deposition info
+ # In DEBUG mode, show sandbox depositions; in production, show only production depositions
+ from works.models import ZenodoDepositionLog
+
+ if settings.DEBUG:
+ # Debug mode: show sandbox depositions
+ latest_zenodo = ZenodoDepositionLog.objects.filter(
+ status='success',
+ api_base__icontains='sandbox.zenodo.org'
+ ).order_by('-deposition_date').first()
+ else:
+ # Production mode: show only production depositions (exclude sandbox)
+ latest_zenodo = ZenodoDepositionLog.objects.filter(
+ status='success'
+ ).exclude(
+ api_base__icontains='sandbox.zenodo.org'
+ ).order_by('-deposition_date').first()
+
return render(request, 'data.html', {
'geojson_size': geojson_size,
'geopackage_size': geopackage_size,
@@ -288,6 +306,7 @@ def data(request):
'last_updated': last_updated,
'last_geojson': last_geo.name if last_geo else None,
'last_gpkg': last_gpkg.name if last_gpkg else None,
+ 'latest_zenodo': latest_zenodo,
})
def confirmation_login(request):
diff --git a/works/zenodo.py b/works/zenodo.py
new file mode 100644
index 00000000..7545de55
--- /dev/null
+++ b/works/zenodo.py
@@ -0,0 +1,1086 @@
+"""
+Zenodo data archival functionality for OPTIMAP.
+
+This module handles rendering metadata and depositing data to Zenodo.
+"""
+import json
+import os
+import tempfile
+import time
+import traceback
+from datetime import date
+from pathlib import Path
+from typing import Iterable
+from urllib.parse import urlparse
+
+import markdown
+import requests
+from django.conf import settings
+from django.contrib.auth import get_user_model
+from django.core.mail import send_mail
+from django.urls import reverse
+from jinja2 import Environment, FileSystemLoader
+from zenodo_client import Zenodo
+
+from works.models import Work, Source, ZenodoDepositionLog
+
+User = get_user_model()
+
+
+# ================== URL/Domain Helpers ==================
+
+def _extract_domain(u: str | None) -> str | None:
+ """Extract domain from URL."""
+ if not u:
+ return None
+ try:
+ p = urlparse(u)
+ netloc = p.netloc or p.path
+ return (netloc or "").lower()
+ except Exception:
+ return None
+
+
+def _canonical_url(raw: str | None) -> str | None:
+ """Normalize URL to https:/// with lowercase host."""
+ if not raw:
+ return None
+ u = raw.strip()
+ if "://" not in u:
+ u = "https://" + u
+ p = urlparse(u)
+ host = (p.netloc or p.path).lower()
+ if not host:
+ return None
+ if host.startswith("www."):
+ host = host[4:]
+ path = p.path or ""
+ return f"https://{host}{path}"
+
+
+def _label_from_domain(domain: str) -> str:
+ """Return a cleaned label from a domain name."""
+ if domain.startswith("www."):
+ domain = domain[4:]
+ return domain.capitalize() if domain else "Source"
+
+
+def _clean_label(name: str | None, url: str | None) -> str:
+ """Clean source label."""
+ n = (name or "").strip()
+ domain = _extract_domain(url) or ""
+ if n.isdigit() and domain == "optimap.science":
+ return "OPTIMAP"
+ if n and not n.isdigit():
+ return n
+ return _label_from_domain(domain) if domain else "Source"
+
+
+def _resolve_api_base(api_base: str | None = None) -> str:
+ """Resolve the Zenodo API base URL with the same env/settings/default
+ cascade that `deposit_to_zenodo` uses, so render and deposit always
+ look at the same target when scoping per-target state (e.g. version).
+ """
+ if api_base is not None:
+ return api_base
+ return (
+ os.getenv("ZENODO_API_BASE")
+ or getattr(settings, "ZENODO_API_BASE", "https://sandbox.zenodo.org/api")
+ )
+
+
+def _next_version_for(api_base: str) -> str:
+ """
+ Compute the next `vN` label by reading the latest successful
+ `ZenodoDepositionLog.version` for `api_base`. Sandbox and production
+ have separate counters because they target different deposits; a
+ failed deposit doesn't burn a version number.
+ """
+ last = (
+ ZenodoDepositionLog.objects
+ .filter(status="success", api_base=api_base)
+ .exclude(version__isnull=True)
+ .exclude(version="")
+ .order_by("-deposition_date")
+ .values_list("version", flat=True)
+ .first()
+ )
+ last_n = 0
+ if last:
+ try:
+ last_n = int(last.lstrip("v") or 0)
+ except ValueError:
+ last_n = 0
+ return f"v{last_n + 1}"
+
+
+def _live_download_related_identifiers() -> list[dict]:
+ """
+ Build Zenodo `related_identifiers` entries pointing at the always-current
+ download endpoints on optimap.science. The Zenodo deposit is a frozen
+ snapshot; the live URLs serve the rolling release of the same dataset.
+ """
+ base = settings.BASE_URL.rstrip("/")
+ routes = [
+ ("optimap:download_geojson", "dataset"),
+ ("optimap:download_geopackage", "dataset"),
+ ("optimap:download_csv", "dataset"),
+ ]
+ return [
+ {
+ "scheme": "url",
+ "identifier": f"{base}{reverse(name)}",
+ "relation": "isSupplementTo",
+ "resource_type": resource_type,
+ }
+ for name, resource_type in routes
+ ]
+
+
+def _source_identifier(source: dict) -> tuple[str, str] | None:
+ """
+ Pick the best Zenodo `(scheme, identifier)` for a Source row.
+
+ Preference order: linking ISSN, then journal homepage URL, then the
+ harvest endpoint URL. Returns ``None`` for self-references to
+ optimap.science (the portal isn't a source it describes) and for
+ sources that expose no usable identifier.
+ """
+ issn = (source.get("issn_l") or "").strip()
+ if issn:
+ return ("issn", issn)
+ for raw in (source.get("homepage_url"), source.get("url_field")):
+ url = _canonical_url(raw)
+ if not url:
+ continue
+ if _extract_domain(url) == "optimap.science":
+ continue
+ return ("url", url)
+ return None
+
+
+# OPTIMAP's grants for the Zenodo deposit. Funder DOIs are Crossref-registered
+# IDs (BMBF 10.13039/501100002347; BMFTR uses the same Crossref entity until
+# the 2025 rename propagates — we still keep both labels for the free-text
+# fallback). The 2025-08-21 issue comment on #63 settled on KOMET + OPTIMETA
+# only; NFDI4Earth is intentionally excluded.
+#
+# Zenodo's legacy deposit API accepts grants as `[{"id": "::"}]`,
+# but it only resolves IDs that are in its curated grants vocabulary. If a
+# grant isn't there, the metadata PUT returns 400 — we catch that below and
+# fall back to a free-text `notes` entry so the funding info isn't lost.
+_FUNDING = [
+ {
+ "id": "10.13039/501100002347::16TOA028B",
+ "name": "OPTIMETA",
+ "funder": "BMBF",
+ "grant": "16TOA028B",
+ },
+ {
+ "id": "10.13039/501100002347::16KOA009A",
+ "name": "KOMET",
+ "funder": "BMFTR",
+ "grant": "16KOA009A",
+ },
+]
+
+
+def _grants_payload() -> list[dict]:
+ """Zenodo-compatible grants list — only the `id` key."""
+ return [{"id": g["id"]} for g in _FUNDING]
+
+
+def _funding_fallback_text() -> str:
+ """Human-readable funding statement for `metadata.notes` when Zenodo
+ can't resolve the structured grant IDs."""
+ parts = [f"{g['name']} ({g['funder']} grant {g['grant']})" for g in _FUNDING]
+ return "Funding: " + ", ".join(parts) + "."
+
+
+# Static "Note" description that documents the license split. Wording follows
+# the 2025-07-21 issue comment on #63 — both licenses are listed on the
+# Zenodo record, the data files are CC0 and only the software snapshot is
+# GPLv3, so harvesters and reusers can apply the correct terms per file.
+_LICENSE_NOTE_HTML = (
+ "Mixed licenses: this record bundles data files and a "
+ "snapshot of the OPTIMAP source code, which carry different licenses.
"
+ ""
+)
+
+
+def _license_additional_descriptions() -> list[dict]:
+ """
+ Build the Zenodo `additional_descriptions` entry that documents the
+ CC0 (data) / GPL-3.0 (code snapshot) license split.
+ """
+ return [{"type": "notes", "description": _LICENSE_NOTE_HTML}]
+
+
+def _describes_related_identifiers(sources: Iterable[dict]) -> list[dict]:
+ """
+ One Zenodo `related_identifiers` entry per harvested Source with
+ relation=describes, resource_type=publication — i.e. "this record
+ describes Journal X". Wording follows the 2025-07-14 issue comment
+ on #63.
+ """
+ seen: set[tuple[str, str]] = set()
+ out: list[dict] = []
+ for s in sources:
+ ident = _source_identifier(s)
+ if ident is None or ident in seen:
+ continue
+ seen.add(ident)
+ scheme, value = ident
+ out.append({
+ "scheme": scheme,
+ "identifier": value,
+ "relation": "describes",
+ "resource_type": "publication",
+ })
+ return out
+
+
+# ================== Rendering ==================
+
+def render_zenodo_package(
+ project_root: Path | None = None,
+ stdout_callback=None,
+ api_base: str | None = None,
+) -> dict:
+ """
+ Render Zenodo data package (README, metadata, archive).
+
+ Returns dict with paths to generated files.
+
+ `api_base` scopes the version counter so sandbox and production
+ increment independently. Defaults to the same env/settings cascade
+ that `deposit_to_zenodo` uses.
+ """
+ def log(msg):
+ if stdout_callback:
+ stdout_callback(msg)
+
+ # Determine project root
+ if project_root is None:
+ project_root = Path(
+ os.getenv("OPTIMAP_PROJECT_ROOT")
+ or getattr(settings, "PROJECT_ROOT", Path(__file__).resolve().parents[1])
+ )
+
+ data_dir = project_root / "data"
+ data_dir.mkdir(exist_ok=True)
+
+ # Version: source of truth is the latest successful ZenodoDepositionLog
+ # for this api_base. A tracked file would drift across environments and
+ # silently restart at v1 on a fresh checkout.
+ api_base = _resolve_api_base(api_base)
+ version = _next_version_for(api_base)
+
+ # Zip snapshot — the deposit must include a copy of the OPTIMAP source
+ # tree (issue #63, last checklist item). A silent empty-zip fallback
+ # would upload a 0-byte optimap-main.zip and look like a successful
+ # deposit, so failures here propagate as a CommandError-friendly
+ # RuntimeError instead.
+ archive_path = data_dir / "optimap-main.zip"
+ log(f"Generating {archive_path.name}...")
+ import subprocess
+ try:
+ result = subprocess.run(
+ ["git", "archive", "--format=zip", "HEAD", "-o", str(archive_path)],
+ cwd=str(project_root),
+ check=True,
+ capture_output=True,
+ text=True,
+ )
+ except FileNotFoundError as ex:
+ raise RuntimeError(
+ "Cannot produce optimap-main.zip: the `git` binary is not on PATH"
+ ) from ex
+ except subprocess.CalledProcessError as ex:
+ raise RuntimeError(
+ f"`git archive HEAD` failed (exit {ex.returncode}) in {project_root}: "
+ f"{(ex.stderr or '').strip()}"
+ ) from ex
+ if not archive_path.exists() or archive_path.stat().st_size == 0:
+ raise RuntimeError(
+ f"`git archive HEAD` produced no archive at {archive_path}; "
+ f"stderr={(result.stderr or '').strip()!r}"
+ )
+
+ # Gather statistics
+ article_count = Work.objects.count()
+ spatial_count = Work.objects.exclude(geometry=None).count()
+ temporal_count = Work.objects.exclude(timeperiod_startdate=None).count()
+ earliest_date = (
+ Work.objects.order_by("publicationDate").values_list("publicationDate", flat=True).first() or ""
+ )
+ latest_date = (
+ Work.objects.order_by("-publicationDate").values_list("publicationDate", flat=True).first() or ""
+ )
+
+ # Sources for the README — dedupe by canonical domain so the same
+ # publisher doesn't appear twice in the visible list.
+ source_rows = list(
+ Source.objects.all().values("name", "url_field", "homepage_url", "issn_l")
+ )
+ seen_domains: set[str] = set()
+ sources: list[dict] = []
+ for s in source_rows:
+ url = _canonical_url(s.get("url_field"))
+ dom = _extract_domain(url)
+ if not dom or dom in seen_domains:
+ continue
+ seen_domains.add(dom)
+ sources.append({"name": _clean_label(s.get("name"), url), "url": url})
+
+ # Render README.md
+ tmpl_dir = project_root / "works" / "templates"
+ env = Environment(loader=FileSystemLoader(str(tmpl_dir)), trim_blocks=True, lstrip_blocks=True)
+ template = env.get_template("README.md.j2")
+ rendered = template.render(
+ version=version,
+ date=date.today().isoformat(),
+ article_count=article_count,
+ sources=sources,
+ spatial_count=spatial_count,
+ temporal_count=temporal_count,
+ earliest_date=earliest_date,
+ latest_date=latest_date,
+ )
+ readme_path = data_dir / "README.md"
+ readme_path.write_text(rendered, encoding="utf-8")
+
+ # Dynamic metadata
+ dyn_path = data_dir / "zenodo_dynamic.json"
+ existing_dyn = {}
+ if dyn_path.exists():
+ try:
+ existing_dyn = json.loads(dyn_path.read_text(encoding="utf-8"))
+ except Exception:
+ existing_dyn = {}
+
+ # Final keyword list per nuest's 2025-07-14 comment on #63. "Open Research
+ # Information" and its short form "ORI" both appear so the record is
+ # discoverable under either label.
+ default_keywords = [
+ "Open Access",
+ "Open Science",
+ "Open Research Information",
+ "ORI",
+ "Open Data",
+ "FAIR",
+ ]
+ # Contributor-level attribution is deferred to #207; for now the deposit's
+ # creator is the project as a whole, matching the 2025-07-14 decision.
+ default_creators = existing_dyn.get("creators") or [
+ {"name": "OPTIMAP Contributors", "affiliation": "OPTIMAP Project"}
+ ]
+
+ # `related_identifiers` is always derived from current state — the live
+ # download URLs come from settings.BASE_URL + URL config, and the
+ # "describes" entries are recomputed from the Source table on every run.
+ # A stale zenodo_dynamic.json from another environment cannot leak in.
+ related_identifiers = [
+ *_live_download_related_identifiers(),
+ *_describes_related_identifiers(source_rows),
+ ]
+
+ dyn = {
+ **existing_dyn,
+ "title": existing_dyn.get("title") or "OPTIMAP FAIR Data Package",
+ "upload_type": existing_dyn.get("upload_type") or "dataset",
+ "publication_date": date.today().isoformat(),
+ "creators": default_creators,
+ "version": version,
+ "keywords": existing_dyn.get("keywords") or default_keywords,
+ "related_identifiers": related_identifiers,
+ "additional_descriptions": _license_additional_descriptions(),
+ "grants": _grants_payload(),
+ "description_markdown": readme_path.read_text(encoding="utf-8"),
+ }
+ dyn_path.write_text(json.dumps(dyn, indent=2), encoding="utf-8")
+
+ log(f"Generated: {archive_path.name}, {readme_path.name}, {dyn_path.name}")
+
+ return {
+ "version": version,
+ "archive_path": archive_path,
+ "readme_path": readme_path,
+ "metadata_path": dyn_path,
+ "data_dir": data_dir,
+ }
+
+
+# ================== Deposition ==================
+
+_REQ_PRESERVE = {"doi", "prereserve_doi"} # never overwrite
+
+
+def _markdown_to_html(markdown_text: str) -> str:
+ """Convert README.md markdown to HTML for Zenodo description."""
+ return markdown.markdown(markdown_text, extensions=["tables", "fenced_code"])
+
+
+def _merge_keywords(existing: Iterable[str] | None, incoming: Iterable[str] | None) -> list[str]:
+ """Merge keyword lists without duplicates."""
+ seen, out = set(), []
+ for x in (existing or []):
+ if x not in seen:
+ seen.add(x)
+ out.append(x)
+ for x in (incoming or []):
+ if x not in seen:
+ seen.add(x)
+ out.append(x)
+ return out
+
+
+def _merge_related(existing: Iterable[dict] | None, incoming: Iterable[dict] | None) -> list[dict]:
+ """Merge related_identifiers by (identifier, relation) pair."""
+ def key(d: dict) -> tuple[str, str]:
+ return (d.get("identifier", ""), d.get("relation", ""))
+
+ seen, out = set(), []
+ for d in (existing or []):
+ k = key(d)
+ if k not in seen:
+ seen.add(k)
+ out.append(d)
+ for d in (incoming or []):
+ k = key(d)
+ if k not in seen:
+ seen.add(k)
+ out.append(d)
+ return out
+
+
+def _get_deposition(api_base: str, token: str, deposition_id: str) -> dict:
+ """Fetch existing deposition from Zenodo API."""
+ r = requests.get(
+ f"{api_base}/deposit/depositions/{deposition_id}",
+ params={"access_token": token},
+ timeout=30,
+ )
+ try:
+ r.raise_for_status()
+ except Exception as ex:
+ raise Exception(f"Failed to fetch deposition {deposition_id}: {r.status_code} {r.text}") from ex
+ return r.json()
+
+
+def _is_published(dep: dict) -> bool:
+ """
+ Zenodo marks a published deposition with ``submitted=true`` and ``state="done"``.
+ Drafts (`unsubmitted` / `inprogress`) are still editable; published records
+ require a `newversion` call before we can change anything.
+ """
+ return bool(dep.get("submitted")) and dep.get("state") == "done"
+
+
+def _extract_id_from_url(url: str | None) -> str | None:
+ """Pull the trailing numeric ID off a Zenodo deposition URL."""
+ if not url:
+ return None
+ tail = url.rstrip("/").rsplit("/", 1)[-1]
+ return tail or None
+
+
+def _create_new_draft(api_base: str, token: str) -> str:
+ """
+ POST /deposit/depositions with an empty body — creates a fresh draft and
+ returns its numeric ID. Used to bootstrap the very first deposit when no
+ deposition_id is configured and no prior log exists for this api_base.
+ """
+ r = requests.post(
+ f"{api_base}/deposit/depositions",
+ params={"access_token": token},
+ headers={"Content-Type": "application/json"},
+ data=json.dumps({}),
+ timeout=30,
+ )
+ try:
+ r.raise_for_status()
+ except Exception as ex:
+ raise Exception(
+ f"Failed to create new Zenodo draft: {r.status_code} {r.text}"
+ ) from ex
+ payload = r.json()
+ new_id = payload.get("id") or _extract_id_from_url(
+ payload.get("links", {}).get("self")
+ )
+ if not new_id:
+ raise Exception(
+ f"Zenodo create-draft response did not include an id: {payload!r}"
+ )
+ return str(new_id)
+
+
+def _create_new_version(api_base: str, token: str, deposition_id: str) -> str:
+ """
+ POST /deposit/depositions/{id}/actions/newversion — fork a new editable
+ draft off a published deposition. The response carries the new draft URL
+ in `links.latest_draft` (Zenodo legacy API); the new ID is the trailing
+ numeric segment. The new draft inherits files and metadata from the
+ published version; the caller is expected to delete the inherited files
+ and re-PUT updated metadata, which the existing deposit flow already
+ does.
+ """
+ r = requests.post(
+ f"{api_base}/deposit/depositions/{deposition_id}/actions/newversion",
+ params={"access_token": token},
+ timeout=30,
+ )
+ try:
+ r.raise_for_status()
+ except Exception as ex:
+ raise Exception(
+ f"Failed to create new version of deposition {deposition_id}: "
+ f"{r.status_code} {r.text}"
+ ) from ex
+ payload = r.json()
+ new_url = payload.get("links", {}).get("latest_draft")
+ new_id = _extract_id_from_url(new_url)
+ if not new_id:
+ raise Exception(
+ f"newversion response for {deposition_id} did not include "
+ f"a latest_draft link: {payload!r}"
+ )
+ return str(new_id)
+
+
+def _latest_log_deposition_id(api_base: str) -> str | None:
+ """
+ Most-recent successful ZenodoDepositionLog deposition_id for the given
+ api_base. Used to recover the current draft / latest-published ID when
+ no explicit env/setting deposition_id is configured — so scheduled and
+ re-triggered runs land on the same record without manual env edits.
+ """
+ return (
+ ZenodoDepositionLog.objects
+ .filter(status="success", api_base=api_base)
+ .exclude(deposition_id__isnull=True)
+ .exclude(deposition_id="")
+ .order_by("-deposition_date")
+ .values_list("deposition_id", flat=True)
+ .first()
+ )
+
+
+_DUMP_PATTERNS = (
+ "optimap_data_dump_*.geojson",
+ "optimap_data_dump_*.geojson.gz",
+ "optimap_data_dump_*.gpkg",
+ "optimap_data_dump_*.csv",
+ "optimap_data_dump_*.csv.gz",
+)
+
+
+def _dump_timestamp(p: Path) -> str:
+ """
+ Extract the timestamp portion of an `optimap_data_dump_.` filename.
+ Returns "" for non-matching paths.
+ """
+ name = p.name
+ if not name.startswith("optimap_data_dump_"):
+ return ""
+ # Strip leading prefix and trailing suffix (everything from the first '.')
+ stem = name[len("optimap_data_dump_"):]
+ return stem.split(".", 1)[0]
+
+
+def _latest_dump_files(directory: Path) -> list[Path]:
+ """
+ Return all dump files belonging to the newest timestamp present in
+ `directory`, across geojson / geojson.gz / gpkg / csv / csv.gz. Old
+ cycles are ignored so a deposit never ships stale formats next to
+ fresh ones.
+ """
+ if not directory.exists():
+ return []
+ candidates: list[Path] = []
+ for pat in _DUMP_PATTERNS:
+ candidates.extend(directory.glob(pat))
+ if not candidates:
+ return []
+ latest = max(_dump_timestamp(p) for p in candidates)
+ return sorted(p for p in candidates if _dump_timestamp(p) == latest)
+
+
+def _build_upload_list(data_dir: Path, dump_dir: Path | None = None) -> list[Path]:
+ """
+ Build the file list for a Zenodo deposit.
+
+ - `README.md` and `optimap-main.zip` come from `data_dir` (where the
+ render step writes them).
+ - Data dumps come from `data_dir` first (covers tests and ad-hoc
+ single-directory layouts); falling back to `dump_dir`, which
+ defaults to the `optimap_cache` directory `regenerate_data_dumps`
+ writes to in production.
+ """
+ if dump_dir is None:
+ dump_dir = Path(tempfile.gettempdir()) / "optimap_cache"
+
+ paths: list[Path] = []
+ for name in ("README.md", "optimap-main.zip"):
+ p = data_dir / name
+ if p.exists():
+ paths.append(p)
+
+ dumps = _latest_dump_files(data_dir)
+ if not dumps and data_dir.resolve() != dump_dir.resolve():
+ dumps = _latest_dump_files(dump_dir)
+ paths.extend(dumps)
+ return paths
+
+
+def _send_admin_notification(log_entry: ZenodoDepositionLog, stdout_callback=None):
+ """Send email notification to all admin users."""
+ admin_emails = list(User.objects.filter(is_staff=True, is_active=True).values_list('email', flat=True))
+
+ if not admin_emails:
+ if stdout_callback:
+ stdout_callback("No admin users found to notify")
+ return
+
+ # Build email
+ if log_entry.status == 'success':
+ subject = f'✅ Zenodo Deposition Successful - {log_entry.version or log_entry.deposition_id}'
+ status_emoji = '✅'
+ status_text = 'SUCCESS'
+ else:
+ subject = f'❌ Zenodo Deposition Failed - {log_entry.deposition_id}'
+ status_emoji = '❌'
+ status_text = 'FAILED'
+
+ files_text = "\n".join([
+ f" • {f['name']} ({f['size']:,} bytes)"
+ for f in log_entry.files_uploaded
+ ]) if log_entry.files_uploaded else " (none)"
+
+ duration_text = "N/A"
+ if log_entry.upload_duration_seconds:
+ minutes = int(log_entry.upload_duration_seconds // 60)
+ seconds = int(log_entry.upload_duration_seconds % 60)
+ duration_text = f"{minutes}m {seconds}s" if minutes > 0 else f"{seconds}s"
+
+ message_parts = [
+ f"{status_emoji} ZENODO DEPOSITION {status_text}",
+ "=" * 70,
+ "",
+ f"Deposition ID: {log_entry.deposition_id}",
+ f"Version: {log_entry.version or 'N/A'}",
+ f"API Base: {log_entry.api_base}",
+ f"Date: {log_entry.deposition_date.strftime('%Y-%m-%d %H:%M:%S')} UTC",
+ f"Duration: {duration_text}",
+ "",
+ ]
+
+ if log_entry.status == 'success':
+ message_parts.extend([
+ f"Works Included: {log_entry.works_count:,}",
+ f"Files Uploaded: {len(log_entry.files_uploaded) if log_entry.files_uploaded else 0}",
+ f"Total Size: {log_entry.total_size_bytes:,} bytes",
+ "",
+ "Files:",
+ files_text,
+ "",
+ ])
+
+ if log_entry.zenodo_url:
+ message_parts.extend([
+ "⚠️ ACTION REQUIRED ⚠️",
+ "",
+ "The deposition is in DRAFT state and not yet published.",
+ "Please review and publish manually:",
+ "",
+ f" {log_entry.zenodo_url}",
+ "",
+ "⚠️ Publishing cannot be undone!",
+ "",
+ ])
+
+ if log_entry.doi:
+ message_parts.append(f"DOI: {log_entry.doi}")
+
+ if log_entry.deposition_summary:
+ message_parts.extend(["", "Summary:", f" {log_entry.deposition_summary}"])
+ else:
+ message_parts.extend([
+ "ERROR:",
+ f" {log_entry.error_message or 'Unknown error'}",
+ "",
+ ])
+
+ if log_entry.error_details:
+ message_parts.extend([
+ "Error Details:",
+ f" Type: {log_entry.error_details.get('exception_type', 'N/A')}",
+ "",
+ ])
+
+ if 'traceback' in log_entry.error_details:
+ message_parts.extend([
+ "Traceback:",
+ log_entry.error_details['traceback'],
+ ])
+
+ message_parts.extend([
+ "",
+ "=" * 70,
+ "",
+ ])
+
+ site_url = getattr(settings, 'SITE_URL', None)
+ if site_url:
+ message_parts.append(f"View full log: {site_url}/admin/works/zenododepositionlog/{log_entry.id}/change/")
+ else:
+ message_parts.append(f"View full log in admin: /admin/works/zenododepositionlog/{log_entry.id}/change/")
+
+ message_parts.extend([
+ "",
+ "This is an automated message from OPTIMAP.",
+ ])
+
+ message = "\n".join(message_parts)
+
+ try:
+ send_mail(
+ subject=subject,
+ message=message,
+ from_email=settings.DEFAULT_FROM_EMAIL,
+ recipient_list=admin_emails,
+ fail_silently=False,
+ )
+ if stdout_callback:
+ stdout_callback(f"Admin notification sent to {len(admin_emails)} admin(s)")
+ except Exception as ex:
+ if stdout_callback:
+ stdout_callback(f"Warning: Failed to send admin notification: {ex}")
+
+
+def deposit_to_zenodo(
+ deposition_id: str | None = None,
+ api_base: str | None = None,
+ token: str | None = None,
+ patch_fields: str | None = None,
+ merge_keywords: bool = False,
+ merge_related: bool = False,
+ project_root: Path | None = None,
+ stdout_callback=None,
+) -> ZenodoDepositionLog:
+ """
+ Deposit rendered files to Zenodo.
+
+ Resolution / bootstrap flow for ``deposition_id``:
+
+ 1. Explicit argument wins.
+ 2. Else fall back to the latest successful ZenodoDepositionLog for this
+ ``api_base`` — so scheduled and re-triggered runs find the same draft
+ (or the previously published record, see step 4) without manual env
+ edits.
+ 3. Else POST /deposit/depositions to bootstrap a fresh draft.
+ 4. After resolving the ID, GET the deposition. If it's already published
+ (``submitted=true`` AND ``state="done"``), POST .../actions/newversion
+ to fork an editable draft and target *that* instead — issue #63 only
+ requires manual *publication*, so the next deposit cycle should
+ silently start the next version.
+
+ Args:
+ deposition_id: Zenodo deposition ID (optional — resolved/bootstrapped
+ when omitted, per the flow above).
+ api_base: Zenodo API base URL (default: from settings)
+ token: Zenodo API token (default: from settings/env)
+ patch_fields: Comma-separated fields to update (default: description,version,keywords,related_identifiers)
+ merge_keywords: Merge keywords instead of replacing
+ merge_related: Merge related_identifiers instead of replacing
+ project_root: Project root directory
+ stdout_callback: Callback for logging messages
+
+ Returns:
+ ZenodoDepositionLog entry
+ """
+ def log(msg):
+ if stdout_callback:
+ stdout_callback(msg)
+
+ # Resolve API base
+ if api_base is None:
+ api_base = os.getenv("ZENODO_API_BASE") or getattr(settings, "ZENODO_API_BASE", "https://sandbox.zenodo.org/api")
+
+ if api_base.endswith("/"):
+ raise ValueError(f"ZENODO_API_BASE must not end with '/'. Got: {api_base!r}")
+
+ # Resolve token
+ if token is None:
+ token = (
+ os.getenv("ZENODO_API_TOKEN")
+ or os.getenv("ZENODO_SANDBOX_API_TOKEN")
+ or getattr(settings, "ZENODO_API_TOKEN", None)
+ or getattr(settings, "ZENODO_SANDBOX_API_TOKEN", None)
+ )
+
+ if not token:
+ raise ValueError("No Zenodo API token. Set ZENODO_API_TOKEN or provide token parameter.")
+
+ # Determine project root
+ if project_root is None:
+ project_root = Path(
+ os.getenv("OPTIMAP_PROJECT_ROOT")
+ or getattr(settings, "PROJECT_ROOT", Path(__file__).resolve().parents[1])
+ )
+
+ data_dir = project_root / "data"
+
+ # Resolve deposition_id: explicit arg → latest successful log for this
+ # api_base → bootstrap a fresh draft. Done before log_entry creation so
+ # the log row records the *actual* target ID even on bootstrap.
+ bootstrapped = False
+ deposition_id_str = str(deposition_id) if deposition_id else ""
+ if not deposition_id_str:
+ recovered = _latest_log_deposition_id(api_base)
+ if recovered:
+ log(f"No deposition_id supplied; reusing latest from log: {recovered}")
+ deposition_id_str = recovered
+ else:
+ log("No deposition_id supplied and no prior log; creating new draft...")
+ deposition_id_str = _create_new_draft(api_base, token)
+ bootstrapped = True
+ log(f"Created new draft {deposition_id_str}")
+
+ # Initialize log
+ log_entry = ZenodoDepositionLog(
+ deposition_id=deposition_id_str,
+ api_base=api_base,
+ status='failed',
+ )
+
+ log_entry.works_count = Work.objects.count()
+
+ upload_start = time.time()
+
+ try:
+ # Load metadata
+ dyn_path = data_dir / "zenodo_dynamic.json"
+ if not dyn_path.exists():
+ raise FileNotFoundError(f"{dyn_path} not found. Run render_zenodo_package() first.")
+
+ incoming = json.loads(dyn_path.read_text(encoding="utf-8"))
+
+ # Version: written into the rendered metadata by render_zenodo_package
+ # — the previous file-based tracker (data/last_version.txt) was
+ # removed in favour of ZenodoDepositionLog as source of truth.
+ version_str = (incoming.get("version") or "").strip()
+ if version_str:
+ log_entry.version = version_str
+
+ # Fetch existing deposition (skip when we just bootstrapped it — the
+ # POST response would already be a known-good empty draft, but the
+ # GET keeps the rest of the flow uniform).
+ dep = _get_deposition(api_base, token, deposition_id_str)
+
+ # New-version handoff: if the targeted record is already published,
+ # fork a new draft and switch to it before patching/uploading.
+ if _is_published(dep):
+ log(
+ f"Deposition {deposition_id_str} is already published; "
+ "creating a new version draft..."
+ )
+ deposition_id_str = _create_new_version(api_base, token, deposition_id_str)
+ log_entry.deposition_id = deposition_id_str
+ log(f"New version draft: {deposition_id_str}")
+ dep = _get_deposition(api_base, token, deposition_id_str)
+
+ existing_meta = dep.get("metadata", {}) or {}
+
+ # Determine fields to patch
+ if patch_fields is None:
+ patch_fields = (
+ "description,version,keywords,related_identifiers,"
+ "additional_descriptions,grants,title,upload_type,"
+ "publication_date,creators"
+ )
+
+ fields_to_patch = {x.strip() for x in patch_fields.split(",") if x.strip()}
+
+ merged = dict(existing_meta)
+
+ # Remove protected fields from incoming
+ for req in _REQ_PRESERVE:
+ if req in incoming and req not in fields_to_patch:
+ incoming.pop(req, None)
+
+ # Update description from README
+ if "description" in fields_to_patch:
+ readme_md = (data_dir / "README.md").read_text(encoding="utf-8")
+ merged["description"] = _markdown_to_html(readme_md)
+
+ # Update other fields
+ for key in fields_to_patch - {"description"}:
+ if key == "keywords":
+ if merge_keywords:
+ merged["keywords"] = _merge_keywords(existing_meta.get("keywords"), incoming.get("keywords"))
+ else:
+ merged["keywords"] = incoming.get("keywords", [])
+ elif key == "related_identifiers":
+ if merge_related:
+ merged["related_identifiers"] = _merge_related(
+ existing_meta.get("related_identifiers"), incoming.get("related_identifiers")
+ )
+ else:
+ merged["related_identifiers"] = incoming.get("related_identifiers", [])
+ else:
+ if key in incoming:
+ merged[key] = incoming[key]
+
+ # Track changes
+ changed = [k for k in merged.keys() if existing_meta.get(k) != merged.get(k)]
+ log(f"Metadata fields changed: {', '.join(changed) if changed else '(none)'}")
+
+ log_entry.metadata_merged = {k: merged[k] for k in changed} if changed else {}
+
+ # PUT metadata — with a one-shot fallback for the curated `grants`
+ # vocabulary. Zenodo only resolves grants in its preloaded list; if a
+ # specific BMBF/BMFTR ID isn't there yet, the API returns 400 and we
+ # retry once with `grants` removed and the funding info moved to a
+ # free-text `notes` paragraph so the deposit still succeeds.
+ put_url = f"{api_base}/deposit/depositions/{deposition_id_str}"
+
+ def _put(payload: dict):
+ return requests.put(
+ put_url,
+ params={"access_token": token},
+ headers={"Content-Type": "application/json"},
+ data=json.dumps({"metadata": payload}),
+ )
+
+ res = _put(merged)
+ if res.status_code == 400 and "grants" in merged and "grants" in res.text.lower():
+ fallback = _funding_fallback_text()
+ log(
+ "Zenodo rejected the structured grants metadata; "
+ "falling back to free-text in `notes`."
+ )
+ del merged["grants"]
+ existing_notes = (merged.get("notes") or "").strip()
+ merged["notes"] = (
+ f"{existing_notes}\n\n{fallback}".strip() if existing_notes else fallback
+ )
+ log_entry.notes = (
+ (log_entry.notes + "\n" if log_entry.notes else "")
+ + f"[fallback] {fallback}"
+ )
+ res = _put(merged)
+ res.raise_for_status()
+ log("Metadata updated.")
+
+ # Delete existing files
+ log("Deleting existing files...")
+ existing_files = dep.get("files", [])
+ for file_obj in existing_files:
+ file_id = file_obj.get("id")
+ if file_id:
+ delete_url = f"{api_base}/deposit/depositions/{deposition_id_str}/files/{file_id}"
+ del_res = requests.delete(delete_url, params={"access_token": token})
+ if del_res.status_code == 204:
+ log(f" - Deleted: {file_obj.get('filename')}")
+ else:
+ log(f" - Failed to delete {file_obj.get('filename')}: {del_res.status_code}")
+
+ # Upload files
+ log("Uploading files...")
+ paths = _build_upload_list(data_dir)
+
+ files_info = []
+ total_size = 0
+ for p in paths:
+ try:
+ size = p.stat().st_size
+ total_size += size
+ files_info.append({"name": p.name, "size": size})
+ except Exception:
+ size = 0
+ files_info.append({"name": p.name, "size": 0})
+ log(f" - {p.name} ({size} bytes)")
+
+ log_entry.files_uploaded = files_info
+ log_entry.total_size_bytes = total_size
+
+ # Use zenodo_client for upload
+ z = Zenodo(sandbox=("sandbox." in api_base))
+ z.access_token = token
+ resp = z.update(deposition_id=deposition_id_str, paths=[str(p) for p in paths], publish=False)
+
+ upload_duration = time.time() - upload_start
+ log_entry.upload_duration_seconds = upload_duration
+
+ # Extract response data
+ try:
+ resp_data = resp.json()
+ html = resp_data.get("links", {}).get("html")
+ doi = resp_data.get("doi")
+
+ if html:
+ log_entry.zenodo_url = html
+ if doi:
+ log_entry.doi = doi
+ except Exception:
+ html = None
+
+ # Mark success
+ log_entry.status = 'success'
+ bootstrap_note = " (bootstrapped a new draft)" if bootstrapped else ""
+ log_entry.deposition_summary = (
+ f"Successfully uploaded {len(files_info)} files "
+ f"({_format_bytes(total_size)}) to Zenodo deposition {deposition_id_str}{bootstrap_note}. "
+ f"Updated metadata fields: {', '.join(changed) if changed else '(none)'}. "
+ f"Upload duration: {upload_duration:.2f}s"
+ )
+
+ if html:
+ log(f"✅ Updated deposition {deposition_id_str} at {html}")
+ else:
+ log(f"✅ Updated deposition {deposition_id_str}")
+
+ except Exception as ex:
+ log_entry.status = 'failed'
+ log_entry.error_message = str(ex)
+ log_entry.error_details = {
+ "exception_type": type(ex).__name__,
+ "traceback": traceback.format_exc(),
+ }
+ log_entry.upload_duration_seconds = time.time() - upload_start
+ log_entry.deposition_summary = f"Failed to upload to Zenodo: {str(ex)}"
+
+ log_entry.save()
+ _send_admin_notification(log_entry, stdout_callback)
+ raise
+
+ # Save and notify
+ log_entry.save()
+ log(f"Deposition log saved (ID: {log_entry.id})")
+ _send_admin_notification(log_entry, stdout_callback)
+
+ return log_entry
+
+
+def _format_bytes(size_bytes: int) -> str:
+ """Format bytes in human-readable format."""
+ for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
+ if size_bytes < 1024.0:
+ return f"{size_bytes:.2f} {unit}"
+ size_bytes /= 1024.0
+ return f"{size_bytes:.2f} PB"