Skip to content

Commit 918deb8

Browse files
committed
More work on reducing memory consumption during index creation
1 parent 53b717c commit 918deb8

3 files changed

Lines changed: 255 additions & 203 deletions

File tree

bench/indexing/index_query_bench.py

Lines changed: 56 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,25 @@ def __exit__(self, exc_type, exc, tb):
355355
return _FullQueryModeScope()
356356

357357

358-
def index_sizes(descriptor: dict) -> tuple[int, int]:
358+
def _with_index_sidecar_mmap(no_mmap: bool):
359+
class _IndexSidecarMmapScope:
360+
def __enter__(self):
361+
self.previous = blosc2_indexing._INDEX_MMAP_MODE
362+
if no_mmap:
363+
blosc2_indexing._INDEX_MMAP_MODE = None
364+
365+
def __exit__(self, exc_type, exc, tb):
366+
blosc2_indexing._INDEX_MMAP_MODE = self.previous
367+
368+
return _IndexSidecarMmapScope()
369+
370+
371+
def _open_index_sidecar(path: str | os.PathLike[str], no_mmap: bool):
372+
mmap_mode = None if no_mmap else blosc2_indexing._INDEX_MMAP_MODE
373+
return blosc2.open(path, mmap_mode=mmap_mode)
374+
375+
376+
def index_sizes(descriptor: dict, *, no_mmap: bool) -> tuple[int, int]:
359377
logical = 0
360378
disk = 0
361379
for level_info in descriptor["levels"].values():
@@ -367,15 +385,15 @@ def index_sizes(descriptor: dict) -> tuple[int, int]:
367385
light = descriptor.get("light")
368386
if light is not None:
369387
for key in ("values_path", "bucket_positions_path", "offsets_path"):
370-
array = blosc2.open(light[key])
388+
array = _open_index_sidecar(light[key], no_mmap)
371389
logical += int(np.prod(array.shape)) * array.dtype.itemsize
372390
disk += os.path.getsize(light[key])
373391

374392
reduced = descriptor.get("reduced")
375393
if reduced is not None:
376-
values = blosc2.open(reduced["values_path"])
377-
positions = blosc2.open(reduced["positions_path"])
378-
offsets = blosc2.open(reduced["offsets_path"])
394+
values = _open_index_sidecar(reduced["values_path"], no_mmap)
395+
positions = _open_index_sidecar(reduced["positions_path"], no_mmap)
396+
offsets = _open_index_sidecar(reduced["offsets_path"], no_mmap)
379397
logical += values.shape[0] * values.dtype.itemsize
380398
logical += positions.shape[0] * positions.dtype.itemsize
381399
logical += offsets.shape[0] * offsets.dtype.itemsize
@@ -385,8 +403,8 @@ def index_sizes(descriptor: dict) -> tuple[int, int]:
385403

386404
full = descriptor.get("full")
387405
if full is not None:
388-
values = blosc2.open(full["values_path"])
389-
positions = blosc2.open(full["positions_path"])
406+
values = _open_index_sidecar(full["values_path"], no_mmap)
407+
positions = _open_index_sidecar(full["positions_path"], no_mmap)
390408
logical += values.shape[0] * values.dtype.itemsize
391409
logical += positions.shape[0] * positions.dtype.itemsize
392410
disk += os.path.getsize(full["values_path"])
@@ -462,6 +480,7 @@ def _open_or_build_indexed_array(
462480
codec: blosc2.Codec | None,
463481
clevel: int | None,
464482
nthreads: int | None,
483+
no_mmap: bool,
465484
) -> tuple[blosc2.NDArray, float]:
466485
if path.exists():
467486
arr = blosc2.open(path, mode="a")
@@ -483,7 +502,8 @@ def _open_or_build_indexed_array(
483502
cparams["nthreads"] = nthreads
484503
if cparams:
485504
kwargs["cparams"] = cparams
486-
arr.create_index(**kwargs)
505+
with _with_index_sidecar_mmap(no_mmap):
506+
arr.create_index(**kwargs)
487507
return arr, time.perf_counter() - build_start
488508

489509

@@ -503,6 +523,8 @@ def benchmark_size(
503523
clevel: int | None,
504524
nthreads: int | None,
505525
kinds: tuple[str, ...],
526+
repeats: int,
527+
no_mmap: bool,
506528
cold_row_callback=None,
507529
) -> list[dict]:
508530
arr = _open_or_build_persistent_array(
@@ -545,14 +567,21 @@ def benchmark_size(
545567
codec,
546568
clevel,
547569
nthreads,
570+
no_mmap,
548571
)
549572
idx_cond = blosc2.lazyexpr(condition_str, idx_arr.fields)
550573
idx_expr = idx_cond.where(idx_arr)
551-
with _with_full_query_mode(full_query_mode):
574+
with _with_full_query_mode(full_query_mode), _with_index_sidecar_mmap(no_mmap):
552575
explanation = idx_expr.explain()
553576
cold_time, index_len = benchmark_index_once(idx_arr, idx_cond)
577+
warm_ms = None
578+
warm_speedup = None
579+
if repeats > 0:
580+
index_runs = [benchmark_index_once(idx_arr, idx_cond)[0] for _ in range(repeats)]
581+
warm_ms = statistics.median(index_runs) * 1_000 if index_runs else None
582+
warm_speedup = None if warm_ms is None else scan_ms / warm_ms
554583
descriptor = idx_arr.indexes[0]
555-
logical_index_bytes, disk_index_bytes = index_sizes(descriptor)
584+
logical_index_bytes, disk_index_bytes = index_sizes(descriptor, no_mmap=no_mmap)
556585

557586
row = {
558587
"size": size,
@@ -566,38 +595,25 @@ def benchmark_size(
566595
"scan_ms": scan_ms,
567596
"cold_ms": cold_time * 1_000,
568597
"cold_speedup": scan_ms / (cold_time * 1_000),
569-
"warm_ms": None,
570-
"warm_speedup": None,
598+
"warm_ms": warm_ms,
599+
"warm_speedup": warm_speedup,
571600
"candidate_units": explanation["candidate_units"],
572601
"total_units": explanation["total_units"],
573602
"lookup_path": explanation.get("lookup_path"),
574603
"full_query_mode": full_query_mode,
604+
"no_mmap": no_mmap,
575605
"logical_index_bytes": logical_index_bytes,
576606
"disk_index_bytes": disk_index_bytes,
577607
"index_pct": logical_index_bytes / base_bytes * 100,
578608
"index_pct_disk": disk_index_bytes / compressed_base_bytes * 100,
579-
"_arr": idx_arr,
580-
"_cond": idx_cond,
581609
}
582610
rows.append(row)
583611
if cold_row_callback is not None:
584612
cold_row_callback(row)
613+
del idx_expr, idx_cond, idx_arr
585614
return rows
586615

587616

588-
def measure_warm_queries(rows: list[dict], repeats: int) -> None:
589-
if repeats <= 0:
590-
return
591-
for result in rows:
592-
arr = result["_arr"]
593-
cond = result["_cond"]
594-
with _with_full_query_mode(result["full_query_mode"]):
595-
index_runs = [benchmark_index_once(arr, cond)[0] for _ in range(repeats)]
596-
warm_ms = statistics.median(index_runs) * 1_000 if index_runs else None
597-
result["warm_ms"] = warm_ms
598-
result["warm_speedup"] = None if warm_ms is None else result["scan_ms"] / warm_ms
599-
600-
601617
def parse_human_size(value: str) -> int:
602618
value = value.strip()
603619
if not value:
@@ -728,6 +744,12 @@ def parse_args() -> argparse.Namespace:
728744
default=None,
729745
help="Number of threads to use for index creation. Default: use blosc2.nthreads.",
730746
)
747+
parser.add_argument(
748+
"--no-mmap",
749+
action="store_true",
750+
default=False,
751+
help="Disable mmap for index sidecar opens during benchmark planning/querying. Default: use mmap when supported.",
752+
)
731753
return parser.parse_args()
732754

733755

@@ -772,6 +794,7 @@ def main() -> None:
772794
codec,
773795
args.clevel,
774796
args.nthreads,
797+
args.no_mmap,
775798
)
776799
else:
777800
args.outdir.mkdir(parents=True, exist_ok=True)
@@ -793,6 +816,7 @@ def main() -> None:
793816
codec,
794817
args.clevel,
795818
args.nthreads,
819+
args.no_mmap,
796820
)
797821

798822

@@ -814,6 +838,7 @@ def run_benchmarks(
814838
codec: blosc2.Codec | None,
815839
clevel: int | None,
816840
nthreads: int | None,
841+
no_mmap: bool,
817842
) -> None:
818843
all_results = []
819844

@@ -831,7 +856,8 @@ def run_benchmarks(
831856
f"query_single_value={query_single_value}, "
832857
f"full_query_mode={full_query_mode}, index_codec={'auto' if codec is None else codec.name}, "
833858
f"index_clevel={'auto' if clevel is None else clevel}, "
834-
f"index_nthreads={'auto' if nthreads is None else nthreads}"
859+
f"index_nthreads={'auto' if nthreads is None else nthreads}, "
860+
f"index_mmap={'off' if no_mmap else 'on'}"
835861
)
836862
cold_widths = progress_widths(COLD_COLUMNS, sizes, dists, kinds, id_dtype)
837863
print()
@@ -860,11 +886,12 @@ def cold_progress_callback(row: dict) -> None:
860886
clevel,
861887
nthreads,
862888
kinds,
889+
repeats,
890+
no_mmap,
863891
cold_row_callback=cold_progress_callback,
864892
)
865893
all_results.extend(size_results)
866894
if repeats > 0:
867-
measure_warm_queries(all_results, repeats)
868895
warm_widths = table_widths(all_results, WARM_COLUMNS)
869896
shared_width_by_header = {}
870897
for (header, _), width in zip(COLD_COLUMNS, cold_widths, strict=True):

0 commit comments

Comments
 (0)