Skip to content

Commit 1efcbf5

Browse files
Add benchmark for struct field filter pushdown in Parquet (apache#20829)
## Which issue does this PR close? - Closes apache#20828 ## Rationale for this change This PR adds a series of benchmarks that compare Parquet row-level filter pushdown for struct field predicates. This establishes a baseline so we can measure the impact of apache#20828 To run, use: ```sh cargo bench -p datafusion-datasource-parquet --bench parquet_struct_filter_pushdown ```
1 parent ed793f0 commit 1efcbf5

2 files changed

Lines changed: 358 additions & 0 deletions

File tree

datafusion/datasource-parquet/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ tokio = { workspace = true }
5858
[dev-dependencies]
5959
chrono = { workspace = true }
6060
criterion = { workspace = true }
61+
datafusion-functions = { workspace = true }
6162
datafusion-functions-nested = { workspace = true }
6263
tempfile = { workspace = true }
6364

@@ -81,3 +82,7 @@ parquet_encryption = [
8182
[[bench]]
8283
name = "parquet_nested_filter_pushdown"
8384
harness = false
85+
86+
[[bench]]
87+
name = "parquet_struct_filter_pushdown"
88+
harness = false
Lines changed: 353 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,353 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for struct field filter pushdown in Parquet.
19+
//!
20+
//! Compares scanning with vs without row-level filter pushdown for
21+
//! predicates on struct sub-fields (e.g. `get_field(s, 'id') = 42`).
22+
//!
23+
//! The dataset schema (in SQL-like notation):
24+
//!
25+
//! ```sql
26+
//! CREATE TABLE t (
27+
//! id INT, -- top-level id, useful for correctness checks
28+
//! large_string TEXT, -- wide column so SELECT * is expensive
29+
//! s STRUCT<
30+
//! id: INT, -- mirrors top-level id
31+
//! large_string: TEXT -- wide sub-field; pushdown with proper projection
32+
//! -- should avoid reading this when filtering on s.id
33+
//! >
34+
//! );
35+
//! ```
36+
//!
37+
//! Benchmark queries:
38+
//!
39+
//! 1. `SELECT * FROM t WHERE get_field(s, 'id') = 42`
40+
//! - no pushdown vs. row-level filter pushdown
41+
//! 2. `SELECT * FROM t WHERE get_field(s, 'id') = id`
42+
//! - cross-column predicate; no pushdown vs. row-level filter pushdown
43+
//! 3. `SELECT id FROM t WHERE get_field(s, 'id') = 42`
44+
//! - narrow projection; pushdown should avoid reading s.large_string
45+
46+
use std::path::{Path, PathBuf};
47+
use std::sync::{Arc, LazyLock};
48+
49+
use arrow::array::{BooleanArray, Int32Array, RecordBatch, StringBuilder, StructArray};
50+
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
51+
use criterion::{Criterion, Throughput, criterion_group, criterion_main};
52+
use datafusion_common::ScalarValue;
53+
use datafusion_datasource_parquet::{ParquetFileMetrics, build_row_filter};
54+
use datafusion_expr::{Expr, col};
55+
use datafusion_physical_expr::planner::logical2physical;
56+
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
57+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
58+
use parquet::arrow::{ArrowWriter, ProjectionMask};
59+
use parquet::file::properties::WriterProperties;
60+
use tempfile::TempDir;
61+
62+
const ROW_GROUP_ROW_COUNT: usize = 10_000;
63+
const TOTAL_ROW_GROUPS: usize = 10;
64+
const TOTAL_ROWS: usize = ROW_GROUP_ROW_COUNT * TOTAL_ROW_GROUPS;
65+
/// Only one row group will contain the target value.
66+
const TARGET_VALUE: i32 = 42;
67+
const ID_COLUMN_NAME: &str = "id";
68+
const LARGE_STRING_COLUMN_NAME: &str = "large_string";
69+
const STRUCT_COLUMN_NAME: &str = "s";
70+
// Large string payload to emphasize decoding overhead when pushdown is disabled.
71+
const LARGE_STRING_LEN: usize = 8 * 1024;
72+
73+
struct BenchmarkDataset {
74+
_tempdir: TempDir,
75+
file_path: PathBuf,
76+
}
77+
78+
impl BenchmarkDataset {
79+
fn path(&self) -> &Path {
80+
&self.file_path
81+
}
82+
}
83+
84+
static DATASET: LazyLock<BenchmarkDataset> = LazyLock::new(|| {
85+
create_dataset().expect("failed to prepare parquet benchmark dataset")
86+
});
87+
88+
fn parquet_struct_filter_pushdown(c: &mut Criterion) {
89+
let dataset_path = DATASET.path().to_owned();
90+
let mut group = c.benchmark_group("parquet_struct_filter_pushdown");
91+
group.throughput(Throughput::Elements(TOTAL_ROWS as u64));
92+
93+
// Scenario 1: SELECT * FROM t WHERE get_field(s, 'id') = 42
94+
group.bench_function("select_star/no_pushdown", |b| {
95+
let file_schema = setup_reader(&dataset_path);
96+
let predicate = logical2physical(&struct_id_eq_literal(), &file_schema);
97+
b.iter(|| {
98+
let matched = scan(&dataset_path, &predicate, false, ProjectionMask::all())
99+
.expect("scan succeeded");
100+
assert_eq!(matched, ROW_GROUP_ROW_COUNT);
101+
});
102+
});
103+
104+
group.bench_function("select_star/with_pushdown", |b| {
105+
let file_schema = setup_reader(&dataset_path);
106+
let predicate = logical2physical(&struct_id_eq_literal(), &file_schema);
107+
b.iter(|| {
108+
let matched = scan(&dataset_path, &predicate, true, ProjectionMask::all())
109+
.expect("scan succeeded");
110+
assert_eq!(matched, ROW_GROUP_ROW_COUNT);
111+
});
112+
});
113+
114+
// Scenario 2: SELECT * FROM t WHERE get_field(s, 'id') = id
115+
group.bench_function("select_star_cross_col/no_pushdown", |b| {
116+
let file_schema = setup_reader(&dataset_path);
117+
let predicate = logical2physical(&struct_id_eq_top_id(), &file_schema);
118+
b.iter(|| {
119+
let matched = scan(&dataset_path, &predicate, false, ProjectionMask::all())
120+
.expect("scan succeeded");
121+
assert_eq!(matched, TOTAL_ROWS);
122+
});
123+
});
124+
125+
group.bench_function("select_star_cross_col/with_pushdown", |b| {
126+
let file_schema = setup_reader(&dataset_path);
127+
let predicate = logical2physical(&struct_id_eq_top_id(), &file_schema);
128+
b.iter(|| {
129+
let matched = scan(&dataset_path, &predicate, true, ProjectionMask::all())
130+
.expect("scan succeeded");
131+
assert_eq!(matched, TOTAL_ROWS);
132+
});
133+
});
134+
135+
// Scenario 3: SELECT id FROM t WHERE get_field(s, 'id') = 42
136+
group.bench_function("select_id/no_pushdown", |b| {
137+
let file_schema = setup_reader(&dataset_path);
138+
let predicate = logical2physical(&struct_id_eq_literal(), &file_schema);
139+
b.iter(|| {
140+
// Without pushdown we must read all columns to evaluate the predicate.
141+
let matched = scan(&dataset_path, &predicate, false, ProjectionMask::all())
142+
.expect("scan succeeded");
143+
assert_eq!(matched, ROW_GROUP_ROW_COUNT);
144+
});
145+
});
146+
147+
group.bench_function("select_id/with_pushdown", |b| {
148+
let file_schema = setup_reader(&dataset_path);
149+
let predicate = logical2physical(&struct_id_eq_literal(), &file_schema);
150+
let id_only = id_projection(&dataset_path);
151+
b.iter(|| {
152+
// With pushdown the filter runs first, then we only project `id`.
153+
let matched = scan(&dataset_path, &predicate, true, id_only.clone())
154+
.expect("scan succeeded");
155+
assert_eq!(matched, ROW_GROUP_ROW_COUNT);
156+
});
157+
});
158+
159+
group.finish();
160+
}
161+
162+
fn setup_reader(path: &Path) -> SchemaRef {
163+
let file = std::fs::File::open(path).expect("failed to open file");
164+
let builder =
165+
ParquetRecordBatchReaderBuilder::try_new(file).expect("failed to build reader");
166+
Arc::clone(builder.schema())
167+
}
168+
169+
/// `get_field(s, 'id') = TARGET_VALUE`
170+
fn struct_id_eq_literal() -> Expr {
171+
let get_field_expr = datafusion_functions::core::get_field().call(vec![
172+
col(STRUCT_COLUMN_NAME),
173+
Expr::Literal(ScalarValue::Utf8(Some("id".to_string())), None),
174+
]);
175+
get_field_expr.eq(Expr::Literal(ScalarValue::Int32(Some(TARGET_VALUE)), None))
176+
}
177+
178+
/// `get_field(s, 'id') = id`
179+
fn struct_id_eq_top_id() -> Expr {
180+
let get_field_expr = datafusion_functions::core::get_field().call(vec![
181+
col(STRUCT_COLUMN_NAME),
182+
Expr::Literal(ScalarValue::Utf8(Some("id".to_string())), None),
183+
]);
184+
get_field_expr.eq(col(ID_COLUMN_NAME))
185+
}
186+
187+
/// Build a [`ProjectionMask`] that only reads the top-level `id` leaf column.
188+
fn id_projection(path: &Path) -> ProjectionMask {
189+
let file = std::fs::File::open(path).expect("failed to open file");
190+
let builder =
191+
ParquetRecordBatchReaderBuilder::try_new(file).expect("failed to build reader");
192+
let parquet_schema = builder.metadata().file_metadata().schema_descr_ptr();
193+
// Leaf index 0 corresponds to the top-level `id` column.
194+
ProjectionMask::leaves(&parquet_schema, [0])
195+
}
196+
197+
fn scan(
198+
path: &Path,
199+
predicate: &Arc<dyn datafusion_physical_expr::PhysicalExpr>,
200+
pushdown: bool,
201+
projection: ProjectionMask,
202+
) -> datafusion_common::Result<usize> {
203+
let file = std::fs::File::open(path)?;
204+
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
205+
let metadata = builder.metadata().clone();
206+
let file_schema = builder.schema();
207+
208+
let metrics = ExecutionPlanMetricsSet::new();
209+
let file_metrics = ParquetFileMetrics::new(0, &path.display().to_string(), &metrics);
210+
211+
let mut filter_applied = false;
212+
let builder = if pushdown {
213+
if let Some(row_filter) =
214+
build_row_filter(predicate, file_schema, &metadata, false, &file_metrics)?
215+
{
216+
filter_applied = true;
217+
builder.with_row_filter(row_filter)
218+
} else {
219+
builder
220+
}
221+
} else {
222+
builder
223+
};
224+
225+
// Only apply a narrow projection when the filter was actually pushed down.
226+
// Otherwise we need all columns to evaluate the predicate manually.
227+
let output_projection = if filter_applied {
228+
projection
229+
} else {
230+
ProjectionMask::all()
231+
};
232+
let reader = builder.with_projection(output_projection).build()?;
233+
234+
let mut matched_rows = 0usize;
235+
for batch in reader {
236+
let batch = batch?;
237+
if filter_applied {
238+
// When the row filter was applied, rows are already filtered.
239+
matched_rows += batch.num_rows();
240+
} else {
241+
matched_rows += count_matches(predicate, &batch)?;
242+
}
243+
}
244+
245+
Ok(matched_rows)
246+
}
247+
248+
fn count_matches(
249+
expr: &Arc<dyn datafusion_physical_expr::PhysicalExpr>,
250+
batch: &RecordBatch,
251+
) -> datafusion_common::Result<usize> {
252+
let values = expr.evaluate(batch)?.into_array(batch.num_rows())?;
253+
let bools = values
254+
.as_any()
255+
.downcast_ref::<BooleanArray>()
256+
.expect("boolean filter result");
257+
258+
Ok(bools.iter().filter(|v| matches!(v, Some(true))).count())
259+
}
260+
261+
fn schema() -> SchemaRef {
262+
let struct_fields = Fields::from(vec![
263+
Field::new("id", DataType::Int32, false),
264+
Field::new(LARGE_STRING_COLUMN_NAME, DataType::Utf8, false),
265+
]);
266+
Arc::new(Schema::new(vec![
267+
Field::new(ID_COLUMN_NAME, DataType::Int32, false),
268+
Field::new(LARGE_STRING_COLUMN_NAME, DataType::Utf8, false),
269+
Field::new(STRUCT_COLUMN_NAME, DataType::Struct(struct_fields), false),
270+
]))
271+
}
272+
273+
fn create_dataset() -> datafusion_common::Result<BenchmarkDataset> {
274+
let tempdir = TempDir::new()?;
275+
let file_path = tempdir.path().join("struct_filter.parquet");
276+
277+
let schema = schema();
278+
let writer_props = WriterProperties::builder()
279+
.set_max_row_group_row_count(Some(ROW_GROUP_ROW_COUNT))
280+
.build();
281+
282+
let mut writer = ArrowWriter::try_new(
283+
std::fs::File::create(&file_path)?,
284+
Arc::clone(&schema),
285+
Some(writer_props),
286+
)?;
287+
288+
// Each row group has a distinct `s.id` value. Only one row group
289+
// matches the target, so pushdown should prune 90% of rows.
290+
for rg_idx in 0..TOTAL_ROW_GROUPS {
291+
let id_value = if rg_idx == TOTAL_ROW_GROUPS - 1 {
292+
TARGET_VALUE
293+
} else {
294+
(rg_idx as i32 + 1) * 1000
295+
};
296+
let batch = build_struct_batch(&schema, id_value, ROW_GROUP_ROW_COUNT)?;
297+
writer.write(&batch)?;
298+
}
299+
300+
writer.close()?;
301+
302+
let reader =
303+
ParquetRecordBatchReaderBuilder::try_new(std::fs::File::open(&file_path)?)?;
304+
assert_eq!(reader.metadata().row_groups().len(), TOTAL_ROW_GROUPS);
305+
306+
Ok(BenchmarkDataset {
307+
_tempdir: tempdir,
308+
file_path,
309+
})
310+
}
311+
312+
fn build_struct_batch(
313+
schema: &SchemaRef,
314+
id_value: i32,
315+
len: usize,
316+
) -> datafusion_common::Result<RecordBatch> {
317+
let large_string: String = "x".repeat(LARGE_STRING_LEN);
318+
319+
// Top-level columns
320+
let top_id_array = Arc::new(Int32Array::from(vec![id_value; len]));
321+
let mut top_string_builder = StringBuilder::new();
322+
for _ in 0..len {
323+
top_string_builder.append_value(&large_string);
324+
}
325+
let top_string_array = Arc::new(top_string_builder.finish());
326+
327+
// Struct sub-fields: s.id mirrors top-level id, s.large_string is the same payload
328+
let struct_id_array = Arc::new(Int32Array::from(vec![id_value; len]));
329+
let mut struct_string_builder = StringBuilder::new();
330+
for _ in 0..len {
331+
struct_string_builder.append_value(&large_string);
332+
}
333+
let struct_string_array = Arc::new(struct_string_builder.finish());
334+
335+
let struct_array = StructArray::from(vec![
336+
(
337+
Arc::new(Field::new("id", DataType::Int32, false)),
338+
struct_id_array as Arc<dyn arrow::array::Array>,
339+
),
340+
(
341+
Arc::new(Field::new(LARGE_STRING_COLUMN_NAME, DataType::Utf8, false)),
342+
struct_string_array as Arc<dyn arrow::array::Array>,
343+
),
344+
]);
345+
346+
Ok(RecordBatch::try_new(
347+
Arc::clone(schema),
348+
vec![top_id_array, top_string_array, Arc::new(struct_array)],
349+
)?)
350+
}
351+
352+
criterion_group!(benches, parquet_struct_filter_pushdown);
353+
criterion_main!(benches);

0 commit comments

Comments
 (0)