Skip to content

Commit fc1cb7f

Browse files
authored
Merge pull request JanKaul#198 from JanKaul/data-file-column-in-schema
add data-file-column and manifest-column to datafusion schema
2 parents 09d5e96 + 9f623b4 commit fc1cb7f

1 file changed

Lines changed: 50 additions & 27 deletions

File tree

datafusion_iceberg/src/table.rs

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::{
1919
use tokio::sync::{RwLock, RwLockWriteGuard};
2020

2121
use datafusion::{
22-
arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef},
22+
arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaBuilder, SchemaRef},
2323
catalog::Session,
2424
common::{not_impl_err, plan_err, DataFusionError, SchemaExt},
2525
datasource::{
@@ -137,7 +137,23 @@ impl DataFusionTable {
137137
let schema = end
138138
.and_then(|snapshot_id| table.metadata().schema(snapshot_id).ok().cloned())
139139
.unwrap_or_else(|| table.current_schema(None).unwrap().clone());
140-
Arc::new((schema.fields()).try_into().unwrap())
140+
let mut builder =
141+
SchemaBuilder::from(TryInto::<ArrowSchema>::try_into(schema.fields()).unwrap());
142+
if config
143+
.as_ref()
144+
.map(|x| x.enable_data_file_path_column)
145+
.unwrap_or_default()
146+
{
147+
builder.push(Field::new(DATA_FILE_PATH_COLUMN, DataType::Utf8, true));
148+
}
149+
if config
150+
.as_ref()
151+
.map(|x| x.enable_manifest_file_path_column)
152+
.unwrap_or_default()
153+
{
154+
builder.push(Field::new(MANIFEST_FILE_PATH_COLUMN, DataType::Utf8, true));
155+
}
156+
Arc::new(builder.finish())
141157
}
142158
Tabular::View(view) => {
143159
let schema = end
@@ -391,16 +407,23 @@ async fn table_scan(
391407

392408
let file_schema: SchemaRef = Arc::new((schema.fields()).try_into().unwrap());
393409

394-
let mut projection = projection
395-
.cloned()
396-
.or_else(|| Some(schema.iter().enumerate().map(|(i, _)| i).collect()));
410+
let projection = projection.cloned().or_else(|| {
411+
Some(
412+
arrow_schema
413+
.fields()
414+
.iter()
415+
.enumerate()
416+
.map(|(i, _)| i)
417+
.collect(),
418+
)
419+
});
397420

398-
let mut projection_expr: Option<Vec<_>> = projection.as_ref().map(|projection| {
421+
let projection_expr: Option<Vec<_>> = projection.as_ref().map(|projection| {
399422
projection
400423
.iter()
401424
.enumerate()
402425
.map(|(i, id)| {
403-
let name = file_schema.fields[*id].name();
426+
let name = arrow_schema.fields[*id].name();
404427
(
405428
Arc::new(Column::new(name, i)) as Arc<dyn PhysicalExpr>,
406429
name.to_owned(),
@@ -411,30 +434,30 @@ async fn table_scan(
411434

412435
if enable_data_file_path_column {
413436
table_partition_cols.push(Field::new(DATA_FILE_PATH_COLUMN, DataType::Utf8, false));
414-
let index = file_schema.fields().len() + table_partition_cols.len() - 1;
415-
if let Some(projection) = &mut projection {
416-
projection.push(index);
417-
}
418-
if let Some(projection_expr) = &mut projection_expr {
419-
projection_expr.push((
420-
Arc::new(Column::new(DATA_FILE_PATH_COLUMN, index)) as Arc<dyn PhysicalExpr>,
421-
DATA_FILE_PATH_COLUMN.to_owned(),
422-
));
423-
}
437+
// let index = file_schema.fields().len() + table_partition_cols.len() - 1;
438+
// if let Some(projection) = &mut projection {
439+
// projection.push(index);
440+
// }
441+
// if let Some(projection_expr) = &mut projection_expr {
442+
// projection_expr.push((
443+
// Arc::new(Column::new(DATA_FILE_PATH_COLUMN, index)) as Arc<dyn PhysicalExpr>,
444+
// DATA_FILE_PATH_COLUMN.to_owned(),
445+
// ));
446+
// }
424447
}
425448

426449
if enable_manifest_file_path_column {
427450
table_partition_cols.push(Field::new(MANIFEST_FILE_PATH_COLUMN, DataType::Utf8, false));
428-
let index = file_schema.fields().len() + table_partition_cols.len() - 1;
429-
if let Some(projection) = &mut projection {
430-
projection.push(index);
431-
}
432-
if let Some(projection_expr) = &mut projection_expr {
433-
projection_expr.push((
434-
Arc::new(Column::new(MANIFEST_FILE_PATH_COLUMN, index)) as Arc<dyn PhysicalExpr>,
435-
MANIFEST_FILE_PATH_COLUMN.to_owned(),
436-
));
437-
}
451+
// let index = file_schema.fields().len() + table_partition_cols.len() - 1;
452+
// if let Some(projection) = &mut projection {
453+
// projection.push(index);
454+
// }
455+
// if let Some(projection_expr) = &mut projection_expr {
456+
// projection_expr.push((
457+
// Arc::new(Column::new(MANIFEST_FILE_PATH_COLUMN, index)) as Arc<dyn PhysicalExpr>,
458+
// MANIFEST_FILE_PATH_COLUMN.to_owned(),
459+
// ));
460+
// }
438461
}
439462

440463
// All files have to be grouped according to their partition values. This is done by using a HashMap with the partition values as the key.

0 commit comments

Comments
 (0)