Skip to content

Commit 49bbc9a

Browse files
committed
Simplify query cache accounting and overflow policy
1 parent 5b82cf9 commit 49bbc9a

3 files changed

Lines changed: 68 additions & 123 deletions

File tree

bench/indexing/query_cache_store_bench.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import blosc2
2323
from blosc2 import indexing
2424

25-
STRATEGIES = ("baseline", "cache_catalog", "skip_cbytes", "defer_vlmeta", "all")
25+
STRATEGIES = ("baseline", "cache_catalog", "skip_observer", "defer_vlmeta", "all")
2626

2727

2828
@dataclass
@@ -89,7 +89,7 @@ def _load_or_create_store(arr: blosc2.NDArray, state: InsertState | None, strate
8989

9090

9191
def _entry_nbytes(coords: np.ndarray, payload_mapping: dict, strategy: str) -> int:
92-
if strategy in {"skip_cbytes", "all"}:
92+
if strategy in {"skip_observer", "all"}:
9393
return len(payload_mapping["data"])
9494
return indexing._query_cache_entry_nbytes(coords)
9595

src/blosc2/indexing.py

Lines changed: 26 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@
5959
QUERY_CACHE_VLMETA_KEY = "_blosc2_query_cache"
6060
QUERY_CACHE_FORMAT_VERSION = 1
6161
QUERY_CACHE_MAX_ENTRY_NBYTES = 65_536 # 64 KB of logical int64 positions per persistent entry
62-
QUERY_CACHE_MAX_MEM_CBYTES = 131_072 # 128 KB for the in-process hot cache
63-
QUERY_CACHE_MAX_PERSISTENT_NBYTES = 2_147_483_648 # 2 GB of logical int64 positions in the payload store
62+
QUERY_CACHE_MAX_MEM_NBYTES = 131_072 # 128 KB for the in-process hot cache
63+
QUERY_CACHE_MAX_PERSISTENT_NBYTES = 4 * 1024 * 1024 # 4 MB of logical int64 positions in the payload store
6464

6565
# In-process hot cache: (array-scope, digest) -> decoded np.ndarray of coordinates.
6666
_HOT_CACHE: dict[tuple[tuple[str, str | int], str], np.ndarray] = {}
@@ -325,7 +325,7 @@ def _default_query_cache_catalog(payload_path: str) -> dict:
325325
"version": QUERY_CACHE_FORMAT_VERSION,
326326
"payload_ref": {"kind": "urlpath", "version": 1, "urlpath": payload_path},
327327
"max_entry_nbytes": QUERY_CACHE_MAX_ENTRY_NBYTES,
328-
"max_mem_cbytes": QUERY_CACHE_MAX_MEM_CBYTES,
328+
"max_mem_nbytes": QUERY_CACHE_MAX_MEM_NBYTES,
329329
"max_persistent_nbytes": QUERY_CACHE_MAX_PERSISTENT_NBYTES,
330330
"persistent_nbytes": 0,
331331
"next_slot": 0,
@@ -334,41 +334,17 @@ def _default_query_cache_catalog(payload_path: str) -> dict:
334334

335335

336336
def _normalize_query_cache_catalog(catalog: dict) -> dict:
337-
"""Normalize legacy compressed-byte cache catalogs to logical-byte accounting."""
337+
"""Ensure the prototype query-cache catalog has the current nbytes schema."""
338338
if not isinstance(catalog, dict):
339339
return _default_query_cache_catalog("")
340340
catalog.setdefault("version", QUERY_CACHE_FORMAT_VERSION)
341+
catalog.setdefault("payload_ref", {"kind": "urlpath", "version": 1, "urlpath": ""})
342+
catalog.setdefault("max_entry_nbytes", QUERY_CACHE_MAX_ENTRY_NBYTES)
343+
catalog.setdefault("max_mem_nbytes", QUERY_CACHE_MAX_MEM_NBYTES)
344+
catalog.setdefault("max_persistent_nbytes", QUERY_CACHE_MAX_PERSISTENT_NBYTES)
345+
catalog.setdefault("persistent_nbytes", 0)
346+
catalog.setdefault("next_slot", 0)
341347
catalog.setdefault("entries", {})
342-
343-
if "max_entry_nbytes" not in catalog:
344-
catalog["max_entry_nbytes"] = int(catalog.pop("max_entry_cbytes", QUERY_CACHE_MAX_ENTRY_NBYTES))
345-
else:
346-
catalog.pop("max_entry_cbytes", None)
347-
348-
catalog.setdefault("max_mem_cbytes", QUERY_CACHE_MAX_MEM_CBYTES)
349-
350-
if "max_persistent_nbytes" not in catalog:
351-
catalog["max_persistent_nbytes"] = int(
352-
catalog.pop("max_persistent_cbytes", QUERY_CACHE_MAX_PERSISTENT_NBYTES)
353-
)
354-
else:
355-
catalog.pop("max_persistent_cbytes", None)
356-
357-
if "persistent_nbytes" not in catalog:
358-
catalog["persistent_nbytes"] = int(catalog.pop("persistent_cbytes", 0))
359-
else:
360-
catalog.pop("persistent_cbytes", None)
361-
362-
total_nbytes = 0
363-
for entry in catalog["entries"].values():
364-
if "nbytes" not in entry:
365-
entry["nbytes"] = int(entry.pop("cbytes", 0))
366-
else:
367-
entry.pop("cbytes", None)
368-
total_nbytes += int(entry.get("nbytes", 0))
369-
370-
if catalog["entries"]:
371-
catalog["persistent_nbytes"] = total_nbytes
372348
return catalog
373349

374350

@@ -498,7 +474,7 @@ def _hot_cache_put(digest: str, coords: np.ndarray, scope: tuple[str, str | int]
498474
global _HOT_CACHE_BYTES
499475
key = _hot_cache_key(digest, scope)
500476
entry_bytes = coords.nbytes
501-
if entry_bytes > QUERY_CACHE_MAX_MEM_CBYTES:
477+
if entry_bytes > QUERY_CACHE_MAX_MEM_NBYTES:
502478
# Single entry too large; skip.
503479
return
504480
# If already present, remove old accounting first.
@@ -507,7 +483,7 @@ def _hot_cache_put(digest: str, coords: np.ndarray, scope: tuple[str, str | int]
507483
with contextlib.suppress(ValueError):
508484
_HOT_CACHE_ORDER.remove(key)
509485
# Evict LRU entries until there is room.
510-
while _HOT_CACHE_ORDER and _HOT_CACHE_BYTES + entry_bytes > QUERY_CACHE_MAX_MEM_CBYTES:
486+
while _HOT_CACHE_ORDER and _HOT_CACHE_BYTES + entry_bytes > QUERY_CACHE_MAX_MEM_NBYTES:
511487
oldest = _HOT_CACHE_ORDER.pop(0)
512488
evicted = _HOT_CACHE.pop(oldest, None)
513489
if evicted is not None:
@@ -558,71 +534,21 @@ def _query_cache_entry_nbytes(coords: np.ndarray) -> int:
558534
return int(np.asarray(coords).size) * np.dtype(np.int64).itemsize
559535

560536

561-
def _query_cache_entries_fifo(catalog: dict) -> list[tuple[str, dict]]:
562-
"""Return catalog entries ordered from oldest to newest insertion."""
563-
entries = catalog.get("entries", {})
564-
return sorted(entries.items(), key=lambda item: int(item[1]["slot"]))
565-
566-
567-
def _query_cache_rebuild_store(
568-
array: blosc2.NDArray,
569-
catalog: dict,
570-
retained_entries: list[tuple[str, dict]],
571-
appended: tuple[str, dict, dict, int] | None = None,
572-
) -> bool:
573-
"""Rewrite the persistent store with retained FIFO entries and an optional appended entry."""
537+
def _reset_persistent_query_cache_catalog(array: blosc2.NDArray, catalog: dict | None = None) -> dict:
538+
"""Drop persistent cache storage and return a fresh empty catalog preserving limits."""
574539
payload_path = _query_cache_payload_path(array)
575-
temp_path = f"{payload_path}.tmp"
576540
_close_query_cache_store(payload_path)
577-
_close_query_cache_store(temp_path)
578-
blosc2.remove_urlpath(temp_path)
579-
580-
old_store = _open_query_cache_store(array)
581-
temp_store = blosc2.VLArray(storage=blosc2.Storage(urlpath=temp_path, mode="w"))
582-
new_entries = {}
583-
persistent_nbytes = 0
584-
slot = 0
585-
586-
try:
587-
for digest, entry in retained_entries:
588-
if old_store is None or int(entry["slot"]) >= len(old_store):
589-
continue
590-
payload = old_store[int(entry["slot"])]
591-
if not isinstance(payload, dict) or payload.get("version") != QUERY_CACHE_FORMAT_VERSION:
592-
continue
593-
temp_store.append(payload)
594-
updated = entry.copy()
595-
updated["slot"] = slot
596-
new_entries[digest] = updated
597-
persistent_nbytes += int(updated["nbytes"])
598-
slot += 1
599-
600-
if appended is not None:
601-
digest, payload_mapping, query_descriptor, nbytes = appended
602-
temp_store.append(payload_mapping)
603-
new_entries[digest] = {
604-
"slot": slot,
605-
"nbytes": nbytes,
606-
"nrows": payload_mapping["nrows"],
607-
"dtype": payload_mapping["dtype"],
608-
"query": query_descriptor,
609-
}
610-
persistent_nbytes += nbytes
611-
slot += 1
612-
finally:
613-
del temp_store
614-
del old_store
615-
_close_query_cache_store(payload_path)
616-
_close_query_cache_store(temp_path)
617-
618541
blosc2.remove_urlpath(payload_path)
619-
os.replace(temp_path, payload_path)
620542

621-
catalog["entries"] = new_entries
622-
catalog["persistent_nbytes"] = persistent_nbytes
623-
catalog["next_slot"] = slot
624-
_save_query_cache_catalog(array, catalog)
625-
return True
543+
fresh = _default_query_cache_catalog(payload_path)
544+
if catalog is not None:
545+
fresh["max_entry_nbytes"] = int(catalog.get("max_entry_nbytes", QUERY_CACHE_MAX_ENTRY_NBYTES))
546+
fresh["max_mem_nbytes"] = int(catalog.get("max_mem_nbytes", QUERY_CACHE_MAX_MEM_NBYTES))
547+
fresh["max_persistent_nbytes"] = int(
548+
catalog.get("max_persistent_nbytes", QUERY_CACHE_MAX_PERSISTENT_NBYTES)
549+
)
550+
_save_query_cache_catalog(array, fresh)
551+
return fresh
626552

627553

628554
def _persistent_cache_insert(
@@ -653,19 +579,10 @@ def _persistent_cache_insert(
653579
max_persistent = catalog.get("max_persistent_nbytes", QUERY_CACHE_MAX_PERSISTENT_NBYTES)
654580
current_persistent = int(catalog.get("persistent_nbytes", 0))
655581
if current_persistent + nbytes > max_persistent:
656-
retained_entries = _query_cache_entries_fifo(catalog)
657-
retained_nbytes = current_persistent
658-
while retained_entries and retained_nbytes + nbytes > max_persistent:
659-
_, oldest = retained_entries.pop(0)
660-
retained_nbytes -= int(oldest["nbytes"])
661-
if retained_nbytes + nbytes > max_persistent:
582+
if nbytes > max_persistent:
662583
return False
663-
return _query_cache_rebuild_store(
664-
array,
665-
catalog,
666-
retained_entries,
667-
appended=(digest, payload_mapping, query_descriptor, nbytes),
668-
)
584+
catalog = _reset_persistent_query_cache_catalog(array, catalog)
585+
current_persistent = 0
669586

670587
store = _open_query_cache_store(array, create=True)
671588
if store is None:
@@ -701,7 +618,6 @@ def _invalidate_query_cache(array: blosc2.NDArray) -> None:
701618
payload_path = _query_cache_payload_path(array)
702619
_close_query_cache_store(payload_path)
703620
blosc2.remove_urlpath(payload_path)
704-
# Clear the catalog in vlmeta.
705621
with contextlib.suppress(KeyError, Exception):
706622
del array.schunk.vlmeta[QUERY_CACHE_VLMETA_KEY]
707623
_hot_cache_clear(scope=scope)

tests/ndarray/test_indexing.py

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,7 +1216,7 @@ def test_hot_cache_byte_limit_evicts_lru():
12161216
assert indexing._hot_cache_get("key0") is None
12171217
# Most recent keys should still be present.
12181218
assert indexing._hot_cache_get("key164") is not None
1219-
assert indexing._HOT_CACHE_BYTES <= indexing.QUERY_CACHE_MAX_MEM_CBYTES
1219+
assert indexing._HOT_CACHE_BYTES <= indexing.QUERY_CACHE_MAX_MEM_NBYTES
12201220

12211221

12221222
def test_hot_cache_clear():
@@ -1289,7 +1289,7 @@ def test_persistent_cache_not_created_for_non_persistent_array():
12891289

12901290

12911291
# ---------------------------------------------------------------------------
1292-
# Stage 3 – 4 KB per-entry size limit
1292+
# Stage 3 – Per-entry logical-byte size limit
12931293
# ---------------------------------------------------------------------------
12941294

12951295

@@ -1315,7 +1315,7 @@ def test_persistent_entry_size_limit_rejected(tmp_path):
13151315
assert result is False, "oversized entry must be rejected"
13161316

13171317

1318-
def test_persistent_cache_prunes_oldest_entries_and_rebuilds_slots(tmp_path, monkeypatch):
1318+
def test_persistent_cache_overflow_nukes_persistent_entries_and_keeps_newest(tmp_path, monkeypatch):
13191319
arr, urlpath = _make_persistent_array(tmp_path, n=8_000)
13201320
_clear_caches()
13211321

@@ -1339,23 +1339,52 @@ def test_persistent_cache_prunes_oldest_entries_and_rebuilds_slots(tmp_path, mon
13391339
catalog = indexing._load_query_cache_catalog(arr)
13401340
assert catalog is not None
13411341
assert catalog["max_persistent_nbytes"] == budget
1342-
assert set(catalog["entries"]) == {payloads[1][0], payloads[2][0]}
1343-
assert catalog["entries"][payloads[1][0]]["slot"] == 0
1344-
assert catalog["entries"][payloads[2][0]]["slot"] == 1
1345-
assert catalog["next_slot"] == 2
1346-
assert catalog["persistent_nbytes"] == payloads[1][3] + payloads[2][3]
1342+
assert set(catalog["entries"]) == {payloads[2][0]}
1343+
assert catalog["entries"][payloads[2][0]]["slot"] == 0
1344+
assert catalog["next_slot"] == 1
1345+
assert catalog["persistent_nbytes"] == payloads[2][3]
13471346

13481347
assert indexing._persistent_cache_lookup(arr, payloads[0][0]) is None
1349-
np.testing.assert_array_equal(indexing._persistent_cache_lookup(arr, payloads[1][0]), payloads[1][2])
1348+
assert indexing._persistent_cache_lookup(arr, payloads[1][0]) is None
13501349
np.testing.assert_array_equal(indexing._persistent_cache_lookup(arr, payloads[2][0]), payloads[2][2])
13511350

13521351
_clear_caches()
13531352
reopened = blosc2.open(urlpath, mode="r")
1353+
assert indexing._persistent_cache_lookup(reopened, payloads[1][0]) is None
13541354
np.testing.assert_array_equal(
1355-
indexing._persistent_cache_lookup(reopened, payloads[1][0]), payloads[1][2]
1355+
indexing._persistent_cache_lookup(reopened, payloads[2][0]), payloads[2][2]
1356+
)
1357+
1358+
1359+
def test_persistent_cache_overflow_preserves_hot_cache(tmp_path, monkeypatch):
1360+
arr, _ = _make_persistent_array(tmp_path, n=8_000)
1361+
_clear_caches()
1362+
1363+
coords1 = np.arange(0, 256, dtype=np.int64)
1364+
coords2 = np.arange(256, 512, dtype=np.int64)
1365+
expr1 = "(id >= 0) & (id < 256)"
1366+
expr2 = "(id >= 256) & (id < 512)"
1367+
1368+
budget = indexing._query_cache_entry_nbytes(coords1)
1369+
monkeypatch.setattr(indexing, "QUERY_CACHE_MAX_PERSISTENT_NBYTES", budget)
1370+
1371+
indexing.store_cached_coords(arr, expr1, [indexing.SELF_TARGET_NAME], None, coords1)
1372+
indexing.store_cached_coords(arr, expr2, [indexing.SELF_TARGET_NAME], None, coords2)
1373+
1374+
assert (
1375+
indexing._persistent_cache_lookup(
1376+
arr,
1377+
indexing._query_cache_digest(
1378+
indexing._normalize_query_descriptor(expr1, [indexing.SELF_TARGET_NAME], None)
1379+
),
1380+
)
1381+
is None
13561382
)
13571383
np.testing.assert_array_equal(
1358-
indexing._persistent_cache_lookup(reopened, payloads[2][0]), payloads[2][2]
1384+
indexing.get_cached_coords(arr, expr1, [indexing.SELF_TARGET_NAME], None), coords1
1385+
)
1386+
np.testing.assert_array_equal(
1387+
indexing.get_cached_coords(arr, expr2, [indexing.SELF_TARGET_NAME], None), coords2
13591388
)
13601389

13611390

0 commit comments

Comments
 (0)