Skip to content

Commit 3256632

Browse files
authored
Merge pull request JanKaul#321 from splitgraph/fix-eq-del-predicates
fix: ensure we only push down filters to delete files that are compatible with their schema
2 parents ae1b89e + 3159646 commit 3256632

2 files changed

Lines changed: 81 additions & 76 deletions

File tree

datafusion_iceberg/src/table.rs

Lines changed: 44 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -424,15 +424,15 @@ async fn table_scan(
424424
.unwrap();
425425

426426
// If there is a filter expression the manifests to read are pruned based on the pruning statistics available in the manifest_list file.
427-
let physical_predicate = if let Some(predicate) = conjunction(filters.iter().cloned()) {
428-
Some(create_physical_expr(
429-
&predicate,
430-
&arrow_schema.as_ref().clone().try_into()?,
431-
session.execution_props(),
432-
)?)
433-
} else {
434-
None
435-
};
427+
let physical_predicate = conjunction(filters.iter().cloned())
428+
.map(|predicate| {
429+
create_physical_expr(
430+
&predicate,
431+
&arrow_schema.as_ref().clone().try_into()?,
432+
session.execution_props(),
433+
)
434+
})
435+
.transpose()?;
436436

437437
let mut table_partition_cols = datafusion_partition_columns(partition_fields)?;
438438

@@ -628,32 +628,24 @@ async fn table_scan(
628628
}
629629

630630
let file_source = {
631-
let physical_predicate = physical_predicate.clone();
632631
let table_schema = TableSchema::new(
633632
file_schema.clone(),
634633
table_partition_cols.iter().cloned().map(Arc::new).collect(),
635634
);
636-
async move {
637-
Arc::new(
638-
if let Some(physical_predicate) = physical_predicate.clone() {
639-
ParquetSource::new(table_schema)
640-
.with_predicate(physical_predicate)
641-
.with_pushdown_filters(true)
642-
} else {
643-
ParquetSource::new(table_schema)
644-
},
645-
)
635+
let mut source = ParquetSource::new(table_schema);
636+
if let Some(ref predicate) = physical_predicate {
637+
source = source
638+
.with_predicate(predicate.clone())
639+
.with_pushdown_filters(true);
646640
}
647-
.instrument(tracing::debug_span!("datafusion_iceberg::file_source"))
648-
.await
641+
Arc::new(source)
649642
};
650643

651644
// Create plan for every partition with delete files
652645
let mut plans = stream::iter(equality_delete_file_groups.into_iter())
653646
.then(|(partition_value, mut delete_files)| {
654647
let object_store_url = object_store_url.clone();
655648
let statistics = statistics.clone();
656-
let physical_predicate = physical_predicate.clone();
657649
let schema = &schema;
658650
let file_source = file_source.clone();
659651
let projection_expr = projection_expr.clone();
@@ -707,7 +699,6 @@ async fn table_scan(
707699
.try_fold(None, |acc, delete_manifest| {
708700
let object_store_url = object_store_url.clone();
709701
let statistics = statistics.clone();
710-
let physical_predicate = physical_predicate.clone();
711702
let schema = &schema;
712703
let file_source = file_source.clone();
713704
let mut data_files = Vec::new();
@@ -733,6 +724,7 @@ async fn table_scan(
733724
.unwrap();
734725
data_files.push(data_file);
735726
}
727+
736728
async move {
737729
let delete_schema = schema.project(
738730
delete_manifest
@@ -745,6 +737,27 @@ async fn table_scan(
745737
let delete_file_schema: SchemaRef =
746738
Arc::new((delete_schema.fields()).try_into().unwrap());
747739

740+
// Scope down the filter expressions used in the query to only those
741+
// which are completely contained in the delete file schema.
742+
let delete_physical_predicate = conjunction(
743+
filters
744+
.iter()
745+
.filter(|expr| {
746+
expr.column_refs().iter().all(|col| {
747+
delete_file_schema.field_with_name(col.name()).is_ok()
748+
})
749+
})
750+
.cloned(),
751+
)
752+
.map(|predicate| {
753+
create_physical_expr(
754+
&predicate,
755+
&delete_file_schema.as_ref().clone().try_into()?,
756+
session.execution_props(),
757+
)
758+
})
759+
.transpose()?;
760+
748761
let last_updated_ms = table.metadata().last_updated_ms;
749762
let manifest_path = if enable_manifest_file_path_column {
750763
Some(delete_manifest.0.clone())
@@ -759,15 +772,13 @@ async fn table_scan(
759772
manifest_path,
760773
)?;
761774

762-
let delete_file_source = Arc::new(
763-
if let Some(physical_predicate) = physical_predicate.clone() {
764-
ParquetSource::new(delete_file_schema)
765-
.with_predicate(physical_predicate)
766-
.with_pushdown_filters(true)
767-
} else {
768-
ParquetSource::new(delete_file_schema)
769-
},
770-
);
775+
let mut delete_source = ParquetSource::new(delete_file_schema);
776+
if let Some(pred) = delete_physical_predicate {
777+
delete_source = delete_source
778+
.with_predicate(pred)
779+
.with_pushdown_filters(true);
780+
}
781+
let delete_file_source = Arc::new(delete_source);
771782

772783
let delete_file_scan_config = FileScanConfigBuilder::new(
773784
object_store_url.clone(),

datafusion_iceberg/tests/equality_delete.rs

Lines changed: 37 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use datafusion::common::test_util::batches_to_string;
12
use datafusion::{
23
arrow::{error::ArrowError, record_batch::RecordBatch},
34
assert_batches_eq,
@@ -25,6 +26,15 @@ use object_store::local::LocalFileSystem;
2526
use std::sync::Arc;
2627
use tempfile::TempDir;
2728

29+
async fn run_query(query: &str, ctx: &SessionContext) -> Vec<RecordBatch> {
30+
ctx.sql(query)
31+
.await
32+
.expect("Failed to create plan for query")
33+
.collect()
34+
.await
35+
.expect("Failed to execute query")
36+
}
37+
2838
/// Convert DuckDB's Arrow RecordBatch to DataFusion's Arrow RecordBatch
2939
/// using Arrow IPC format as an interchange format.
3040
/// This allows compatibility between different Arrow versions.
@@ -120,28 +130,19 @@ pub async fn test_equality_delete() {
120130

121131
ctx.register_catalog("warehouse", datafusion_catalog);
122132

123-
ctx.sql(
124-
"INSERT INTO warehouse.test.orders (id, customer_id, product_id, date, amount) VALUES
133+
run_query(
134+
"INSERT INTO warehouse.test.orders (id, customer_id, product_id, date, amount) VALUES
125135
(1, 1, 1, '2020-01-01', 1),
126136
(2, 2, 1, '2020-01-01', 1),
127137
(3, 3, 1, '2020-01-01', 3),
128138
(4, 1, 2, '2020-02-02', 1),
129139
(5, 1, 1, '2020-02-02', 2),
130140
(6, 3, 3, '2020-02-02', 3);",
141+
&ctx,
131142
)
132-
.await
133-
.expect("Failed to create query plan for insert")
134-
.collect()
135-
.await
136-
.expect("Failed to insert values into table");
143+
.await;
137144

138-
let batches = ctx
139-
.sql("select * from warehouse.test.orders order by id")
140-
.await
141-
.expect("Failed to create plan for select")
142-
.collect()
143-
.await
144-
.expect("Failed to execute select query");
145+
let batches = run_query("select * from warehouse.test.orders order by id", &ctx).await;
145146

146147
let expected = [
147148
"+----+-------------+------------+------------+--------+",
@@ -157,15 +158,7 @@ pub async fn test_equality_delete() {
157158
];
158159
assert_batches_eq!(expected, &batches);
159160

160-
let batches = ctx
161-
.sql(
162-
"SELECT id, customer_id, product_id, date FROM warehouse.test.orders WHERE customer_id = 1 order by id",
163-
)
164-
.await
165-
.expect("Failed to create query plan for insert")
166-
.collect()
167-
.await
168-
.expect("Failed to insert values into table");
161+
let batches = run_query("SELECT id, customer_id, product_id, date FROM warehouse.test.orders WHERE customer_id = 1 order by id", &ctx).await;
169162

170163
let expected = [
171164
"+----+-------------+------------+------------+",
@@ -204,13 +197,7 @@ pub async fn test_equality_delete() {
204197
.await
205198
.unwrap();
206199

207-
let batches = ctx
208-
.sql("select * from warehouse.test.orders order by id")
209-
.await
210-
.expect("Failed to create plan for select")
211-
.collect()
212-
.await
213-
.expect("Failed to execute select query");
200+
let batches = run_query("select * from warehouse.test.orders order by id", &ctx).await;
214201

215202
let expected = [
216203
"+----+-------------+------------+------------+--------+",
@@ -237,25 +224,16 @@ pub async fn test_equality_delete() {
237224
assert_batches_eq!(expected, &duckdb_batches);
238225

239226
// Test that projecting a column that is not included in equality deletes works
240-
ctx.sql(
227+
run_query(
241228
"INSERT INTO warehouse.test.orders (id, customer_id, product_id, date, amount) VALUES
242229
(7, 3, 2, '2020-01-01', 2),
243230
(8, 2, 1, '2020-02-02', 3),
244231
(9, 1, 3, '2020-01-01', 1);",
232+
&ctx,
245233
)
246-
.await
247-
.expect("Failed to create query plan for insert")
248-
.collect()
249-
.await
250-
.expect("Failed to insert values into table");
234+
.await;
251235

252-
let batches = ctx
253-
.sql("select sum(amount) from warehouse.test.orders")
254-
.await
255-
.expect("Failed to create plan for select")
256-
.collect()
257-
.await
258-
.expect("Failed to execute select query");
236+
let batches = run_query("select sum(amount) from warehouse.test.orders", &ctx).await;
259237

260238
let expected = [
261239
"+-----------------------------------+",
@@ -265,4 +243,20 @@ pub async fn test_equality_delete() {
265243
"+-----------------------------------+",
266244
];
267245
assert_batches_eq!(expected, &batches);
246+
247+
// Test that using a filter on a column that is not included in equality deletes works
248+
let query = "select count(*) from warehouse.test.orders where product_id = 1 and (amount = 1 or customer_id = 3)";
249+
let batches = run_query(query, &ctx).await;
250+
let expected = [
251+
"+----------+",
252+
"| count(*) |",
253+
"+----------+",
254+
"| 2 |",
255+
"+----------+",
256+
];
257+
assert_batches_eq!(expected, &batches);
258+
259+
// Ensure we only pushed down predicates that have matching columns with delete file schemas (i.e. amount was not pushed down).
260+
let batches = run_query(&format!("explain {query}"), &ctx).await;
261+
assert!(batches_to_string(&batches).contains("projection=[id, customer_id, product_id, date], file_type=parquet, predicate=product_id@2 = 1,"));
268262
}

0 commit comments

Comments
 (0)