Skip to content

Commit 493c805

Browse files
committed
Add b2o carrier support for persisted LazyExpr objects
1 parent dc5174f commit 493c805

5 files changed

Lines changed: 72 additions & 41 deletions

File tree

src/blosc2/lazyexpr.py

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
import blosc2
4444

45+
from .b2objects import _encode_b2object_payload, _make_b2object_carrier, _write_b2object_payload
4546
from .dsl_kernel import DSLKernel, DSLSyntaxError, _DSLValidator, specialize_miniexpr_inputs
4647

4748
if blosc2._HAS_NUMBA:
@@ -570,7 +571,7 @@ def convert_inputs(inputs):
570571
return []
571572
inputs_ = []
572573
for obj in inputs:
573-
if not isinstance(obj, (np.ndarray, blosc2.Operand)) and not np.isscalar(obj):
574+
if not isinstance(obj, np.ndarray | blosc2.Operand) and not np.isscalar(obj):
574575
try:
575576
obj = blosc2.SimpleProxy(obj)
576577
except Exception:
@@ -2855,7 +2856,7 @@ def result_type(
28552856
# Follow NumPy rules for scalar-array operations
28562857
# Create small arrays with the same dtypes and let NumPy's type promotion determine the result type
28572858
arrs = [
2858-
(np.array(value).dtype if isinstance(value, (str, bytes)) else value)
2859+
(np.array(value).dtype if isinstance(value, str | bytes) else value)
28592860
if (np.isscalar(value) or not hasattr(value, "dtype"))
28602861
else np.array([0], dtype=_convert_dtype(value.dtype))
28612862
for value in arrays_and_dtypes
@@ -2902,7 +2903,7 @@ def __init__(self, new_op): # noqa: C901
29022903
# Check that operands are proper Operands, LazyArray or scalars; if not, convert to NDArray objects
29032904
value1 = (
29042905
blosc2.SimpleProxy(value1)
2905-
if not (isinstance(value1, (blosc2.Operand, np.ndarray)) or np.isscalar(value1))
2906+
if not (isinstance(value1, blosc2.Operand | np.ndarray) or np.isscalar(value1))
29062907
else value1
29072908
)
29082909
# Reset values represented as np.int64 etc. to be set as Python natives
@@ -2926,7 +2927,7 @@ def __init__(self, new_op): # noqa: C901
29262927
return
29272928
value2 = (
29282929
blosc2.SimpleProxy(value2)
2929-
if not (isinstance(value2, (blosc2.Operand, np.ndarray)) or np.isscalar(value2))
2930+
if not (isinstance(value2, blosc2.Operand | np.ndarray) or np.isscalar(value2))
29302931
else value2
29312932
)
29322933
# Reset values represented as np.int64 etc. to be set as Python natives
@@ -3711,44 +3712,27 @@ def save(self, urlpath=None, **kwargs):
37113712
if urlpath is None:
37123713
raise ValueError("To save a LazyArray you must provide an urlpath")
37133714

3714-
expression = self.expression_tosave if hasattr(self, "expression_tosave") else self.expression
3715-
operands_ = self.operands_tosave if hasattr(self, "operands_tosave") else self.operands
3716-
# Validate expression
3717-
validate_expr(expression)
3718-
3719-
meta = kwargs.get("meta", {})
3720-
meta["LazyArray"] = LazyArrayEnum.Expr.value
37213715
kwargs["urlpath"] = urlpath
3722-
kwargs["meta"] = meta
37233716
kwargs["mode"] = "w" # always overwrite the file in urlpath
3717+
self._to_b2object_carrier(**kwargs)
37243718

3725-
# Create an empty array; useful for providing the shape and dtype of the outcome
3726-
array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs)
3727-
3728-
# Save the expression and operands in the metadata
3729-
operands = {}
3730-
for key, value in operands_.items():
3731-
if isinstance(value, blosc2.C2Array):
3732-
operands[key] = {
3733-
"path": str(value.path),
3734-
"urlbase": value.urlbase,
3735-
}
3736-
continue
3737-
if isinstance(value, blosc2.Proxy):
3738-
# Take the required info from the Proxy._cache container
3739-
value = value._cache
3740-
if not hasattr(value, "schunk"):
3741-
raise ValueError(
3742-
"To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects"
3743-
)
3744-
if value.schunk.urlpath is None:
3745-
raise ValueError("To save a LazyArray, all operands must be stored on disk/network")
3746-
operands[key] = value.schunk.urlpath
3747-
array.schunk.vlmeta["_LazyArray"] = {
3748-
"expression": expression,
3749-
"UDF": None,
3750-
"operands": operands,
3751-
}
3719+
def to_cframe(self) -> bytes:
3720+
return self._to_b2object_carrier().to_cframe()
3721+
3722+
def _to_b2object_carrier(self, **kwargs):
3723+
payload = _encode_b2object_payload(self)
3724+
if payload is None:
3725+
raise TypeError("Unsupported persisted Blosc2 object")
3726+
array = _make_b2object_carrier(
3727+
"lazyexpr",
3728+
self.shape,
3729+
self.dtype,
3730+
chunks=self.chunks,
3731+
blocks=self.blocks,
3732+
**kwargs,
3733+
)
3734+
_write_b2object_payload(array, payload)
3735+
return array
37523736

37533737
@classmethod
37543738
def _new_expr(cls, expression, operands, guess, out=None, where=None, ne_args=None):
@@ -3764,7 +3748,7 @@ def _new_expr(cls, expression, operands, guess, out=None, where=None, ne_args=No
37643748
_operands = operands | local_vars
37653749
# Check that operands are proper Operands, LazyArray or scalars; if not, convert to NDArray objects
37663750
for op, val in _operands.items():
3767-
if not (isinstance(val, (blosc2.Operand, np.ndarray)) or np.isscalar(val)):
3751+
if not (isinstance(val, blosc2.Operand | np.ndarray) or np.isscalar(val)):
37683752
_operands[op] = blosc2.SimpleProxy(val)
37693753
# for scalars just return value (internally converts to () if necessary)
37703754
opshapes = {k: v if not hasattr(v, "shape") else v.shape for k, v in _operands.items()}
282 Bytes
Binary file not shown.
282 Bytes
Binary file not shown.
346 Bytes
Binary file not shown.

tests/test_b2objects.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
from __future__ import annotations
99

10+
from pathlib import Path
11+
1012
import numpy as np
1113

1214
import blosc2
@@ -69,7 +71,7 @@ def test_c2array_from_cframe_roundtrip(monkeypatch):
6971

7072
def test_c2array_open_roundtrip(tmp_path, monkeypatch):
7173
original = _make_c2array(monkeypatch, shape=(8,), chunks=(4,), blocks=(2,))
72-
urlpath = tmp_path / "remote-array.b2o"
74+
urlpath = tmp_path / "remote-array.b2nd"
7375

7476
original.save(urlpath)
7577
restored = blosc2.open(urlpath, mode="r")
@@ -81,3 +83,48 @@ def test_c2array_open_roundtrip(tmp_path, monkeypatch):
8183
assert restored.shape == original.shape
8284
assert restored.chunks == original.chunks
8385
assert restored.blocks == original.blocks
86+
87+
88+
def test_lazyexpr_from_cframe_roundtrip(tmp_path):
89+
a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=tmp_path / "a.b2nd", mode="w")
90+
b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2, urlpath=tmp_path / "b.b2nd", mode="w")
91+
expr = blosc2.lazyexpr("a + b", operands={"a": a, "b": b})
92+
carrier = blosc2.ndarray_from_cframe(expr.to_cframe())
93+
94+
assert carrier.schunk.meta["b2o"] == {"kind": "lazyexpr", "version": 1}
95+
assert carrier.schunk.vlmeta["b2o"] == {
96+
"kind": "lazyexpr",
97+
"version": 1,
98+
"expression": "a + b",
99+
"operands": {
100+
"a": {"kind": "urlpath", "version": 1, "urlpath": str(tmp_path / "a.b2nd")},
101+
"b": {"kind": "urlpath", "version": 1, "urlpath": str(tmp_path / "b.b2nd")},
102+
},
103+
}
104+
105+
restored = blosc2.from_cframe(expr.to_cframe())
106+
107+
assert isinstance(restored, blosc2.LazyExpr)
108+
np.testing.assert_array_equal(restored[:], np.arange(5, dtype=np.int64) * 3)
109+
110+
111+
def test_lazyexpr_open_roundtrip(tmp_path):
112+
a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=tmp_path / "a.b2nd", mode="w")
113+
b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2, urlpath=tmp_path / "b.b2nd", mode="w")
114+
expr = blosc2.lazyexpr("a + b", operands={"a": a, "b": b})
115+
urlpath = tmp_path / "expr.b2nd"
116+
117+
expr.save(urlpath)
118+
restored = blosc2.open(urlpath, mode="r")
119+
120+
assert isinstance(restored, blosc2.LazyExpr)
121+
np.testing.assert_array_equal(restored[:], np.arange(5, dtype=np.int64) * 3)
122+
123+
124+
def test_legacy_lazyexpr_open_backward_compat():
125+
fixture = Path(__file__).parent / "data" / "legacy_lazyexpr_v1" / "expr.b2nd"
126+
127+
restored = blosc2.open(fixture, mode="r")
128+
129+
assert isinstance(restored, blosc2.LazyExpr)
130+
np.testing.assert_array_equal(restored[:], np.arange(5, dtype=np.int64) * 3)

0 commit comments

Comments
 (0)