Skip to content

Commit e0fd16e

Browse files
bvolpatojonahgao
andauthored
feat(substrait): support Placeholder <-> DynamicParameter in Substrait producer/consumer (#20977)
## Which issue does this PR close? - Closes #20971. ## Rationale for this change The Substrait producer previously returned `not_impl_err!` for `Expr::Placeholder`, meaning any query plan containing parameterized expressions (e.g. `$1`, `$2`) could not be serialized to Substrait. This gap prevented roundtripping prepared statement plans through the Substrait layer. ## What changes are included in this PR? **Producer:** - New `placeholder.rs` module with `from_placeholder` that converts `Expr::Placeholder` to `DynamicParameter`, mapping one-based `$N` DataFusion ids to zero-based Substrait `parameter_reference` values with optional type information. - `handle_placeholder` added to the `SubstraitProducer` trait. **Consumer:** - `consume_dynamic_parameter` converts Substrait `DynamicParameter` back to `Expr::Placeholder`, reversing the index mapping and type conversion. ## Are these changes tested? Yes. Five new integration tests and one unit test: - `roundtrip_placeholder_sql_filter` — SQL-based, `WHERE a > $1` - `roundtrip_placeholder_sql_projection` — SQL-based, `$1` in SELECT + `$2` in WHERE - `roundtrip_placeholder_typed_int64` — typed Int64 placeholder with proto-level `DynamicParameter` verification - `roundtrip_placeholder_multiple_typed` — two typed placeholders (Int64 + Decimal128) - `roundtrip_placeholder_typed_utf8` — Utf8 typed placeholder - `test_parse_placeholder_index` — unit test for index parsing edge cases All 190 integration tests, 28 unit tests, and 3 doc-tests pass. Clippy and fmt are clean. ## Are there any user-facing changes? Query plans containing `Placeholder` expressions can now be serialized to and deserialized from Substrait format. Previously this would return an error. Co-authored-by: Jonah Gao <jonahgao@msn.com>
1 parent f1c643a commit e0fd16e

5 files changed

Lines changed: 232 additions & 8 deletions

File tree

datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,10 +366,22 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
366366

367367
async fn consume_dynamic_parameter(
368368
&self,
369-
_expr: &DynamicParameter,
369+
expr: &DynamicParameter,
370370
_input_schema: &DFSchema,
371371
) -> datafusion::common::Result<Expr> {
372-
not_impl_err!("Dynamic Parameter expression not supported")
372+
let id = format!("${}", expr.parameter_reference + 1);
373+
let field = expr
374+
.r#type
375+
.as_ref()
376+
.map(|t| {
377+
super::from_substrait_type_without_names(self, t).map(|dt| {
378+
Arc::new(datafusion::arrow::datatypes::Field::new(&id, dt, true))
379+
})
380+
})
381+
.transpose()?;
382+
Ok(Expr::Placeholder(
383+
datafusion::logical_expr::expr::Placeholder::new_with_field(id, field),
384+
))
373385
}
374386

375387
// Outer Schema Stack

datafusion/substrait/src/logical_plan/producer/expr/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ mod cast;
2020
mod field_reference;
2121
mod if_then;
2222
mod literal;
23+
mod placeholder;
2324
mod scalar_function;
2425
mod singular_or_list;
2526
mod subquery;
@@ -30,6 +31,7 @@ pub use cast::*;
3031
pub use field_reference::*;
3132
pub use if_then::*;
3233
pub use literal::*;
34+
pub use placeholder::*;
3335
pub use scalar_function::*;
3436
pub use singular_or_list::*;
3537
pub use subquery::*;
@@ -142,7 +144,7 @@ pub fn to_substrait_rex(
142144
#[expect(deprecated)]
143145
Expr::Wildcard { .. } => not_impl_err!("Cannot convert {expr:?} to Substrait"),
144146
Expr::GroupingSet(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"),
145-
Expr::Placeholder(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"),
147+
Expr::Placeholder(expr) => producer.handle_placeholder(expr, schema),
146148
Expr::OuterReferenceColumn(_, _) => {
147149
// OuterReferenceColumn requires tracking outer query schema context for correlated
148150
// subqueries. This is a complex feature that is not yet implemented.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::logical_plan::producer::{SubstraitProducer, to_substrait_type};
19+
use datafusion::common::substrait_err;
20+
use datafusion::logical_expr::expr::Placeholder;
21+
use substrait::proto::expression::RexType;
22+
use substrait::proto::{DynamicParameter, Expression};
23+
24+
pub fn from_placeholder(
25+
producer: &mut impl SubstraitProducer,
26+
placeholder: &Placeholder,
27+
) -> datafusion::common::Result<Expression> {
28+
let parameter_reference = parse_placeholder_index(&placeholder.id)?;
29+
30+
let r#type = placeholder
31+
.field
32+
.as_ref()
33+
.map(|field| to_substrait_type(producer, field.data_type(), field.is_nullable()))
34+
.transpose()?;
35+
36+
Ok(Expression {
37+
rex_type: Some(RexType::DynamicParameter(DynamicParameter {
38+
r#type,
39+
parameter_reference,
40+
})),
41+
})
42+
}
43+
44+
/// Converts a placeholder id like "$1" into a zero-based parameter index.
45+
/// Substrait uses zero-based `parameter_reference` while DataFusion uses
46+
/// one-based `$N` placeholder ids.
47+
fn parse_placeholder_index(id: &str) -> datafusion::common::Result<u32> {
48+
let num_str = id.strip_prefix('$').unwrap_or(id);
49+
match num_str.parse::<u32>() {
50+
Ok(n) if n > 0 => Ok(n - 1),
51+
Ok(_) => substrait_err!("Placeholder index must be >= 1, got: {id}"),
52+
Err(_) => substrait_err!("Cannot parse placeholder id as numeric index: {id}"),
53+
}
54+
}
55+
56+
#[cfg(test)]
57+
mod tests {
58+
use super::*;
59+
60+
#[test]
61+
fn test_parse_placeholder_index() {
62+
assert_eq!(parse_placeholder_index("$1").unwrap(), 0);
63+
assert_eq!(parse_placeholder_index("$2").unwrap(), 1);
64+
assert_eq!(parse_placeholder_index("$100").unwrap(), 99);
65+
assert!(parse_placeholder_index("$0").is_err());
66+
assert!(parse_placeholder_index("$name").is_err());
67+
}
68+
}

datafusion/substrait/src/logical_plan/producer/substrait_producer.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,17 @@ use crate::logical_plan::producer::{
2020
from_aggregate, from_aggregate_function, from_alias, from_between, from_binary_expr,
2121
from_case, from_cast, from_column, from_distinct, from_empty_relation, from_exists,
2222
from_filter, from_in_list, from_in_subquery, from_join, from_like, from_limit,
23-
from_literal, from_projection, from_repartition, from_scalar_function,
24-
from_scalar_subquery, from_set_comparison, from_sort, from_subquery_alias,
25-
from_table_scan, from_try_cast, from_unary_expr, from_union, from_values,
26-
from_window, from_window_function, to_substrait_rel, to_substrait_rex,
23+
from_literal, from_placeholder, from_projection, from_repartition,
24+
from_scalar_function, from_scalar_subquery, from_set_comparison, from_sort,
25+
from_subquery_alias, from_table_scan, from_try_cast, from_unary_expr, from_union,
26+
from_values, from_window, from_window_function, to_substrait_rel, to_substrait_rex,
2727
};
2828
use datafusion::common::{Column, DFSchemaRef, ScalarValue, substrait_err};
2929
use datafusion::execution::SessionState;
3030
use datafusion::execution::registry::SerializerRegistry;
3131
use datafusion::logical_expr::Subquery;
3232
use datafusion::logical_expr::expr::{
33-
Alias, Exists, InList, InSubquery, SetComparison, WindowFunction,
33+
Alias, Exists, InList, InSubquery, Placeholder, SetComparison, WindowFunction,
3434
};
3535
use datafusion::logical_expr::{
3636
Aggregate, Between, BinaryExpr, Case, Cast, Distinct, EmptyRelation, Expr, Extension,
@@ -388,6 +388,14 @@ pub trait SubstraitProducer: Send + Sync + Sized {
388388
) -> datafusion::common::Result<Expression> {
389389
from_exists(self, exists, schema)
390390
}
391+
392+
fn handle_placeholder(
393+
&mut self,
394+
placeholder: &Placeholder,
395+
_schema: &DFSchemaRef,
396+
) -> datafusion::common::Result<Expression> {
397+
from_placeholder(self, placeholder)
398+
}
391399
}
392400

393401
pub struct DefaultSubstraitProducer<'a> {

datafusion/substrait/tests/cases/roundtrip_logical_plan.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1754,6 +1754,140 @@ async fn roundtrip_read_filter() -> Result<()> {
17541754
roundtrip_verify_read_filter_count("SELECT a FROM data where a < 5", 1).await
17551755
}
17561756

1757+
#[tokio::test]
1758+
async fn roundtrip_placeholder_sql_filter() -> Result<()> {
1759+
let plan = generate_plan_from_sql("SELECT a, b FROM data WHERE a > $1", false, false)
1760+
.await?;
1761+
1762+
assert_snapshot!(
1763+
plan,
1764+
@r"
1765+
Projection: data.a, data.b
1766+
Filter: data.a > $1
1767+
TableScan: data
1768+
"
1769+
);
1770+
Ok(())
1771+
}
1772+
1773+
#[tokio::test]
1774+
async fn roundtrip_placeholder_sql_projection() -> Result<()> {
1775+
let plan =
1776+
generate_plan_from_sql("SELECT a, $1 FROM data WHERE a > $2", false, false)
1777+
.await?;
1778+
1779+
assert_snapshot!(
1780+
plan,
1781+
@r"
1782+
Projection: data.a, $1
1783+
Filter: data.a > $2
1784+
TableScan: data
1785+
"
1786+
);
1787+
Ok(())
1788+
}
1789+
1790+
#[tokio::test]
1791+
async fn roundtrip_placeholder_typed_int64() -> Result<()> {
1792+
let ctx = create_context().await?;
1793+
1794+
let placeholder =
1795+
Expr::Placeholder(datafusion::logical_expr::expr::Placeholder::new_with_field(
1796+
"$1".into(),
1797+
Some(Arc::new(Field::new("$1", DataType::Int64, true))),
1798+
));
1799+
let scan_plan = ctx.table("data").await?.into_optimized_plan()?;
1800+
let plan = LogicalPlanBuilder::from(scan_plan)
1801+
.filter(col("a").gt(placeholder))?
1802+
.build()?;
1803+
1804+
let proto = to_substrait_plan(&plan, &ctx.state())?;
1805+
1806+
// Verify the producer emits a DynamicParameter in the Substrait proto
1807+
let plan_rel = proto.relations.first().unwrap();
1808+
let plan_json = format!("{plan_rel:?}");
1809+
assert!(
1810+
plan_json.contains("DynamicParameter"),
1811+
"Substrait proto should contain DynamicParameter, got: {plan_json}"
1812+
);
1813+
1814+
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;
1815+
1816+
assert_snapshot!(
1817+
plan2,
1818+
@r"
1819+
Filter: data.a > $1
1820+
TableScan: data
1821+
"
1822+
);
1823+
1824+
assert_eq!(plan.schema(), plan2.schema());
1825+
Ok(())
1826+
}
1827+
1828+
#[tokio::test]
1829+
async fn roundtrip_placeholder_multiple_typed() -> Result<()> {
1830+
let ctx = create_context().await?;
1831+
1832+
let p1 =
1833+
Expr::Placeholder(datafusion::logical_expr::expr::Placeholder::new_with_field(
1834+
"$1".into(),
1835+
Some(Arc::new(Field::new("$1", DataType::Int64, true))),
1836+
));
1837+
let p2 =
1838+
Expr::Placeholder(datafusion::logical_expr::expr::Placeholder::new_with_field(
1839+
"$2".into(),
1840+
Some(Arc::new(Field::new("$2", DataType::Decimal128(5, 2), true))),
1841+
));
1842+
let scan_plan = ctx.table("data").await?.into_optimized_plan()?;
1843+
let plan = LogicalPlanBuilder::from(scan_plan)
1844+
.filter(col("a").gt(p1).and(col("b").lt(p2)))?
1845+
.build()?;
1846+
1847+
let proto = to_substrait_plan(&plan, &ctx.state())?;
1848+
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;
1849+
1850+
assert_snapshot!(
1851+
plan2,
1852+
@r"
1853+
Filter: data.a > $1 AND data.b < $2
1854+
TableScan: data
1855+
"
1856+
);
1857+
1858+
assert_eq!(plan.schema(), plan2.schema());
1859+
Ok(())
1860+
}
1861+
1862+
#[tokio::test]
1863+
async fn roundtrip_placeholder_typed_utf8() -> Result<()> {
1864+
let ctx = create_context().await?;
1865+
1866+
let placeholder =
1867+
Expr::Placeholder(datafusion::logical_expr::expr::Placeholder::new_with_field(
1868+
"$1".into(),
1869+
Some(Arc::new(Field::new("$1", DataType::Utf8, true))),
1870+
));
1871+
let scan_plan = ctx.table("data").await?.into_optimized_plan()?;
1872+
let plan = LogicalPlanBuilder::from(scan_plan)
1873+
.filter(col("f").eq(placeholder))?
1874+
.build()?;
1875+
1876+
let proto = to_substrait_plan(&plan, &ctx.state())?;
1877+
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;
1878+
1879+
assert_snapshot!(
1880+
plan2,
1881+
@r"
1882+
Filter: data.f = $1
1883+
TableScan: data
1884+
"
1885+
);
1886+
1887+
assert_eq!(plan.schema(), plan2.schema());
1888+
Ok(())
1889+
}
1890+
17571891
fn check_post_join_filters(rel: &Rel) -> Result<()> {
17581892
// search for target_rel and field value in proto
17591893
match &rel.rel_type {

0 commit comments

Comments
 (0)