Skip to content

Commit a4be384

Browse files
author
Jan Kaul
committed
add data-file-path to projection
1 parent 7a5903b commit a4be384

1 file changed

Lines changed: 40 additions & 24 deletions

File tree

datafusion_iceberg/src/table.rs

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -378,8 +378,38 @@ async fn table_scan(
378378
.collect::<Result<Vec<_>, DataFusionError>>()
379379
.map_err(DataFusionIcebergError::from)?;
380380

381+
let file_schema: SchemaRef = Arc::new((schema.fields()).try_into().unwrap());
382+
383+
let mut projection = projection
384+
.cloned()
385+
.or_else(|| Some(schema.iter().enumerate().map(|(i, _)| i).collect()));
386+
387+
let mut projection_expr: Option<Vec<_>> = projection.as_ref().map(|projection| {
388+
projection
389+
.iter()
390+
.enumerate()
391+
.map(|(i, id)| {
392+
let name = file_schema.fields[*id].name();
393+
(
394+
Arc::new(Column::new(name, i)) as Arc<dyn PhysicalExpr>,
395+
name.to_owned(),
396+
)
397+
})
398+
.collect()
399+
});
400+
381401
if enable_data_file_path_column {
382402
table_partition_cols.push(Field::new(DATA_FILE_PATH_COLUMN, DataType::Utf8, false));
403+
let index = file_schema.fields().len() + table_partition_cols.len() - 1;
404+
if let Some(projection) = &mut projection {
405+
projection.push(index);
406+
}
407+
if let Some(projection_expr) = &mut projection_expr {
408+
projection_expr.push((
409+
Arc::new(Column::new(DATA_FILE_PATH_COLUMN, index)) as Arc<dyn PhysicalExpr>,
410+
DATA_FILE_PATH_COLUMN.to_owned(),
411+
));
412+
}
383413
}
384414

385415
// All files have to be grouped according to their partition values. This is done by using a HashMap with the partition values as the key.
@@ -513,26 +543,6 @@ async fn table_scan(
513543
});
514544
};
515545

516-
let file_schema: SchemaRef = Arc::new((schema.fields()).try_into().unwrap());
517-
518-
let projection = projection
519-
.cloned()
520-
.or_else(|| Some(schema.iter().enumerate().map(|(i, _)| i).collect()));
521-
522-
let projection_expr: Option<Vec<_>> = projection.as_ref().map(|projection| {
523-
projection
524-
.iter()
525-
.enumerate()
526-
.map(|(i, id)| {
527-
let name = file_schema.fields[*id].name();
528-
(
529-
Arc::new(Column::new(name, i)) as Arc<dyn PhysicalExpr>,
530-
name.to_owned(),
531-
)
532-
})
533-
.collect()
534-
});
535-
536546
let file_source = Arc::new(
537547
if let Some(physical_predicate) = physical_predicate.clone() {
538548
ParquetSource::default()
@@ -1833,20 +1843,26 @@ mod tests {
18331843

18341844
for batch in batches {
18351845
if batch.num_rows() != 0 {
1836-
assert!(batch.schema().column_with_name("__data_file_path").is_some());
1837-
1846+
assert!(batch
1847+
.schema()
1848+
.column_with_name("__data_file_path")
1849+
.is_some());
1850+
18381851
let data_file_path_column = batch
18391852
.column_by_name("__data_file_path")
18401853
.expect("Data file path column should exist");
1841-
1854+
18421855
for i in 0..batch.num_rows() {
18431856
let value = data_file_path_column
18441857
.as_any()
18451858
.downcast_ref::<datafusion::arrow::array::StringArray>()
18461859
.unwrap()
18471860
.value(i);
18481861
assert!(!value.is_empty(), "Data file path should not be empty");
1849-
assert!(value.contains(".parquet"), "Data file path should contain .parquet");
1862+
assert!(
1863+
value.contains(".parquet"),
1864+
"Data file path should contain .parquet"
1865+
);
18501866
}
18511867
}
18521868
}

0 commit comments

Comments
 (0)