Skip to content

feat: add sorted_series column for DataFusion streaming aggregation#6290

Open
g-talbot wants to merge 8 commits intomainfrom
gtt/sorted-series-key
Open

feat: add sorted_series column for DataFusion streaming aggregation#6290
g-talbot wants to merge 8 commits intomainfrom
gtt/sorted-series-key

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

Summary

  • Compute a composite, lexicographically sortable sorted_series binary column at Parquet write time using storekey order-preserving encoding
  • For each row, encodes non-null sort schema tag columns as (ordinal: u8, value: str) pairs, then appends timeseries_id (i64) as final discriminator
  • Identical timeseries always produce identical byte keys regardless of timestamp or value, enabling DataFusion's streaming AggregateExec and BoundedWindowAggExec with O(1) memory instead of O(N) hash tables
  • Column is placed after sort columns in physical layout (Phase 1b in reorder_columns) for optimal streaming read
  • Fixes create_nullable_dict_array bug: dictionary keys now correctly index into unique values (was using original array index, causing panics for mixed null/non-null inputs)

Stacked on top of #6287 (column ordering) and timeseries_id work.

Design

Based on the Sorted Series Column design doc:

Key structure for sort schema [metric_name(0), service(1), ..., host(5)]:

┌──────────┬────────────────┬──────────┬──────────────┬─────────────────┐
│ ordinal 0│ "cpu.usage"    │ ordinal 1│ "api"        │ timeseries_id   │
│ (u8)     │ (storekey str) │ (u8)     │ (storekey)   │ (storekey i64)  │
└──────────┴────────────────┴──────────┴──────────────┴─────────────────┘

Null columns are skipped. The ordinal prefix prevents cross-column byte collisions for sparse schemas.

Test plan

  • 19 tests (identity, discrimination, sort-order, null handling, stability, Parquet round-trip, structural ordinal verification, 2 proptests)
  • All 195 quickwit-parquet-engine tests pass
  • Clippy clean, formatted, no unused deps
  • License headers pass
  • Docs compile

🤖 Generated with Claude Code

@g-talbot g-talbot changed the base branch from main to gtt/sorted-series-column April 10, 2026 14:12
@g-talbot g-talbot changed the base branch from gtt/sorted-series-column to main April 10, 2026 14:12
@g-talbot g-talbot changed the base branch from main to gtt/sorted-series-column April 10, 2026 14:14
@g-talbot g-talbot force-pushed the gtt/sorted-series-column branch from 60d859c to 9522326 Compare April 10, 2026 14:17
@g-talbot g-talbot force-pushed the gtt/sorted-series-key branch from 53bc3e0 to cb1b4d2 Compare April 10, 2026 14:25
@g-talbot g-talbot force-pushed the gtt/sorted-series-column branch from 9522326 to b0344ba Compare April 10, 2026 14:42
@g-talbot g-talbot force-pushed the gtt/sorted-series-key branch from 9a8b9a5 to 58f4810 Compare April 10, 2026 14:42
@g-talbot g-talbot requested a review from mattmkim April 10, 2026 16:05
// UInt8 columns (e.g., metric_type) are encoded as their string
// representation. This is rare in the sort schema but handled for
// completeness.
DataType::UInt8 => None,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Claude:

The extract_string_value function returns None for DataType::UInt8, meaning any UInt8 column in the sort schema (e.g., metric_type if it were ever added before timeseries_id) would be silently skipped from the key, reducing discrimination. The comment says "handled for completeness" but the behavior is actually "silently ignored." If the intent is to skip UInt8 columns, a code comment explaining why they are excluded from the key would be more accurate.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we can ignore this comment, and remove this case, because tags are always strings.

@mattmkim
Copy link
Copy Markdown
Contributor

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 652d128d5e

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +211 to +213
if col.is_null(row_idx) {
continue;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Encode null tag positions in sorted_series keys

encode_row_key skips null sort columns entirely, so rows with a null in an earlier tag column can compare after rows with non-null values in that same column (because the next encoded ordinal is larger). The writer sorts input with nulls_first=true, so this makes sorted_series non-monotonic relative to the file’s physical sort order whenever nullable tags are present. If sorted_series is used as the streaming partition/sort key (the stated goal), streaming aggregate/window execution can receive keys in the wrong order.

Useful? React with 👍 / 👎.

Comment on lines +223 to +226
if let Some(kc) = ts_id_column {
let col = batch.column(kc.batch_idx);
if !col.is_null(row_idx) {
let ts_id = extract_i64_value(col.as_ref(), row_idx);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Require timeseries_id when building sorted_series

The key appends timeseries_id only when that column exists, so accepted batches without timeseries_id silently produce keys that omit the only guaranteed discriminator for series identity. In that case, different series that share sort tags (e.g., same metric/tags but different metric_type) can collapse onto the same sorted_series value, which can merge groups incorrectly once this column is used for streaming aggregation.

Useful? React with 👍 / 👎.

@mattmkim mattmkim force-pushed the gtt/sorted-series-column branch 2 times, most recently from 0a0adb5 to 720f498 Compare April 15, 2026 21:27
@mattmkim
Copy link
Copy Markdown
Contributor

commit history is messed up, pushed existing history to https://github.com/quickwit-oss/quickwit/tree/matthew.kim/gtt/sorted-series-key just in case, going to cherry pick the commits we want

Base automatically changed from gtt/sorted-series-column to main April 15, 2026 22:27
g-talbot and others added 8 commits April 15, 2026 15:30
Compute a composite, lexicographically sortable binary column
(sorted_series) at Parquet write time using storekey order-preserving
encoding. For each row the key encodes:

  1. Non-null sort schema tag columns as (ordinal: u8, value: str)
  2. timeseries_id (i64) as final discriminator

Identical timeseries always produce identical byte keys regardless of
timestamp or value, enabling DataFusion's streaming AggregateExec and
BoundedWindowAggExec with O(1) memory instead of O(N) hash tables.

Also fixes create_nullable_dict_array which used the original array
index as dictionary key instead of the position in the unique values
array, causing out-of-bounds panics for mixed null/non-null inputs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Without the ordinal, the timeseries_id bytes could collide with a
subsequent tag column's ordinal+string encoding. Every component in
the key now consistently gets an ordinal prefix from its sort schema
position.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add tests that assert:
- timeseries_id gets ordinal 6 prefix (its sort schema position)
- key length is exact: ordinal(1) + str(2) + ordinal(1) + i64(8) = 12
- when timeseries_id is absent, no trailing ordinal appears

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Writes a 6-row batch with 4 distinct series (including null tags)
through the ParquetWriter pipeline, reads back, and verifies:

- 4 distinct keys produced (series identity)
- series with 3 rows produces 3 identical keys
- null host differs from present host (ordinal skipping)
- all-null tags differ from partial-null tags
- ordinal bytes are correct (0x00 for metric_name, 0x01 for service,
  0x06 for timeseries_id) even when intermediate tags are null
- equal keys are contiguous after sort (streaming aggregation ready)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Regenerate storekey entry via dd-rust-license-tool (correct authors)
- Fix 4 rustfmt nightly formatting diffs in sorted_series tests

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@mattmkim mattmkim force-pushed the gtt/sorted-series-key branch from 652d128 to 069a5a1 Compare April 15, 2026 22:30
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 069a5a1871

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +378 to +379
crate::sorted_series::append_sorted_series_column(&self.sort_fields_string, batch)
.map_err(|e| ParquetWriteError::SchemaValidation(e.to_string()))?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep writes tolerant of malformed sort_fields

ParquetWriter::new already treats an unparseable sort_fields string as non-fatal (it logs and uses empty sort order), but prepare_write now always calls append_sorted_series_column, which reparses and propagates parse errors as SchemaValidation. That turns a previously degraded-but-working path into a hard write failure for malformed or legacy sort_fields values, which can stop ingestion instead of just disabling sorting behavior.

Useful? React with 👍 / 👎.

Comment on lines +264 to +265
DataType::UInt8 => None,
_ => None,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Encode non-string sort fields in sorted_series keys

This branch drops UInt8 (and other non-string) sort columns by returning None, so encode_row_key silently omits those dimensions from sorted_series. For schemas that include numeric pre-timestamp sort columns (supported by the sort-fields parser), rows can get keys that no longer reflect the configured sort order, which undermines correctness when sorted_series is used as the streaming grouping/order key.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants