Skip to content

Commit 7a5903b

Browse files
author
Jan Kaul
committed
introduce data_file_path column
1 parent 60ff613 commit 7a5903b

1 file changed

Lines changed: 160 additions & 9 deletions

File tree

datafusion_iceberg/src/table.rs

Lines changed: 160 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::{
1919
use tokio::sync::{RwLock, RwLockWriteGuard};
2020

2121
use datafusion::{
22-
arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef},
22+
arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef},
2323
catalog::Session,
2424
common::{not_impl_err, plan_err, DataFusionError, SchemaExt},
2525
datasource::{
@@ -65,7 +65,8 @@ use iceberg_rust::{
6565
arrow::write::write_parquet_partitioned, catalog::tabular::Tabular, error::Error,
6666
materialized_view::MaterializedView, table::Table, view::View,
6767
};
68-
// mod value;
68+
69+
static DATA_FILE_PATH_COLUMN: &str = "__data_file_path";
6970

7071
#[derive(Debug, Clone)]
7172
/// Iceberg table for datafusion
@@ -217,6 +218,7 @@ impl TableProvider for DataFusionTable {
217218
table,
218219
&self.snapshot_range,
219220
schema,
221+
self.config.as_ref(),
220222
statistics,
221223
session_state,
222224
projection,
@@ -239,6 +241,7 @@ impl TableProvider for DataFusionTable {
239241
&table,
240242
&self.snapshot_range,
241243
schema,
244+
self.config.as_ref(),
242245
statistics,
243246
session_state,
244247
projection,
@@ -308,6 +311,7 @@ async fn table_scan(
308311
table: &Table,
309312
snapshot_range: &(Option<i64>, Option<i64>),
310313
arrow_schema: SchemaRef,
314+
config: Option<&DataFusionTableConfig>,
311315
statistics: Statistics,
312316
session: &SessionState,
313317
projection: Option<&Vec<usize>>,
@@ -326,6 +330,10 @@ async fn table_scan(
326330
.runtime_env()
327331
.register_object_store(object_store_url.as_ref(), table.object_store());
328332

333+
let enable_data_file_path_column = config
334+
.map(|x| x.enable_data_file_path_column)
335+
.unwrap_or_default();
336+
329337
let partition_fields = &snapshot_range
330338
.1
331339
.and_then(|snapshot_id| table.metadata().partition_fields(snapshot_id).ok())
@@ -349,7 +357,7 @@ async fn table_scan(
349357
};
350358

351359
// Get all partition columns
352-
let table_partition_cols: Vec<Field> = partition_fields
360+
let mut table_partition_cols: Vec<Field> = partition_fields
353361
.iter()
354362
.map(|partition_field| {
355363
Ok(Field::new(
@@ -370,6 +378,10 @@ async fn table_scan(
370378
.collect::<Result<Vec<_>, DataFusionError>>()
371379
.map_err(DataFusionIcebergError::from)?;
372380

381+
if enable_data_file_path_column {
382+
table_partition_cols.push(Field::new(DATA_FILE_PATH_COLUMN, DataType::Utf8, false));
383+
}
384+
373385
// All files have to be grouped according to their partition values. This is done by using a HashMap with the partition values as the key.
374386
// This way data files with the same partition value are mapped to the same vector.
375387
let mut data_file_groups: HashMap<Struct, Vec<ManifestEntry>> = HashMap::new();
@@ -578,9 +590,13 @@ async fn table_scan(
578590
< delete_manifest.sequence_number().unwrap()
579591
}) {
580592
let last_updated_ms = table.metadata().last_updated_ms;
581-
let data_file =
582-
generate_partitioned_file(schema, &data_manifest, last_updated_ms)
583-
.unwrap();
593+
let data_file = generate_partitioned_file(
594+
schema,
595+
&data_manifest,
596+
last_updated_ms,
597+
enable_data_file_path_column,
598+
)
599+
.unwrap();
584600
data_files.push(data_file);
585601
}
586602
async move {
@@ -615,6 +631,7 @@ async fn table_scan(
615631
&delete_schema,
616632
delete_manifest,
617633
last_updated_ms,
634+
enable_data_file_path_column,
618635
)?;
619636

620637
let delete_file_source = Arc::new(
@@ -716,7 +733,12 @@ async fn table_scan(
716733
let additional_data_files = data_file_iter
717734
.map(|x| {
718735
let last_updated_ms = table.metadata().last_updated_ms;
719-
generate_partitioned_file(schema, &x, last_updated_ms)
736+
generate_partitioned_file(
737+
schema,
738+
&x,
739+
last_updated_ms,
740+
enable_data_file_path_column,
741+
)
720742
})
721743
.collect::<Result<Vec<_>, _>>()?;
722744

@@ -764,7 +786,13 @@ async fn table_scan(
764786
x.into_iter()
765787
.map(|x| {
766788
let last_updated_ms = table.metadata().last_updated_ms;
767-
generate_partitioned_file(&schema, &x, last_updated_ms).unwrap()
789+
generate_partitioned_file(
790+
&schema,
791+
&x,
792+
last_updated_ms,
793+
enable_data_file_path_column,
794+
)
795+
.unwrap()
768796
})
769797
.collect()
770798
})
@@ -860,9 +888,10 @@ fn generate_partitioned_file(
860888
schema: &Schema,
861889
manifest: &ManifestEntry,
862890
last_updated_ms: i64,
891+
enable_data_file_path: bool,
863892
) -> Result<PartitionedFile, DataFusionError> {
864893
let manifest_statistics = manifest_statistics(schema, manifest);
865-
let partition_values = manifest
894+
let mut partition_values = manifest
866895
.data_file()
867896
.partition()
868897
.iter()
@@ -872,6 +901,13 @@ fn generate_partitioned_file(
872901
.unwrap_or(Ok(ScalarValue::Null))
873902
})
874903
.collect::<Result<Vec<ScalarValue>, _>>()?;
904+
905+
if enable_data_file_path {
906+
partition_values.push(ScalarValue::Utf8(Some(
907+
manifest.data_file().file_path().clone(),
908+
)));
909+
}
910+
875911
let object_meta = ObjectMeta {
876912
location: util::strip_prefix(manifest.data_file().file_path()).into(),
877913
size: *manifest.data_file().file_size_in_bytes() as u64,
@@ -1701,6 +1737,121 @@ mod tests {
17011737
}
17021738
}
17031739

1740+
#[tokio::test]
1741+
pub async fn test_datafusion_table_insert_with_data_file_path() {
1742+
let object_store = ObjectStoreBuilder::memory();
1743+
1744+
let catalog: Arc<dyn Catalog> = Arc::new(
1745+
SqlCatalog::new("sqlite://", "test", object_store)
1746+
.await
1747+
.unwrap(),
1748+
);
1749+
1750+
let schema = Schema::builder()
1751+
.with_struct_field(StructField {
1752+
id: 1,
1753+
name: "id".to_string(),
1754+
required: true,
1755+
field_type: Type::Primitive(PrimitiveType::Long),
1756+
doc: None,
1757+
})
1758+
.with_struct_field(StructField {
1759+
id: 2,
1760+
name: "customer_id".to_string(),
1761+
required: true,
1762+
field_type: Type::Primitive(PrimitiveType::Long),
1763+
doc: None,
1764+
})
1765+
.with_struct_field(StructField {
1766+
id: 3,
1767+
name: "product_id".to_string(),
1768+
required: true,
1769+
field_type: Type::Primitive(PrimitiveType::Long),
1770+
doc: None,
1771+
})
1772+
.with_struct_field(StructField {
1773+
id: 4,
1774+
name: "date".to_string(),
1775+
required: true,
1776+
field_type: Type::Primitive(PrimitiveType::Date),
1777+
doc: None,
1778+
})
1779+
.with_struct_field(StructField {
1780+
id: 5,
1781+
name: "amount".to_string(),
1782+
required: true,
1783+
field_type: Type::Primitive(PrimitiveType::Int),
1784+
doc: None,
1785+
})
1786+
.build()
1787+
.unwrap();
1788+
1789+
let table = Table::builder()
1790+
.with_name("orders")
1791+
.with_location("/test/orders")
1792+
.with_schema(schema)
1793+
.build(&["test".to_owned()], catalog)
1794+
.await
1795+
.expect("Failed to create table");
1796+
1797+
let config = crate::table::DataFusionTableConfigBuilder::default()
1798+
.enable_data_file_path_column(true)
1799+
.build()
1800+
.unwrap();
1801+
1802+
let table = Arc::new(DataFusionTable::new_with_config(
1803+
Tabular::Table(table),
1804+
None,
1805+
None,
1806+
None,
1807+
Some(config),
1808+
));
1809+
1810+
let ctx = SessionContext::new();
1811+
1812+
ctx.register_table("orders", table.clone()).unwrap();
1813+
1814+
ctx.sql(
1815+
"INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES
1816+
(1, 1, 1, '2020-01-01', 1),
1817+
(2, 2, 1, '2020-01-01', 1),
1818+
(3, 3, 1, '2020-01-01', 3);",
1819+
)
1820+
.await
1821+
.expect("Failed to create query plan for insert")
1822+
.collect()
1823+
.await
1824+
.expect("Failed to insert values into table");
1825+
1826+
let batches = ctx
1827+
.sql("select * from orders;")
1828+
.await
1829+
.expect("Failed to create plan for select")
1830+
.collect()
1831+
.await
1832+
.expect("Failed to execute select query");
1833+
1834+
for batch in batches {
1835+
if batch.num_rows() != 0 {
1836+
assert!(batch.schema().column_with_name("__data_file_path").is_some());
1837+
1838+
let data_file_path_column = batch
1839+
.column_by_name("__data_file_path")
1840+
.expect("Data file path column should exist");
1841+
1842+
for i in 0..batch.num_rows() {
1843+
let value = data_file_path_column
1844+
.as_any()
1845+
.downcast_ref::<datafusion::arrow::array::StringArray>()
1846+
.unwrap()
1847+
.value(i);
1848+
assert!(!value.is_empty(), "Data file path should not be empty");
1849+
assert!(value.contains(".parquet"), "Data file path should contain .parquet");
1850+
}
1851+
}
1852+
}
1853+
}
1854+
17041855
#[test]
17051856
fn test_fake_object_store_url() {
17061857
assert_eq!(

0 commit comments

Comments
 (0)