Skip to content

Commit c5bbc40

Browse files
committed
Add persistent query-result cache for indexed lookups
Implements a two-level cache for range/value queries on indexed NDArrays: - Hot cache: in-process LRU (≤128 KB) keyed by BLAKE2b digest of the query descriptor (expression, tokens, order). - Persistent cache: results stored as a VLArray in the same .b2nd file (vlmeta catalog + compressed payload), surviving process restarts. Cache is invalidated automatically on append, drop_index, compact_index, and mark_indexes_stale. Only persistent (on-disk) arrays use the persistent layer; in-memory arrays use the hot cache only. Wire-up in lazyexpr.slices_eval: - Value path (arr[cond][:]): check/store persistent + hot cache. - Indices path (.indices().compute()): check/store hot cache. - Cache hit short-circuits plan_query via a minimal IndexPlan. Add 38 new tests in tests/ndarray/test_indexing.py covering: - Cold/warm hits for both in-memory and on-disk arrays. - Cross-array isolation (no cache poisoning between arrays). - Invalidation on append and index rebuild. - Persistence across process-simulated reopens (clear hot cache). - LRU eviction and entry size cap enforcement.
1 parent 40fc4ab commit c5bbc40

3 files changed

Lines changed: 982 additions & 2 deletions

File tree

src/blosc2/indexing.py

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from __future__ import annotations
99

1010
import ast
11+
import contextlib
1112
import enum
1213
import hashlib
1314
import math
@@ -51,6 +52,25 @@
5152
_PERSISTENT_INDEXES: dict[tuple[str, str | int], dict] = {}
5253
_DATA_CACHE: dict[tuple[int, str | None, str, str], np.ndarray] = {}
5354
_SIDECAR_HANDLE_CACHE: dict[tuple[int, str | None, str, str], object] = {}
55+
56+
# ---------------------------------------------------------------------------
57+
# Query-result cache constants and global state
58+
# ---------------------------------------------------------------------------
59+
QUERY_CACHE_VLMETA_KEY = "_blosc2_query_cache"
60+
QUERY_CACHE_FORMAT_VERSION = 1
61+
QUERY_CACHE_MAX_ENTRY_CBYTES = 4096 # 4 KB per persistent entry
62+
QUERY_CACHE_MAX_MEM_CBYTES = 131_072 # 128 KB for the in-process hot cache
63+
QUERY_CACHE_MAX_PERSISTENT_CBYTES = 2_147_483_648 # 2 GB for the payload store
64+
65+
# In-process hot cache: digest -> decoded np.ndarray of coordinates.
66+
_HOT_CACHE: dict[str, np.ndarray] = {}
67+
# Insertion-order list for LRU eviction.
68+
_HOT_CACHE_ORDER: list[str] = []
69+
# Total bytes of arrays currently in the hot cache.
70+
_HOT_CACHE_BYTES: int = 0
71+
# Persistent VLArray handles: resolved urlpath -> open VLArray object.
72+
_QUERY_CACHE_STORE_HANDLES: dict[str, object] = {}
73+
5474
FULL_OOC_RUN_ITEMS = 2_000_000
5575
FULL_OOC_MERGE_BUFFER_ITEMS = 500_000
5676
FULL_SELECTIVE_OOC_MAX_SPANS = 128
@@ -267,6 +287,309 @@ def _save_store(array: blosc2.NDArray, store: dict) -> None:
267287
_IN_MEMORY_INDEX_FINALIZERS.setdefault(key, weakref.finalize(array, _cleanup_in_memory_store, key))
268288

269289

290+
# ---------------------------------------------------------------------------
291+
# Stage 1 – Query cache: metadata helpers and container plumbing
292+
# ---------------------------------------------------------------------------
293+
294+
295+
def _query_cache_payload_path(array: blosc2.NDArray) -> str:
296+
"""Return the path for the persistent query-cache VLArray payload store."""
297+
path, root = _sanitize_sidecar_root(array.urlpath)
298+
return str(path.with_name(f"{root}.__query_cache__.b2frame"))
299+
300+
301+
def _default_query_cache_catalog(payload_path: str) -> dict:
302+
return {
303+
"version": QUERY_CACHE_FORMAT_VERSION,
304+
"payload_ref": {"kind": "urlpath", "version": 1, "urlpath": payload_path},
305+
"max_entry_cbytes": QUERY_CACHE_MAX_ENTRY_CBYTES,
306+
"max_mem_cbytes": QUERY_CACHE_MAX_MEM_CBYTES,
307+
"max_persistent_cbytes": QUERY_CACHE_MAX_PERSISTENT_CBYTES,
308+
"persistent_cbytes": 0,
309+
"next_slot": 0,
310+
"entries": {},
311+
}
312+
313+
314+
def _load_query_cache_catalog(array: blosc2.NDArray) -> dict | None:
315+
"""Read the query-cache catalog from *array* vlmeta, or return None."""
316+
if not _is_persistent_array(array):
317+
return None
318+
try:
319+
cat = array.schunk.vlmeta[QUERY_CACHE_VLMETA_KEY]
320+
except KeyError:
321+
return None
322+
if not isinstance(cat, dict) or cat.get("version") != QUERY_CACHE_FORMAT_VERSION:
323+
return None
324+
return cat
325+
326+
327+
def _save_query_cache_catalog(array: blosc2.NDArray, catalog: dict) -> None:
328+
"""Write *catalog* back to *array* vlmeta."""
329+
array.schunk.vlmeta[QUERY_CACHE_VLMETA_KEY] = catalog
330+
331+
332+
def _open_query_cache_store(array: blosc2.NDArray, *, create: bool = False):
333+
"""Return an open (writable) VLArray for the persistent payload store.
334+
335+
Returns ``None`` if the array is not persistent. When *create* is True the
336+
store is created if it does not yet exist.
337+
"""
338+
if not _is_persistent_array(array):
339+
return None
340+
path = _query_cache_payload_path(array)
341+
cached = _QUERY_CACHE_STORE_HANDLES.get(path)
342+
if cached is not None:
343+
return cached
344+
if Path(path).exists():
345+
vla = blosc2.VLArray(storage=blosc2.Storage(urlpath=path, mode="a"))
346+
_QUERY_CACHE_STORE_HANDLES[path] = vla
347+
return vla
348+
if not create:
349+
return None
350+
vla = blosc2.VLArray(storage=blosc2.Storage(urlpath=path, mode="w"))
351+
_QUERY_CACHE_STORE_HANDLES[path] = vla
352+
return vla
353+
354+
355+
def _close_query_cache_store(path: str) -> None:
356+
"""Drop a cached VLArray handle for *path*."""
357+
_QUERY_CACHE_STORE_HANDLES.pop(path, None)
358+
359+
360+
# ---------------------------------------------------------------------------
361+
# Stage 2 – Cache key normalization
362+
# ---------------------------------------------------------------------------
363+
364+
365+
def _normalize_query_descriptor(
366+
expression: str,
367+
tokens: list[str],
368+
order: list[str] | None,
369+
) -> dict:
370+
"""Build a canonical, order-stable query descriptor for cache keying."""
371+
try:
372+
normalized_expr = ast.unparse(ast.parse(expression, mode="eval"))
373+
except Exception:
374+
normalized_expr = expression
375+
return {
376+
"version": QUERY_CACHE_FORMAT_VERSION,
377+
"kind": "indices",
378+
"tokens": sorted(tokens),
379+
"expr": normalized_expr,
380+
"order": sorted(order) if order is not None else None,
381+
}
382+
383+
384+
def _query_cache_digest(descriptor: dict) -> str:
385+
"""Return a 32-character hex digest for *descriptor*."""
386+
import json
387+
388+
canonical = json.dumps(descriptor, sort_keys=True, separators=(",", ":"))
389+
return hashlib.blake2b(canonical.encode(), digest_size=16).hexdigest()
390+
391+
392+
# ---------------------------------------------------------------------------
393+
# Stage 3 – Payload encode/decode and hot/persistent cache helpers
394+
# ---------------------------------------------------------------------------
395+
396+
397+
def _encode_coords_payload(coords: np.ndarray) -> dict:
398+
"""Encode a coordinate array as a compact msgpack-safe mapping."""
399+
dtype = np.dtype("<u4") if coords.max() <= np.iinfo(np.uint32).max else np.dtype("<u8")
400+
return {
401+
"version": QUERY_CACHE_FORMAT_VERSION,
402+
"dtype": dtype.str,
403+
"nrows": len(coords),
404+
"data": coords.astype(dtype).tobytes(),
405+
}
406+
407+
408+
def _decode_coords_payload(payload: dict) -> np.ndarray:
409+
"""Reconstruct a coordinate array from a cached payload mapping."""
410+
return np.frombuffer(payload["data"], dtype=np.dtype(payload["dtype"])).copy()
411+
412+
413+
def _hot_cache_get(digest: str) -> np.ndarray | None:
414+
"""Return the cached coordinate array for *digest*, or ``None``."""
415+
arr = _HOT_CACHE.get(digest)
416+
if arr is None:
417+
return None
418+
# Move to most-recently-used position.
419+
with contextlib.suppress(ValueError):
420+
_HOT_CACHE_ORDER.remove(digest)
421+
_HOT_CACHE_ORDER.append(digest)
422+
return arr
423+
424+
425+
def _hot_cache_put(digest: str, coords: np.ndarray) -> None:
426+
"""Insert *coords* into the hot cache, evicting LRU entries if needed."""
427+
global _HOT_CACHE_BYTES
428+
entry_bytes = coords.nbytes
429+
if entry_bytes > QUERY_CACHE_MAX_MEM_CBYTES:
430+
# Single entry too large; skip.
431+
return
432+
# If already present, remove old accounting first.
433+
if digest in _HOT_CACHE:
434+
_HOT_CACHE_BYTES -= _HOT_CACHE[digest].nbytes
435+
with contextlib.suppress(ValueError):
436+
_HOT_CACHE_ORDER.remove(digest)
437+
# Evict LRU entries until there is room.
438+
while _HOT_CACHE_ORDER and _HOT_CACHE_BYTES + entry_bytes > QUERY_CACHE_MAX_MEM_CBYTES:
439+
oldest = _HOT_CACHE_ORDER.pop(0)
440+
evicted = _HOT_CACHE.pop(oldest, None)
441+
if evicted is not None:
442+
_HOT_CACHE_BYTES -= evicted.nbytes
443+
_HOT_CACHE[digest] = coords
444+
_HOT_CACHE_ORDER.append(digest)
445+
_HOT_CACHE_BYTES += entry_bytes
446+
447+
448+
def _hot_cache_clear() -> None:
449+
"""Clear all in-process hot cache entries."""
450+
global _HOT_CACHE_BYTES
451+
_HOT_CACHE.clear()
452+
_HOT_CACHE_ORDER.clear()
453+
_HOT_CACHE_BYTES = 0
454+
455+
456+
def _persistent_cache_lookup(array: blosc2.NDArray, digest: str) -> np.ndarray | None:
457+
"""Return coordinates from the persistent cache for *digest*, or ``None``."""
458+
catalog = _load_query_cache_catalog(array)
459+
if catalog is None:
460+
return None
461+
entry = catalog.get("entries", {}).get(digest)
462+
if entry is None:
463+
return None
464+
slot = entry["slot"]
465+
store = _open_query_cache_store(array)
466+
if store is None or slot >= len(store):
467+
return None
468+
payload = store[slot]
469+
if not isinstance(payload, dict) or payload.get("version") != QUERY_CACHE_FORMAT_VERSION:
470+
return None
471+
try:
472+
coords = _decode_coords_payload(payload)
473+
except Exception:
474+
return None
475+
return coords
476+
477+
478+
def _persistent_cache_insert(
479+
array: blosc2.NDArray,
480+
digest: str,
481+
coords: np.ndarray,
482+
query_descriptor: dict,
483+
) -> bool:
484+
"""Append *coords* to the persistent cache and update the catalog.
485+
486+
Returns ``True`` on success, ``False`` if the entry is too large or the
487+
persistent budget is exceeded.
488+
"""
489+
catalog = _load_query_cache_catalog(array)
490+
payload_path = _query_cache_payload_path(array)
491+
if catalog is None:
492+
catalog = _default_query_cache_catalog(payload_path)
493+
494+
payload_mapping = _encode_coords_payload(coords)
495+
raw_data = payload_mapping["data"]
496+
497+
# Measure the compressed size of the coordinate bytes directly so the
498+
# per-entry limit is independent of VLArray/msgpack encoding overhead.
499+
coord_dtype = np.dtype(payload_mapping["dtype"])
500+
compressed_coords = blosc2.compress2(raw_data, cparams=blosc2.CParams(typesize=coord_dtype.itemsize))
501+
cbytes = len(compressed_coords)
502+
503+
max_entry = catalog.get("max_entry_cbytes", QUERY_CACHE_MAX_ENTRY_CBYTES)
504+
if cbytes > max_entry:
505+
return False
506+
507+
max_persistent = catalog.get("max_persistent_cbytes", QUERY_CACHE_MAX_PERSISTENT_CBYTES)
508+
current_persistent = int(catalog.get("persistent_cbytes", 0))
509+
if current_persistent + cbytes > max_persistent:
510+
return False
511+
512+
store = _open_query_cache_store(array, create=True)
513+
if store is None:
514+
return False
515+
516+
slot = len(store)
517+
store.append(payload_mapping)
518+
519+
catalog["entries"][digest] = {
520+
"slot": slot,
521+
"cbytes": cbytes,
522+
"nrows": len(coords),
523+
"dtype": payload_mapping["dtype"],
524+
"query": query_descriptor,
525+
}
526+
catalog["persistent_cbytes"] = current_persistent + cbytes
527+
catalog["next_slot"] = slot + 1
528+
_save_query_cache_catalog(array, catalog)
529+
return True
530+
531+
532+
# ---------------------------------------------------------------------------
533+
# Stage 5 – Query cache invalidation
534+
# ---------------------------------------------------------------------------
535+
536+
537+
def _invalidate_query_cache(array: blosc2.NDArray) -> None:
538+
"""Drop the entire query cache for *array* (persistent file + hot cache)."""
539+
if not _is_persistent_array(array):
540+
_hot_cache_clear()
541+
return
542+
payload_path = _query_cache_payload_path(array)
543+
_close_query_cache_store(payload_path)
544+
blosc2.remove_urlpath(payload_path)
545+
# Clear the catalog in vlmeta.
546+
with contextlib.suppress(KeyError, Exception):
547+
del array.schunk.vlmeta[QUERY_CACHE_VLMETA_KEY]
548+
_hot_cache_clear()
549+
550+
551+
# ---------------------------------------------------------------------------
552+
# Public helper: cached coordinate lookup (used by lazyexpr.py integration)
553+
# ---------------------------------------------------------------------------
554+
555+
556+
def get_cached_coords(
557+
array: blosc2.NDArray,
558+
expression: str,
559+
tokens: list[str],
560+
order: list[str] | None,
561+
) -> np.ndarray | None:
562+
"""Return cached coordinates for *expression*/*tokens*/*order*, or ``None``."""
563+
descriptor = _normalize_query_descriptor(expression, tokens, order)
564+
digest = _query_cache_digest(descriptor)
565+
# 1. In-process hot cache.
566+
coords = _hot_cache_get(digest)
567+
if coords is not None:
568+
return coords
569+
# 2. Persistent cache (persistent arrays only).
570+
if _is_persistent_array(array):
571+
coords = _persistent_cache_lookup(array, digest)
572+
if coords is not None:
573+
_hot_cache_put(digest, coords)
574+
return coords
575+
return None
576+
577+
578+
def store_cached_coords(
579+
array: blosc2.NDArray,
580+
expression: str,
581+
tokens: list[str],
582+
order: list[str] | None,
583+
coords: np.ndarray,
584+
) -> None:
585+
"""Store *coords* in both the hot cache and (if persistent) the payload store."""
586+
descriptor = _normalize_query_descriptor(expression, tokens, order)
587+
digest = _query_cache_digest(descriptor)
588+
_hot_cache_put(digest, coords)
589+
if _is_persistent_array(array):
590+
_persistent_cache_insert(array, digest, coords, descriptor)
591+
592+
270593
def _supported_index_dtype(dtype: np.dtype) -> bool:
271594
return np.dtype(dtype).kind in {"b", "i", "u", "f", "m", "M"}
272595

@@ -2983,6 +3306,7 @@ def append_to_indexes(array: blosc2.NDArray, old_size: int, appended_values: np.
29833306
descriptor["blocks"] = tuple(array.blocks)
29843307
descriptor["stale"] = False
29853308
_save_store(array, store)
3309+
_invalidate_query_cache(array)
29863310

29873311

29883312
def drop_index(array: blosc2.NDArray, field: str | None = None, name: str | None = None) -> None:
@@ -2993,6 +3317,7 @@ def drop_index(array: blosc2.NDArray, field: str | None = None, name: str | None
29933317
descriptor = store["indexes"].pop(token)
29943318
_save_store(array, store)
29953319
_drop_descriptor_sidecars(descriptor)
3320+
_invalidate_query_cache(array)
29963321

29973322

29983323
def rebuild_index(array: blosc2.NDArray, field: str | None = None, name: str | None = None) -> dict:
@@ -3077,6 +3402,7 @@ def compact_index(array: blosc2.NDArray, field: str | None = None, name: str | N
30773402
_replace_full_descriptor(array, descriptor, sorted_values, positions, descriptor["persistent"])
30783403
_clear_full_merge_cache(array, descriptor["token"])
30793404
_save_store(array, store)
3405+
_invalidate_query_cache(array)
30803406
return _copy_descriptor(descriptor)
30813407

30823408
dtype = np.dtype(descriptor["dtype"])
@@ -3112,6 +3438,7 @@ def compact_index(array: blosc2.NDArray, field: str | None = None, name: str | N
31123438

31133439
_clear_full_merge_cache(array, descriptor["token"])
31143440
_save_store(array, store)
3441+
_invalidate_query_cache(array)
31153442
return _copy_descriptor(descriptor)
31163443

31173444

@@ -3137,6 +3464,7 @@ def mark_indexes_stale(array: blosc2.NDArray) -> None:
31373464
changed = True
31383465
if changed:
31393466
_save_store(array, store)
3467+
_invalidate_query_cache(array)
31403468

31413469

31423470
def _descriptor_for(array: blosc2.NDArray, field: str | None) -> dict | None:

0 commit comments

Comments
 (0)