From f5201b16a2f183782f08f87fd97f358212b7c53d Mon Sep 17 00:00:00 2001 From: Manu Zhang Date: Sat, 9 May 2026 08:39:43 +0800 Subject: [PATCH] fix(spec): validate schema types by format version Reject schema types that require a newer table format and update the DataFusion catalog registration path to request v3 only when the converted schema needs it. This keeps ordinary CREATE TABLE defaults on v2 while allowing timestamp_ns schemas to pass validation. Co-authored-by: Codex --- crates/iceberg/src/spec/table_metadata.rs | 168 +++++++++++++++++- .../src/spec/table_metadata_builder.rs | 29 ++- crates/integrations/datafusion/src/schema.rs | 52 +++++- 3 files changed, 236 insertions(+), 13 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 607fd98350..79f2f663d1 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -22,7 +22,7 @@ use std::cmp::Ordering; use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::hash::Hash; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use _serde::TableMetadataEnum; use chrono::{DateTime, Utc}; @@ -33,9 +33,10 @@ use uuid::Uuid; use super::snapshot::SnapshotReference; pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder}; use super::{ - DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef, + DEFAULT_PARTITION_SPEC_ID, ListType, MapType, NestedFieldRef, PartitionSpecRef, + PartitionStatisticsFile, PrimitiveType, Schema, SchemaId, SchemaRef, SchemaVisitor, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType, - TableProperties, parse_metadata_file_compression, + TableProperties, parse_metadata_file_compression, visit_schema, }; use crate::catalog::MetadataLocation; use crate::compression::CompressionCodec; @@ -60,6 +61,14 @@ pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3; /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; +static PRIMITIVE_TYPE_MIN_FORMAT_VERSION: LazyLock> = + LazyLock::new(|| { + HashMap::from([ + (PrimitiveType::TimestampNs, FormatVersion::V3), + (PrimitiveType::TimestamptzNs, FormatVersion::V3), + ]) + }); + #[derive(Debug, PartialEq, Deserialize, Eq, Clone)] #[serde(try_from = "TableMetadataEnum")] /// Fields for the version 2 of the table metadata. @@ -1565,6 +1574,44 @@ impl Display for FormatVersion { } } +/// Returns the minimum table format version required by any type in a schema. +/// +/// Returns [`None`] when the schema contains no type with a specific minimum +/// table format version requirement. +pub fn min_format_version_for_schema(schema: &Schema) -> Result> { + visit_schema(schema, &mut MinFormatVersionVisitor) +} + +struct MinFormatVersionVisitor; + +impl SchemaVisitor for MinFormatVersionVisitor { + type T = Option; + + fn schema(&mut self, _schema: &Schema, value: Self::T) -> Result { + Ok(value) + } + + fn field(&mut self, _field: &NestedFieldRef, value: Self::T) -> Result { + Ok(value) + } + + fn r#struct(&mut self, _struct: &StructType, results: Vec) -> Result { + Ok(results.into_iter().flatten().max()) + } + + fn list(&mut self, _list: &ListType, value: Self::T) -> Result { + Ok(value) + } + + fn map(&mut self, _map: &MapType, key_value: Self::T, value: Self::T) -> Result { + Ok(key_value.into_iter().chain(value).max()) + } + + fn primitive(&mut self, primitive: &PrimitiveType) -> Result { + Ok(PRIMITIVE_TYPE_MIN_FORMAT_VERSION.get(primitive).copied()) + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "kebab-case")] /// Encodes changes to the previous metadata files for the table @@ -1616,10 +1663,10 @@ mod tests { use crate::io::FileIO; use crate::spec::table_metadata::TableMetadata; use crate::spec::{ - BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation, - PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot, - SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile, - Summary, TableProperties, Transform, Type, UnboundPartitionField, + BlobMetadata, EncryptedKey, INITIAL_ROW_ID, ListType, Literal, NestedField, NullOrder, + Operation, PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, + Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, + StatisticsFile, Summary, TableProperties, Transform, Type, UnboundPartitionField, }; use crate::{ErrorKind, TableCreation}; @@ -3541,6 +3588,28 @@ mod tests { ) } + fn schema_with_primitive_field(field_type: PrimitiveType) -> Schema { + Schema::builder() + .with_fields(vec![ + NestedField::required(1, "ts", Type::Primitive(field_type)).into(), + ]) + .build() + .unwrap() + } + + fn table_creation_with_format_version( + schema: Schema, + format_version: FormatVersion, + ) -> TableCreation { + TableCreation::builder() + .location("s3://db/table".to_string()) + .name("table".to_string()) + .properties(HashMap::new()) + .schema(schema) + .format_version(format_version) + .build() + } + #[test] fn test_table_metadata_builder_from_table_creation() { let table_creation = TableCreation::builder() @@ -3591,6 +3660,91 @@ mod tests { ); } + #[test] + fn test_table_metadata_builder_rejects_v1_v2_nanosecond_timestamp_tables() { + for (format_version, primitive_type) in [ + (FormatVersion::V1, PrimitiveType::TimestampNs), + (FormatVersion::V1, PrimitiveType::TimestamptzNs), + (FormatVersion::V2, PrimitiveType::TimestampNs), + (FormatVersion::V2, PrimitiveType::TimestamptzNs), + ] { + let table_creation = table_creation_with_format_version( + schema_with_primitive_field(primitive_type), + format_version, + ); + + let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err(); + + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message() + .contains("Cannot use types that require format v3"), + "expected error message to avoid naming specific types, got {}", + err.message() + ); + assert!( + err.message().contains("use format v3"), + "expected error message to explain v3 requirement, got {}", + err.message() + ); + } + } + + #[test] + fn test_table_metadata_builder_rejects_v2_list_element_requiring_v3() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required( + 1, + "ts_values", + Type::List(ListType::new( + NestedField::list_element( + 2, + Type::Primitive(PrimitiveType::TimestampNs), + false, + ) + .into(), + )), + ) + .into(), + ]) + .build() + .unwrap(); + let table_creation = table_creation_with_format_version(schema, FormatVersion::V2); + + let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err(); + + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message() + .contains("Cannot use types that require format v3"), + "expected error message to explain nested v3 requirement, got {}", + err.message() + ); + } + + #[test] + fn test_table_metadata_builder_allows_v3_nanosecond_timestamp_tables() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "ts_ns", Type::Primitive(PrimitiveType::TimestampNs)) + .into(), + NestedField::required(2, "tstz_ns", Type::Primitive(PrimitiveType::TimestamptzNs)) + .into(), + ]) + .build() + .unwrap(); + let table_creation = table_creation_with_format_version(schema, FormatVersion::V3); + + let table_metadata = TableMetadataBuilder::from_table_creation(table_creation) + .unwrap() + .build() + .unwrap() + .metadata; + + assert_eq!(table_metadata.format_version, FormatVersion::V3); + } + #[tokio::test] async fn test_table_metadata_read_write() { // Create a temporary directory for our test diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 5754b5fe06..69f8365f55 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -28,7 +28,9 @@ use super::{ UnboundPartitionSpec, }; use crate::error::{Error, ErrorKind, Result}; -use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE}; +use crate::spec::{ + EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE, min_format_version_for_schema, +}; use crate::{TableCreation, TableUpdate}; pub(crate) const FIRST_FIELD_ID: i32 = 1; @@ -84,6 +86,8 @@ impl TableMetadataBuilder { format_version: FormatVersion, properties: HashMap, ) -> Result { + Self::validate_schema_compatible_with_format_version(format_version, &schema)?; + // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new table. let (fresh_schema, fresh_spec, fresh_sort_order) = Self::reassign_ids(schema, spec.into(), sort_order)?; @@ -196,6 +200,24 @@ impl TableMetadataBuilder { ) } + fn validate_schema_compatible_with_format_version( + format_version: FormatVersion, + schema: &Schema, + ) -> Result<()> { + if let Some(min_format_version) = min_format_version_for_schema(schema)? + && min_format_version > format_version + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot use types that require format {min_format_version} in {format_version} table schemas; use format {min_format_version}" + ), + )); + } + + Ok(()) + } + /// Changes uuid of table metadata. pub fn assign_uuid(mut self, uuid: Uuid) -> Self { if self.metadata.table_uuid != uuid { @@ -638,6 +660,11 @@ impl TableMetadataBuilder { /// Important: Use this method with caution. The builder does not check /// if the added schema is compatible with the current schema. pub fn add_schema(mut self, schema: Schema) -> Result { + Self::validate_schema_compatible_with_format_version( + self.metadata.format_version, + &schema, + )?; + // Validate that new schema fields don't conflict with existing partition field names self.validate_schema_field_names(&schema)?; diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 508aeb303b..e359c3a63e 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -29,6 +29,7 @@ use futures::StreamExt; use futures::future::try_join_all; use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids; use iceberg::inspect::MetadataTableType; +use iceberg::spec::min_format_version_for_schema; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent}; use crate::table::IcebergTableProvider; @@ -162,12 +163,21 @@ impl SchemaProvider for IcebergSchemaProvider { let df_schema = table.schema(); let iceberg_schema = arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref()) .map_err(to_datafusion_error)?; + let format_version = + min_format_version_for_schema(&iceberg_schema).map_err(to_datafusion_error)?; // Create the table in the Iceberg catalog - let table_creation = TableCreation::builder() - .name(name.clone()) - .schema(iceberg_schema) - .build(); + let table_creation = match format_version { + Some(format_version) => TableCreation::builder() + .name(name.clone()) + .format_version(format_version) + .schema(iceberg_schema) + .build(), + None => TableCreation::builder() + .name(name.clone()) + .schema(iceberg_schema) + .build(), + }; let catalog = self.catalog.clone(); let namespace = self.namespace.clone(); @@ -288,10 +298,11 @@ mod tests { use std::sync::Arc; use datafusion::arrow::array::{Int32Array, StringArray}; - use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::MemTable; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::FormatVersion; use iceberg::{Catalog, CatalogBuilder, NamespaceIdent}; use tempfile::TempDir; @@ -375,6 +386,37 @@ mod tests { assert!(schema_provider.table_exist("empty_table")); } + #[tokio::test] + async fn test_register_timestamp_ns_table_uses_v3() { + let (schema_provider, _temp_dir) = create_test_schema_provider().await; + + let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "ts", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + + let empty_batch = RecordBatch::new_empty(arrow_schema.clone()); + let mem_table = MemTable::try_new(arrow_schema, vec![vec![empty_batch]]).unwrap(); + + let result = + schema_provider.register_table("timestamp_ns_table".to_string(), Arc::new(mem_table)); + + assert!(result.is_ok(), "Expected success, got: {result:?}"); + + let table_ident = TableIdent::new( + schema_provider.namespace.clone(), + "timestamp_ns_table".to_string(), + ); + let table = schema_provider + .catalog + .load_table(&table_ident) + .await + .unwrap(); + + assert_eq!(FormatVersion::V3, table.metadata().format_version()); + } + #[tokio::test] async fn test_register_duplicate_table_fails() { let (schema_provider, _temp_dir) = create_test_schema_provider().await;