diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 607fd98350..79f2f663d1 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -22,7 +22,7 @@ use std::cmp::Ordering; use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::hash::Hash; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use _serde::TableMetadataEnum; use chrono::{DateTime, Utc}; @@ -33,9 +33,10 @@ use uuid::Uuid; use super::snapshot::SnapshotReference; pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder}; use super::{ - DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef, + DEFAULT_PARTITION_SPEC_ID, ListType, MapType, NestedFieldRef, PartitionSpecRef, + PartitionStatisticsFile, PrimitiveType, Schema, SchemaId, SchemaRef, SchemaVisitor, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType, - TableProperties, parse_metadata_file_compression, + TableProperties, parse_metadata_file_compression, visit_schema, }; use crate::catalog::MetadataLocation; use crate::compression::CompressionCodec; @@ -60,6 +61,14 @@ pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3; /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; +static PRIMITIVE_TYPE_MIN_FORMAT_VERSION: LazyLock> = + LazyLock::new(|| { + HashMap::from([ + (PrimitiveType::TimestampNs, FormatVersion::V3), + (PrimitiveType::TimestamptzNs, FormatVersion::V3), + ]) + }); + #[derive(Debug, PartialEq, Deserialize, Eq, Clone)] #[serde(try_from = "TableMetadataEnum")] /// Fields for the version 2 of the table metadata. @@ -1565,6 +1574,44 @@ impl Display for FormatVersion { } } +/// Returns the minimum table format version required by any type in a schema. +/// +/// Returns [`None`] when the schema contains no type with a specific minimum +/// table format version requirement. +pub fn min_format_version_for_schema(schema: &Schema) -> Result> { + visit_schema(schema, &mut MinFormatVersionVisitor) +} + +struct MinFormatVersionVisitor; + +impl SchemaVisitor for MinFormatVersionVisitor { + type T = Option; + + fn schema(&mut self, _schema: &Schema, value: Self::T) -> Result { + Ok(value) + } + + fn field(&mut self, _field: &NestedFieldRef, value: Self::T) -> Result { + Ok(value) + } + + fn r#struct(&mut self, _struct: &StructType, results: Vec) -> Result { + Ok(results.into_iter().flatten().max()) + } + + fn list(&mut self, _list: &ListType, value: Self::T) -> Result { + Ok(value) + } + + fn map(&mut self, _map: &MapType, key_value: Self::T, value: Self::T) -> Result { + Ok(key_value.into_iter().chain(value).max()) + } + + fn primitive(&mut self, primitive: &PrimitiveType) -> Result { + Ok(PRIMITIVE_TYPE_MIN_FORMAT_VERSION.get(primitive).copied()) + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "kebab-case")] /// Encodes changes to the previous metadata files for the table @@ -1616,10 +1663,10 @@ mod tests { use crate::io::FileIO; use crate::spec::table_metadata::TableMetadata; use crate::spec::{ - BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation, - PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot, - SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile, - Summary, TableProperties, Transform, Type, UnboundPartitionField, + BlobMetadata, EncryptedKey, INITIAL_ROW_ID, ListType, Literal, NestedField, NullOrder, + Operation, PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, + Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, + StatisticsFile, Summary, TableProperties, Transform, Type, UnboundPartitionField, }; use crate::{ErrorKind, TableCreation}; @@ -3541,6 +3588,28 @@ mod tests { ) } + fn schema_with_primitive_field(field_type: PrimitiveType) -> Schema { + Schema::builder() + .with_fields(vec![ + NestedField::required(1, "ts", Type::Primitive(field_type)).into(), + ]) + .build() + .unwrap() + } + + fn table_creation_with_format_version( + schema: Schema, + format_version: FormatVersion, + ) -> TableCreation { + TableCreation::builder() + .location("s3://db/table".to_string()) + .name("table".to_string()) + .properties(HashMap::new()) + .schema(schema) + .format_version(format_version) + .build() + } + #[test] fn test_table_metadata_builder_from_table_creation() { let table_creation = TableCreation::builder() @@ -3591,6 +3660,91 @@ mod tests { ); } + #[test] + fn test_table_metadata_builder_rejects_v1_v2_nanosecond_timestamp_tables() { + for (format_version, primitive_type) in [ + (FormatVersion::V1, PrimitiveType::TimestampNs), + (FormatVersion::V1, PrimitiveType::TimestamptzNs), + (FormatVersion::V2, PrimitiveType::TimestampNs), + (FormatVersion::V2, PrimitiveType::TimestamptzNs), + ] { + let table_creation = table_creation_with_format_version( + schema_with_primitive_field(primitive_type), + format_version, + ); + + let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err(); + + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message() + .contains("Cannot use types that require format v3"), + "expected error message to avoid naming specific types, got {}", + err.message() + ); + assert!( + err.message().contains("use format v3"), + "expected error message to explain v3 requirement, got {}", + err.message() + ); + } + } + + #[test] + fn test_table_metadata_builder_rejects_v2_list_element_requiring_v3() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required( + 1, + "ts_values", + Type::List(ListType::new( + NestedField::list_element( + 2, + Type::Primitive(PrimitiveType::TimestampNs), + false, + ) + .into(), + )), + ) + .into(), + ]) + .build() + .unwrap(); + let table_creation = table_creation_with_format_version(schema, FormatVersion::V2); + + let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err(); + + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message() + .contains("Cannot use types that require format v3"), + "expected error message to explain nested v3 requirement, got {}", + err.message() + ); + } + + #[test] + fn test_table_metadata_builder_allows_v3_nanosecond_timestamp_tables() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "ts_ns", Type::Primitive(PrimitiveType::TimestampNs)) + .into(), + NestedField::required(2, "tstz_ns", Type::Primitive(PrimitiveType::TimestamptzNs)) + .into(), + ]) + .build() + .unwrap(); + let table_creation = table_creation_with_format_version(schema, FormatVersion::V3); + + let table_metadata = TableMetadataBuilder::from_table_creation(table_creation) + .unwrap() + .build() + .unwrap() + .metadata; + + assert_eq!(table_metadata.format_version, FormatVersion::V3); + } + #[tokio::test] async fn test_table_metadata_read_write() { // Create a temporary directory for our test diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 5754b5fe06..69f8365f55 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -28,7 +28,9 @@ use super::{ UnboundPartitionSpec, }; use crate::error::{Error, ErrorKind, Result}; -use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE}; +use crate::spec::{ + EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE, min_format_version_for_schema, +}; use crate::{TableCreation, TableUpdate}; pub(crate) const FIRST_FIELD_ID: i32 = 1; @@ -84,6 +86,8 @@ impl TableMetadataBuilder { format_version: FormatVersion, properties: HashMap, ) -> Result { + Self::validate_schema_compatible_with_format_version(format_version, &schema)?; + // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new table. let (fresh_schema, fresh_spec, fresh_sort_order) = Self::reassign_ids(schema, spec.into(), sort_order)?; @@ -196,6 +200,24 @@ impl TableMetadataBuilder { ) } + fn validate_schema_compatible_with_format_version( + format_version: FormatVersion, + schema: &Schema, + ) -> Result<()> { + if let Some(min_format_version) = min_format_version_for_schema(schema)? + && min_format_version > format_version + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot use types that require format {min_format_version} in {format_version} table schemas; use format {min_format_version}" + ), + )); + } + + Ok(()) + } + /// Changes uuid of table metadata. pub fn assign_uuid(mut self, uuid: Uuid) -> Self { if self.metadata.table_uuid != uuid { @@ -638,6 +660,11 @@ impl TableMetadataBuilder { /// Important: Use this method with caution. The builder does not check /// if the added schema is compatible with the current schema. pub fn add_schema(mut self, schema: Schema) -> Result { + Self::validate_schema_compatible_with_format_version( + self.metadata.format_version, + &schema, + )?; + // Validate that new schema fields don't conflict with existing partition field names self.validate_schema_field_names(&schema)?; diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 508aeb303b..e359c3a63e 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -29,6 +29,7 @@ use futures::StreamExt; use futures::future::try_join_all; use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids; use iceberg::inspect::MetadataTableType; +use iceberg::spec::min_format_version_for_schema; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent}; use crate::table::IcebergTableProvider; @@ -162,12 +163,21 @@ impl SchemaProvider for IcebergSchemaProvider { let df_schema = table.schema(); let iceberg_schema = arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref()) .map_err(to_datafusion_error)?; + let format_version = + min_format_version_for_schema(&iceberg_schema).map_err(to_datafusion_error)?; // Create the table in the Iceberg catalog - let table_creation = TableCreation::builder() - .name(name.clone()) - .schema(iceberg_schema) - .build(); + let table_creation = match format_version { + Some(format_version) => TableCreation::builder() + .name(name.clone()) + .format_version(format_version) + .schema(iceberg_schema) + .build(), + None => TableCreation::builder() + .name(name.clone()) + .schema(iceberg_schema) + .build(), + }; let catalog = self.catalog.clone(); let namespace = self.namespace.clone(); @@ -288,10 +298,11 @@ mod tests { use std::sync::Arc; use datafusion::arrow::array::{Int32Array, StringArray}; - use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::MemTable; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::FormatVersion; use iceberg::{Catalog, CatalogBuilder, NamespaceIdent}; use tempfile::TempDir; @@ -375,6 +386,37 @@ mod tests { assert!(schema_provider.table_exist("empty_table")); } + #[tokio::test] + async fn test_register_timestamp_ns_table_uses_v3() { + let (schema_provider, _temp_dir) = create_test_schema_provider().await; + + let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "ts", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + + let empty_batch = RecordBatch::new_empty(arrow_schema.clone()); + let mem_table = MemTable::try_new(arrow_schema, vec![vec![empty_batch]]).unwrap(); + + let result = + schema_provider.register_table("timestamp_ns_table".to_string(), Arc::new(mem_table)); + + assert!(result.is_ok(), "Expected success, got: {result:?}"); + + let table_ident = TableIdent::new( + schema_provider.namespace.clone(), + "timestamp_ns_table".to_string(), + ); + let table = schema_provider + .catalog + .load_table(&table_ident) + .await + .unwrap(); + + assert_eq!(FormatVersion::V3, table.metadata().format_version()); + } + #[tokio::test] async fn test_register_duplicate_table_fails() { let (schema_provider, _temp_dir) = create_test_schema_provider().await;