Skip to content

Commit 0d19145

Browse files
committed
Renaming remaining BatchStore -> BatchArray
1 parent c3696ef commit 0d19145

3 files changed

Lines changed: 562 additions & 6 deletions

File tree

bench/batch_array.py

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
#######################################################################
2+
# Copyright (c) 2019-present, Blosc Development Team <blosc@blosc.org>
3+
# All rights reserved.
4+
#
5+
# SPDX-License-Identifier: BSD-3-Clause
6+
#######################################################################
7+
8+
# This benchmarks BatchArray random single-item reads. It supports
9+
# msgpack or arrow, configurable codec/compression level, optional
10+
# dictionary compression, and in-memory vs persistent mode.
11+
12+
from __future__ import annotations
13+
14+
import argparse
15+
import random
16+
import statistics
17+
import time
18+
19+
import blosc2
20+
21+
22+
URLPATH = "bench_batch_array.b2b"
23+
NBATCHES = 10_000
24+
OBJECTS_PER_BATCH = 100
25+
TOTAL_OBJECTS = NBATCHES * OBJECTS_PER_BATCH
26+
ITEMS_PER_BLOCK = 32
27+
N_RANDOM_READS = 1_000
28+
29+
30+
def make_rgb(batch_index: int, item_index: int) -> dict[str, int]:
31+
global_index = batch_index * OBJECTS_PER_BATCH + item_index
32+
return {
33+
"red": batch_index,
34+
"green": item_index,
35+
"blue": global_index,
36+
}
37+
38+
39+
def make_batch(batch_index: int) -> list[dict[str, int]]:
40+
return [make_rgb(batch_index, item_index) for item_index in range(OBJECTS_PER_BATCH)]
41+
42+
43+
def expected_entry(batch_index: int, item_index: int) -> dict[str, int]:
44+
return {
45+
"red": batch_index,
46+
"green": item_index,
47+
"blue": batch_index * OBJECTS_PER_BATCH + item_index,
48+
}
49+
50+
51+
def build_parser() -> argparse.ArgumentParser:
52+
parser = argparse.ArgumentParser(
53+
description="Benchmark BatchArray single-entry reads.",
54+
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
55+
)
56+
parser.add_argument("--codec", type=str, default="ZSTD", choices=[codec.name for codec in blosc2.Codec])
57+
parser.add_argument("--clevel", type=int, default=5)
58+
parser.add_argument("--serializer", type=str, default="msgpack", choices=["msgpack", "arrow"])
59+
parser.add_argument("--use-dict", action="store_true", help="Enable dictionaries for ZSTD/LZ4/LZ4HC codecs.")
60+
parser.add_argument("--in-mem", action="store_true", help="Keep the BatchArray purely in memory.")
61+
return parser
62+
63+
64+
def build_array(
65+
codec: blosc2.Codec, clevel: int, use_dict: bool, serializer: str, in_mem: bool
66+
) -> blosc2.BatchArray | None:
67+
if in_mem:
68+
storage = blosc2.Storage(mode="w")
69+
barr = blosc2.BatchArray(
70+
storage=storage,
71+
items_per_block=ITEMS_PER_BLOCK,
72+
serializer=serializer,
73+
cparams={
74+
"codec": codec,
75+
"clevel": clevel,
76+
"use_dict": use_dict and codec in (blosc2.Codec.ZSTD, blosc2.Codec.LZ4, blosc2.Codec.LZ4HC),
77+
},
78+
)
79+
for batch_index in range(NBATCHES):
80+
barr.append(make_batch(batch_index))
81+
return barr
82+
83+
blosc2.remove_urlpath(URLPATH)
84+
storage = blosc2.Storage(urlpath=URLPATH, mode="w", contiguous=True)
85+
cparams = {
86+
"codec": codec,
87+
"clevel": clevel,
88+
"use_dict": use_dict and codec in (blosc2.Codec.ZSTD, blosc2.Codec.LZ4, blosc2.Codec.LZ4HC),
89+
}
90+
with blosc2.BatchArray(
91+
storage=storage, items_per_block=ITEMS_PER_BLOCK, serializer=serializer, cparams=cparams
92+
) as barr:
93+
for batch_index in range(NBATCHES):
94+
barr.append(make_batch(batch_index))
95+
return None
96+
97+
98+
def measure_random_reads(barr: blosc2.BatchArray) -> tuple[list[tuple[int, int, int, dict[str, int]]], list[int]]:
99+
rng = random.Random(2024)
100+
samples: list[tuple[int, int, int, dict[str, int]]] = []
101+
timings_ns: list[int] = []
102+
103+
for _ in range(N_RANDOM_READS):
104+
batch_index = rng.randrange(len(barr))
105+
item_index = rng.randrange(OBJECTS_PER_BATCH)
106+
t0 = time.perf_counter_ns()
107+
value = barr[batch_index][item_index]
108+
timings_ns.append(time.perf_counter_ns() - t0)
109+
if value != expected_entry(batch_index, item_index):
110+
raise RuntimeError(f"Value mismatch at batch={batch_index}, item={item_index}")
111+
samples.append((timings_ns[-1], batch_index, item_index, value))
112+
113+
return samples, timings_ns
114+
115+
116+
def main() -> None:
117+
parser = build_parser()
118+
args = parser.parse_args()
119+
codec = blosc2.Codec[args.codec]
120+
use_dict = args.use_dict and codec in (blosc2.Codec.ZSTD, blosc2.Codec.LZ4, blosc2.Codec.LZ4HC)
121+
122+
mode_label = "in-memory" if args.in_mem else "persistent"
123+
article = "an" if args.in_mem else "a"
124+
print(f"Building {article} {mode_label} BatchArray with 1,000,000 RGB dicts and timing 1,000 random scalar reads...")
125+
print(f" codec: {codec.name}")
126+
print(f" clevel: {args.clevel}")
127+
print(f" serializer: {args.serializer}")
128+
print(f" use_dict: {use_dict}")
129+
print(f" in_mem: {args.in_mem}")
130+
t0 = time.perf_counter()
131+
barr = build_array(
132+
codec=codec, clevel=args.clevel, use_dict=use_dict, serializer=args.serializer, in_mem=args.in_mem
133+
)
134+
build_time_s = time.perf_counter() - t0
135+
if args.in_mem:
136+
assert barr is not None
137+
read_array = barr
138+
else:
139+
read_array = blosc2.BatchArray(urlpath=URLPATH, mode="r", contiguous=True, items_per_block=ITEMS_PER_BLOCK)
140+
samples, timings_ns = measure_random_reads(read_array)
141+
t0 = time.perf_counter()
142+
checksum = 0
143+
nitems = 0
144+
for item in read_array.iter_items():
145+
checksum += item["blue"]
146+
nitems += 1
147+
iter_time_s = time.perf_counter() - t0
148+
149+
print()
150+
print("BatchArray benchmark")
151+
print(f" build time: {build_time_s:.3f} s")
152+
print(f" batches: {len(read_array)}")
153+
print(f" items: {TOTAL_OBJECTS}")
154+
print(f" items_per_block: {read_array.items_per_block}")
155+
print()
156+
print(read_array.info)
157+
print(f"Random scalar reads: {N_RANDOM_READS}")
158+
print(f" mean: {statistics.fmean(timings_ns) / 1_000:.2f} us")
159+
print(f" max: {max(timings_ns) / 1_000:.2f} us")
160+
print(f" min: {min(timings_ns) / 1_000:.2f} us")
161+
print(f"Item iteration via iter_items(): {iter_time_s:.3f} s")
162+
print(f" per item: {iter_time_s * 1_000_000 / nitems:.2f} us")
163+
print(f" checksum: {checksum}")
164+
print("Sample reads:")
165+
for timing_ns, batch_index, item_index, value in samples[:5]:
166+
print(f" {timing_ns / 1_000:.2f} us -> read_array[{batch_index}][{item_index}] = {value}")
167+
if args.in_mem:
168+
print("BatchArray kept in memory")
169+
else:
170+
print(f"BatchArray file at: {read_array.urlpath}")
171+
172+
173+
if __name__ == "__main__":
174+
main()

0 commit comments

Comments
 (0)