Skip to content

Commit dc5174f

Browse files
committed
Add b2o carrier support for persisted C2Array objects
1 parent 98b0c10 commit dc5174f

10 files changed

Lines changed: 289 additions & 99 deletions

File tree

src/blosc2/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,7 @@ def _raise(exc):
538538
from .batch_store import Batch, BatchStore
539539
from .vlarray import VLArray, vlarray_from_cframe
540540
from .ref import Ref
541+
from .b2objects import _open_b2object
541542

542543
from .c2array import c2context, C2Array, URLPath
543544

Lines changed: 69 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -12,50 +12,57 @@
1212
import linecache
1313
import textwrap
1414
from dataclasses import asdict
15+
from typing import Any
1516

1617
import numpy as np
17-
from msgpack import ExtType, packb, unpackb
1818

19-
from blosc2 import blosc2_ext
19+
import blosc2
2020
from blosc2.dsl_kernel import DSLKernel
21-
from blosc2.ref import Ref
2221

23-
# Msgpack extension type codes are application-defined. Reserve code 42 in
24-
# python-blosc2 for values serialized as Blosc2 CFrames via ``to_cframe()`` and
25-
# reconstructed with ``blosc2.from_cframe()``. Keep this stable for backward
26-
# compatibility with persisted msgpack payloads produced by this package.
27-
_BLOSC2_EXT_CODE = 42
28-
# Reserve code 43 for structured Blosc2 reference objects that are not naturally
29-
# serialized as CFrames. The payload is a msgpack-encoded mapping with a
30-
# stable ``kind`` and ``version`` envelope.
31-
_BLOSC2_STRUCTURED_EXT_CODE = 43
32-
_BLOSC2_STRUCTURED_VERSION = 1
33-
_BLOSC2_DSL_VERSION = 1
22+
_B2OBJECT_META_KEY = "b2o"
23+
_B2OBJECT_VERSION = 1
24+
_B2OBJECT_DSL_VERSION = 1
25+
26+
27+
def _make_b2object_carrier(
28+
kind: str,
29+
shape,
30+
dtype,
31+
*,
32+
chunks=None,
33+
blocks=None,
34+
**kwargs,
35+
):
36+
meta = dict(kwargs.pop("meta", {}))
37+
meta[_B2OBJECT_META_KEY] = {"kind": kind, "version": _B2OBJECT_VERSION}
38+
kwargs["meta"] = meta
39+
return blosc2.empty(shape=shape, dtype=dtype, chunks=chunks, blocks=blocks, **kwargs)
40+
41+
42+
def _write_b2object_payload(array, payload: dict[str, Any]) -> None:
43+
array.schunk.vlmeta[_B2OBJECT_META_KEY] = payload
3444

3545

3646
def _encode_operand_reference(obj):
37-
return Ref.from_object(obj).to_dict()
47+
return blosc2.Ref.from_object(obj).to_dict()
48+
3849

50+
def _decode_operand_reference(payload):
51+
return blosc2.Ref.from_dict(payload).open()
3952

40-
def _encode_structured_reference(obj):
41-
import blosc2
4253

43-
if isinstance(obj, blosc2.Ref):
44-
payload = {"kind": "ref", "version": _BLOSC2_STRUCTURED_VERSION, "ref": obj.to_dict()}
45-
return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True))
54+
def _encode_b2object_payload(obj) -> dict[str, Any] | None:
4655
if isinstance(obj, blosc2.C2Array):
47-
payload = _encode_operand_reference(obj)
48-
return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True))
56+
return blosc2.Ref.c2array_ref(obj.path, obj.urlbase).to_dict()
4957
if isinstance(obj, blosc2.LazyExpr):
5058
expression = obj.expression_tosave if hasattr(obj, "expression_tosave") else obj.expression
5159
operands = obj.operands_tosave if hasattr(obj, "operands_tosave") else obj.operands
52-
payload = {
60+
return {
5361
"kind": "lazyexpr",
54-
"version": _BLOSC2_STRUCTURED_VERSION,
62+
"version": _B2OBJECT_VERSION,
5563
"expression": expression,
5664
"operands": {key: _encode_operand_reference(value) for key, value in operands.items()},
5765
}
58-
return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True))
5966
if isinstance(obj, blosc2.LazyUDF):
6067
if not isinstance(obj.func, DSLKernel):
6168
raise TypeError("Structured Blosc2 msgpack payload only supports LazyUDF backed by DSLKernel")
@@ -75,15 +82,11 @@ def _encode_structured_reference(obj):
7582
kwargs[key] = asdict(value)
7683
else:
7784
kwargs[key] = value
78-
# Keep both source forms:
79-
# - udf_source recreates the executable Python function object
80-
# - dsl_source preserves the DSLKernel's normalized DSL metadata so the
81-
# reconstructed function can keep its DSL identity and fast-path hints
82-
payload = {
85+
return {
8386
"kind": "lazyudf",
84-
"version": _BLOSC2_STRUCTURED_VERSION,
87+
"version": _B2OBJECT_VERSION,
8588
"function_kind": "dsl",
86-
"dsl_version": _BLOSC2_DSL_VERSION,
89+
"dsl_version": _B2OBJECT_DSL_VERSION,
8790
"name": udf_name,
8891
"udf_source": udf_source,
8992
"dsl_source": obj.func.dsl_source,
@@ -92,76 +95,60 @@ def _encode_structured_reference(obj):
9295
"operands": {f"o{i}": _encode_operand_reference(value) for i, value in enumerate(obj.inputs)},
9396
"kwargs": kwargs,
9497
}
95-
return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True))
9698
return None
9799

98100

99-
def _decode_operand_reference(payload):
100-
return Ref.from_dict(payload).open()
101-
102-
103-
def _decode_structured_reference(data):
104-
payload = unpackb(data)
105-
if not isinstance(payload, dict):
106-
raise TypeError("Structured Blosc2 msgpack payload must decode to a mapping")
107-
108-
version = payload.get("version")
109-
if version != _BLOSC2_STRUCTURED_VERSION:
110-
raise ValueError(f"Unsupported structured Blosc2 msgpack payload version: {version!r}")
111-
101+
def _decode_b2object_payload(payload: dict[str, Any]):
112102
kind = payload.get("kind")
113-
if kind == "ref":
114-
ref_payload = payload.get("ref")
115-
return Ref.from_dict(ref_payload)
103+
version = payload.get("version")
104+
if version != _B2OBJECT_VERSION:
105+
raise ValueError(f"Unsupported persisted Blosc2 object version: {version!r}")
116106
if kind == "c2array":
117-
return _decode_operand_reference(payload)
107+
ref = blosc2.Ref.from_dict(payload)
108+
return ref.open()
118109
if kind == "lazyexpr":
119110
return _decode_structured_lazyexpr(payload)
120111
if kind == "lazyudf":
121112
return _decode_structured_lazyudf(payload)
122-
raise ValueError(f"Unsupported structured Blosc2 msgpack payload kind: {kind!r}")
113+
raise ValueError(f"Unsupported persisted Blosc2 object kind: {kind!r}")
123114

124115

125116
def _decode_structured_lazyexpr(payload):
126-
import blosc2
127-
128117
expression = payload.get("expression")
129118
if not isinstance(expression, str):
130-
raise TypeError("Structured LazyExpr msgpack payload requires a string 'expression'")
119+
raise TypeError("Structured LazyExpr payload requires a string 'expression'")
131120
operands_payload = payload.get("operands")
132121
if not isinstance(operands_payload, dict):
133-
raise TypeError("Structured LazyExpr msgpack payload requires a mapping 'operands'")
122+
raise TypeError("Structured LazyExpr payload requires a mapping 'operands'")
134123
operands = {key: _decode_operand_reference(value) for key, value in operands_payload.items()}
135124
return blosc2.lazyexpr(expression, operands=operands)
136125

137126

138127
def _decode_structured_lazyudf(payload):
139-
import blosc2
140-
141128
function_kind = payload.get("function_kind")
142129
if function_kind != "dsl":
143130
raise ValueError(f"Unsupported structured LazyUDF function kind: {function_kind!r}")
144131
dsl_version = payload.get("dsl_version")
145-
if dsl_version != _BLOSC2_DSL_VERSION:
132+
if dsl_version != _B2OBJECT_DSL_VERSION:
146133
raise ValueError(f"Unsupported structured LazyUDF DSL version: {dsl_version!r}")
147134
udf_source = payload.get("udf_source")
148135
if not isinstance(udf_source, str):
149-
raise TypeError("Structured LazyUDF msgpack payload requires a string 'udf_source'")
136+
raise TypeError("Structured LazyUDF payload requires a string 'udf_source'")
150137
name = payload.get("name")
151138
if not isinstance(name, str):
152-
raise TypeError("Structured LazyUDF msgpack payload requires a string 'name'")
139+
raise TypeError("Structured LazyUDF payload requires a string 'name'")
153140
dtype = payload.get("dtype")
154141
if not isinstance(dtype, str):
155-
raise TypeError("Structured LazyUDF msgpack payload requires a string 'dtype'")
142+
raise TypeError("Structured LazyUDF payload requires a string 'dtype'")
156143
shape_payload = payload.get("shape")
157144
if not isinstance(shape_payload, list):
158-
raise TypeError("Structured LazyUDF msgpack payload requires a list 'shape'")
145+
raise TypeError("Structured LazyUDF payload requires a list 'shape'")
159146
operands_payload = payload.get("operands")
160147
if not isinstance(operands_payload, dict):
161-
raise TypeError("Structured LazyUDF msgpack payload requires a mapping 'operands'")
148+
raise TypeError("Structured LazyUDF payload requires a mapping 'operands'")
162149
kwargs = payload.get("kwargs", {})
163150
if not isinstance(kwargs, dict):
164-
raise TypeError("Structured LazyUDF msgpack payload requires a mapping 'kwargs'")
151+
raise TypeError("Structured LazyUDF payload requires a mapping 'kwargs'")
165152

166153
local_ns = {}
167154
filename = f"<{name}>"
@@ -185,38 +172,26 @@ def _decode_structured_lazyudf(payload):
185172
return blosc2.lazyudf(func, operands, dtype=np.dtype(dtype), shape=tuple(shape_payload), **kwargs)
186173

187174

188-
def _encode_msgpack_ext(obj):
189-
import blosc2
190-
191-
if isinstance(
192-
obj, blosc2.NDArray | blosc2.SChunk | blosc2.VLArray | blosc2.BatchStore | blosc2.EmbedStore
193-
):
194-
return ExtType(_BLOSC2_EXT_CODE, obj.to_cframe())
195-
structured = _encode_structured_reference(obj)
196-
if structured is not None:
197-
return structured
198-
return blosc2_ext.encode_tuple(obj)
199-
200-
201-
def msgpack_packb(value):
202-
return packb(value, default=_encode_msgpack_ext, strict_types=True, use_bin_type=True)
203-
204-
205-
def decode_tuple_list_hook(obj):
206-
if obj and isinstance(obj[0], str) and obj[0] == "__tuple__":
207-
return tuple(obj[1:])
208-
return obj
175+
def _read_b2object_marker(obj) -> dict[str, Any] | None:
176+
schunk = getattr(obj, "schunk", obj)
177+
if _B2OBJECT_META_KEY not in schunk.meta:
178+
return None
179+
return schunk.meta[_B2OBJECT_META_KEY]
209180

210181

211-
def _decode_msgpack_ext(code, data):
212-
import blosc2
182+
def _read_b2object_payload(obj) -> dict[str, Any]:
183+
schunk = getattr(obj, "schunk", obj)
184+
return schunk.vlmeta[_B2OBJECT_META_KEY]
213185

214-
if code == _BLOSC2_EXT_CODE:
215-
return blosc2.from_cframe(data, copy=True)
216-
if code == _BLOSC2_STRUCTURED_EXT_CODE:
217-
return _decode_structured_reference(data)
218-
return ExtType(code, data)
219186

187+
def _open_b2object(obj):
188+
marker = _read_b2object_marker(obj)
189+
if marker is None:
190+
return None
220191

221-
def msgpack_unpackb(payload):
222-
return unpackb(payload, list_hook=decode_tuple_list_hook, ext_hook=_decode_msgpack_ext)
192+
payload = _read_b2object_payload(obj)
193+
if marker.get("version") != _B2OBJECT_VERSION:
194+
raise ValueError(f"Unsupported persisted Blosc2 object version: {marker.get('version')!r}")
195+
if marker.get("kind") != payload.get("kind"):
196+
raise ValueError("Persisted Blosc2 object marker/payload kind mismatch")
197+
return _decode_b2object_payload(payload)

src/blosc2/batch_store.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
import numpy as np
1919

2020
import blosc2
21-
from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb
2221
from blosc2.info import InfoReporter, format_nbytes_info
22+
from blosc2.msgpack_utils import msgpack_packb, msgpack_unpackb
2323

2424
_BATCHSTORE_META = {"version": 1, "serializer": "msgpack", "items_per_block": None, "arrow_schema": None}
2525
_SUPPORTED_SERIALIZERS = {"msgpack", "arrow"}

src/blosc2/c2array.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import requests
1919

2020
import blosc2
21+
from blosc2.b2objects import _encode_b2object_payload, _make_b2object_carrier, _write_b2object_payload
2122
from blosc2.info import InfoReporter, format_nbytes_info
2223

2324
_subscriber_data = {
@@ -238,6 +239,37 @@ def __init__(self, path: str, /, urlbase: str | None = None, auth_token: str | N
238239
cparams.pop("filters, meta", None)
239240
self._cparams = blosc2.CParams(**cparams)
240241

242+
def _to_b2object_payload(self) -> dict:
243+
payload = _encode_b2object_payload(self)
244+
if payload is None:
245+
raise TypeError("Unsupported persisted Blosc2 object")
246+
return payload
247+
248+
def _to_b2object_carrier(self, **kwargs):
249+
array = _make_b2object_carrier(
250+
"c2array",
251+
self.shape,
252+
self.dtype,
253+
chunks=self.chunks,
254+
blocks=self.blocks,
255+
cparams=self.cparams,
256+
**kwargs,
257+
)
258+
_write_b2object_payload(array, self._to_b2object_payload())
259+
return array
260+
261+
def to_cframe(self) -> bytes:
262+
"""Serialize the remote array reference as a CFrame-backed Blosc2 object."""
263+
return self._to_b2object_carrier().to_cframe()
264+
265+
def save(self, urlpath: str, contiguous: bool = True, **kwargs) -> None:
266+
"""Persist the remote array reference using a CFrame-backed carrier."""
267+
blosc2.blosc2_ext.check_access_mode(urlpath, "w")
268+
kwargs["urlpath"] = urlpath
269+
kwargs["contiguous"] = contiguous
270+
kwargs["mode"] = "w"
271+
self._to_b2object_carrier(**kwargs)
272+
241273
def __getitem__(self, slice_: int | slice | Sequence[slice]) -> np.ndarray:
242274
"""
243275
Get a slice of the array (returning NumPy array).

src/blosc2/core.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1918,7 +1918,9 @@ def ndarray_from_cframe(cframe: bytes | str, copy: bool = False) -> blosc2.NDArr
19181918

19191919
def from_cframe(
19201920
cframe: bytes | str, copy: bool = True
1921-
) -> blosc2.EmbedStore | blosc2.NDArray | blosc2.SChunk | blosc2.BatchStore | blosc2.VLArray:
1921+
) -> (
1922+
blosc2.EmbedStore | blosc2.NDArray | blosc2.SChunk | blosc2.BatchStore | blosc2.VLArray | blosc2.C2Array
1923+
):
19221924
"""Create a :ref:`EmbedStore <EmbedStore>`, :ref:`NDArray <NDArray>`, :ref:`SChunk <SChunk>`,
19231925
:ref:`BatchStore <BatchStore>` or :ref:`VLArray <VLArray>` instance
19241926
from a contiguous frame buffer.
@@ -1956,6 +1958,8 @@ def from_cframe(
19561958
return blosc2.BatchStore(_from_schunk=schunk_from_cframe(cframe, copy=copy))
19571959
if "vlarray" in schunk.meta:
19581960
return blosc2.vlarray_from_cframe(cframe, copy=copy)
1961+
if "b2o" in schunk.meta:
1962+
return blosc2._open_b2object(ndarray_from_cframe(cframe, copy=copy))
19591963
if "b2nd" in schunk.meta:
19601964
return ndarray_from_cframe(cframe, copy=copy)
19611965
return schunk_from_cframe(cframe, copy=copy)

0 commit comments

Comments
 (0)