Skip to content

Commit 1a4b3b5

Browse files
committed
Add support for serializing LazyUDF with DSL functions
1 parent 2fcebe6 commit 1a4b3b5

7 files changed

Lines changed: 277 additions & 31 deletions

File tree

doc/reference/batch_store.rst

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,18 @@ Structured reference-style Blosc2 objects currently supported:
4747

4848
- ``C2Array``
4949
- ``LazyExpr``
50+
- ``LazyUDF`` backed by ``@blosc2.dsl_kernel``
5051

51-
``LazyExpr`` values preserve reference semantics and are serialized as a recipe
52-
plus durable operand references. Supported operands are:
52+
``LazyExpr`` values and supported ``LazyUDF`` values preserve reference
53+
semantics and are serialized as a recipe plus durable operand references.
54+
Supported operands are:
5355

5456
- persistent local Blosc2 operands reopenable from ``urlpath``
5557
- remote ``C2Array`` operands
5658
- ``DictStore`` members reopenable from ``(.b2d|.b2z, key)``
5759

58-
Purely in-memory operands are intentionally not supported.
60+
Purely in-memory operands are intentionally not supported. Plain Python
61+
``LazyUDF`` callables are not serialized by msgpack.
5962

6063
Quick example
6164
-------------

examples/vlstore-lazyudf.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#######################################################################
2+
# Copyright (c) 2019-present, Blosc Development Team <blosc@blosc.org>
3+
# All rights reserved.
4+
#
5+
# SPDX-License-Identifier: BSD-3-Clause
6+
#######################################################################
7+
8+
import numpy as np
9+
10+
import blosc2
11+
12+
13+
def show(label, value):
14+
print(f"{label}: {value}")
15+
16+
17+
@blosc2.dsl_kernel
18+
def kernel_add_twice(x, y):
19+
return x + y * 2
20+
21+
22+
urlpath = "example_vlstore_lazyudf.b2frame"
23+
a_path = "example_vlstore_lazyudf_a.b2nd"
24+
b_path = "example_vlstore_lazyudf_b.b2nd"
25+
blosc2.remove_urlpath(urlpath)
26+
blosc2.remove_urlpath(a_path)
27+
blosc2.remove_urlpath(b_path)
28+
29+
a = blosc2.asarray(np.arange(5, dtype=np.float32), urlpath=a_path, mode="w")
30+
b = blosc2.asarray(np.arange(5, dtype=np.float32) * 2, urlpath=b_path, mode="w")
31+
lazy_udf = blosc2.lazyudf(kernel_add_twice, (a, b), dtype=a.dtype, shape=a.shape)
32+
33+
vla = blosc2.VLArray(urlpath=urlpath, mode="w", contiguous=True)
34+
vla.append({"kind": "lazyudf", "value": lazy_udf})
35+
36+
restored = vla[0]["value"]
37+
show("Stored type", type(vla[0]["value"]).__name__)
38+
show("Computed values", restored[:])
39+
40+
reopened = blosc2.open(urlpath, mode="r")
41+
restored_reopened = reopened[0]["value"]
42+
show("Reopened type", type(restored_reopened).__name__)
43+
show("Reopened values", restored_reopened[:])
44+
45+
blosc2.remove_urlpath(urlpath)
46+
blosc2.remove_urlpath(a_path)
47+
blosc2.remove_urlpath(b_path)

src/blosc2/_msgpack_utils.py

Lines changed: 113 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,17 @@
77

88
from __future__ import annotations
99

10+
import builtins
11+
import inspect
12+
import linecache
13+
import textwrap
14+
from dataclasses import asdict
15+
16+
import numpy as np
1017
from msgpack import ExtType, packb, unpackb
1118

1219
from blosc2 import blosc2_ext
20+
from blosc2.dsl_kernel import DSLKernel
1321

1422
# Msgpack extension type codes are application-defined. Reserve code 42 in
1523
# python-blosc2 for values serialized as Blosc2 CFrames via ``to_cframe()`` and
@@ -21,6 +29,7 @@
2129
# stable ``kind`` and ``version`` envelope.
2230
_BLOSC2_STRUCTURED_EXT_CODE = 43
2331
_BLOSC2_STRUCTURED_VERSION = 1
32+
_BLOSC2_DSL_VERSION = 1
2433

2534

2635
def _encode_operand_reference(obj):
@@ -74,6 +83,43 @@ def _encode_structured_reference(obj):
7483
"operands": {key: _encode_operand_reference(value) for key, value in operands.items()},
7584
}
7685
return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True))
86+
if isinstance(obj, blosc2.LazyUDF):
87+
if not isinstance(obj.func, DSLKernel):
88+
raise TypeError("Structured Blosc2 msgpack payload only supports LazyUDF backed by DSLKernel")
89+
udf_func = obj.func.func
90+
udf_name = getattr(udf_func, "__name__", obj.func.__name__)
91+
try:
92+
udf_source = textwrap.dedent(inspect.getsource(udf_func)).lstrip()
93+
except Exception:
94+
udf_source = obj.func.dsl_source
95+
if udf_source is None:
96+
raise ValueError("Structured LazyUDF msgpack payload requires recoverable DSL kernel source")
97+
kwargs = {}
98+
for key, value in obj.kwargs.items():
99+
if key in {"dtype", "shape"}:
100+
continue
101+
if isinstance(value, blosc2.CParams | blosc2.DParams):
102+
kwargs[key] = asdict(value)
103+
else:
104+
kwargs[key] = value
105+
# Keep both source forms:
106+
# - udf_source recreates the executable Python function object
107+
# - dsl_source preserves the DSLKernel's normalized DSL metadata so the
108+
# reconstructed function can keep its DSL identity and fast-path hints
109+
payload = {
110+
"kind": "lazyudf",
111+
"version": _BLOSC2_STRUCTURED_VERSION,
112+
"function_kind": "dsl",
113+
"dsl_version": _BLOSC2_DSL_VERSION,
114+
"name": udf_name,
115+
"udf_source": udf_source,
116+
"dsl_source": obj.func.dsl_source,
117+
"dtype": np.dtype(obj.dtype).str,
118+
"shape": list(obj.shape),
119+
"operands": {f"o{i}": _encode_operand_reference(value) for i, value in enumerate(obj.inputs)},
120+
"kwargs": kwargs,
121+
}
122+
return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True))
77123
return None
78124

79125

@@ -113,8 +159,6 @@ def _decode_operand_reference(payload):
113159

114160

115161
def _decode_structured_reference(data):
116-
import blosc2
117-
118162
payload = unpackb(data)
119163
if not isinstance(payload, dict):
120164
raise TypeError("Structured Blosc2 msgpack payload must decode to a mapping")
@@ -127,29 +171,80 @@ def _decode_structured_reference(data):
127171
if kind == "c2array":
128172
return _decode_operand_reference(payload)
129173
if kind == "lazyexpr":
130-
expression = payload.get("expression")
131-
if not isinstance(expression, str):
132-
raise TypeError("Structured LazyExpr msgpack payload requires a string 'expression'")
133-
operands_payload = payload.get("operands")
134-
if not isinstance(operands_payload, dict):
135-
raise TypeError("Structured LazyExpr msgpack payload requires a mapping 'operands'")
136-
operands = {key: _decode_operand_reference(value) for key, value in operands_payload.items()}
137-
return blosc2.lazyexpr(expression, operands=operands)
174+
return _decode_structured_lazyexpr(payload)
175+
if kind == "lazyudf":
176+
return _decode_structured_lazyudf(payload)
138177
raise ValueError(f"Unsupported structured Blosc2 msgpack payload kind: {kind!r}")
139178

140179

180+
def _decode_structured_lazyexpr(payload):
181+
import blosc2
182+
183+
expression = payload.get("expression")
184+
if not isinstance(expression, str):
185+
raise TypeError("Structured LazyExpr msgpack payload requires a string 'expression'")
186+
operands_payload = payload.get("operands")
187+
if not isinstance(operands_payload, dict):
188+
raise TypeError("Structured LazyExpr msgpack payload requires a mapping 'operands'")
189+
operands = {key: _decode_operand_reference(value) for key, value in operands_payload.items()}
190+
return blosc2.lazyexpr(expression, operands=operands)
191+
192+
193+
def _decode_structured_lazyudf(payload):
194+
import blosc2
195+
196+
function_kind = payload.get("function_kind")
197+
if function_kind != "dsl":
198+
raise ValueError(f"Unsupported structured LazyUDF function kind: {function_kind!r}")
199+
dsl_version = payload.get("dsl_version")
200+
if dsl_version != _BLOSC2_DSL_VERSION:
201+
raise ValueError(f"Unsupported structured LazyUDF DSL version: {dsl_version!r}")
202+
udf_source = payload.get("udf_source")
203+
if not isinstance(udf_source, str):
204+
raise TypeError("Structured LazyUDF msgpack payload requires a string 'udf_source'")
205+
name = payload.get("name")
206+
if not isinstance(name, str):
207+
raise TypeError("Structured LazyUDF msgpack payload requires a string 'name'")
208+
dtype = payload.get("dtype")
209+
if not isinstance(dtype, str):
210+
raise TypeError("Structured LazyUDF msgpack payload requires a string 'dtype'")
211+
shape_payload = payload.get("shape")
212+
if not isinstance(shape_payload, list):
213+
raise TypeError("Structured LazyUDF msgpack payload requires a list 'shape'")
214+
operands_payload = payload.get("operands")
215+
if not isinstance(operands_payload, dict):
216+
raise TypeError("Structured LazyUDF msgpack payload requires a mapping 'operands'")
217+
kwargs = payload.get("kwargs", {})
218+
if not isinstance(kwargs, dict):
219+
raise TypeError("Structured LazyUDF msgpack payload requires a mapping 'kwargs'")
220+
221+
local_ns = {}
222+
filename = f"<{name}>"
223+
safe_globals = {
224+
"__builtins__": {k: v for k, v in builtins.__dict__.items() if k != "__import__"},
225+
"np": np,
226+
"blosc2": blosc2,
227+
}
228+
linecache.cache[filename] = (len(udf_source), None, udf_source.splitlines(True), filename)
229+
exec(compile(udf_source, filename, "exec"), safe_globals, local_ns)
230+
func = local_ns[name]
231+
if not isinstance(func, DSLKernel):
232+
func = DSLKernel(func)
233+
dsl_source = payload.get("dsl_source")
234+
if dsl_source is not None and func.dsl_source is None:
235+
func.dsl_source = dsl_source
236+
237+
operands = tuple(
238+
_decode_operand_reference(operands_payload[f"o{n}"]) for n in range(len(operands_payload))
239+
)
240+
return blosc2.lazyudf(func, operands, dtype=np.dtype(dtype), shape=tuple(shape_payload), **kwargs)
241+
242+
141243
def _encode_msgpack_ext(obj):
142244
import blosc2
143245

144246
if isinstance(
145-
obj,
146-
(
147-
blosc2.NDArray,
148-
blosc2.SChunk,
149-
blosc2.VLArray,
150-
blosc2.BatchStore,
151-
blosc2.EmbedStore,
152-
),
247+
obj, blosc2.NDArray | blosc2.SChunk | blosc2.VLArray | blosc2.BatchStore | blosc2.EmbedStore
153248
):
154249
return ExtType(_BLOSC2_EXT_CODE, obj.to_cframe())
155250
structured = _encode_structured_reference(obj)

src/blosc2/batch_store.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,14 @@ class BatchStore:
173173
:class:`blosc2.EmbedStore`, which are serialized transparently via
174174
:meth:`to_cframe` / :func:`blosc2.from_cframe`. Msgpack also supports
175175
structured Blosc2 reference objects, currently
176-
:class:`blosc2.C2Array` and :class:`blosc2.LazyExpr`. Lazy expressions
177-
preserve reference semantics, so only persistent local operands,
178-
:class:`blosc2.C2Array` operands, and :class:`blosc2.DictStore`
179-
members are supported; purely in-memory operands are rejected.
180-
``"arrow"`` is optional and requires ``pyarrow``.
176+
:class:`blosc2.C2Array`, :class:`blosc2.LazyExpr`, and
177+
:class:`blosc2.LazyUDF` backed by :func:`blosc2.dsl_kernel`. These lazy
178+
objects preserve reference semantics, so only persistent local
179+
operands, :class:`blosc2.C2Array` operands, and
180+
:class:`blosc2.DictStore` members are supported; purely in-memory
181+
operands are rejected. Plain Python :class:`blosc2.LazyUDF` callables
182+
are not serialized by msgpack. ``"arrow"`` is optional and requires
183+
``pyarrow``.
181184
_from_schunk : blosc2.SChunk, optional
182185
Internal hook used when reopening an already-tagged BatchStore.
183186
**kwargs

src/blosc2/vlarray.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,13 @@ class VLArray:
3838
transparently via :meth:`to_cframe` / :func:`blosc2.from_cframe`.
3939
4040
Msgpack also supports structured Blosc2 reference objects. Currently this
41-
includes :class:`blosc2.C2Array` and :class:`blosc2.LazyExpr`. Lazy
42-
expressions are serialized as recipes plus durable operand references, so
43-
only persistent local operands, :class:`blosc2.C2Array` operands, and
44-
:class:`blosc2.DictStore` members are supported. Purely in-memory operands
45-
are intentionally rejected.
41+
includes :class:`blosc2.C2Array`, :class:`blosc2.LazyExpr`, and
42+
:class:`blosc2.LazyUDF` backed by :func:`blosc2.dsl_kernel`. Lazy
43+
expressions and supported lazy UDFs are serialized as recipes plus durable
44+
operand references, so only persistent local operands,
45+
:class:`blosc2.C2Array` operands, and :class:`blosc2.DictStore` members are
46+
supported. Purely in-memory operands are intentionally rejected. Plain
47+
Python :class:`blosc2.LazyUDF` callables are not serialized by msgpack.
4648
"""
4749

4850
@staticmethod

tests/test_batch_store.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,17 @@
1212
import blosc2.c2array as blosc2_c2array
1313
from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb
1414

15+
16+
@blosc2.dsl_kernel
17+
def _kernel_add_twice(x, y):
18+
return x + y * 2
19+
20+
21+
def _python_udf_add(inputs_tuple, output, offset):
22+
x, y = inputs_tuple
23+
output[:] = x + y
24+
25+
1526
BATCHES = [
1627
[b"bytes\x00payload", "plain text", 42],
1728
[{"nested": [1, 2]}, None, {"tail": True}],
@@ -69,6 +80,20 @@ def _make_in_memory_lazyexpr():
6980
return blosc2.lazyexpr("a + b", operands={"a": a, "b": b})
7081

7182

83+
def _make_persistent_lazyudf(tmp_path):
84+
a = blosc2.asarray(np.arange(5, dtype=np.float32), urlpath=tmp_path / "a_udf.b2nd", mode="w")
85+
b = blosc2.asarray(np.arange(5, dtype=np.float32) * 2, urlpath=tmp_path / "b_udf.b2nd", mode="w")
86+
udf = blosc2.lazyudf(_kernel_add_twice, (a, b), dtype=a.dtype, shape=a.shape)
87+
expected = a[:] + b[:] * 2
88+
return udf, expected
89+
90+
91+
def _make_persistent_python_lazyudf(tmp_path):
92+
a = blosc2.asarray(np.arange(5, dtype=np.float32), urlpath=tmp_path / "a_pyudf.b2nd", mode="w")
93+
b = blosc2.asarray(np.arange(5, dtype=np.float32) * 2, urlpath=tmp_path / "b_pyudf.b2nd", mode="w")
94+
return blosc2.lazyudf(_python_udf_add, (a, b), dtype=a.dtype, shape=a.shape)
95+
96+
7297
@pytest.mark.parametrize(
7398
("contiguous", "urlpath"),
7499
[
@@ -296,6 +321,33 @@ def test_batchstore_msgpack_supports_lazyexpr(tmp_path):
296321
np.testing.assert_array_equal(restored[:], expected)
297322

298323

324+
def test_msgpack_supports_lazyudf_dslkernel(tmp_path):
325+
udf, expected = _make_persistent_lazyudf(tmp_path)
326+
327+
restored = msgpack_unpackb(msgpack_packb({"udf": udf}))["udf"]
328+
329+
assert isinstance(restored, blosc2.LazyUDF)
330+
np.testing.assert_allclose(restored[:], expected)
331+
332+
333+
def test_batchstore_msgpack_supports_lazyudf_dslkernel(tmp_path):
334+
udf, expected = _make_persistent_lazyudf(tmp_path)
335+
336+
barray = blosc2.BatchStore(items_per_block=2)
337+
barray.append([udf])
338+
restored = barray[0][0]
339+
340+
assert isinstance(restored, blosc2.LazyUDF)
341+
np.testing.assert_allclose(restored[:], expected)
342+
343+
344+
def test_msgpack_rejects_plain_python_lazyudf(tmp_path):
345+
udf = _make_persistent_python_lazyudf(tmp_path)
346+
347+
with pytest.raises(TypeError, match="DSLKernel"):
348+
msgpack_packb({"udf": udf})
349+
350+
299351
def test_msgpack_rejects_lazyexpr_with_in_memory_operands():
300352
expr = _make_in_memory_lazyexpr()
301353

0 commit comments

Comments
 (0)