Skip to content

Commit 2ccef90

Browse files
committed
Support for serializing LazyArray as well
1 parent cb0bd6a commit 2ccef90

3 files changed

Lines changed: 203 additions & 4 deletions

File tree

src/blosc2/_msgpack_utils.py

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,54 @@
2323
_BLOSC2_STRUCTURED_VERSION = 1
2424

2525

26-
def _encode_structured_reference(obj):
26+
def _encode_operand_reference(obj):
2727
import blosc2
2828

2929
if isinstance(obj, blosc2.C2Array):
30-
payload = {
30+
return {
3131
"kind": "c2array",
3232
"version": _BLOSC2_STRUCTURED_VERSION,
3333
"path": obj.path,
3434
"urlbase": obj.urlbase,
3535
}
36+
if isinstance(obj, blosc2.Proxy):
37+
obj = obj._cache
38+
if hasattr(obj, "schunk"):
39+
urlpath = obj.schunk.urlpath
40+
if urlpath is None:
41+
raise ValueError(
42+
"Structured Blosc2 msgpack payload requires operands to be stored on disk/network"
43+
)
44+
return {
45+
"kind": "urlpath",
46+
"version": _BLOSC2_STRUCTURED_VERSION,
47+
"urlpath": urlpath,
48+
}
49+
raise TypeError("Structured Blosc2 msgpack payload requires NDArray, C2Array, or Proxy operands")
50+
51+
52+
def _encode_structured_reference(obj):
53+
import blosc2
54+
55+
if isinstance(obj, blosc2.C2Array):
56+
payload = _encode_operand_reference(obj)
57+
return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True))
58+
if isinstance(obj, blosc2.LazyExpr):
59+
expression = obj.expression_tosave if hasattr(obj, "expression_tosave") else obj.expression
60+
operands = obj.operands_tosave if hasattr(obj, "operands_tosave") else obj.operands
61+
payload = {
62+
"kind": "lazyexpr",
63+
"version": _BLOSC2_STRUCTURED_VERSION,
64+
"expression": expression,
65+
"operands": {key: _encode_operand_reference(value) for key, value in operands.items()},
66+
}
3667
return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True))
3768
return None
3869

3970

40-
def _decode_structured_reference(data):
71+
def _decode_operand_reference(payload):
4172
import blosc2
4273

43-
payload = unpackb(data)
4474
if not isinstance(payload, dict):
4575
raise TypeError("Structured Blosc2 msgpack payload must decode to a mapping")
4676

@@ -57,6 +87,37 @@ def _decode_structured_reference(data):
5787
if urlbase is not None and not isinstance(urlbase, str):
5888
raise TypeError("Structured C2Array msgpack payload requires 'urlbase' to be a string or None")
5989
return blosc2.C2Array(path, urlbase=urlbase)
90+
if kind == "urlpath":
91+
urlpath = payload.get("urlpath")
92+
if not isinstance(urlpath, str):
93+
raise TypeError("Structured urlpath msgpack payload requires a string 'urlpath'")
94+
return blosc2.open(urlpath, mode="r")
95+
raise ValueError(f"Unsupported structured Blosc2 msgpack payload operand kind: {kind!r}")
96+
97+
98+
def _decode_structured_reference(data):
99+
import blosc2
100+
101+
payload = unpackb(data)
102+
if not isinstance(payload, dict):
103+
raise TypeError("Structured Blosc2 msgpack payload must decode to a mapping")
104+
105+
version = payload.get("version")
106+
if version != _BLOSC2_STRUCTURED_VERSION:
107+
raise ValueError(f"Unsupported structured Blosc2 msgpack payload version: {version!r}")
108+
109+
kind = payload.get("kind")
110+
if kind == "c2array":
111+
return _decode_operand_reference(payload)
112+
if kind == "lazyexpr":
113+
expression = payload.get("expression")
114+
if not isinstance(expression, str):
115+
raise TypeError("Structured LazyExpr msgpack payload requires a string 'expression'")
116+
operands_payload = payload.get("operands")
117+
if not isinstance(operands_payload, dict):
118+
raise TypeError("Structured LazyExpr msgpack payload requires a mapping 'operands'")
119+
operands = {key: _decode_operand_reference(value) for key, value in operands_payload.items()}
120+
return blosc2.lazyexpr(expression, operands=operands)
60121
raise ValueError(f"Unsupported structured Blosc2 msgpack payload kind: {kind!r}")
61122

62123

tests/test_batch_store.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,20 @@ def fake_info(path_, urlbase_, params=None, headers=None, model=None, auth_token
5555
return blosc2.C2Array(path, urlbase=urlbase)
5656

5757

58+
def _make_persistent_lazyexpr(tmp_path):
59+
a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=tmp_path / "a.b2nd", mode="w")
60+
b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2, urlpath=tmp_path / "b.b2nd", mode="w")
61+
expr = blosc2.lazyexpr("a + b", operands={"a": a, "b": b})
62+
expected = np.arange(5, dtype=np.int64) * 3
63+
return expr, expected
64+
65+
66+
def _make_in_memory_lazyexpr():
67+
a = blosc2.asarray(np.arange(5, dtype=np.int64))
68+
b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2)
69+
return blosc2.lazyexpr("a + b", operands={"a": a, "b": b})
70+
71+
5872
@pytest.mark.parametrize(
5973
("contiguous", "urlpath"),
6074
[
@@ -260,6 +274,96 @@ def test_batchstore_msgpack_supports_c2array(monkeypatch):
260274
assert restored.auth_token is None
261275

262276

277+
def test_msgpack_supports_lazyexpr(tmp_path):
278+
expr, expected = _make_persistent_lazyexpr(tmp_path)
279+
280+
payload = msgpack_packb({"expr": expr})
281+
restored = msgpack_unpackb(payload)["expr"]
282+
283+
assert isinstance(restored, blosc2.LazyExpr)
284+
np.testing.assert_array_equal(restored[:], expected)
285+
286+
287+
def test_batchstore_msgpack_supports_lazyexpr(tmp_path):
288+
expr, expected = _make_persistent_lazyexpr(tmp_path)
289+
290+
barray = blosc2.BatchStore(items_per_block=2)
291+
barray.append([expr])
292+
293+
restored = barray[0][0]
294+
295+
assert isinstance(restored, blosc2.LazyExpr)
296+
np.testing.assert_array_equal(restored[:], expected)
297+
298+
299+
def test_msgpack_rejects_lazyexpr_with_in_memory_operands():
300+
expr = _make_in_memory_lazyexpr()
301+
302+
with pytest.raises(ValueError, match="stored on disk/network"):
303+
msgpack_packb({"expr": expr})
304+
305+
306+
def test_batchstore_msgpack_rejects_lazyexpr_with_in_memory_operands():
307+
expr = _make_in_memory_lazyexpr()
308+
309+
barray = blosc2.BatchStore(items_per_block=2)
310+
with pytest.raises(ValueError, match="stored on disk/network"):
311+
barray.append([expr])
312+
313+
314+
@pytest.mark.network
315+
def test_msgpack_supports_lazyexpr_with_c2array_operand(cat2_context, tmp_path):
316+
path = "@public/expr/ds-1-2-linspace-float64-b2-(5,)d.b2nd"
317+
a = blosc2.C2Array(path)
318+
a_values = np.asarray(a[:])
319+
b = blosc2.asarray(a_values * 2, urlpath=tmp_path / "b.b2nd", mode="w")
320+
expr = blosc2.lazyexpr("a + b", operands={"a": a, "b": b})
321+
322+
restored = msgpack_unpackb(msgpack_packb({"expr": expr}))["expr"]
323+
324+
assert isinstance(restored, blosc2.LazyExpr)
325+
np.testing.assert_allclose(restored[:], a_values + b[:])
326+
327+
328+
@pytest.mark.network
329+
def test_batchstore_msgpack_supports_lazyexpr_with_c2array_operand(cat2_context, tmp_path):
330+
path = "@public/expr/ds-1-2-linspace-float64-b2-(5,)d.b2nd"
331+
a = blosc2.C2Array(path)
332+
a_values = np.asarray(a[:])
333+
b = blosc2.asarray(a_values * 2, urlpath=tmp_path / "b.b2nd", mode="w")
334+
expr = blosc2.lazyexpr("a + b", operands={"a": a, "b": b})
335+
336+
barray = blosc2.BatchStore(items_per_block=2)
337+
barray.append([expr])
338+
restored = barray[0][0]
339+
340+
assert isinstance(restored, blosc2.LazyExpr)
341+
np.testing.assert_allclose(restored[:], a_values + b[:])
342+
343+
344+
@pytest.mark.xfail(
345+
strict=True, reason="Structured LazyExpr urlpath operands do not preserve .b2z member offsets"
346+
)
347+
def test_msgpack_lazyexpr_with_b2z_operands(tmp_path):
348+
store_path = tmp_path / "operands.b2z"
349+
ext_a = tmp_path / "a.b2nd"
350+
ext_b = tmp_path / "b.b2nd"
351+
expected = np.arange(5, dtype=np.int64) * 3
352+
353+
a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=str(ext_a), mode="w")
354+
b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2, urlpath=str(ext_b), mode="w")
355+
with blosc2.DictStore(str(store_path), mode="w", threshold=None) as dstore:
356+
dstore["/a"] = a
357+
dstore["/b"] = b
358+
359+
with blosc2.DictStore(str(store_path), mode="r") as dstore:
360+
expr = blosc2.lazyexpr("a + b", operands={"a": dstore["/a"], "b": dstore["/b"]})
361+
restored = msgpack_unpackb(msgpack_packb({"expr": expr}))["expr"]
362+
363+
assert isinstance(restored, blosc2.LazyExpr)
364+
np.testing.assert_array_equal(restored[:], expected)
365+
366+
263367
@pytest.mark.network
264368
def test_msgpack_roundtrip_c2array_network(cat2_context):
265369
path = "@public/expr/ds-1-2-linspace-float64-b2-(5,)d.b2nd"

tests/test_vlarray.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,20 @@ def fake_info(path_, urlbase_, params=None, headers=None, model=None, auth_token
5454
return blosc2.C2Array(path, urlbase=urlbase)
5555

5656

57+
def _make_persistent_lazyexpr(tmp_path):
58+
a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=tmp_path / "a.b2nd", mode="w")
59+
b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2, urlpath=tmp_path / "b.b2nd", mode="w")
60+
expr = blosc2.lazyexpr("a + b", operands={"a": a, "b": b})
61+
expected = np.arange(5, dtype=np.int64) * 3
62+
return expr, expected
63+
64+
65+
def _make_in_memory_lazyexpr():
66+
a = blosc2.asarray(np.arange(5, dtype=np.int64))
67+
b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2)
68+
return blosc2.lazyexpr("a + b", operands={"a": a, "b": b})
69+
70+
5771
@pytest.mark.parametrize(
5872
("contiguous", "urlpath"),
5973
[
@@ -192,6 +206,26 @@ def test_vlarray_msgpack_supports_c2array(monkeypatch):
192206
assert restored.auth_token is None
193207

194208

209+
def test_vlarray_msgpack_supports_lazyexpr(tmp_path):
210+
expr, expected = _make_persistent_lazyexpr(tmp_path)
211+
212+
vlarray = blosc2.VLArray()
213+
vlarray.append(expr)
214+
215+
restored = vlarray[0]
216+
217+
assert isinstance(restored, blosc2.LazyExpr)
218+
np.testing.assert_array_equal(restored[:], expected)
219+
220+
221+
def test_vlarray_msgpack_rejects_lazyexpr_with_in_memory_operands():
222+
expr = _make_in_memory_lazyexpr()
223+
224+
vlarray = blosc2.VLArray()
225+
with pytest.raises(ValueError, match="stored on disk/network"):
226+
vlarray.append(expr)
227+
228+
195229
@pytest.mark.network
196230
def test_vlarray_msgpack_roundtrip_c2array_network(cat2_context):
197231
path = "@public/expr/ds-1-2-linspace-float64-b2-(5,)d.b2nd"

0 commit comments

Comments
 (0)