Skip to content

Commit 038b6f6

Browse files
committed
Implement FIFO pruning for persistent query cache
Add persistent-cache pruning when a new query result would exceed the configured max_persistent_cbytes budget. Rebuild the query-cache VLArray with FIFO retention of the newest entries, append the new payload, and rewrite the vlmeta slot map with updated persistent_cbytes and next_slot. Skip duplicate persistent inserts for digests that are already cached, and add regression coverage that forces a tiny cache budget to exercise the rebuild path without allocating large files. Update the query-cache plan document to reflect the shipped behavior: scoped per-array hot cache, non-exact warm reuse on value/indices paths, and completed Stage 6 FIFO pruning.
1 parent b112f3a commit 038b6f6

2 files changed

Lines changed: 137 additions & 8 deletions

File tree

src/blosc2/indexing.py

Lines changed: 92 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,82 @@ def _persistent_cache_lookup(array: blosc2.NDArray, digest: str) -> np.ndarray |
512512
return coords
513513

514514

515+
def _query_cache_entry_cbytes(payload_mapping: dict) -> int:
516+
"""Return the compressed coordinate payload size used for budget accounting."""
517+
coord_dtype = np.dtype(payload_mapping["dtype"])
518+
compressed_coords = blosc2.compress2(
519+
payload_mapping["data"], cparams=blosc2.CParams(typesize=coord_dtype.itemsize)
520+
)
521+
return len(compressed_coords)
522+
523+
524+
def _query_cache_entries_fifo(catalog: dict) -> list[tuple[str, dict]]:
525+
"""Return catalog entries ordered from oldest to newest insertion."""
526+
entries = catalog.get("entries", {})
527+
return sorted(entries.items(), key=lambda item: int(item[1]["slot"]))
528+
529+
530+
def _query_cache_rebuild_store(
531+
array: blosc2.NDArray,
532+
catalog: dict,
533+
retained_entries: list[tuple[str, dict]],
534+
appended: tuple[str, dict, dict, int] | None = None,
535+
) -> bool:
536+
"""Rewrite the persistent store with retained FIFO entries and an optional appended entry."""
537+
payload_path = _query_cache_payload_path(array)
538+
temp_path = f"{payload_path}.tmp"
539+
_close_query_cache_store(payload_path)
540+
_close_query_cache_store(temp_path)
541+
blosc2.remove_urlpath(temp_path)
542+
543+
old_store = _open_query_cache_store(array)
544+
temp_store = blosc2.VLArray(storage=blosc2.Storage(urlpath=temp_path, mode="w"))
545+
new_entries = {}
546+
persistent_cbytes = 0
547+
slot = 0
548+
549+
try:
550+
for digest, entry in retained_entries:
551+
if old_store is None or int(entry["slot"]) >= len(old_store):
552+
continue
553+
payload = old_store[int(entry["slot"])]
554+
if not isinstance(payload, dict) or payload.get("version") != QUERY_CACHE_FORMAT_VERSION:
555+
continue
556+
temp_store.append(payload)
557+
updated = entry.copy()
558+
updated["slot"] = slot
559+
new_entries[digest] = updated
560+
persistent_cbytes += int(updated["cbytes"])
561+
slot += 1
562+
563+
if appended is not None:
564+
digest, payload_mapping, query_descriptor, cbytes = appended
565+
temp_store.append(payload_mapping)
566+
new_entries[digest] = {
567+
"slot": slot,
568+
"cbytes": cbytes,
569+
"nrows": payload_mapping["nrows"],
570+
"dtype": payload_mapping["dtype"],
571+
"query": query_descriptor,
572+
}
573+
persistent_cbytes += cbytes
574+
slot += 1
575+
finally:
576+
del temp_store
577+
del old_store
578+
_close_query_cache_store(payload_path)
579+
_close_query_cache_store(temp_path)
580+
581+
blosc2.remove_urlpath(payload_path)
582+
os.replace(temp_path, payload_path)
583+
584+
catalog["entries"] = new_entries
585+
catalog["persistent_cbytes"] = persistent_cbytes
586+
catalog["next_slot"] = slot
587+
_save_query_cache_catalog(array, catalog)
588+
return True
589+
590+
515591
def _persistent_cache_insert(
516592
array: blosc2.NDArray,
517593
digest: str,
@@ -527,15 +603,11 @@ def _persistent_cache_insert(
527603
payload_path = _query_cache_payload_path(array)
528604
if catalog is None:
529605
catalog = _default_query_cache_catalog(payload_path)
606+
elif digest in catalog.get("entries", {}):
607+
return True
530608

531609
payload_mapping = _encode_coords_payload(coords)
532-
raw_data = payload_mapping["data"]
533-
534-
# Measure the compressed size of the coordinate bytes directly so the
535-
# per-entry limit is independent of VLArray/msgpack encoding overhead.
536-
coord_dtype = np.dtype(payload_mapping["dtype"])
537-
compressed_coords = blosc2.compress2(raw_data, cparams=blosc2.CParams(typesize=coord_dtype.itemsize))
538-
cbytes = len(compressed_coords)
610+
cbytes = _query_cache_entry_cbytes(payload_mapping)
539611

540612
max_entry = catalog.get("max_entry_cbytes", QUERY_CACHE_MAX_ENTRY_CBYTES)
541613
if cbytes > max_entry:
@@ -544,7 +616,19 @@ def _persistent_cache_insert(
544616
max_persistent = catalog.get("max_persistent_cbytes", QUERY_CACHE_MAX_PERSISTENT_CBYTES)
545617
current_persistent = int(catalog.get("persistent_cbytes", 0))
546618
if current_persistent + cbytes > max_persistent:
547-
return False
619+
retained_entries = _query_cache_entries_fifo(catalog)
620+
retained_cbytes = current_persistent
621+
while retained_entries and retained_cbytes + cbytes > max_persistent:
622+
_, oldest = retained_entries.pop(0)
623+
retained_cbytes -= int(oldest["cbytes"])
624+
if retained_cbytes + cbytes > max_persistent:
625+
return False
626+
return _query_cache_rebuild_store(
627+
array,
628+
catalog,
629+
retained_entries,
630+
appended=(digest, payload_mapping, query_descriptor, cbytes),
631+
)
548632

549633
store = _open_query_cache_store(array, create=True)
550634
if store is None:

tests/ndarray/test_indexing.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1319,6 +1319,51 @@ def test_persistent_entry_size_limit_rejected(tmp_path):
13191319
assert result is False, "oversized entry must be rejected"
13201320

13211321

1322+
def test_persistent_cache_prunes_oldest_entries_and_rebuilds_slots(tmp_path, monkeypatch):
1323+
arr, urlpath = _make_persistent_array(tmp_path, n=8_000)
1324+
_clear_caches()
1325+
1326+
rng = np.random.default_rng(123)
1327+
payloads = []
1328+
for i in range(3):
1329+
coords = np.sort(rng.choice(8_000, size=256, replace=False)).astype(np.int64)
1330+
descriptor = indexing._normalize_query_descriptor(
1331+
f"(id >= {i}) & (id < {i + 1})", ["__self__"], None
1332+
)
1333+
digest = indexing._query_cache_digest(descriptor)
1334+
payload_mapping = indexing._encode_coords_payload(coords)
1335+
cbytes = indexing._query_cache_entry_cbytes(payload_mapping)
1336+
payloads.append((digest, descriptor, coords, cbytes))
1337+
1338+
budget = max(payloads[0][3] + payloads[1][3], payloads[1][3] + payloads[2][3])
1339+
monkeypatch.setattr(indexing, "QUERY_CACHE_MAX_PERSISTENT_CBYTES", budget)
1340+
1341+
for digest, descriptor, coords, _ in payloads:
1342+
assert indexing._persistent_cache_insert(arr, digest, coords, descriptor) is True
1343+
1344+
catalog = indexing._load_query_cache_catalog(arr)
1345+
assert catalog is not None
1346+
assert catalog["max_persistent_cbytes"] == budget
1347+
assert set(catalog["entries"]) == {payloads[1][0], payloads[2][0]}
1348+
assert catalog["entries"][payloads[1][0]]["slot"] == 0
1349+
assert catalog["entries"][payloads[2][0]]["slot"] == 1
1350+
assert catalog["next_slot"] == 2
1351+
assert catalog["persistent_cbytes"] == payloads[1][3] + payloads[2][3]
1352+
1353+
assert indexing._persistent_cache_lookup(arr, payloads[0][0]) is None
1354+
np.testing.assert_array_equal(indexing._persistent_cache_lookup(arr, payloads[1][0]), payloads[1][2])
1355+
np.testing.assert_array_equal(indexing._persistent_cache_lookup(arr, payloads[2][0]), payloads[2][2])
1356+
1357+
_clear_caches()
1358+
reopened = blosc2.open(urlpath, mode="r")
1359+
np.testing.assert_array_equal(
1360+
indexing._persistent_cache_lookup(reopened, payloads[1][0]), payloads[1][2]
1361+
)
1362+
np.testing.assert_array_equal(
1363+
indexing._persistent_cache_lookup(reopened, payloads[2][0]), payloads[2][2]
1364+
)
1365+
1366+
13221367
# ---------------------------------------------------------------------------
13231368
# Stage 5 – Invalidation
13241369
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)