Skip to content

Commit f0b928f

Browse files
Merge pull request #603 from Blosc/batch-store
Batch store
2 parents 3f30f7e + 80f2b5f commit f0b928f

26 files changed

Lines changed: 3283 additions & 75 deletions

bench/batch_store.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
#######################################################################
2+
# Copyright (c) 2019-present, Blosc Development Team <blosc@blosc.org>
3+
# All rights reserved.
4+
#
5+
# SPDX-License-Identifier: BSD-3-Clause
6+
#######################################################################
7+
8+
from __future__ import annotations
9+
10+
import argparse
11+
import random
12+
import statistics
13+
import time
14+
15+
import blosc2
16+
17+
18+
URLPATH = "bench_batch_store.b2b"
19+
NBATCHES = 10_000
20+
OBJECTS_PER_BATCH = 100
21+
TOTAL_OBJECTS = NBATCHES * OBJECTS_PER_BATCH
22+
BLOCKSIZE_MAX = 32
23+
N_RANDOM_READS = 1_000
24+
25+
26+
def make_rgb(batch_index: int, item_index: int) -> dict[str, int]:
27+
global_index = batch_index * OBJECTS_PER_BATCH + item_index
28+
return {
29+
"red": batch_index,
30+
"green": item_index,
31+
"blue": global_index,
32+
}
33+
34+
35+
def make_batch(batch_index: int) -> list[dict[str, int]]:
36+
return [make_rgb(batch_index, item_index) for item_index in range(OBJECTS_PER_BATCH)]
37+
38+
39+
def expected_entry(batch_index: int, item_index: int) -> dict[str, int]:
40+
return {
41+
"red": batch_index,
42+
"green": item_index,
43+
"blue": batch_index * OBJECTS_PER_BATCH + item_index,
44+
}
45+
46+
47+
def build_parser() -> argparse.ArgumentParser:
48+
parser = argparse.ArgumentParser(
49+
description="Benchmark BatchStore single-entry reads.",
50+
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
51+
)
52+
parser.add_argument("--codec", type=str, default="ZSTD", choices=[codec.name for codec in blosc2.Codec])
53+
parser.add_argument("--clevel", type=int, default=5)
54+
parser.add_argument("--serializer", type=str, default="msgpack", choices=["msgpack", "arrow"])
55+
parser.add_argument("--use-dict", action="store_true", help="Enable dictionaries for ZSTD/LZ4/LZ4HC codecs.")
56+
parser.add_argument("--in-mem", action="store_true", help="Keep the BatchStore purely in memory.")
57+
return parser
58+
59+
60+
def build_store(
61+
codec: blosc2.Codec, clevel: int, use_dict: bool, serializer: str, in_mem: bool
62+
) -> blosc2.BatchStore | None:
63+
if in_mem:
64+
storage = blosc2.Storage(mode="w")
65+
store = blosc2.BatchStore(
66+
storage=storage,
67+
max_blocksize=BLOCKSIZE_MAX,
68+
serializer=serializer,
69+
cparams={
70+
"codec": codec,
71+
"clevel": clevel,
72+
"use_dict": use_dict and codec in (blosc2.Codec.ZSTD, blosc2.Codec.LZ4, blosc2.Codec.LZ4HC),
73+
},
74+
)
75+
for batch_index in range(NBATCHES):
76+
store.append(make_batch(batch_index))
77+
return store
78+
79+
blosc2.remove_urlpath(URLPATH)
80+
storage = blosc2.Storage(urlpath=URLPATH, mode="w", contiguous=True)
81+
cparams = {
82+
"codec": codec,
83+
"clevel": clevel,
84+
"use_dict": use_dict and codec in (blosc2.Codec.ZSTD, blosc2.Codec.LZ4, blosc2.Codec.LZ4HC),
85+
}
86+
with blosc2.BatchStore(
87+
storage=storage, max_blocksize=BLOCKSIZE_MAX, serializer=serializer, cparams=cparams
88+
) as store:
89+
for batch_index in range(NBATCHES):
90+
store.append(make_batch(batch_index))
91+
return None
92+
93+
94+
def measure_random_reads(store: blosc2.BatchStore) -> tuple[list[tuple[int, int, int, dict[str, int]]], list[int]]:
95+
rng = random.Random(2024)
96+
samples: list[tuple[int, int, int, dict[str, int]]] = []
97+
timings_ns: list[int] = []
98+
99+
for _ in range(N_RANDOM_READS):
100+
batch_index = rng.randrange(len(store))
101+
item_index = rng.randrange(OBJECTS_PER_BATCH)
102+
t0 = time.perf_counter_ns()
103+
value = store[batch_index][item_index]
104+
timings_ns.append(time.perf_counter_ns() - t0)
105+
if value != expected_entry(batch_index, item_index):
106+
raise RuntimeError(f"Value mismatch at batch={batch_index}, item={item_index}")
107+
samples.append((timings_ns[-1], batch_index, item_index, value))
108+
109+
return samples, timings_ns
110+
111+
112+
def main() -> None:
113+
parser = build_parser()
114+
args = parser.parse_args()
115+
codec = blosc2.Codec[args.codec]
116+
use_dict = args.use_dict and codec in (blosc2.Codec.ZSTD, blosc2.Codec.LZ4, blosc2.Codec.LZ4HC)
117+
118+
mode_label = "in-memory" if args.in_mem else "persistent"
119+
article = "an" if args.in_mem else "a"
120+
print(f"Building {article} {mode_label} BatchStore with 1,000,000 RGB dicts and timing 1,000 random scalar reads...")
121+
print(f" codec: {codec.name}")
122+
print(f" clevel: {args.clevel}")
123+
print(f" serializer: {args.serializer}")
124+
print(f" use_dict: {use_dict}")
125+
print(f" in_mem: {args.in_mem}")
126+
t0 = time.perf_counter()
127+
store = build_store(
128+
codec=codec, clevel=args.clevel, use_dict=use_dict, serializer=args.serializer, in_mem=args.in_mem
129+
)
130+
build_time_s = time.perf_counter() - t0
131+
if args.in_mem:
132+
assert store is not None
133+
read_store = store
134+
else:
135+
read_store = blosc2.BatchStore(urlpath=URLPATH, mode="r", contiguous=True, max_blocksize=BLOCKSIZE_MAX)
136+
samples, timings_ns = measure_random_reads(read_store)
137+
t0 = time.perf_counter()
138+
checksum = 0
139+
nitems = 0
140+
for item in read_store.iter_items():
141+
checksum += item["blue"]
142+
nitems += 1
143+
iter_time_s = time.perf_counter() - t0
144+
145+
print()
146+
print("BatchStore benchmark")
147+
print(f" build time: {build_time_s:.3f} s")
148+
print(f" batches: {len(read_store)}")
149+
print(f" items: {TOTAL_OBJECTS}")
150+
print(f" max_blocksize: {read_store.max_blocksize}")
151+
print()
152+
print(read_store.info)
153+
print(f"Random scalar reads: {N_RANDOM_READS}")
154+
print(f" mean: {statistics.fmean(timings_ns) / 1_000:.2f} us")
155+
print(f" max: {max(timings_ns) / 1_000:.2f} us")
156+
print(f" min: {min(timings_ns) / 1_000:.2f} us")
157+
print(f"Item iteration via iter_items(): {iter_time_s:.3f} s")
158+
print(f" per item: {iter_time_s * 1_000_000 / nitems:.2f} us")
159+
print(f" checksum: {checksum}")
160+
print("Sample reads:")
161+
for timing_ns, batch_index, item_index, value in samples[:5]:
162+
print(f" {timing_ns / 1_000:.2f} us -> read_store[{batch_index}][{item_index}] = {value}")
163+
if args.in_mem:
164+
print("BatchStore kept in memory")
165+
else:
166+
print(f"BatchStore file at: {read_store.urlpath}")
167+
168+
169+
if __name__ == "__main__":
170+
main()

0 commit comments

Comments
 (0)