Skip to content

Commit 426d6c7

Browse files
committed
Harden full OOC index merge writes and add regression coverage
1 parent ff479be commit 426d6c7

2 files changed

Lines changed: 94 additions & 8 deletions

File tree

src/blosc2/indexing.py

Lines changed: 74 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2582,6 +2582,38 @@ def _read_ndarray_linear_span(array: blosc2.NDArray | np.ndarray, start: int, ou
25822582
out_cursor += take
25832583

25842584

2585+
def _write_ndarray_linear_span(array: blosc2.NDArray | np.ndarray, start: int, values: np.ndarray) -> None:
2586+
if len(values) == 0:
2587+
return
2588+
stop = int(start) + len(values)
2589+
if stop > int(array.shape[0]):
2590+
raise RuntimeError(
2591+
f"attempted to write past the end of temporary array: stop={stop}, length={int(array.shape[0])}"
2592+
)
2593+
if isinstance(array, np.ndarray):
2594+
array[start:stop] = values
2595+
return
2596+
chunk_len = int(array.chunks[0])
2597+
cursor = int(start)
2598+
in_cursor = 0
2599+
while in_cursor < len(values):
2600+
chunk_id = cursor // chunk_len
2601+
local_start = cursor % chunk_len
2602+
take = min(len(values) - in_cursor, chunk_len - local_start)
2603+
try:
2604+
array[cursor : cursor + take] = values[in_cursor : in_cursor + take]
2605+
except Exception as exc:
2606+
raise RuntimeError(
2607+
"failed temporary sidecar span write: "
2608+
f"array_len={int(array.shape[0])}, chunk_len={chunk_len}, "
2609+
f"write_start={cursor}, write_stop={cursor + take}, "
2610+
f"write_items={take}, local_start={local_start}, chunk_id={chunk_id}, "
2611+
f"input_offset={in_cursor}, input_len={len(values)}, dtype={np.dtype(array.dtype)}"
2612+
) from exc
2613+
cursor += take
2614+
in_cursor += take
2615+
2616+
25852617
def _read_sidecar_span(handle, start: int, stop: int) -> np.ndarray:
25862618
if stop <= start:
25872619
return np.empty(0, dtype=np.dtype(handle.dtype))
@@ -2608,8 +2640,8 @@ def _materialize_sorted_run(
26082640
run_positions = _create_blosc2_temp_array(
26092641
positions_path, length, np.dtype(np.int64), FULL_OOC_MERGE_BUFFER_ITEMS, cparams
26102642
)
2611-
run_values[:] = values
2612-
run_positions[:] = positions
2643+
_write_ndarray_linear_span(run_values, 0, values)
2644+
_write_ndarray_linear_span(run_positions, 0, positions)
26132645
del run_values, run_positions
26142646
_tracker_register_create(tracker, values_path, positions_path)
26152647
return SortedRun(values_path, positions_path, length)
@@ -2698,6 +2730,7 @@ def _merge_run_pair(
26982730
out_positions = _create_blosc2_temp_array(
26992731
out_positions_path, left.length + right.length, np.dtype(np.int64), buffer_items, cparams
27002732
)
2733+
out_total = left.length + right.length
27012734

27022735
left_cursor = 0
27032736
right_cursor = 0
@@ -2720,16 +2753,32 @@ def _merge_run_pair(
27202753
break
27212754
if left_values.size == 0:
27222755
take = right_values.size
2723-
out_values[out_cursor : out_cursor + take] = right_values
2724-
out_positions[out_cursor : out_cursor + take] = right_positions
2756+
try:
2757+
_write_ndarray_linear_span(out_values, out_cursor, right_values)
2758+
_write_ndarray_linear_span(out_positions, out_cursor, right_positions)
2759+
except Exception as exc:
2760+
raise RuntimeError(
2761+
"full index OOC merge write failed while flushing right run remainder: "
2762+
f"merge_id={merge_id}, left_len={left.length}, right_len={right.length}, "
2763+
f"out_total={out_total}, out_cursor={out_cursor}, take={take}, "
2764+
f"left_cursor={left_cursor}, right_cursor={right_cursor}, buffer_items={buffer_items}"
2765+
) from exc
27252766
out_cursor += take
27262767
right_values = np.empty(0, dtype=dtype)
27272768
right_positions = np.empty(0, dtype=np.int64)
27282769
continue
27292770
if right_values.size == 0:
27302771
take = left_values.size
2731-
out_values[out_cursor : out_cursor + take] = left_values
2732-
out_positions[out_cursor : out_cursor + take] = left_positions
2772+
try:
2773+
_write_ndarray_linear_span(out_values, out_cursor, left_values)
2774+
_write_ndarray_linear_span(out_positions, out_cursor, left_positions)
2775+
except Exception as exc:
2776+
raise RuntimeError(
2777+
"full index OOC merge write failed while flushing left run remainder: "
2778+
f"merge_id={merge_id}, left_len={left.length}, right_len={right.length}, "
2779+
f"out_total={out_total}, out_cursor={out_cursor}, take={take}, "
2780+
f"left_cursor={left_cursor}, right_cursor={right_cursor}, buffer_items={buffer_items}"
2781+
) from exc
27332782
out_cursor += take
27342783
left_values = np.empty(0, dtype=dtype)
27352784
left_positions = np.empty(0, dtype=np.int64)
@@ -2754,14 +2803,31 @@ def _merge_run_pair(
27542803
np.int64,
27552804
)
27562805
take = merged_values.size
2757-
out_values[out_cursor : out_cursor + take] = merged_values
2758-
out_positions[out_cursor : out_cursor + take] = merged_positions
2806+
try:
2807+
_write_ndarray_linear_span(out_values, out_cursor, merged_values)
2808+
_write_ndarray_linear_span(out_positions, out_cursor, merged_positions)
2809+
except Exception as exc:
2810+
raise RuntimeError(
2811+
"full index OOC merge write failed for merged batch: "
2812+
f"merge_id={merge_id}, left_len={left.length}, right_len={right.length}, "
2813+
f"out_total={out_total}, out_cursor={out_cursor}, take={take}, "
2814+
f"left_cursor={left_cursor}, right_cursor={right_cursor}, "
2815+
f"left_buffer={left_values.size}, right_buffer={right_values.size}, "
2816+
f"left_cut={left_cut}, right_cut={right_cut}, buffer_items={buffer_items}"
2817+
) from exc
27592818
out_cursor += take
27602819
left_values = left_values[left_cut:]
27612820
left_positions = left_positions[left_cut:]
27622821
right_values = right_values[right_cut:]
27632822
right_positions = right_positions[right_cut:]
27642823

2824+
if out_cursor != out_total:
2825+
raise RuntimeError(
2826+
"full index OOC merge produced an unexpected output length: "
2827+
f"merge_id={merge_id}, left_len={left.length}, right_len={right.length}, "
2828+
f"expected={out_total}, written={out_cursor}"
2829+
)
2830+
27652831
del out_values, out_positions
27662832
_tracker_register_create(tracker, out_values_path, out_positions_path)
27672833
del left_values_mm, left_positions_mm, right_values_mm, right_positions_mm

tests/ndarray/test_indexing.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,6 +1103,26 @@ def test_compact_full_expression_index_preserves_results():
11031103
np.testing.assert_array_equal(expr.compute()[:], expected[expected_mask])
11041104

11051105

1106+
def test_forced_ooc_full_index_merge_preserves_sorted_sidecars(monkeypatch, tmp_path):
1107+
path = tmp_path / "forced_ooc_full_merge.b2nd"
1108+
rng = np.random.default_rng(14)
1109+
data = np.arange(4096, dtype=np.int64)
1110+
rng.shuffle(data)
1111+
1112+
arr = blosc2.asarray(data, urlpath=path, mode="w", chunks=(256,), blocks=(64,))
1113+
indexing = __import__("blosc2.indexing", fromlist=["FULL_OOC_RUN_ITEMS", "FULL_OOC_MERGE_BUFFER_ITEMS"])
1114+
monkeypatch.setattr(indexing, "FULL_OOC_RUN_ITEMS", 512)
1115+
monkeypatch.setattr(indexing, "FULL_OOC_MERGE_BUFFER_ITEMS", 128)
1116+
1117+
descriptor = arr.create_index(kind="full")
1118+
meta = descriptor["full"]
1119+
values_sidecar = blosc2.open(meta["values_path"])
1120+
positions_sidecar = blosc2.open(meta["positions_path"])
1121+
1122+
np.testing.assert_array_equal(values_sidecar[:], np.sort(data, kind="stable"))
1123+
np.testing.assert_array_equal(values_sidecar[:], data[positions_sidecar[:]])
1124+
1125+
11061126
@pytest.mark.parametrize("persistent", [False, True])
11071127
def test_compact_full_index_rebuilds_navigation_without_whole_loading(monkeypatch, tmp_path, persistent):
11081128
dtype = np.dtype([("a", np.int64), ("b", np.int64)])

0 commit comments

Comments
 (0)