Skip to content

Commit 46f46ee

Browse files
committed
Preserve legacy LazyExpr persistence behavior and support .b2z operands
1 parent 1eb3dd7 commit 46f46ee

3 files changed

Lines changed: 107 additions & 14 deletions

File tree

src/blosc2/b2objects.py

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import builtins
1111
import inspect
1212
import linecache
13+
import pathlib
1314
import textwrap
1415
from dataclasses import asdict
1516
from typing import Any
@@ -47,8 +48,11 @@ def encode_operand_reference(obj):
4748
return blosc2.Ref.from_object(obj).to_dict()
4849

4950

50-
def decode_operand_reference(payload):
51-
return blosc2.Ref.from_dict(payload).open()
51+
def decode_operand_reference(payload, *, base_path=None):
52+
ref = blosc2.Ref.from_dict(payload)
53+
if ref.kind == "urlpath" and base_path is not None and not pathlib.Path(ref.urlpath).is_absolute():
54+
return blosc2.open(base_path / ref.urlpath, mode="r")
55+
return ref.open()
5256

5357

5458
def encode_b2object_payload(obj) -> dict[str, Any] | None:
@@ -98,7 +102,7 @@ def encode_b2object_payload(obj) -> dict[str, Any] | None:
98102
return None
99103

100104

101-
def decode_b2object_payload(payload: dict[str, Any]):
105+
def decode_b2object_payload(payload: dict[str, Any], *, carrier_path=None):
102106
kind = payload.get("kind")
103107
version = payload.get("version")
104108
if version != _B2OBJECT_VERSION:
@@ -107,24 +111,39 @@ def decode_b2object_payload(payload: dict[str, Any]):
107111
ref = blosc2.Ref.from_dict(payload)
108112
return ref.open()
109113
if kind == "lazyexpr":
110-
return decode_structured_lazyexpr(payload)
114+
return decode_structured_lazyexpr(payload, carrier_path=carrier_path)
111115
if kind == "lazyudf":
112-
return decode_structured_lazyudf(payload)
116+
return decode_structured_lazyudf(payload, carrier_path=carrier_path)
113117
raise ValueError(f"Unsupported persisted Blosc2 object kind: {kind!r}")
114118

115119

116-
def decode_structured_lazyexpr(payload):
120+
def decode_structured_lazyexpr(payload, *, carrier_path=None):
117121
expression = payload.get("expression")
118122
if not isinstance(expression, str):
119123
raise TypeError("Structured LazyExpr payload requires a string 'expression'")
120124
operands_payload = payload.get("operands")
121125
if not isinstance(operands_payload, dict):
122126
raise TypeError("Structured LazyExpr payload requires a mapping 'operands'")
123-
operands = {key: decode_operand_reference(value) for key, value in operands_payload.items()}
127+
operands = {}
128+
missing_ops = {}
129+
for key, value in operands_payload.items():
130+
try:
131+
operands[key] = decode_operand_reference(value, base_path=carrier_path)
132+
except FileNotFoundError:
133+
ref = blosc2.Ref.from_dict(value)
134+
if ref.kind == "urlpath":
135+
missing_ops[key] = pathlib.Path(ref.urlpath)
136+
else:
137+
raise
138+
if missing_ops:
139+
exc = blosc2.exceptions.MissingOperands(expression, missing_ops)
140+
exc.expr = expression
141+
exc.missing_ops = missing_ops
142+
raise exc
124143
return blosc2.lazyexpr(expression, operands=operands)
125144

126145

127-
def decode_structured_lazyudf(payload):
146+
def decode_structured_lazyudf(payload, *, carrier_path=None):
128147
function_kind = payload.get("function_kind")
129148
if function_kind != "dsl":
130149
raise ValueError(f"Unsupported structured LazyUDF function kind: {function_kind!r}")
@@ -167,7 +186,8 @@ def decode_structured_lazyudf(payload):
167186
func.dsl_source = dsl_source
168187

169188
operands = tuple(
170-
decode_operand_reference(operands_payload[f"o{n}"]) for n in range(len(operands_payload))
189+
decode_operand_reference(operands_payload[f"o{n}"], base_path=carrier_path)
190+
for n in range(len(operands_payload))
171191
)
172192
return blosc2.lazyudf(func, operands, dtype=np.dtype(dtype), shape=tuple(shape_payload), **kwargs)
173193

@@ -194,4 +214,12 @@ def open_b2object(obj):
194214
raise ValueError(f"Unsupported persisted Blosc2 object version: {marker.get('version')!r}")
195215
if marker.get("kind") != payload.get("kind"):
196216
raise ValueError("Persisted Blosc2 object marker/payload kind mismatch")
197-
return decode_b2object_payload(payload)
217+
carrier_path = None
218+
schunk = getattr(obj, "schunk", obj)
219+
if getattr(schunk, "urlpath", None) is not None:
220+
carrier_path = pathlib.Path(schunk.urlpath).parent
221+
opened = decode_b2object_payload(payload, carrier_path=carrier_path)
222+
if isinstance(opened, blosc2.LazyExpr | blosc2.LazyUDF):
223+
opened.array = obj
224+
opened.schunk = schunk
225+
return opened

src/blosc2/lazyexpr.py

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3718,12 +3718,46 @@ def save(self, urlpath=None, **kwargs):
37183718
self._to_b2object_carrier(**kwargs)
37193719

37203720
def to_cframe(self) -> bytes:
3721-
return self._to_b2object_carrier().to_cframe()
3721+
try:
3722+
return self._to_b2object_carrier().to_cframe()
3723+
except (TypeError, ValueError):
3724+
return self.compute().to_cframe()
37223725

37233726
def _to_b2object_carrier(self, **kwargs):
3724-
payload = encode_b2object_payload(self)
3725-
if payload is None:
3726-
raise TypeError("Unsupported persisted Blosc2 object")
3727+
expression = self.expression_tosave if hasattr(self, "expression_tosave") else self.expression
3728+
operands_ = self.operands_tosave if hasattr(self, "operands_tosave") else self.operands
3729+
validate_expr(expression)
3730+
3731+
payload = {"kind": "lazyexpr", "version": 1, "expression": expression, "operands": {}}
3732+
carrier_urlpath = kwargs.get("urlpath")
3733+
carrier_parent = Path(carrier_urlpath).parent if carrier_urlpath is not None else None
3734+
for key, value in operands_.items():
3735+
if isinstance(value, blosc2.C2Array):
3736+
payload["operands"][key] = encode_b2object_payload(value)
3737+
continue
3738+
if isinstance(value, blosc2.Proxy):
3739+
value = value._cache
3740+
ref = getattr(value, "_blosc2_ref", None)
3741+
if isinstance(ref, blosc2.Ref):
3742+
payload["operands"][key] = ref.to_dict()
3743+
continue
3744+
if not hasattr(value, "schunk"):
3745+
raise ValueError(
3746+
"To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects"
3747+
)
3748+
if value.schunk.urlpath is None:
3749+
raise ValueError("To save a LazyArray, all operands must be stored on disk/network")
3750+
operand_urlpath = Path(value.schunk.urlpath)
3751+
if carrier_parent is not None and not operand_urlpath.is_absolute():
3752+
ref_urlpath = operand_urlpath.as_posix()
3753+
elif carrier_parent is not None:
3754+
try:
3755+
ref_urlpath = operand_urlpath.relative_to(carrier_parent).as_posix()
3756+
except ValueError:
3757+
ref_urlpath = operand_urlpath.as_posix()
3758+
else:
3759+
ref_urlpath = operand_urlpath.as_posix()
3760+
payload["operands"][key] = {"kind": "urlpath", "version": 1, "urlpath": ref_urlpath}
37273761
array = make_b2object_carrier(
37283762
"lazyexpr",
37293763
self.shape,

tests/ndarray/test_lazyexpr.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1721,6 +1721,37 @@ def test_missing_operator():
17211721
blosc2.remove_urlpath("expr.b2nd")
17221722

17231723

1724+
def test_save_dictstore_operands(tmp_path):
1725+
store_path = tmp_path / "operands.b2z"
1726+
ext_a = tmp_path / "a.b2nd"
1727+
ext_b = tmp_path / "b.b2nd"
1728+
expr_path = tmp_path / "expr.b2nd"
1729+
expected = np.arange(5, dtype=np.int64) * 3
1730+
1731+
a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=str(ext_a), mode="w")
1732+
b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2, urlpath=str(ext_b), mode="w")
1733+
with blosc2.DictStore(str(store_path), mode="w", threshold=None) as dstore:
1734+
dstore["/a"] = a
1735+
dstore["/b"] = b
1736+
1737+
with blosc2.DictStore(str(store_path), mode="r") as dstore:
1738+
a = dstore["/a"]
1739+
b = dstore["/b"]
1740+
expr = blosc2.lazyexpr("a + b")
1741+
expr.save(expr_path)
1742+
1743+
carrier = blosc2.open(expr_path, mode="r").array
1744+
assert carrier.schunk.vlmeta["b2o"]["operands"] == {
1745+
"a": {"kind": "dictstore_key", "version": 1, "urlpath": str(store_path), "key": "/a"},
1746+
"b": {"kind": "dictstore_key", "version": 1, "urlpath": str(store_path), "key": "/b"},
1747+
}
1748+
1749+
restored = blosc2.open(expr_path)
1750+
1751+
assert isinstance(restored, blosc2.LazyExpr)
1752+
np.testing.assert_array_equal(restored[:], expected)
1753+
1754+
17241755
# Test the chaining of multiple lazy expressions
17251756
def test_chain_expressions():
17261757
N = 1_000

0 commit comments

Comments
 (0)