feat: add sorted_series column for DataFusion streaming aggregation#6290
feat: add sorted_series column for DataFusion streaming aggregation#6290
Conversation
60d859c to
9522326
Compare
53bc3e0 to
cb1b4d2
Compare
9522326 to
b0344ba
Compare
9a8b9a5 to
58f4810
Compare
| // UInt8 columns (e.g., metric_type) are encoded as their string | ||
| // representation. This is rare in the sort schema but handled for | ||
| // completeness. | ||
| DataType::UInt8 => None, |
There was a problem hiding this comment.
From Claude:
The extract_string_value function returns None for DataType::UInt8, meaning any UInt8 column in the sort schema (e.g., metric_type if it were ever added before timeseries_id) would be silently skipped from the key, reducing discrimination. The comment says "handled for completeness" but the behavior is actually "silently ignored." If the intent is to skip UInt8 columns, a code comment explaining why they are excluded from the key would be more accurate.
There was a problem hiding this comment.
i think we can ignore this comment, and remove this case, because tags are always strings.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 652d128d5e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if col.is_null(row_idx) { | ||
| continue; | ||
| } |
There was a problem hiding this comment.
Encode null tag positions in sorted_series keys
encode_row_key skips null sort columns entirely, so rows with a null in an earlier tag column can compare after rows with non-null values in that same column (because the next encoded ordinal is larger). The writer sorts input with nulls_first=true, so this makes sorted_series non-monotonic relative to the file’s physical sort order whenever nullable tags are present. If sorted_series is used as the streaming partition/sort key (the stated goal), streaming aggregate/window execution can receive keys in the wrong order.
Useful? React with 👍 / 👎.
| if let Some(kc) = ts_id_column { | ||
| let col = batch.column(kc.batch_idx); | ||
| if !col.is_null(row_idx) { | ||
| let ts_id = extract_i64_value(col.as_ref(), row_idx); |
There was a problem hiding this comment.
Require timeseries_id when building sorted_series
The key appends timeseries_id only when that column exists, so accepted batches without timeseries_id silently produce keys that omit the only guaranteed discriminator for series identity. In that case, different series that share sort tags (e.g., same metric/tags but different metric_type) can collapse onto the same sorted_series value, which can merge groups incorrectly once this column is used for streaming aggregation.
Useful? React with 👍 / 👎.
0a0adb5 to
720f498
Compare
|
commit history is messed up, pushed existing history to https://github.com/quickwit-oss/quickwit/tree/matthew.kim/gtt/sorted-series-key just in case, going to cherry pick the commits we want |
Compute a composite, lexicographically sortable binary column (sorted_series) at Parquet write time using storekey order-preserving encoding. For each row the key encodes: 1. Non-null sort schema tag columns as (ordinal: u8, value: str) 2. timeseries_id (i64) as final discriminator Identical timeseries always produce identical byte keys regardless of timestamp or value, enabling DataFusion's streaming AggregateExec and BoundedWindowAggExec with O(1) memory instead of O(N) hash tables. Also fixes create_nullable_dict_array which used the original array index as dictionary key instead of the position in the unique values array, causing out-of-bounds panics for mixed null/non-null inputs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Without the ordinal, the timeseries_id bytes could collide with a subsequent tag column's ordinal+string encoding. Every component in the key now consistently gets an ordinal prefix from its sort schema position. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add tests that assert: - timeseries_id gets ordinal 6 prefix (its sort schema position) - key length is exact: ordinal(1) + str(2) + ordinal(1) + i64(8) = 12 - when timeseries_id is absent, no trailing ordinal appears Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Writes a 6-row batch with 4 distinct series (including null tags) through the ParquetWriter pipeline, reads back, and verifies: - 4 distinct keys produced (series identity) - series with 3 rows produces 3 identical keys - null host differs from present host (ordinal skipping) - all-null tags differ from partial-null tags - ordinal bytes are correct (0x00 for metric_name, 0x01 for service, 0x06 for timeseries_id) even when intermediate tags are null - equal keys are contiguous after sort (streaming aggregation ready) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Regenerate storekey entry via dd-rust-license-tool (correct authors) - Fix 4 rustfmt nightly formatting diffs in sorted_series tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
652d128 to
069a5a1
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 069a5a1871
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| crate::sorted_series::append_sorted_series_column(&self.sort_fields_string, batch) | ||
| .map_err(|e| ParquetWriteError::SchemaValidation(e.to_string()))?; |
There was a problem hiding this comment.
Keep writes tolerant of malformed sort_fields
ParquetWriter::new already treats an unparseable sort_fields string as non-fatal (it logs and uses empty sort order), but prepare_write now always calls append_sorted_series_column, which reparses and propagates parse errors as SchemaValidation. That turns a previously degraded-but-working path into a hard write failure for malformed or legacy sort_fields values, which can stop ingestion instead of just disabling sorting behavior.
Useful? React with 👍 / 👎.
| DataType::UInt8 => None, | ||
| _ => None, |
There was a problem hiding this comment.
Encode non-string sort fields in sorted_series keys
This branch drops UInt8 (and other non-string) sort columns by returning None, so encode_row_key silently omits those dimensions from sorted_series. For schemas that include numeric pre-timestamp sort columns (supported by the sort-fields parser), rows can get keys that no longer reflect the configured sort order, which undermines correctness when sorted_series is used as the streaming grouping/order key.
Useful? React with 👍 / 👎.
Summary
sorted_seriesbinary column at Parquet write time usingstorekeyorder-preserving encoding(ordinal: u8, value: str)pairs, then appendstimeseries_id(i64) as final discriminatorAggregateExecandBoundedWindowAggExecwith O(1) memory instead of O(N) hash tablesreorder_columns) for optimal streaming readcreate_nullable_dict_arraybug: dictionary keys now correctly index into unique values (was using original array index, causing panics for mixed null/non-null inputs)Stacked on top of #6287 (column ordering) and timeseries_id work.
Design
Based on the Sorted Series Column design doc:
Null columns are skipped. The ordinal prefix prevents cross-column byte collisions for sparse schemas.
Test plan
quickwit-parquet-enginetests pass🤖 Generated with Claude Code