Skip to content

Commit c6fa4e4

Browse files
committed
Add b2o-backed persistence for DSL LazyUDF objects
1 parent 3b8409b commit c6fa4e4

8 files changed

Lines changed: 196 additions & 64 deletions

File tree

src/blosc2/b2objects.py

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,14 @@ def encode_operand_reference(obj):
4949

5050

5151
def decode_operand_reference(payload, *, base_path=None):
52+
if (
53+
payload.get("kind") in {"urlpath", "dictstore_key"}
54+
and base_path is not None
55+
and not pathlib.Path(payload["urlpath"]).is_absolute()
56+
):
57+
payload = dict(payload)
58+
payload["urlpath"] = (base_path / payload["urlpath"]).as_posix()
5259
ref = blosc2.Ref.from_dict(payload)
53-
if ref.kind == "urlpath" and base_path is not None and not pathlib.Path(ref.urlpath).is_absolute():
54-
return blosc2.open(base_path / ref.urlpath, mode="r")
5560
return ref.open()
5661

5762

@@ -123,23 +128,28 @@ def decode_structured_lazyexpr(payload, *, carrier_path=None):
123128
operands_payload = payload.get("operands")
124129
if not isinstance(operands_payload, dict):
125130
raise TypeError("Structured LazyExpr payload requires a mapping 'operands'")
131+
operands, missing_ops = decode_operand_mapping(operands_payload, base_path=carrier_path)
132+
if missing_ops:
133+
exc = blosc2.exceptions.MissingOperands(expression, missing_ops)
134+
exc.expr = expression
135+
exc.missing_ops = missing_ops
136+
raise exc
137+
return blosc2.lazyexpr(expression, operands=operands)
138+
139+
140+
def decode_operand_mapping(operands_payload, *, base_path=None):
126141
operands = {}
127142
missing_ops = {}
128143
for key, value in operands_payload.items():
129144
try:
130-
operands[key] = decode_operand_reference(value, base_path=carrier_path)
145+
operands[key] = decode_operand_reference(value, base_path=base_path)
131146
except FileNotFoundError:
132147
ref = blosc2.Ref.from_dict(value)
133-
if ref.kind == "urlpath":
148+
if ref.kind in {"urlpath", "dictstore_key"}:
134149
missing_ops[key] = pathlib.Path(ref.urlpath)
135150
else:
136151
raise
137-
if missing_ops:
138-
exc = blosc2.exceptions.MissingOperands(expression, missing_ops)
139-
exc.expr = expression
140-
exc.missing_ops = missing_ops
141-
raise exc
142-
return blosc2.lazyexpr(expression, operands=operands)
152+
return operands, missing_ops
143153

144154

145155
def decode_structured_lazyudf(payload, *, carrier_path=None):
@@ -180,12 +190,16 @@ def decode_structured_lazyudf(payload, *, carrier_path=None):
180190
func = local_ns[name]
181191
if not isinstance(func, DSLKernel):
182192
func = DSLKernel(func)
183-
184-
operands = tuple(
185-
decode_operand_reference(operands_payload[f"o{n}"], base_path=carrier_path)
186-
for n in range(len(operands_payload))
193+
ordered_operands_payload = {f"o{n}": operands_payload[f"o{n}"] for n in range(len(operands_payload))}
194+
operands, missing_ops = decode_operand_mapping(ordered_operands_payload, base_path=carrier_path)
195+
if missing_ops:
196+
exc = blosc2.exceptions.MissingOperands(name, missing_ops)
197+
exc.expr = name
198+
exc.missing_ops = missing_ops
199+
raise exc
200+
return blosc2.lazyudf(
201+
func, tuple(operands.values()), dtype=np.dtype(dtype), shape=tuple(shape_payload), **kwargs
187202
)
188-
return blosc2.lazyudf(func, operands, dtype=np.dtype(dtype), shape=tuple(shape_payload), **kwargs)
189203

190204

191205
def read_b2object_marker(obj) -> dict[str, Any] | None:

src/blosc2/lazyexpr.py

Lines changed: 81 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3718,10 +3718,7 @@ def save(self, urlpath=None, **kwargs):
37183718
self._to_b2object_carrier(**kwargs)
37193719

37203720
def to_cframe(self) -> bytes:
3721-
try:
3722-
return self._to_b2object_carrier().to_cframe()
3723-
except (TypeError, ValueError):
3724-
return self.compute().to_cframe()
3721+
return self._to_b2object_carrier().to_cframe()
37253722

37263723
def _to_b2object_carrier(self, **kwargs):
37273724
expression = self.expression_tosave if hasattr(self, "expression_tosave") else self.expression
@@ -4079,50 +4076,91 @@ def save(self, urlpath=None, **kwargs):
40794076
if urlpath is None:
40804077
raise ValueError("To save a LazyArray you must provide an urlpath")
40814078

4082-
meta = kwargs.get("meta", {})
4083-
meta["LazyArray"] = LazyArrayEnum.UDF.value
40844079
kwargs["urlpath"] = urlpath
4085-
kwargs["meta"] = meta
40864080
kwargs["mode"] = "w" # always overwrite the file in urlpath
4081+
try:
4082+
self._to_b2object_carrier(**kwargs)
4083+
except (TypeError, ValueError):
4084+
meta = kwargs.get("meta", {})
4085+
meta["LazyArray"] = LazyArrayEnum.UDF.value
4086+
kwargs["meta"] = meta
40874087

4088-
# Create an empty array; useful for providing the shape and dtype of the outcome
4089-
array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs)
4088+
# Create an empty array; useful for providing the shape and dtype of the outcome
4089+
array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs)
40904090

4091-
# Save the expression and operands in the metadata
4092-
operands = {}
4093-
operands_ = self.inputs_dict
4094-
for i, (_key, value) in enumerate(operands_.items()):
4095-
pos_key = f"o{i}" # always use positional keys for consistent loading
4096-
if isinstance(value, blosc2.C2Array):
4097-
operands[pos_key] = {
4098-
"path": str(value.path),
4099-
"urlbase": value.urlbase,
4100-
}
4091+
# Save the expression and operands in the metadata
4092+
operands = {}
4093+
operands_ = self.inputs_dict
4094+
for i, (_key, value) in enumerate(operands_.items()):
4095+
pos_key = f"o{i}" # always use positional keys for consistent loading
4096+
if isinstance(value, blosc2.C2Array):
4097+
operands[pos_key] = {
4098+
"path": str(value.path),
4099+
"urlbase": value.urlbase,
4100+
}
4101+
continue
4102+
if isinstance(value, blosc2.Proxy):
4103+
# Take the required info from the Proxy._cache container
4104+
value = value._cache
4105+
if not hasattr(value, "schunk"):
4106+
raise ValueError(
4107+
"To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects"
4108+
) from None
4109+
if value.schunk.urlpath is None:
4110+
raise ValueError(
4111+
"To save a LazyArray, all operands must be stored on disk/network"
4112+
) from None
4113+
operands[pos_key] = value.schunk.urlpath
4114+
udf_func = self.func.func if isinstance(self.func, DSLKernel) else self.func
4115+
udf_name = getattr(udf_func, "__name__", self.func.__name__)
4116+
try:
4117+
udf_source = textwrap.dedent(inspect.getsource(udf_func)).lstrip()
4118+
except Exception:
4119+
udf_source = None
4120+
meta = {
4121+
"UDF": udf_source,
4122+
"operands": operands,
4123+
"name": udf_name,
4124+
}
4125+
if isinstance(self.func, DSLKernel) and self.func.dsl_source is not None:
4126+
meta["dsl_source"] = self.func.dsl_source
4127+
array.schunk.vlmeta["_LazyArray"] = meta
4128+
4129+
def to_cframe(self) -> bytes:
4130+
return self._to_b2object_carrier().to_cframe()
4131+
4132+
def _to_b2object_carrier(self, **kwargs):
4133+
payload = encode_b2object_payload(self)
4134+
if payload is None:
4135+
raise TypeError("Persistent Blosc2 object payload is not supported for this LazyUDF")
4136+
4137+
carrier_urlpath = kwargs.get("urlpath")
4138+
carrier_parent = Path(carrier_urlpath).parent if carrier_urlpath is not None else None
4139+
for operand_payload in payload["operands"].values():
4140+
if operand_payload["kind"] not in {"urlpath", "dictstore_key"}:
41014141
continue
4102-
if isinstance(value, blosc2.Proxy):
4103-
# Take the required info from the Proxy._cache container
4104-
value = value._cache
4105-
if not hasattr(value, "schunk"):
4106-
raise ValueError(
4107-
"To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects"
4108-
)
4109-
if value.schunk.urlpath is None:
4110-
raise ValueError("To save a LazyArray, all operands must be stored on disk/network")
4111-
operands[pos_key] = value.schunk.urlpath
4112-
udf_func = self.func.func if isinstance(self.func, DSLKernel) else self.func
4113-
udf_name = getattr(udf_func, "__name__", self.func.__name__)
4114-
try:
4115-
udf_source = textwrap.dedent(inspect.getsource(udf_func)).lstrip()
4116-
except Exception:
4117-
udf_source = None
4118-
meta = {
4119-
"UDF": udf_source,
4120-
"operands": operands,
4121-
"name": udf_name,
4122-
}
4123-
if isinstance(self.func, DSLKernel) and self.func.dsl_source is not None:
4124-
meta["dsl_source"] = self.func.dsl_source
4125-
array.schunk.vlmeta["_LazyArray"] = meta
4142+
operand_urlpath = Path(operand_payload["urlpath"])
4143+
if carrier_parent is not None and not operand_urlpath.is_absolute():
4144+
ref_urlpath = operand_urlpath.as_posix()
4145+
elif carrier_parent is not None:
4146+
try:
4147+
ref_urlpath = operand_urlpath.relative_to(carrier_parent).as_posix()
4148+
except ValueError:
4149+
ref_urlpath = operand_urlpath.as_posix()
4150+
else:
4151+
ref_urlpath = operand_urlpath.as_posix()
4152+
operand_payload["urlpath"] = ref_urlpath
4153+
4154+
array = make_b2object_carrier(
4155+
"lazyudf",
4156+
self.shape,
4157+
self.dtype,
4158+
chunks=self.chunks,
4159+
blocks=self.blocks,
4160+
**kwargs,
4161+
)
4162+
write_b2object_payload(array, payload)
4163+
return array
41264164

41274165

41284166
def _numpy_eval_expr(expression, operands, prefer_blosc=False):
293 Bytes
Binary file not shown.
293 Bytes
Binary file not shown.
434 Bytes
Binary file not shown.

tests/ndarray/test_dsl_kernels.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -975,3 +975,33 @@ def test_dsl_save_input_names_match(tmp_path):
975975
assert isinstance(reloaded.func, DSLKernel)
976976
assert reloaded.func.input_names == ["x", "y"]
977977
assert list(reloaded.inputs_dict.keys()) == ["x", "y"]
978+
979+
980+
def test_dsl_save_dictstore_operands(tmp_path):
981+
shape = (10,)
982+
store_path = tmp_path / "ops.b2z"
983+
ext_a = tmp_path / "a.b2nd"
984+
ext_b = tmp_path / "b.b2nd"
985+
expr_path = tmp_path / "lazy.b2nd"
986+
987+
a = blosc2.asarray(np.arange(shape[0], dtype=np.float64), urlpath=str(ext_a), mode="w")
988+
b = blosc2.asarray(np.arange(shape[0], dtype=np.float64) * 2, urlpath=str(ext_b), mode="w")
989+
with blosc2.DictStore(str(store_path), mode="w", threshold=None) as dstore:
990+
dstore["/a"] = a
991+
dstore["/b"] = b
992+
993+
with blosc2.DictStore(str(store_path), mode="r") as dstore:
994+
a = dstore["/a"]
995+
b = dstore["/b"]
996+
lazy = blosc2.lazyudf(kernel_save_simple, (a, b), dtype=np.float64)
997+
lazy.save(urlpath=str(expr_path))
998+
999+
carrier = blosc2.open(str(expr_path), mode="r").array
1000+
assert carrier.schunk.vlmeta["b2o"]["operands"] == {
1001+
"o0": {"kind": "dictstore_key", "version": 1, "urlpath": "ops.b2z", "key": "/a"},
1002+
"o1": {"kind": "dictstore_key", "version": 1, "urlpath": "ops.b2z", "key": "/b"},
1003+
}
1004+
1005+
reloaded = blosc2.open(str(expr_path), mode="r")
1006+
expected = (np.arange(shape[0], dtype=np.float64) * 3) ** 2
1007+
np.testing.assert_allclose(reloaded.compute()[...], expected, rtol=1e-5, atol=1e-6)

tests/ndarray/test_lazyexpr.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1874,12 +1874,8 @@ def test_to_cframe():
18741874
dtype = "float64"
18751875
a = blosc2.linspace(0, 1, N * N, dtype=dtype, shape=(N, N))
18761876
expr = a**3 + blosc2.sin(a**2)
1877-
cframe = expr.to_cframe()
1878-
assert len(cframe) > 0
1879-
arr = blosc2.ndarray_from_cframe(cframe)
1880-
assert arr.shape == expr.shape
1881-
assert arr.dtype == expr.dtype
1882-
assert np.allclose(arr[:], expr[:])
1877+
with pytest.raises(ValueError, match="stored on disk/network"):
1878+
expr.to_cframe()
18831879

18841880

18851881
# Test for the bug where multiplying two complex lazy expressions would fail with:

tests/test_b2objects.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
import blosc2.c2array as blosc2_c2array
1616

1717

18+
@blosc2.dsl_kernel
19+
def kernel_add_square(x, y):
20+
return x * x + y * y + 2 * x * y
21+
22+
1823
def _make_c2array(
1924
monkeypatch,
2025
path="@public/examples/ds-1d.b2nd",
@@ -128,3 +133,52 @@ def test_legacy_lazyexpr_open_backward_compat():
128133

129134
assert isinstance(restored, blosc2.LazyExpr)
130135
np.testing.assert_array_equal(restored[:], np.arange(5, dtype=np.int64) * 3)
136+
137+
138+
def test_legacy_lazyudf_open_backward_compat():
139+
fixture = Path(__file__).parent / "data" / "legacy_lazyudf_v1" / "expr.b2nd"
140+
141+
restored = blosc2.open(fixture, mode="r")
142+
143+
assert isinstance(restored, blosc2.LazyUDF)
144+
np.testing.assert_allclose(restored.compute()[:], (np.arange(5, dtype=np.float64) * 3) ** 2)
145+
146+
147+
def test_lazyudf_from_cframe_roundtrip(tmp_path):
148+
a = blosc2.asarray(np.arange(5, dtype=np.float64), urlpath=tmp_path / "a.b2nd", mode="w")
149+
b = blosc2.asarray(np.arange(5, dtype=np.float64) * 2, urlpath=tmp_path / "b.b2nd", mode="w")
150+
expr = blosc2.lazyudf(kernel_add_square, (a, b), dtype=np.float64)
151+
carrier = blosc2.ndarray_from_cframe(expr.to_cframe())
152+
153+
assert carrier.schunk.meta["b2o"] == {"kind": "lazyudf", "version": 1}
154+
payload = carrier.schunk.vlmeta["b2o"]
155+
assert payload["kind"] == "lazyudf"
156+
assert payload["version"] == 1
157+
assert payload["function_kind"] == "dsl"
158+
assert payload["dsl_version"] == 1
159+
assert payload["name"] == "kernel_add_square"
160+
assert "kernel_add_square" in payload["udf_source"]
161+
assert payload["dtype"] == np.dtype(np.float64).str
162+
assert payload["shape"] == [5]
163+
assert payload["operands"] == {
164+
"o0": {"kind": "urlpath", "version": 1, "urlpath": str(tmp_path / "a.b2nd")},
165+
"o1": {"kind": "urlpath", "version": 1, "urlpath": str(tmp_path / "b.b2nd")},
166+
}
167+
168+
restored = blosc2.from_cframe(expr.to_cframe())
169+
170+
assert isinstance(restored, blosc2.LazyUDF)
171+
np.testing.assert_allclose(restored[:], (np.arange(5, dtype=np.float64) * 3) ** 2)
172+
173+
174+
def test_lazyudf_open_roundtrip(tmp_path):
175+
a = blosc2.asarray(np.arange(5, dtype=np.float64), urlpath=tmp_path / "a.b2nd", mode="w")
176+
b = blosc2.asarray(np.arange(5, dtype=np.float64) * 2, urlpath=tmp_path / "b.b2nd", mode="w")
177+
expr = blosc2.lazyudf(kernel_add_square, (a, b), dtype=np.float64)
178+
urlpath = tmp_path / "expr.b2nd"
179+
180+
expr.save(urlpath)
181+
restored = blosc2.open(urlpath, mode="r")
182+
183+
assert isinstance(restored, blosc2.LazyUDF)
184+
np.testing.assert_allclose(restored[:], (np.arange(5, dtype=np.float64) * 3) ** 2)

0 commit comments

Comments
 (0)