Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 161 additions & 7 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use _serde::TableMetadataEnum;
use chrono::{DateTime, Utc};
Expand All @@ -33,9 +33,10 @@ use uuid::Uuid;
use super::snapshot::SnapshotReference;
pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
use super::{
DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
DEFAULT_PARTITION_SPEC_ID, ListType, MapType, NestedFieldRef, PartitionSpecRef,
PartitionStatisticsFile, PrimitiveType, Schema, SchemaId, SchemaRef, SchemaVisitor,
SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
TableProperties, parse_metadata_file_compression,
TableProperties, parse_metadata_file_compression, visit_schema,
};
use crate::catalog::MetadataLocation;
use crate::compression::CompressionCodec;
Expand All @@ -60,6 +61,14 @@ pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
/// Reference to [`TableMetadata`].
pub type TableMetadataRef = Arc<TableMetadata>;

static PRIMITIVE_TYPE_MIN_FORMAT_VERSION: LazyLock<HashMap<PrimitiveType, FormatVersion>> =
LazyLock::new(|| {
HashMap::from([
(PrimitiveType::TimestampNs, FormatVersion::V3),
(PrimitiveType::TimestamptzNs, FormatVersion::V3),
])
});

#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
#[serde(try_from = "TableMetadataEnum")]
/// Fields for the version 2 of the table metadata.
Expand Down Expand Up @@ -1565,6 +1574,44 @@ impl Display for FormatVersion {
}
}

/// Returns the minimum table format version required by any type in a schema.
///
/// Returns [`None`] when the schema contains no type with a specific minimum
/// table format version requirement.
pub fn min_format_version_for_schema(schema: &Schema) -> Result<Option<FormatVersion>> {
visit_schema(schema, &mut MinFormatVersionVisitor)
}

struct MinFormatVersionVisitor;

impl SchemaVisitor for MinFormatVersionVisitor {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After second thoughts, I think iterating thru schema.id_to_field.values() will be even simpler

type T = Option<FormatVersion>;

fn schema(&mut self, _schema: &Schema, value: Self::T) -> Result<Self::T> {
Ok(value)
}

fn field(&mut self, _field: &NestedFieldRef, value: Self::T) -> Result<Self::T> {
Ok(value)
}

fn r#struct(&mut self, _struct: &StructType, results: Vec<Self::T>) -> Result<Self::T> {
Ok(results.into_iter().flatten().max())
}

fn list(&mut self, _list: &ListType, value: Self::T) -> Result<Self::T> {
Ok(value)
}

fn map(&mut self, _map: &MapType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
Ok(key_value.into_iter().chain(value).max())
}

fn primitive(&mut self, primitive: &PrimitiveType) -> Result<Self::T> {
Ok(PRIMITIVE_TYPE_MIN_FORMAT_VERSION.get(primitive).copied())
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
/// Encodes changes to the previous metadata files for the table
Expand Down Expand Up @@ -1616,10 +1663,10 @@ mod tests {
use crate::io::FileIO;
use crate::spec::table_metadata::TableMetadata;
use crate::spec::{
BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
Summary, TableProperties, Transform, Type, UnboundPartitionField,
BlobMetadata, EncryptedKey, INITIAL_ROW_ID, ListType, Literal, NestedField, NullOrder,
Operation, PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema,
Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder,
StatisticsFile, Summary, TableProperties, Transform, Type, UnboundPartitionField,
};
use crate::{ErrorKind, TableCreation};

Expand Down Expand Up @@ -3541,6 +3588,28 @@ mod tests {
)
}

fn schema_with_primitive_field(field_type: PrimitiveType) -> Schema {
Schema::builder()
.with_fields(vec![
NestedField::required(1, "ts", Type::Primitive(field_type)).into(),
])
.build()
.unwrap()
}

fn table_creation_with_format_version(
schema: Schema,
format_version: FormatVersion,
) -> TableCreation {
TableCreation::builder()
.location("s3://db/table".to_string())
.name("table".to_string())
.properties(HashMap::new())
.schema(schema)
.format_version(format_version)
.build()
}

#[test]
fn test_table_metadata_builder_from_table_creation() {
let table_creation = TableCreation::builder()
Expand Down Expand Up @@ -3591,6 +3660,91 @@ mod tests {
);
}

#[test]
fn test_table_metadata_builder_rejects_v1_v2_nanosecond_timestamp_tables() {
for (format_version, primitive_type) in [
(FormatVersion::V1, PrimitiveType::TimestampNs),
(FormatVersion::V1, PrimitiveType::TimestamptzNs),
(FormatVersion::V2, PrimitiveType::TimestampNs),
(FormatVersion::V2, PrimitiveType::TimestamptzNs),
] {
let table_creation = table_creation_with_format_version(
schema_with_primitive_field(primitive_type),
format_version,
);

let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err();

assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert!(
err.message()
.contains("Cannot use types that require format v3"),
"expected error message to avoid naming specific types, got {}",
err.message()
);
assert!(
err.message().contains("use format v3"),
"expected error message to explain v3 requirement, got {}",
err.message()
);
}
}

#[test]
fn test_table_metadata_builder_rejects_v2_list_element_requiring_v3() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(
1,
"ts_values",
Type::List(ListType::new(
NestedField::list_element(
2,
Type::Primitive(PrimitiveType::TimestampNs),
false,
)
.into(),
)),
)
.into(),
])
.build()
.unwrap();
let table_creation = table_creation_with_format_version(schema, FormatVersion::V2);

let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err();

assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert!(
err.message()
.contains("Cannot use types that require format v3"),
"expected error message to explain nested v3 requirement, got {}",
err.message()
);
}

#[test]
fn test_table_metadata_builder_allows_v3_nanosecond_timestamp_tables() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "ts_ns", Type::Primitive(PrimitiveType::TimestampNs))
.into(),
NestedField::required(2, "tstz_ns", Type::Primitive(PrimitiveType::TimestamptzNs))
.into(),
])
.build()
.unwrap();
let table_creation = table_creation_with_format_version(schema, FormatVersion::V3);

let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
.unwrap()
.metadata;

assert_eq!(table_metadata.format_version, FormatVersion::V3);
}

#[tokio::test]
async fn test_table_metadata_read_write() {
// Create a temporary directory for our test
Expand Down
29 changes: 28 additions & 1 deletion crates/iceberg/src/spec/table_metadata_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use super::{
UnboundPartitionSpec,
};
use crate::error::{Error, ErrorKind, Result};
use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE};
use crate::spec::{
EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE, min_format_version_for_schema,
};
use crate::{TableCreation, TableUpdate};

pub(crate) const FIRST_FIELD_ID: i32 = 1;
Expand Down Expand Up @@ -84,6 +86,8 @@ impl TableMetadataBuilder {
format_version: FormatVersion,
properties: HashMap<String, String>,
) -> Result<Self> {
Self::validate_schema_compatible_with_format_version(format_version, &schema)?;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a good place to validate the schema, how about adding format_version to TableMetadataBuilder and validate the schema in add_schema?


// Re-assign field_ids, schema.id, sort_order.id and spec.id for a new table.
let (fresh_schema, fresh_spec, fresh_sort_order) =
Self::reassign_ids(schema, spec.into(), sort_order)?;
Expand Down Expand Up @@ -196,6 +200,24 @@ impl TableMetadataBuilder {
)
}

fn validate_schema_compatible_with_format_version(
format_version: FormatVersion,
schema: &Schema,
) -> Result<()> {
if let Some(min_format_version) = min_format_version_for_schema(schema)?
&& min_format_version > format_version
{
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot use types that require format {min_format_version} in {format_version} table schemas; use format {min_format_version}"
),
));
}

Ok(())
}

/// Changes uuid of table metadata.
pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
if self.metadata.table_uuid != uuid {
Expand Down Expand Up @@ -638,6 +660,11 @@ impl TableMetadataBuilder {
/// Important: Use this method with caution. The builder does not check
/// if the added schema is compatible with the current schema.
pub fn add_schema(mut self, schema: Schema) -> Result<Self> {
Self::validate_schema_compatible_with_format_version(
self.metadata.format_version,
&schema,
)?;

// Validate that new schema fields don't conflict with existing partition field names
self.validate_schema_field_names(&schema)?;

Expand Down
52 changes: 47 additions & 5 deletions crates/integrations/datafusion/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use futures::StreamExt;
use futures::future::try_join_all;
use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids;
use iceberg::inspect::MetadataTableType;
use iceberg::spec::min_format_version_for_schema;
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent};

use crate::table::IcebergTableProvider;
Expand Down Expand Up @@ -162,12 +163,21 @@ impl SchemaProvider for IcebergSchemaProvider {
let df_schema = table.schema();
let iceberg_schema = arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref())
.map_err(to_datafusion_error)?;
let format_version =
min_format_version_for_schema(&iceberg_schema).map_err(to_datafusion_error)?;

// Create the table in the Iceberg catalog
let table_creation = TableCreation::builder()
.name(name.clone())
.schema(iceberg_schema)
.build();
let table_creation = match format_version {
Some(format_version) => TableCreation::builder()
.name(name.clone())
.format_version(format_version)
.schema(iceberg_schema)
.build(),
None => TableCreation::builder()
.name(name.clone())
.schema(iceberg_schema)
.build(),
};

let catalog = self.catalog.clone();
let namespace = self.namespace.clone();
Expand Down Expand Up @@ -288,10 +298,11 @@ mod tests {
use std::sync::Arc;

use datafusion::arrow::array::{Int32Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
use iceberg::spec::FormatVersion;
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent};
use tempfile::TempDir;

Expand Down Expand Up @@ -375,6 +386,37 @@ mod tests {
assert!(schema_provider.table_exist("empty_table"));
}

#[tokio::test]
async fn test_register_timestamp_ns_table_uses_v3() {
let (schema_provider, _temp_dir) = create_test_schema_provider().await;

let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"ts",
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
)]));

let empty_batch = RecordBatch::new_empty(arrow_schema.clone());
let mem_table = MemTable::try_new(arrow_schema, vec![vec![empty_batch]]).unwrap();

let result =
schema_provider.register_table("timestamp_ns_table".to_string(), Arc::new(mem_table));

assert!(result.is_ok(), "Expected success, got: {result:?}");

let table_ident = TableIdent::new(
schema_provider.namespace.clone(),
"timestamp_ns_table".to_string(),
);
let table = schema_provider
.catalog
.load_table(&table_ident)
.await
.unwrap();

assert_eq!(FormatVersion::V3, table.metadata().format_version());
}

#[tokio::test]
async fn test_register_duplicate_table_fails() {
let (schema_provider, _temp_dir) = create_test_schema_provider().await;
Expand Down
Loading