Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 50 additions & 22 deletions crates/catalog/loader/tests/schema_update_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ mod common;
use std::collections::HashMap;

use common::{CatalogKind, cleanup_namespace_dyn, load_catalog};
use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type};
use iceberg::transaction::{AddColumn, ApplyTransactionAction, Transaction};
use iceberg::spec::{
AddColumn, DeleteColumn, NestedField, PrimitiveType, Schema, SchemaOperation, StructType, Type,
};
use iceberg::transaction::Transaction;
use iceberg::{ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent};
use iceberg_test_utils::normalize_test_name_with_parts;
use rstest::rstest;
Expand All @@ -52,6 +54,8 @@ fn base_schema() -> Schema {
#[case::memory_catalog(CatalogKind::Memory)]
#[tokio::test]
async fn test_catalog_schema_add_column(#[case] kind: CatalogKind) -> Result<()> {
use iceberg::transaction::ApplyTransactionAction;

let Some(harness) = load_catalog(kind).await else {
return Ok(());
};
Expand Down Expand Up @@ -79,9 +83,11 @@ async fn test_catalog_schema_add_column(#[case] kind: CatalogKind) -> Result<()>
let tx = Transaction::new(&table);
let tx = tx
.update_schema()
.add_column(AddColumn::optional(
"a",
Type::Primitive(PrimitiveType::Int),
.push_operation(SchemaOperation::Add(
AddColumn::builder()
.name("a")
.r#type(PrimitiveType::Int.into())
.build(),
))
.apply(tx)?;
let updated = tx.commit(catalog.as_ref()).await?;
Expand All @@ -105,6 +111,8 @@ async fn test_catalog_schema_add_column(#[case] kind: CatalogKind) -> Result<()>
#[case::memory_catalog(CatalogKind::Memory)]
#[tokio::test]
async fn test_catalog_schema_add_nested_and_delete_column(#[case] kind: CatalogKind) -> Result<()> {
use iceberg::transaction::ApplyTransactionAction;

let Some(harness) = load_catalog(kind).await else {
return Ok(());
};
Expand Down Expand Up @@ -136,27 +144,34 @@ async fn test_catalog_schema_add_nested_and_delete_column(#[case] kind: CatalogK
let tx = Transaction::new(&table);
let tx = tx
.update_schema()
.add_column(AddColumn::optional(
"info",
Type::Struct(StructType::new(vec![
NestedField::optional(0, "city", Type::Primitive(PrimitiveType::String)).into(),
])),
))
.push_operation(
AddColumn::builder()
.name("info")
.r#type(
StructType::new(vec![
NestedField::optional(0, "city", PrimitiveType::String.into()).into(),
])
.into(),
)
.build()
.into(),
)
.apply(tx)?;
let table = tx.commit(catalog.as_ref()).await?;

// Second transaction: add a sub-field to the nested struct and delete a top-level column.
let tx = Transaction::new(&table);
let tx = tx
.update_schema()
.add_column(
.push_operation(
AddColumn::builder()
.name("zip")
.field_type(Type::Primitive(PrimitiveType::String))
.parent("info")
.build(),
.r#type(PrimitiveType::String.into())
.parent("info".into())
.build()
.into(),
)
.delete_column("baz")
.push_operation(DeleteColumn::new("baz").into())
.apply(tx)?;
let table = tx.commit(catalog.as_ref()).await?;

Expand All @@ -179,6 +194,8 @@ async fn test_catalog_schema_add_nested_and_delete_column(#[case] kind: CatalogK
#[case::memory_catalog(CatalogKind::Memory)]
#[tokio::test]
async fn test_catalog_schema_delete_invalid_column_errors(#[case] kind: CatalogKind) -> Result<()> {
use iceberg::transaction::ApplyTransactionAction;

let Some(harness) = load_catalog(kind).await else {
return Ok(());
};
Expand Down Expand Up @@ -208,13 +225,19 @@ async fn test_catalog_schema_delete_invalid_column_errors(#[case] kind: CatalogK

// Deleting an identifier field must fail.
let tx = Transaction::new(&table);
let tx = tx.update_schema().delete_column("bar").apply(tx)?;
let tx = tx
.update_schema()
.push_operation(DeleteColumn::new("bar").into())
.apply(tx)?;
let err = tx.commit(catalog.as_ref()).await.unwrap_err();
assert_eq!(err.kind(), ErrorKind::PreconditionFailed);

// Deleting a nonexistent field must fail.
let tx = Transaction::new(&table);
let tx = tx.update_schema().delete_column("nonexistent").apply(tx)?;
let tx = tx
.update_schema()
.push_operation(DeleteColumn::new("nonexistent").into())
.apply(tx)?;
let err = tx.commit(catalog.as_ref()).await.unwrap_err();
assert_eq!(err.kind(), ErrorKind::PreconditionFailed);

Expand All @@ -233,6 +256,8 @@ async fn test_catalog_schema_delete_invalid_column_errors(#[case] kind: CatalogK
async fn test_catalog_schema_update_persisted_after_reload(
#[case] kind: CatalogKind,
) -> Result<()> {
use iceberg::transaction::ApplyTransactionAction;

let Some(harness) = load_catalog(kind).await else {
return Ok(());
};
Expand Down Expand Up @@ -264,10 +289,13 @@ async fn test_catalog_schema_update_persisted_after_reload(
let tx = Transaction::new(&table);
let tx = tx
.update_schema()
.add_column(AddColumn::optional(
"new_field",
Type::Primitive(PrimitiveType::Long),
))
.push_operation(
AddColumn::builder()
.name("new_field")
.r#type(PrimitiveType::Long.into())
.build()
.into(),
)
.apply(tx)?;
tx.commit(catalog.as_ref()).await?;

Expand Down
10 changes: 10 additions & 0 deletions crates/iceberg/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,16 @@ macro_rules! ensure_data_valid {
};
}

/// Helper macro to check preconditions.
#[macro_export]
macro_rules! ensure_precondition {
($cond: expr, $fmt: literal, $($arg:tt)*) => {
if !$cond {
return Err($crate::error::Error::new($crate::error::ErrorKind::PreconditionFailed, format!($fmt, $($arg)*)))
}
};
}

#[cfg(test)]
mod tests {
use anyhow::anyhow;
Expand Down
50 changes: 49 additions & 1 deletion crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ impl Type {
matches!(self, Type::Struct(_))
}

/// Whether the type is list type.
#[inline(always)]
pub fn is_list(&self) -> bool {
matches!(self, Type::List(_))
}

/// Whether the type is map type.
#[inline(always)]
pub fn is_map(&self) -> bool {
matches!(self, Type::Map(_))
}

/// Whether the type is nested type.
#[inline(always)]
pub fn is_nested(&self) -> bool {
Expand Down Expand Up @@ -160,7 +172,7 @@ impl Type {
Ok(REQUIRED_LENGTH[precision as usize - 1])
}

/// Creates decimal type.
/// Creates decimal type.
#[inline(always)]
pub fn decimal(precision: u32, scale: u32) -> Result<Self> {
ensure_data_valid!(
Expand Down Expand Up @@ -634,6 +646,16 @@ impl NestedField {
Self::new(id, LIST_FIELD_NAME, field_type, required)
}

/// Construct required list type's element field.
pub fn list_required_element(id: i32, field_type: Type) -> Self {
Self::list_element(id, field_type, true)
}

/// Construct optional list type's element field.
pub fn list_optional_element(id: i32, field_type: Type) -> Self {
Self::list_element(id, field_type, false)
}

/// Construct map type's key field.
pub fn map_key_element(id: i32, field_type: Type) -> Self {
Self::required(id, MAP_KEY_FIELD_NAME, field_type)
Expand Down Expand Up @@ -667,6 +689,18 @@ impl NestedField {
self.id = id;
self
}

/// Set the type of the field
pub(crate) fn of_type(mut self, field_type: Box<Type>) -> Self {
self.field_type = field_type;
self
}

/// Set the name of the field.
pub(crate) fn with_name(mut self, name: impl ToString) -> Self {
self.name = name.to_string();
self
}
}

impl fmt::Display for NestedField {
Expand Down Expand Up @@ -699,6 +733,20 @@ impl ListType {
pub fn new(element_field: NestedFieldRef) -> Self {
Self { element_field }
}

/// Construct an optional list type with the given element field.
pub fn optional(element_id: i32, element_type: Type) -> Self {
Self {
element_field: NestedField::list_element(element_id, element_type, false).into(),
}
}

/// Construct a required list type with the given element field.
pub fn required(element_id: i32, element_type: Type) -> Self {
Self {
element_field: NestedField::list_element(element_id, element_type, true).into(),
}
}
}

/// Module for type serialization/deserialization.
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/spec/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::sync::Arc;

mod update;
mod utils;
mod visitor;
pub use self::update::*;
pub use self::visitor::*;
pub(super) mod _serde;
mod id_reassigner;
Expand Down
Loading
Loading