Skip to content

Commit c3696ef

Browse files
Optimize full (and light/medium) index creation
Three independent optimizations, all in indexing.py. Stage 1: replace per-run np.lexsort with the native Cython intra_chunk_sort_run kernel in _build_full_descriptor_ooc(). Eliminates the np.arange allocation, the O(N log N) lexsort, and two gather passes in favour of the GIL-free stable mergesort. Stage 2: replace _merge_sorted_slices (concat + lexsort, O(N log N) linear merge, GIL-free) inside _merge_run_pair(). Stage 3: rewrite _build_levels_descriptor_ooc() with a single-pass strategy. Previously the function issued one array[start:stop] decompression call per segment across all levels (e.g. 9008 calls for 10M rows on a Mac M4: 8 chunk + 1000 block + 8000 subblock). The new code decompresses each array chunk exactly once and computes all level summaries in a single vectorized pass via _fill_summaries_from_2d(), falling back to the original per-segment path only when segment sizes do not divide the chunk length evenly. Measured on 10M random float64: Mac mini M4 (chunks=1.25M, blocks=10K): light: 940 ms -> 431 ms (2.2x) medium: 4696 ms -> 394 ms (11.9x) full: 9650 ms -> 1584 ms (6.1x) AMD 7800X3D Linux (chunks=2M, blocks=20K): light: 603 ms -> 430 ms (1.4x) medium: 2050 ms -> 395 ms (5.2x) full: 6856 ms -> 1582 ms (4.3x) Query latency and index sizes are unchanged. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 2b93a71 commit c3696ef

1 file changed

Lines changed: 83 additions & 18 deletions

File tree

src/blosc2/indexing.py

Lines changed: 83 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,36 @@ def _compute_segment_summaries(values: np.ndarray, dtype: np.dtype, segment_len:
897897
return summaries
898898

899899

900+
def _fill_summaries_from_2d(
901+
data_2d: np.ndarray,
902+
summaries_arr: np.ndarray,
903+
offset: int,
904+
dtype: np.dtype,
905+
) -> None:
906+
"""Fill summaries_arr[offset:offset+n] from data_2d (shape n×segment_len) with vectorized ops."""
907+
n = data_2d.shape[0]
908+
if n == 0:
909+
return
910+
if dtype.kind == "f":
911+
with np.errstate(all="ignore"):
912+
has_nan = np.any(np.isnan(data_2d), axis=1)
913+
all_nan = np.all(np.isnan(data_2d), axis=1)
914+
mins = np.nanmin(data_2d, axis=1)
915+
maxs = np.nanmax(data_2d, axis=1)
916+
flags = np.where(has_nan, FLAG_HAS_NAN, np.uint8(0)).astype(np.uint8)
917+
flags = np.where(all_nan, np.uint8(FLAG_ALL_NAN | FLAG_HAS_NAN), flags)
918+
zero = dtype.type(0)
919+
mins = np.where(all_nan, zero, mins).astype(dtype)
920+
maxs = np.where(all_nan, zero, maxs).astype(dtype)
921+
else:
922+
mins = data_2d.min(axis=1)
923+
maxs = data_2d.max(axis=1)
924+
flags = np.zeros(n, dtype=np.uint8)
925+
summaries_arr["min"][offset : offset + n] = mins
926+
summaries_arr["max"][offset : offset + n] = maxs
927+
summaries_arr["flags"][offset : offset + n] = flags
928+
929+
900930
def _compute_sorted_boundaries(values: np.ndarray, dtype: np.dtype, segment_len: int) -> np.ndarray:
901931
nsegments = math.ceil(values.shape[0] / segment_len)
902932
boundaries = np.empty(nsegments, dtype=_boundary_dtype(dtype))
@@ -1067,23 +1097,61 @@ def _build_levels_descriptor_ooc(
10671097
persistent: bool,
10681098
cparams: dict | None = None,
10691099
) -> dict:
1070-
levels = {}
10711100
size = int(array.shape[0])
10721101
summary_dtype = _summary_dtype(dtype)
1073-
for level in SEGMENT_LEVELS_BY_KIND[kind]:
1074-
segment_len = _segment_len(array, level)
1075-
nsegments = math.ceil(size / segment_len)
1076-
summaries = np.empty(nsegments, dtype=summary_dtype)
1077-
for idx in range(nsegments):
1078-
start = idx * segment_len
1079-
stop = min(start + segment_len, size)
1080-
summaries[idx] = _segment_summary(_slice_values_for_target(array, target, start, stop), dtype)
1102+
chunk_len = int(array.chunks[0])
1103+
levels_to_build = SEGMENT_LEVELS_BY_KIND[kind]
1104+
segment_lens = {level: _segment_len(array, level) for level in levels_to_build}
1105+
nsegments_total = {level: math.ceil(size / slen) for level, slen in segment_lens.items()}
1106+
all_summaries = {level: np.empty(n, dtype=summary_dtype) for level, n in nsegments_total.items()}
1107+
1108+
# Fast path: all segment sizes are ≤ chunk_len and divide it evenly, so no segment
1109+
# spans a chunk boundary. A single decompression pass over the data suffices.
1110+
can_fast = all(slen <= chunk_len and chunk_len % slen == 0 for slen in segment_lens.values())
1111+
1112+
if can_fast:
1113+
seg_offsets = dict.fromkeys(levels_to_build, 0)
1114+
nchunks = math.ceil(size / chunk_len)
1115+
for chunk_id in range(nchunks):
1116+
chunk_start = chunk_id * chunk_len
1117+
chunk_stop = min(chunk_start + chunk_len, size)
1118+
chunk_values = _slice_values_for_target(array, target, chunk_start, chunk_stop)
1119+
chunk_size = chunk_stop - chunk_start
1120+
for level in levels_to_build:
1121+
slen = segment_lens[level]
1122+
summaries_arr = all_summaries[level]
1123+
offset = seg_offsets[level]
1124+
n_complete = chunk_size // slen
1125+
remainder = chunk_size % slen
1126+
if n_complete > 0:
1127+
data_2d = chunk_values[: n_complete * slen].reshape(n_complete, slen)
1128+
_fill_summaries_from_2d(data_2d, summaries_arr, offset, dtype)
1129+
if remainder > 0:
1130+
summaries_arr[offset + n_complete] = _segment_summary(
1131+
chunk_values[n_complete * slen :], dtype
1132+
)
1133+
seg_offsets[level] = offset + n_complete + 1
1134+
else:
1135+
seg_offsets[level] = offset + n_complete
1136+
else:
1137+
# Fallback: original segment-by-segment approach
1138+
for level in levels_to_build:
1139+
slen = segment_lens[level]
1140+
for idx in range(nsegments_total[level]):
1141+
start = idx * slen
1142+
stop = min(start + slen, size)
1143+
all_summaries[level][idx] = _segment_summary(
1144+
_slice_values_for_target(array, target, start, stop), dtype
1145+
)
1146+
1147+
levels = {}
1148+
for level in levels_to_build:
10811149
sidecar = _store_array_sidecar(
1082-
array, token, kind, "summary", level, summaries, persistent, cparams=cparams
1150+
array, token, kind, "summary", level, all_summaries[level], persistent, cparams=cparams
10831151
)
10841152
levels[level] = {
1085-
"segment_len": segment_len,
1086-
"nsegments": len(summaries),
1153+
"segment_len": segment_lens[level],
1154+
"nsegments": nsegments_total[level],
10871155
"path": sidecar["path"],
10881156
"dtype": sidecar["dtype"],
10891157
}
@@ -2540,12 +2608,12 @@ def _merge_run_pair(
25402608
)
25412609
right_cut = right_values.size
25422610

2543-
merged_values, merged_positions = _merge_sorted_slices(
2611+
merged_values, merged_positions = indexing_ext.intra_chunk_merge_sorted_slices(
25442612
left_values[:left_cut],
25452613
left_positions[:left_cut],
25462614
right_values[:right_cut],
25472615
right_positions[:right_cut],
2548-
dtype,
2616+
np.int64,
25492617
)
25502618
take = merged_values.size
25512619
out_values[out_cursor : out_cursor + take] = merged_values
@@ -2603,10 +2671,7 @@ def _build_full_descriptor_ooc(
26032671
for run_id, start in enumerate(range(0, size, run_items)):
26042672
stop = min(start + run_items, size)
26052673
values = _slice_values_for_target(array, target, start, stop)
2606-
positions = np.arange(start, stop, dtype=np.int64)
2607-
order = np.lexsort((positions, values))
2608-
sorted_values = values[order]
2609-
sorted_positions = positions[order]
2674+
sorted_values, sorted_positions = indexing_ext.intra_chunk_sort_run(values, start, np.int64)
26102675
runs.append(
26112676
_materialize_sorted_run(
26122677
sorted_values,

0 commit comments

Comments
 (0)