diff --git a/crates/catalog/loader/tests/schema_update_suite.rs b/crates/catalog/loader/tests/schema_update_suite.rs index 9421bbf0ee..14f54b807b 100644 --- a/crates/catalog/loader/tests/schema_update_suite.rs +++ b/crates/catalog/loader/tests/schema_update_suite.rs @@ -24,8 +24,10 @@ mod common; use std::collections::HashMap; use common::{CatalogKind, cleanup_namespace_dyn, load_catalog}; -use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type}; -use iceberg::transaction::{AddColumn, ApplyTransactionAction, Transaction}; +use iceberg::spec::{ + AddColumn, DeleteColumn, NestedField, PrimitiveType, Schema, SchemaOperation, StructType, Type, +}; +use iceberg::transaction::Transaction; use iceberg::{ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent}; use iceberg_test_utils::normalize_test_name_with_parts; use rstest::rstest; @@ -52,6 +54,8 @@ fn base_schema() -> Schema { #[case::memory_catalog(CatalogKind::Memory)] #[tokio::test] async fn test_catalog_schema_add_column(#[case] kind: CatalogKind) -> Result<()> { + use iceberg::transaction::ApplyTransactionAction; + let Some(harness) = load_catalog(kind).await else { return Ok(()); }; @@ -79,9 +83,11 @@ async fn test_catalog_schema_add_column(#[case] kind: CatalogKind) -> Result<()> let tx = Transaction::new(&table); let tx = tx .update_schema() - .add_column(AddColumn::optional( - "a", - Type::Primitive(PrimitiveType::Int), + .push_operation(SchemaOperation::Add( + AddColumn::builder() + .name("a") + .r#type(PrimitiveType::Int.into()) + .build(), )) .apply(tx)?; let updated = tx.commit(catalog.as_ref()).await?; @@ -105,6 +111,8 @@ async fn test_catalog_schema_add_column(#[case] kind: CatalogKind) -> Result<()> #[case::memory_catalog(CatalogKind::Memory)] #[tokio::test] async fn test_catalog_schema_add_nested_and_delete_column(#[case] kind: CatalogKind) -> Result<()> { + use iceberg::transaction::ApplyTransactionAction; + let Some(harness) = load_catalog(kind).await else { return Ok(()); }; @@ -136,12 +144,18 @@ async fn test_catalog_schema_add_nested_and_delete_column(#[case] kind: CatalogK let tx = Transaction::new(&table); let tx = tx .update_schema() - .add_column(AddColumn::optional( - "info", - Type::Struct(StructType::new(vec![ - NestedField::optional(0, "city", Type::Primitive(PrimitiveType::String)).into(), - ])), - )) + .push_operation( + AddColumn::builder() + .name("info") + .r#type( + StructType::new(vec![ + NestedField::optional(0, "city", PrimitiveType::String.into()).into(), + ]) + .into(), + ) + .build() + .into(), + ) .apply(tx)?; let table = tx.commit(catalog.as_ref()).await?; @@ -149,14 +163,15 @@ async fn test_catalog_schema_add_nested_and_delete_column(#[case] kind: CatalogK let tx = Transaction::new(&table); let tx = tx .update_schema() - .add_column( + .push_operation( AddColumn::builder() .name("zip") - .field_type(Type::Primitive(PrimitiveType::String)) - .parent("info") - .build(), + .r#type(PrimitiveType::String.into()) + .parent("info".into()) + .build() + .into(), ) - .delete_column("baz") + .push_operation(DeleteColumn::new("baz").into()) .apply(tx)?; let table = tx.commit(catalog.as_ref()).await?; @@ -179,6 +194,8 @@ async fn test_catalog_schema_add_nested_and_delete_column(#[case] kind: CatalogK #[case::memory_catalog(CatalogKind::Memory)] #[tokio::test] async fn test_catalog_schema_delete_invalid_column_errors(#[case] kind: CatalogKind) -> Result<()> { + use iceberg::transaction::ApplyTransactionAction; + let Some(harness) = load_catalog(kind).await else { return Ok(()); }; @@ -208,13 +225,19 @@ async fn test_catalog_schema_delete_invalid_column_errors(#[case] kind: CatalogK // Deleting an identifier field must fail. let tx = Transaction::new(&table); - let tx = tx.update_schema().delete_column("bar").apply(tx)?; + let tx = tx + .update_schema() + .push_operation(DeleteColumn::new("bar").into()) + .apply(tx)?; let err = tx.commit(catalog.as_ref()).await.unwrap_err(); assert_eq!(err.kind(), ErrorKind::PreconditionFailed); // Deleting a nonexistent field must fail. let tx = Transaction::new(&table); - let tx = tx.update_schema().delete_column("nonexistent").apply(tx)?; + let tx = tx + .update_schema() + .push_operation(DeleteColumn::new("nonexistent").into()) + .apply(tx)?; let err = tx.commit(catalog.as_ref()).await.unwrap_err(); assert_eq!(err.kind(), ErrorKind::PreconditionFailed); @@ -233,6 +256,8 @@ async fn test_catalog_schema_delete_invalid_column_errors(#[case] kind: CatalogK async fn test_catalog_schema_update_persisted_after_reload( #[case] kind: CatalogKind, ) -> Result<()> { + use iceberg::transaction::ApplyTransactionAction; + let Some(harness) = load_catalog(kind).await else { return Ok(()); }; @@ -264,10 +289,13 @@ async fn test_catalog_schema_update_persisted_after_reload( let tx = Transaction::new(&table); let tx = tx .update_schema() - .add_column(AddColumn::optional( - "new_field", - Type::Primitive(PrimitiveType::Long), - )) + .push_operation( + AddColumn::builder() + .name("new_field") + .r#type(PrimitiveType::Long.into()) + .build() + .into(), + ) .apply(tx)?; tx.commit(catalog.as_ref()).await?; diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 02c3eee8fc..7f2217eccf 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -469,6 +469,16 @@ macro_rules! ensure_data_valid { }; } +/// Helper macro to check preconditions. +#[macro_export] +macro_rules! ensure_precondition { + ($cond: expr, $fmt: literal, $($arg:tt)*) => { + if !$cond { + return Err($crate::error::Error::new($crate::error::ErrorKind::PreconditionFailed, format!($fmt, $($arg)*))) + } + }; +} + #[cfg(test)] mod tests { use anyhow::anyhow; diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index ad4aea758f..d280ac9bc3 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -116,6 +116,18 @@ impl Type { matches!(self, Type::Struct(_)) } + /// Whether the type is list type. + #[inline(always)] + pub fn is_list(&self) -> bool { + matches!(self, Type::List(_)) + } + + /// Whether the type is map type. + #[inline(always)] + pub fn is_map(&self) -> bool { + matches!(self, Type::Map(_)) + } + /// Whether the type is nested type. #[inline(always)] pub fn is_nested(&self) -> bool { @@ -160,7 +172,7 @@ impl Type { Ok(REQUIRED_LENGTH[precision as usize - 1]) } - /// Creates decimal type. + /// Creates decimal type. #[inline(always)] pub fn decimal(precision: u32, scale: u32) -> Result { ensure_data_valid!( @@ -634,6 +646,16 @@ impl NestedField { Self::new(id, LIST_FIELD_NAME, field_type, required) } + /// Construct required list type's element field. + pub fn list_required_element(id: i32, field_type: Type) -> Self { + Self::list_element(id, field_type, true) + } + + /// Construct optional list type's element field. + pub fn list_optional_element(id: i32, field_type: Type) -> Self { + Self::list_element(id, field_type, false) + } + /// Construct map type's key field. pub fn map_key_element(id: i32, field_type: Type) -> Self { Self::required(id, MAP_KEY_FIELD_NAME, field_type) @@ -667,6 +689,18 @@ impl NestedField { self.id = id; self } + + /// Set the type of the field + pub(crate) fn of_type(mut self, field_type: Box) -> Self { + self.field_type = field_type; + self + } + + /// Set the name of the field. + pub(crate) fn with_name(mut self, name: impl ToString) -> Self { + self.name = name.to_string(); + self + } } impl fmt::Display for NestedField { @@ -699,6 +733,20 @@ impl ListType { pub fn new(element_field: NestedFieldRef) -> Self { Self { element_field } } + + /// Construct an optional list type with the given element field. + pub fn optional(element_id: i32, element_type: Type) -> Self { + Self { + element_field: NestedField::list_element(element_id, element_type, false).into(), + } + } + + /// Construct a required list type with the given element field. + pub fn required(element_id: i32, element_type: Type) -> Self { + Self { + element_field: NestedField::list_element(element_id, element_type, true).into(), + } + } } /// Module for type serialization/deserialization. diff --git a/crates/iceberg/src/spec/schema/mod.rs b/crates/iceberg/src/spec/schema/mod.rs index 9109990e19..c1675d2509 100644 --- a/crates/iceberg/src/spec/schema/mod.rs +++ b/crates/iceberg/src/spec/schema/mod.rs @@ -21,8 +21,10 @@ use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; use std::sync::Arc; +mod update; mod utils; mod visitor; +pub use self::update::*; pub use self::visitor::*; pub(super) mod _serde; mod id_reassigner; diff --git a/crates/iceberg/src/spec/schema/update.rs b/crates/iceberg/src/spec/schema/update.rs new file mode 100644 index 0000000000..d4763f9e63 --- /dev/null +++ b/crates/iceberg/src/spec/schema/update.rs @@ -0,0 +1,1572 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use typed_builder::TypedBuilder; + +use crate::spec::schema::index::index_parents; +use crate::spec::{ + ListType, Literal, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, SchemaRef, + SchemaVisitor, StructType, Type, visit_schema, +}; +use crate::{Error, ErrorKind, Result, ensure_precondition}; + +const TABLE_ROOT_ID: i32 = -1; + +/// Operations that can be applied to a schema to produce a new schema. These are used in `UpdateSchemaAction` and are not intended to be used directly by end users. Instead, end users should use `UpdateSchema` which will be converted into a list of `SchemaOperation`s. +pub enum SchemaOperation { + /// Add a column to the schema + Add(AddColumn), + /// Update a column's type, doc, or default value + Update(UpdateColumn), + /// Rename a column + Rename(RenameColumn), + /// Delete a column + Delete(DeleteColumn), + /// Move a column + Move(MoveColumn), +} + +/// A column to be added to the schema. +#[derive(TypedBuilder)] +pub struct AddColumn { + #[builder(default, setter(strip_option))] + parent: Option, + #[builder(setter(into))] + name: String, + #[builder(default = true)] + is_optional: bool, + r#type: Type, + #[builder(default, setter(strip_option))] + doc: Option>, + #[builder(default, setter(strip_option))] + default_value: Option, +} + +impl From for SchemaOperation { + fn from(add: AddColumn) -> Self { + SchemaOperation::Add(add) + } +} + +/// A column to be deleted from the schema. +pub struct DeleteColumn { + name: String, +} + +impl DeleteColumn { + /// Create a new `DeleteColumn` with the given column name. + pub fn new(name: impl Into) -> Self { + Self { name: name.into() } + } +} + +impl From for SchemaOperation { + fn from(delete: DeleteColumn) -> Self { + SchemaOperation::Delete(delete) + } +} + +/// A column to be renamed in the schema. +#[derive(TypedBuilder)] +pub struct RenameColumn { + #[builder(setter(into))] + name: String, + #[builder(setter(into))] + new_name: String, +} + +impl From for SchemaOperation { + fn from(rename: RenameColumn) -> Self { + SchemaOperation::Rename(rename) + } +} + +/// A column to be updated in the schema. +#[derive(TypedBuilder)] +pub struct UpdateColumn { + #[builder(setter(into))] + name: String, + #[builder(default, setter(strip_option))] + new_type: Option, + #[builder(default, setter(strip_option))] + new_doc: Option>, + #[builder(default, setter(strip_option))] + new_default_value: Option>, +} + +impl From for SchemaOperation { + fn from(update: UpdateColumn) -> Self { + SchemaOperation::Update(update) + } +} + +/// A column to be moved in the schema. +pub struct MoveColumn { + name: String, + reference_name: String, + move_type: MoveType, +} + +impl From for SchemaOperation { + fn from(move_col: MoveColumn) -> Self { + SchemaOperation::Move(move_col) + } +} + +impl MoveColumn { + /// Move the column to the first position. + pub fn first(name: impl Into) -> Self { + Self { + name: name.into(), + reference_name: String::new(), + move_type: MoveType::First, + } + } + + /// Move the column before the reference column. + pub fn before(name: impl Into, reference: impl Into) -> Self { + Self { + name: name.into(), + reference_name: reference.into(), + move_type: MoveType::Before, + } + } + + /// Move the column after the reference column. + pub fn after(name: impl Into, reference: impl Into) -> Self { + Self { + name: name.into(), + reference_name: reference.into(), + move_type: MoveType::After, + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum MoveType { + First, + Before, + After, +} + +#[derive(Clone, Debug)] +struct Move { + field_id: i32, + reference_field_id: i32, + r#type: MoveType, +} + +impl Move { + fn first(field_id: i32) -> Self { + Move::new(field_id, TABLE_ROOT_ID, MoveType::First) + } + + fn before(field_id: i32, reference_field_id: i32) -> Self { + Move::new(field_id, reference_field_id, MoveType::Before) + } + + fn after(field_id: i32, reference_field_id: i32) -> Self { + Move::new(field_id, reference_field_id, MoveType::After) + } + + fn new(field_id: i32, reference_field_id: i32, r#type: MoveType) -> Self { + Move { + field_id, + reference_field_id, + r#type, + } + } + + fn field_id(&self) -> i32 { + self.field_id + } + + fn reference_field_id(&self) -> i32 { + self.reference_field_id + } + + fn r#type(&self) -> MoveType { + self.r#type + } +} + +/// Applies a list of `SchemaOperation`s to a `Schema` to produce a new `Schema`. This is used in `UpdateSchemaAction` to apply schema changes as part of a transaction commit. This function validates that the schema operations are valid (e.g. that added columns do not have duplicate names, that deleted columns exist, etc.) and returns an error if any invalid operations are found. If all operations are valid, it returns the updated schema. +pub fn schema_update(schema: SchemaRef, operations: &[SchemaOperation]) -> Result { + let mut updates: HashMap = HashMap::new(); + let mut deletes = Vec::new(); + let mut moves: HashMap<_, Vec<_>> = HashMap::new(); + let mut parent_to_added_ids: HashMap<_, Vec<_>> = HashMap::new(); + let mut id_to_parent = index_parents(&schema.r#struct).unwrap(); + let mut last_column_id = schema.highest_field_id; + let mut added_name_to_id = HashMap::new(); + let mut identifier_field_ids = schema.identifier_field_ids.clone(); + + for operation in operations { + match operation { + SchemaOperation::Add(add) => { + let (parent, name, is_optional, field_type, doc, default_value) = ( + &add.parent, + &add.name, + add.is_optional, + &add.r#type, + &add.doc, + &add.default_value, + ); + let mut parent_id = TABLE_ROOT_ID; + let full_name = if let Some(parent) = parent { + let parent_field = schema.field_by_name(parent).ok_or(Error::new( + ErrorKind::PreconditionFailed, + format!("Cannot find parent struct: {}", parent), + ))?; + let parent_field = if parent_field.field_type.is_nested() { + let parent_type = parent_field.field_type.as_ref(); + match parent_type { + Type::List(nested) => nested.element_field.as_ref(), // fields are added to the element type + Type::Map(nested) => nested.value_field.as_ref(), // fields are added to the map value type + _ => parent_field, + } + } else { + parent_field + }; + ensure_precondition!( + parent_field.field_type.is_struct(), + "Cannot add to non-struct column: {}: {}", + &parent, + parent_field.field_type + ); + parent_id = parent_field.id; + let full_name = format!("{}.{}", parent, name); + let current_field = schema.field_by_name(&full_name); + ensure_precondition!( + !deletes.contains(&parent_id), + "Can not add a column that will be deleted: {}", + name + ); + ensure_precondition!( + current_field.is_none() || deletes.contains(¤t_field.unwrap().id), + "Cannot add column, name already exists: {}", + &name + ); + full_name + } else { + let current_field = schema.field_by_name(name); + ensure_precondition!( + current_field.is_none() || deletes.contains(¤t_field.unwrap().id), + "Cannot add column, name already exists: {}", + &name + ); + name.clone() + }; + ensure_precondition!( + default_value.is_some() || is_optional, + "Incompatible change: cannot add required column without a default value: {}", + full_name + ); + last_column_id += 1; + let new_id = last_column_id; + added_name_to_id.insert(full_name, new_id); + + if parent_id != TABLE_ROOT_ID { + id_to_parent.insert(new_id, parent_id); + } + // TODO: Maybe we can use `ReassignFieldIds`? + let assigned_type = assign_fresh_ids(field_type.clone(), &mut last_column_id); + let mut new_field = NestedField::new(new_id, name, assigned_type, !is_optional); + if let Some(doc) = doc { + new_field.doc = doc.clone(); + } + new_field.write_default = default_value.clone(); + new_field.initial_default = default_value.clone(); + updates.insert(new_id, new_field.into()); + parent_to_added_ids + .entry(parent_id) + .or_default() + .push(new_id); + } + SchemaOperation::Delete(delete) => { + let field = schema.field_by_name(&delete.name).ok_or_else(|| { + Error::new( + ErrorKind::PreconditionFailed, + format!("Cannot delete missing column: {}", delete.name), + ) + })?; + ensure_precondition!( + !parent_to_added_ids.contains_key(&field.id), + "Cannot delete a column that has updates: {}", + delete.name + ); + ensure_precondition!( + !updates.contains_key(&field.id), + "Cannot delete a column that has updates: {}", + delete.name + ); + deletes.push(field.id); + } + SchemaOperation::Rename(rename) => { + let (name, new_name) = (&rename.name, &rename.new_name); + let field = schema.field_by_name(name).ok_or(Error::new( + ErrorKind::PreconditionFailed, + format!("Cannot rename missing column: {}", name), + ))?; + ensure_precondition!( + !deletes.contains(&field.id), + "Cannot rename a column that will be deleted: {}", + name + ); + // merge with an update, if present + let field_id = field.id; + let update = updates.get(&field_id); + let new_field = if let Some(update) = update { + Arc::unwrap_or_clone(update.clone()).with_name(new_name) + } else { + Arc::unwrap_or_clone(field.clone()).with_name(new_name) + }; + updates.insert(field_id, Arc::new(new_field)); + if identifier_field_ids.contains(&field_id) { + identifier_field_ids.remove(&field_id); + identifier_field_ids.insert(field_id); + } + } + SchemaOperation::Update(update) => { + let (name, new_type, new_doc, new_default_value) = ( + &update.name, + &update.new_type, + &update.new_doc, + &update.new_default_value, + ); + let field = find_for_update(name, schema.clone(), &updates, &added_name_to_id)? + .ok_or(Error::new( + ErrorKind::PreconditionFailed, + format!("Cannot update missing column: {}", name), + ))?; + ensure_precondition!( + !deletes.contains(&field.id), + "Cannot update column that will be deleted: {}", + name, + ); + let mut new_field = Arc::unwrap_or_clone(field.clone()); + if let Some(new_type) = new_type { + *new_field.field_type = new_type.clone(); + } + if let Some(new_doc) = new_doc { + new_field.doc = new_doc.clone(); + } + if let Some(new_default_value) = new_default_value { + new_field.write_default = new_default_value.clone(); + } + updates.insert(field.id, Arc::new(new_field)); + } + SchemaOperation::Move(r#move) => { + let (name, reference_name, move_type) = + (&r#move.name, &r#move.reference_name, &r#move.move_type); + let field_id = + find_for_move(name, schema.clone(), &added_name_to_id)?.ok_or(Error::new( + ErrorKind::PreconditionFailed, + format!("Cannot move missing column: {}", name), + ))?; + let r#move = if move_type == &MoveType::First { + Move::first(field_id) + } else { + let reference_field_id = find_for_move( + reference_name, + schema.clone(), + &added_name_to_id, + )? + .ok_or(Error::new( + ErrorKind::PreconditionFailed, + format!("Cannot move relative to missing column: {}", reference_name), + ))?; + match move_type { + MoveType::Before => Move::before(field_id, reference_field_id), + MoveType::After => Move::after(field_id, reference_field_id), + _ => unreachable!(), + } + }; + let parent_id = id_to_parent.get(&field_id); + if let Some(&parent_id) = parent_id { + let parent = schema.field_by_id(parent_id).unwrap(); + ensure_precondition!( + parent.field_type.is_struct(), + "Cannot move field in non-struct type: {}", + parent + ); + if r#move.r#type == MoveType::After || r#move.r#type == MoveType::Before { + ensure_precondition!( + parent_id == *id_to_parent.get(&r#move.reference_field_id).unwrap(), + "Cannot move field {} to a different struct", + name, + ); + } + moves.entry(parent_id).or_default().push(r#move); + } else { + if move_type == &MoveType::After || move_type == &MoveType::Before { + ensure_precondition!( + !id_to_parent.contains_key(&r#move.reference_field_id), + "Cannot move field {} to a different struct", + name, + ); + } + moves.entry(TABLE_ROOT_ID).or_default().push(r#move); + } + } + } + } + for &id in &identifier_field_ids { + let field = schema.field_by_id(id); + if let Some(field) = field { + // TODO: add support for `setIdentifierFields` to update identifier fields + ensure_precondition!( + !deletes.contains(&id), + "Cannot delete identifier field: {}.", + field.name + ); + let mut parent_id = id_to_parent.get(&id); + while let Some(p_id) = parent_id { + ensure_precondition!( + !deletes.contains(p_id), + "Cannot delete field {} as it will delete nested identifier field {}.", + p_id, + field.name + ); + parent_id = id_to_parent.get(p_id); + } + } + } + // apply schema changes + let mut visitor = ApplyChangesVisitor { + deletes, + updates, + parent_to_added_ids, + moves, + }; + let struct_type = visit_schema(schema.as_ref(), &mut visitor)? + .unwrap() + .to_struct_type() + .unwrap(); + // validate identifier requirements based on the latest schema + Ok(Schema::builder() + .with_fields(struct_type.fields().to_vec()) + .with_identifier_field_ids(identifier_field_ids) + .build()? + .into()) +} + +fn find_for_update( + name: &str, + schema: SchemaRef, + updates: &HashMap, + added_name_to_id: &HashMap, +) -> Result> { + let field = schema.field_by_name(name); + if let Some(field) = field { + let pending_update = updates.get(&field.id); + if let Some(pending_update) = pending_update { + Ok(Some(pending_update.clone())) + } else { + Ok(Some(field.clone())) + } + } else { + let added_id = added_name_to_id.get(name); + if let Some(added_id) = added_id { + Ok(updates.get(added_id).cloned()) + } else { + Ok(None) + } + } +} + +fn find_for_move( + name: &str, + schema: SchemaRef, + added_name_to_id: &HashMap, +) -> Result> { + let added_id = added_name_to_id.get(name); + if let Some(added_id) = added_id { + return Ok(Some(*added_id)); + } + let field = schema.field_by_name(name); + if let Some(field) = field { + return Ok(Some(field.id)); + } + Ok(None) +} + +fn assign_fresh_ids(field_type: Type, next_id: &mut i32) -> Type { + match field_type { + Type::Primitive(_) => field_type, + Type::Struct(s) => { + let new_fields = s + .fields() + .iter() + .map(|field| { + *next_id += 1; + let new_field_id = *next_id; + let new_type = assign_fresh_ids((*field.field_type).clone(), next_id); + Arc::new(NestedField::new( + new_field_id, + &field.name, + new_type, + field.required, + )) + }) + .collect(); + Type::Struct(StructType::new(new_fields)) + } + Type::List(list) => { + *next_id += 1; + let element_id = *next_id; + let element_type = assign_fresh_ids((*list.element_field.field_type).clone(), next_id); + Type::List(ListType::new(Arc::new(NestedField::new( + element_id, + &list.element_field.name, + element_type, + list.element_field.required, + )))) + } + Type::Map(map) => { + *next_id += 1; + let key_id = *next_id; + *next_id += 1; + let value_id = *next_id; + let key_type = assign_fresh_ids((*map.key_field.field_type).clone(), next_id); + let value_type = assign_fresh_ids((*map.value_field.field_type).clone(), next_id); + Type::Map(MapType::new( + Arc::new(NestedField::new( + key_id, + &map.key_field.name, + key_type, + true, + )), + Arc::new(NestedField::new( + value_id, + &map.value_field.name, + value_type, + map.value_field.required, + )), + )) + } + } +} + +struct ApplyChangesVisitor { + deletes: Vec, + updates: HashMap, + parent_to_added_ids: HashMap>, + moves: HashMap>, +} + +impl SchemaVisitor for ApplyChangesVisitor { + type T = Option; + + fn schema(&mut self, _schema: &Schema, value: Self::T) -> Result { + let added_fields: Vec = self + .parent_to_added_ids + .get(&TABLE_ROOT_ID) + .unwrap_or(&vec![]) + .iter() + .map(|id| self.updates.get(id).unwrap().clone()) + .collect(); + let fields = add_and_move_fields( + value.clone().unwrap().to_struct_type().unwrap().fields(), + &added_fields, + self.moves.get(&TABLE_ROOT_ID).unwrap_or(&vec![]), + ); + if !fields.is_empty() { + return Ok(Some(Type::Struct(StructType::new(fields)))); + } + Ok(value) + } + + fn r#struct(&mut self, r#struct: &StructType, results: Vec) -> Result { + let mut has_change = false; + let mut new_fields: Vec = Vec::with_capacity(results.len()); + for (result_type, field) in results.iter().zip(r#struct.fields()) { + if result_type.is_none() { + has_change = true; + continue; + } + let result_type = result_type.clone().unwrap(); + let update = self.updates.get(&field.id); + let updated = if let Some(update) = update { + Arc::unwrap_or_clone(update.clone()).of_type(Box::new(result_type)) + } else { + Arc::unwrap_or_clone(field.clone()).of_type(Box::new(result_type)) + }; + if field.as_ref() == &updated { + new_fields.push(field.clone()); + } else { + has_change = true; + new_fields.push(updated.into()); + } + } + if has_change { + return Ok(Some(Type::Struct(StructType::new(new_fields)))); + } + Ok(Some(Type::Struct(r#struct.clone()))) + } + + fn field(&mut self, field: &NestedFieldRef, value: Self::T) -> Result { + let field_id = field.id; + // handle deletes + if self.deletes.contains(&field_id) { + return Ok(None); + } + // handle updates + let update = self.updates.get(&field_id); + if let Some(update) = update + && update.field_type.as_ref() != field.field_type.as_ref() + { + return Ok(Some(*update.field_type.clone())); + } + // handle adds + let new_fields: Vec<_> = self + .parent_to_added_ids + .get(&field_id) + .unwrap_or(&vec![]) + .iter() + .filter_map(|id| self.updates.get(id)) + .cloned() + .collect(); + let columns_to_move = self.moves.get(&field_id).cloned().unwrap_or(vec![]); + if !new_fields.is_empty() || !columns_to_move.is_empty() { + let fields = add_and_move_fields( + value.clone().unwrap().to_struct_type().unwrap().fields(), + &new_fields, + &columns_to_move, + ); + if !fields.is_empty() { + return Ok(Some(Type::Struct(StructType::new(fields)))); + } + } + Ok(value) + } + + fn list(&mut self, list: &ListType, element_result: Self::T) -> Result { + let element_field = list.element_field.clone(); + let element_type = self + .field(&element_field, element_result)? + .ok_or(Error::new( + ErrorKind::PreconditionFailed, + format!("Cannot delete list element type from list: {:?}", list), + ))?; + let element_update = self.updates.get(&element_field.id); + let is_element_optional = if let Some(element_update) = element_update { + !element_update.required + } else { + !element_field.required + }; + let is_element_required = !is_element_optional; + if is_element_required == element_field.required + && &element_type == list.element_field.field_type.as_ref() + { + return Ok(Some(Type::List(list.clone()))); + } + if is_element_optional { + Ok(Some(Type::List(ListType::optional( + list.element_field.id, + element_type, + )))) + } else { + Ok(Some(Type::List(ListType::required( + list.element_field.id, + element_type, + )))) + } + } + + fn map( + &mut self, + map: &MapType, + key_result: Self::T, + value_result: Self::T, + ) -> Result { + let key_id = map.key_field.id; + if self.deletes.contains(&key_id) { + return Err(Error::new( + ErrorKind::PreconditionFailed, + format!("Cannot delete map keys: {:?}", map), + )); + } else if self.updates.contains_key(&key_id) { + return Err(Error::new( + ErrorKind::PreconditionFailed, + format!("Cannot update map keys: {:?}", map), + )); + } else if self.parent_to_added_ids.contains_key(&key_id) { + return Err(Error::new( + ErrorKind::PreconditionFailed, + format!("Cannot add fields to map keys: {:?}", map), + )); + } else if map.key_field.field_type.as_ref() != &key_result.unwrap() { + return Err(Error::new( + ErrorKind::PreconditionFailed, + format!("Cannot alter map keys: {:?}", map), + )); + } + let value_field = map.value_field.clone(); + let value_type = self.field(&value_field, value_result)?.ok_or(Error::new( + ErrorKind::PreconditionFailed, + format!("Cannot delete value type from map: {:?}", map), + ))?; + let value_update = self.updates.get(&value_field.id); + let is_value_required = if let Some(update) = value_update { + update.required + } else { + map.value_field.required + }; + if is_value_required == map.value_field.required + && map.value_field.field_type.as_ref() == &value_type + { + return Ok(Some(Type::Map(map.clone()))); + } + if is_value_required { + Ok(Some(Type::Map(MapType::required( + map.key_field.id, + *map.key_field.field_type.clone(), + map.value_field.id, + value_type, + )))) + } else { + Ok(Some(Type::Map(MapType::optional( + map.key_field.id, + *map.key_field.field_type.clone(), + map.value_field.id, + value_type, + )))) + } + } + + fn primitive(&mut self, p: &PrimitiveType) -> Result { + Ok(Some(Type::Primitive(p.clone()))) + } +} + +fn add_and_move_fields( + fields: &[NestedFieldRef], + adds: &[NestedFieldRef], + moves: &[Move], +) -> Vec { + if !adds.is_empty() { + if !moves.is_empty() { + return move_fields(&add_fields(fields, adds), moves); + } + return add_fields(fields, adds); + } else if !moves.is_empty() { + return move_fields(fields, moves); + } + vec![] +} + +fn add_fields(fields: &[NestedFieldRef], adds: &[NestedFieldRef]) -> Vec { + let mut new_fields = fields.to_owned(); + new_fields.extend(adds.iter().cloned()); + new_fields +} + +fn move_fields(fields: &[NestedFieldRef], moves: &[Move]) -> Vec { + let mut reordered = fields.to_vec(); + for r#move in moves { + let idx = reordered + .iter() + .position(|f| f.id == r#move.field_id()) + .unwrap(); + let to_move = reordered.remove(idx); + match r#move.r#type() { + MoveType::First => { + reordered.insert(0, to_move); + } + MoveType::Before => { + let before_idx = reordered + .iter() + .position(|f| f.id == r#move.reference_field_id()) + .unwrap(); + reordered.insert(before_idx, to_move); + } + MoveType::After => { + let after_idx = reordered + .iter() + .position(|f| f.id == r#move.reference_field_id()) + .unwrap(); + reordered.insert(after_idx + 1, to_move); + } + } + } + reordered +} + +#[cfg(test)] +mod tests { + use std::sync::{Arc, LazyLock}; + + use crate::spec::{ + AddColumn, ListType, Literal, MapType, NestedField, PrimitiveType, RenameColumn, Schema, + SchemaOperation, StructType, Type, UpdateColumn, schema_update, + }; + + static SCHEMA: LazyLock = LazyLock::new(|| { + Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", PrimitiveType::Int.into()).into(), + NestedField::optional(2, "data", PrimitiveType::String.into()).into(), + NestedField::optional( + 3, + "preferences", + StructType::new(vec![ + NestedField::required(8, "feature1", PrimitiveType::Boolean.into()).into(), + NestedField::optional(9, "feature2", PrimitiveType::Boolean.into()).into(), + ]) + .into(), + ) + .with_doc("struct of named boolean options") + .into(), + NestedField::required( + 4, + "locations", + MapType::required( + 10, + StructType::new(vec![ + NestedField::required(20, "address", PrimitiveType::String.into()) + .into(), + NestedField::required(21, "city", PrimitiveType::String.into()).into(), + NestedField::required(22, "state", PrimitiveType::String.into()).into(), + NestedField::required(23, "zip", PrimitiveType::Int.into()).into(), + ]) + .into(), + 11, + StructType::new(vec![ + NestedField::required(12, "lat", PrimitiveType::Float.into()).into(), + NestedField::required(13, "long", PrimitiveType::Float.into()).into(), + ]) + .into(), + ) + .into(), + ) + .with_doc("map of address to coordinate") + .into(), + NestedField::optional( + 5, + "points", + ListType::new( + NestedField::list_optional_element( + 14, + StructType::new(vec![ + NestedField::required(15, "x", PrimitiveType::Long.into()).into(), + NestedField::required(16, "y", PrimitiveType::Long.into()).into(), + ]) + .into(), + ) + .into(), + ) + .into(), + ) + .with_doc("2-D cartesian points") + .into(), + NestedField::required( + 6, + "doubles", + ListType::new( + NestedField::list_required_element(17, PrimitiveType::Double.into()).into(), + ) + .into(), + ) + .into(), + NestedField::optional( + 7, + "properties", + MapType::optional( + 18, + PrimitiveType::String.into(), + 19, + PrimitiveType::String.into(), + ) + .into(), + ) + .with_doc("string map of properties") + .into(), + ]) + .build() + .unwrap() + }); + + #[test] + fn no_changes() { + let base = SCHEMA.clone(); + let expected = SCHEMA.clone(); + let updated = schema_update(Arc::new(base), &[]).unwrap(); + assert_eq!(updated.as_ref(), &expected); + } + + #[test] + #[ignore = "not yet implemented"] + fn delete_fields() {} + + #[test] + #[ignore = "not yet implemented"] + fn delete_fields_case_sensitive_disabled() { + todo!() + } + + #[test] + fn update_types() { + let expected = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", PrimitiveType::Long.into()).into(), + NestedField::optional(2, "data", PrimitiveType::String.into()).into(), + NestedField::optional( + 3, + "preferences", + StructType::new(vec![ + NestedField::required(8, "feature1", PrimitiveType::Boolean.into()).into(), + NestedField::optional(9, "feature2", PrimitiveType::Boolean.into()).into(), + ]) + .into(), + ) + .with_doc("struct of named boolean options") + .into(), + NestedField::required( + 4, + "locations", + MapType::required( + 10, + StructType::new(vec![ + NestedField::required(20, "address", PrimitiveType::String.into()) + .into(), + NestedField::required(21, "city", PrimitiveType::String.into()).into(), + NestedField::required(22, "state", PrimitiveType::String.into()).into(), + NestedField::required(23, "zip", PrimitiveType::Int.into()).into(), + ]) + .into(), + 11, + StructType::new(vec![ + NestedField::required(12, "lat", PrimitiveType::Double.into()).into(), + NestedField::required(13, "long", PrimitiveType::Double.into()).into(), + ]) + .into(), + ) + .into(), + ) + .with_doc("map of address to coordinate") + .into(), + NestedField::optional( + 5, + "points", + ListType::new( + NestedField::list_optional_element( + 14, + StructType::new(vec![ + NestedField::required(15, "x", PrimitiveType::Long.into()).into(), + NestedField::required(16, "y", PrimitiveType::Long.into()).into(), + ]) + .into(), + ) + .into(), + ) + .into(), + ) + .with_doc("2-D cartesian points") + .into(), + NestedField::required( + 6, + "doubles", + ListType::new( + NestedField::list_required_element(17, PrimitiveType::Double.into()).into(), + ) + .into(), + ) + .into(), + NestedField::optional( + 7, + "properties", + MapType::optional( + 18, + PrimitiveType::String.into(), + 19, + PrimitiveType::String.into(), + ) + .into(), + ) + .with_doc("string map of properties") + .into(), + ]) + .build() + .unwrap(); + let updated = schema_update(Arc::new(SCHEMA.clone()), &[ + SchemaOperation::Update( + UpdateColumn::builder() + .name("id") + .new_type(PrimitiveType::Long.into()) + .build(), + ), + SchemaOperation::Update( + UpdateColumn::builder() + .name("locations.lat") + .new_type(PrimitiveType::Double.into()) + .build(), + ), + SchemaOperation::Update( + UpdateColumn::builder() + .name("locations.long") + .new_type(PrimitiveType::Double.into()) + .build(), + ), + ]) + .unwrap(); + assert_eq!(&expected, updated.as_ref()); + } + + #[test] + #[ignore = "not yet implemented"] + fn update_type_preserves_other_metadata() { + todo!() + } + + #[test] + #[ignore = "not yet implemented"] + fn update_doc_preserves_other_metadata() { + todo!() + } + + #[test] + #[ignore = "not yet implemented"] + fn update_default_preserves_other_metadata() { + todo!() + } + + #[test] + #[ignore = "not yet implemented"] + fn update_types_case_insensitive() { + todo!() + } + + #[test] + #[ignore = "not yet implemented"] + fn update_failure() { + todo!() + } + + #[test] + fn rename() { + let renamed = schema_update(Arc::new(SCHEMA.clone()), &[ + SchemaOperation::Rename( + RenameColumn::builder() + .name("data") + .new_name("json") + .build(), + ), + SchemaOperation::Rename( + RenameColumn::builder() + .name("preferences") + .new_name("options") + .build(), + ), + SchemaOperation::Rename( + RenameColumn::builder() + .name("preferences.feature2") + .new_name("newfeature") + .build(), + ), + SchemaOperation::Rename( + RenameColumn::builder() + .name("locations.lat") + .new_name("latitude") + .build(), + ), + SchemaOperation::Rename( + RenameColumn::builder() + .name("points.x") + .new_name("X") + .build(), + ), + SchemaOperation::Rename( + RenameColumn::builder() + .name("points.y") + .new_name("Y") + .build(), + ), + ]); + let expected = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", PrimitiveType::Int.into()).into(), + NestedField::optional(2, "json", PrimitiveType::String.into()).into(), + NestedField::optional( + 3, + "options", + StructType::new(vec![ + NestedField::required(8, "feature1", PrimitiveType::Boolean.into()).into(), + NestedField::optional(9, "newfeature", PrimitiveType::Boolean.into()) + .into(), + ]) + .into(), + ) + .with_doc("struct of named boolean options") + .into(), + NestedField::required( + 4, + "locations", + MapType::new( + NestedField::map_key_element( + 10, + StructType::new(vec![ + NestedField::required(20, "address", PrimitiveType::String.into()) + .into(), + NestedField::required(21, "city", PrimitiveType::String.into()) + .into(), + NestedField::required(22, "state", PrimitiveType::String.into()) + .into(), + NestedField::required(23, "zip", PrimitiveType::Int.into()).into(), + ]) + .into(), + ) + .into(), + NestedField::map_value_element( + 11, + StructType::new(vec![ + NestedField::required(12, "latitude", PrimitiveType::Float.into()) + .into(), + NestedField::required(13, "long", PrimitiveType::Float.into()) + .into(), + ]) + .into(), + true, + ) + .into(), + ) + .into(), + ) + .with_doc("map of address to coordinate") + .into(), + NestedField::optional( + 5, + "points", + ListType::new( + NestedField::list_element( + 14, + StructType::new(vec![ + NestedField::required(15, "X", PrimitiveType::Long.into()).into(), + NestedField::required(16, "Y", PrimitiveType::Long.into()).into(), + ]) + .into(), + false, + ) + .into(), + ) + .into(), + ) + .with_doc("2-D cartesian points") + .into(), + NestedField::required( + 6, + "doubles", + ListType::new( + NestedField::required(17, "element", PrimitiveType::Double.into()).into(), + ) + .into(), + ) + .into(), + NestedField::optional( + 7, + "properties", + MapType::optional( + 18, + PrimitiveType::String.into(), + 19, + PrimitiveType::String.into(), + ) + .into(), + ) + .with_doc("string map of properties") + .into(), + ]) + .build() + .unwrap(); + assert_eq!(renamed.unwrap().as_ref(), &expected); + } + + #[test] + #[ignore = "not yet implemented"] + fn rename_case_insensitive() {} + + #[test] + fn add_fields() { + let added = schema_update(Arc::new(SCHEMA.clone()), &[ + SchemaOperation::Add( + AddColumn::builder() + .name("topLevel") + .r#type(Type::Primitive(PrimitiveType::Decimal { + precision: 9, + scale: 2, + })) + .build(), + ), + SchemaOperation::Add( + AddColumn::builder() + .parent("locations".to_string()) + .name("alt") + .r#type(Type::Primitive(PrimitiveType::Float)) + .build(), + ), + SchemaOperation::Add( + AddColumn::builder() + .parent("points".to_string()) + .name("z") + .r#type(Type::Primitive(PrimitiveType::Long)) + .build(), + ), + SchemaOperation::Add( + AddColumn::builder() + .parent("points".to_string()) + .name("t.t") + .r#type(Type::Primitive(PrimitiveType::Long)) + .build(), + ), + ]) + .unwrap(); + + let expected = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", PrimitiveType::Int.into()).into(), + NestedField::optional(2, "data", PrimitiveType::String.into()).into(), + NestedField::optional( + 3, + "preferences", + StructType::new(vec![ + NestedField::required(8, "feature1", PrimitiveType::Boolean.into()).into(), + NestedField::optional(9, "feature2", PrimitiveType::Boolean.into()).into(), + ]) + .into(), + ) + .with_doc("struct of named boolean options") + .into(), + NestedField::required( + 4, + "locations", + MapType::new( + NestedField::map_key_element( + 10, + StructType::new(vec![ + NestedField::required(20, "address", PrimitiveType::String.into()) + .into(), + NestedField::required(21, "city", PrimitiveType::String.into()) + .into(), + NestedField::required(22, "state", PrimitiveType::String.into()) + .into(), + NestedField::required(23, "zip", PrimitiveType::Int.into()).into(), + ]) + .into(), + ) + .into(), + NestedField::map_value_element( + 11, + StructType::new(vec![ + NestedField::required(12, "lat", PrimitiveType::Float.into()) + .into(), + NestedField::required(13, "long", PrimitiveType::Float.into()) + .into(), + NestedField::optional(25, "alt", PrimitiveType::Float.into()) + .into(), + ]) + .into(), + true, + ) + .into(), + ) + .into(), + ) + .with_doc("map of address to coordinate") + .into(), + NestedField::optional( + 5, + "points", + ListType::optional( + 14, + StructType::new(vec![ + NestedField::required(15, "x", PrimitiveType::Long.into()).into(), + NestedField::required(16, "y", PrimitiveType::Long.into()).into(), + NestedField::optional(26, "z", PrimitiveType::Long.into()).into(), + NestedField::optional(27, "t.t", PrimitiveType::Long.into()).into(), + ]) + .into(), + ) + .into(), + ) + .with_doc("2-D cartesian points") + .into(), + NestedField::required( + 6, + "doubles", + ListType::new( + NestedField::required(17, "element", PrimitiveType::Double.into()).into(), + ) + .into(), + ) + .into(), + NestedField::optional( + 7, + "properties", + MapType::optional( + 18, + PrimitiveType::String.into(), + 19, + PrimitiveType::String.into(), + ) + .into(), + ) + .with_doc("string map of properties") + .into(), + NestedField::optional( + 24, + "topLevel", + PrimitiveType::Decimal { + precision: 9, + scale: 2, + } + .into(), + ) + .into(), + ]) + .build() + .unwrap(); + + assert_eq!(added.as_struct(), expected.as_struct()); + } + + #[test] + fn add_column_with_default() { + let schema: Arc = Arc::new( + Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "id", PrimitiveType::Int.into()).into(), + ]) + .build() + .unwrap(), + ); + let expected = Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "id", PrimitiveType::Int.into()).into(), + NestedField::optional(2, "data", PrimitiveType::String.into()) + .with_doc("description") + .with_initial_default(Literal::string("unknown")) + .with_write_default(Literal::string("unknown")) + .into(), + ]) + .build() + .unwrap(); + let result = schema_update(schema.clone(), &[SchemaOperation::Add( + AddColumn::builder() + .name("data") + .r#type(Type::Primitive(PrimitiveType::String)) + .doc(Some("description".into())) + .default_value(Literal::string("unknown")) + .build(), + )]) + .unwrap(); + assert_eq!(&expected, result.as_ref()); + } + + #[test] + fn add_column_with_update_column_default() { + let schema: Arc = Arc::new( + Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "id", PrimitiveType::Int.into()).into(), + ]) + .build() + .unwrap(), + ); + let expected = Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "id", PrimitiveType::Int.into()).into(), + NestedField::optional(2, "data", PrimitiveType::String.into()) + .with_write_default(Literal::string("unknown")) + .into(), + ]) + .build() + .unwrap(); + let result = schema_update(schema.clone(), &[ + SchemaOperation::Add( + AddColumn::builder() + .name("data") + .r#type(PrimitiveType::String.into()) + .build(), + ), + SchemaOperation::Update( + UpdateColumn::builder() + .name("data") + .new_default_value(Some(Literal::string("unknown"))) + .build(), + ), + ]) + .unwrap(); + assert_eq!(&expected, result.as_ref()); + } + + #[test] + fn add_nested_struct() { + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", PrimitiveType::Int.into()).into(), + ]) + .build() + .unwrap(), + ); + let struct_type = StructType::new(vec![ + NestedField::required(1, "lat", PrimitiveType::Int.into()).into(), + NestedField::optional(2, "long", PrimitiveType::Int.into()).into(), + ]); + let expected = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", PrimitiveType::Int.into()).into(), + NestedField::optional( + 2, + "location", + StructType::new(vec![ + NestedField::required(3, "lat", PrimitiveType::Int.into()).into(), + NestedField::optional(4, "long", PrimitiveType::Int.into()).into(), + ]) + .into(), + ) + .into(), + ]) + .build() + .unwrap(); + + let result = schema_update(schema.clone(), &[SchemaOperation::Add( + AddColumn::builder() + .name("location") + .r#type(Type::Struct(struct_type)) + .build(), + )]) + .unwrap(); + assert_eq!(&expected, result.as_ref()); + } + + #[test] + fn add_nested_map_of_structs() { + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", PrimitiveType::Int.into()).into(), + ]) + .build() + .unwrap(), + ); + let expected = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", PrimitiveType::Int.into()).into(), + NestedField::optional( + 2, + "locations", + MapType::optional( + 3, + StructType::new(vec![ + NestedField::required(5, "address", PrimitiveType::String.into()) + .into(), + NestedField::required(6, "city", PrimitiveType::String.into()).into(), + NestedField::required(7, "state", PrimitiveType::String.into()).into(), + NestedField::required(8, "zip", PrimitiveType::Int.into()).into(), + ]) + .into(), + 4, + StructType::new(vec![ + NestedField::required(9, "lat", PrimitiveType::Int.into()).into(), + NestedField::optional(10, "long", PrimitiveType::Int.into()).into(), + ]) + .into(), + ) + .into(), + ) + .into(), + ]) + .build() + .unwrap(); + let map = MapType::optional( + 1, + StructType::new(vec![ + NestedField::required(20, "address", PrimitiveType::String.into()).into(), + NestedField::required(21, "city", PrimitiveType::String.into()).into(), + NestedField::required(22, "state", PrimitiveType::String.into()).into(), + NestedField::required(23, "zip", PrimitiveType::Int.into()).into(), + ]) + .into(), + 2, + StructType::new(vec![ + NestedField::required(9, "lat", PrimitiveType::Int.into()).into(), + NestedField::optional(8, "long", PrimitiveType::Int.into()).into(), + ]) + .into(), + ); + let result = schema_update(schema, &[SchemaOperation::Add( + AddColumn::builder() + .name("locations") + .r#type(map.into()) + .build(), + )]) + .unwrap(); + assert_eq!(&expected, result.as_ref()) + } + + #[test] + #[ignore = "not yet implemented"] + fn add_nested_list_of_structs() { + todo!() + } + + #[test] + #[ignore = "not yet implemented"] + fn add_required_column_without_default() { + todo!() + } + + #[test] + #[ignore = "not yet implemented"] + fn add_required_column_with_default() { + todo!() + } + + #[test] + #[ignore = "not yet implemented"] + fn add_required_column_with_update_column_default() { + todo!() + } + + #[test] + #[ignore = "not yet implemented"] + fn add_required_column_case_insensitive() { + todo!() + } + + #[test] + #[ignore = "not yet implemented"] + fn add_multiple_required_column_case_insensitive() { + todo!() + } + + #[test] + #[ignore = "not yet implemented"] + fn make_column_optional() { + todo!() + } + + #[test] + #[ignore = "not yet implemented"] + fn require_column() { + todo!() + } +} diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 159021d9f2..2aecf8523e 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -66,7 +66,6 @@ use std::sync::Arc; use std::time::Duration; use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext}; -pub use update_schema::AddColumn; use crate::error::Result; use crate::spec::TableProperties; @@ -139,11 +138,6 @@ impl Transaction { UpdatePropertiesAction::new() } - /// Creates an update schema action. - pub fn update_schema(&self) -> UpdateSchemaAction { - UpdateSchemaAction::new() - } - /// Creates a fast append action. pub fn fast_append(&self) -> FastAppendAction { FastAppendAction::new() @@ -164,6 +158,11 @@ impl Transaction { UpdateStatisticsAction::new() } + /// Update the schema of table + pub fn update_schema(&self) -> UpdateSchemaAction { + UpdateSchemaAction::new() + } + /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { if self.actions.is_empty() { diff --git a/crates/iceberg/src/transaction/update_schema.rs b/crates/iceberg/src/transaction/update_schema.rs index 6ee37be2c9..20bf758185 100644 --- a/crates/iceberg/src/transaction/update_schema.rs +++ b/crates/iceberg/src/transaction/update_schema.rs @@ -15,1133 +15,227 @@ // specific language governing permissions and limitations // under the License. -use std::collections::{HashMap, HashSet}; use std::sync::Arc; use async_trait::async_trait; -use typed_builder::TypedBuilder; -use crate::spec::{ - ListType, Literal, MapType, NestedField, NestedFieldRef, SCHEMA_NAME_DELIMITER, Schema, - StructType, Type, -}; +use crate::spec::{SchemaOperation, schema_update}; use crate::table::Table; -use crate::transaction::action::{ActionCommit, TransactionAction}; -use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate}; +use crate::transaction::{ActionCommit, TransactionAction}; +use crate::{Result, TableRequirement, TableUpdate}; -// Default ID for a new column. This will be re-assigned to a fresh ID at commit time. -const DEFAULT_FIELD_ID: i32 = 0; - -/// Declarative specification for adding a column in [`UpdateSchemaAction`]. -/// -/// Use helper constructors such as [`AddColumn::optional`] and [`AddColumn::required`], -/// optionally combined with [`AddColumn::with_parent`] and [`AddColumn::with_doc`], then pass -/// the value to -/// [`UpdateSchemaAction::add_column`]. -#[derive(TypedBuilder)] -pub struct AddColumn { - #[builder(default = None, setter(strip_option, into))] - parent: Option, - #[builder(setter(into))] - name: String, - #[builder(default = false)] - required: bool, - field_type: Type, - #[builder(default = None, setter(strip_option, into))] - doc: Option, - #[builder(default = None, setter(strip_option))] - initial_default: Option, - #[builder(default = None, setter(strip_option))] - write_default: Option, -} - -impl AddColumn { - /// Create a root-level optional column specification. - pub fn optional(name: impl ToString, field_type: Type) -> Self { - Self::builder() - .name(name.to_string()) - .field_type(field_type) - .required(false) - .build() - } - - /// Create a root-level required column specification. - pub fn required(name: impl ToString, field_type: Type, initial_default: Literal) -> Self { - Self::builder() - .name(name.to_string()) - .field_type(field_type) - .required(true) - .initial_default(initial_default.clone()) - .write_default(initial_default) - .build() - } - - fn to_nested_field(&self) -> NestedFieldRef { - let mut field = NestedField::new( - DEFAULT_FIELD_ID, - self.name.clone(), - self.field_type.clone(), - self.required, - ); - - field.doc = self.doc.clone(); - field.initial_default = self.initial_default.clone(); - field.write_default = self.write_default.clone(); - Arc::new(field) - } -} - -/// Schema evolution API modeled after the Java `SchemaUpdate` implementation. -/// -/// This action accumulates schema modifications (column additions and deletions) -/// via builder methods. At commit time, it validates all operations against the -/// current table schema, auto-assigns field IDs from `table.metadata().last_column_id()`, -/// builds a new schema, and emits `AddSchema` + `SetCurrentSchema` updates with a -/// `CurrentSchemaIdMatch` requirement. -/// -/// # Example -/// -/// ```ignore -/// let tx = Transaction::new(&table); -/// let action = tx.update_schema() -/// .add_column(AddColumn::optional("new_col", Type::Primitive(PrimitiveType::Int))) -/// .add_column( -/// AddColumn::optional("email", Type::Primitive(PrimitiveType::String)) -/// .with_parent("person") -/// ) -/// .delete_column("old_col"); -/// let tx = action.apply(tx).unwrap(); -/// let table = tx.commit(&catalog).await.unwrap(); -/// ``` pub struct UpdateSchemaAction { - additions: Vec, - deletes: Vec, + operations: Vec, } impl UpdateSchemaAction { - /// Creates a new empty `UpdateSchemaAction`. - pub(crate) fn new() -> Self { + pub fn new() -> Self { Self { - additions: Vec::new(), - deletes: Vec::new(), + operations: Vec::new(), } } - // --- Root-level additions --- - - /// Add a column to the table schema. - /// - /// To add a root-level column, leave `AddColumn::parent` as `None`. - /// For nested additions, set a parent path (for example via [`AddColumn::with_parent`]). - /// If the parent resolves to a map/list, the column is added to map value/list element. - pub fn add_column(mut self, add_column: AddColumn) -> Self { - self.additions.push(add_column); - self - } - - // --- Other builder methods --- - - /// Record a column deletion by name. - /// - /// At commit time, the column must exist in the current schema. - pub fn delete_column(mut self, name: impl ToString) -> Self { - self.deletes.push(name.to_string()); + pub fn push_operation(mut self, op: SchemaOperation) -> Self { + self.operations.push(op); self } } -// --------------------------------------------------------------------------- -// ID assignment helpers -// --------------------------------------------------------------------------- - -/// Recursively assign fresh field IDs to a `NestedField` and all its nested sub-fields. -/// -/// This follows the same recursive pattern as `ReassignFieldIds::reassign_ids_visit_type` -/// from `crate::spec::schema::id_reassigner`, but operates on new fields with placeholder -/// IDs rather than reassigning an existing schema. `ReassignFieldIds` cannot be used -/// directly here because it rejects duplicate old IDs (all new fields share placeholder -/// ID `DEFAULT_FIELD_ID`). -fn assign_fresh_ids(field: &NestedField, next_id: &mut i32) -> NestedFieldRef { - *next_id += 1; - let new_id = *next_id; - let new_type = assign_fresh_ids_to_type(&field.field_type, next_id); - - Arc::new(NestedField { - id: new_id, - name: field.name.clone(), - required: field.required, - field_type: Box::new(new_type), - doc: field.doc.clone(), - initial_default: field.initial_default.clone(), - write_default: field.write_default.clone(), - }) -} - -/// Recursively assign fresh field IDs to all nested fields within a `Type`. -fn assign_fresh_ids_to_type(field_type: &Type, next_id: &mut i32) -> Type { - match field_type { - Type::Primitive(_) => field_type.clone(), - Type::Struct(struct_type) => { - let new_fields: Vec = struct_type - .fields() - .iter() - .map(|f| assign_fresh_ids(f, next_id)) - .collect(); - Type::Struct(StructType::new(new_fields)) - } - Type::List(list_type) => { - let new_element = assign_fresh_ids(&list_type.element_field, next_id); - Type::List(ListType { - element_field: new_element, - }) - } - Type::Map(map_type) => { - let new_key = assign_fresh_ids(&map_type.key_field, next_id); - let new_value = assign_fresh_ids(&map_type.value_field, next_id); - Type::Map(MapType { - key_field: new_key, - value_field: new_value, - }) - } - } -} - -// --------------------------------------------------------------------------- -// Parent path resolution -// --------------------------------------------------------------------------- - -/// Resolve a parent path to the target struct's parent field ID and a reference -/// to its `StructType`. -/// -/// If the parent is a map, navigates to the value field. If a list, navigates to -/// the element field. The final target must be a struct type. -fn resolve_parent_target<'a>( - base_schema: &'a Schema, - parent: &str, -) -> Result<(i32, &'a StructType)> { - base_schema - .field_by_name(parent) - .ok_or_else(|| { - Error::new( - ErrorKind::PreconditionFailed, - format!("Cannot add column: parent '{parent}' not found"), - ) - }) - .and_then(|parent_field| match parent_field.field_type.as_ref() { - Type::Struct(s) => Ok((parent_field.id, s)), - Type::Map(m) => match m.value_field.field_type.as_ref() { - Type::Struct(s) => Ok((m.value_field.id, s)), - _ => Err(Error::new( - ErrorKind::PreconditionFailed, - format!("Cannot add column: map value of '{parent}' is not a struct"), - )), - }, - Type::List(l) => match l.element_field.field_type.as_ref() { - Type::Struct(s) => Ok((l.element_field.id, s)), - _ => Err(Error::new( - ErrorKind::PreconditionFailed, - format!("Cannot add column: list element of '{parent}' is not a struct"), - )), - }, - _ => Err(Error::new( - ErrorKind::PreconditionFailed, - format!("Cannot add column: parent '{parent}' is not a struct, map, or list"), - )), - }) -} - -// --------------------------------------------------------------------------- -// Schema tree rebuild -// --------------------------------------------------------------------------- - -/// Rebuild a slice of fields, applying deletions and additions at every level, -/// plus any additions keyed by `parent_id` (`None` represents the table root). -fn rebuild_fields( - fields: &[NestedFieldRef], - adds: &HashMap, Vec>, - delete_ids: &HashSet, - parent_id: Option, -) -> Vec { - fields - .iter() - .filter(|f| !delete_ids.contains(&f.id)) - .map(|f| rebuild_field(f, adds, delete_ids)) - .chain(adds.get(&parent_id).into_iter().flatten().cloned()) - .collect() -} - -/// Recursively rebuild a single field. If the field (or any descendant) is a struct -/// that has pending additions, those additions are appended to the struct's fields. -/// Fields whose IDs appear in `delete_ids` are filtered out at every struct level. -fn rebuild_field( - field: &NestedFieldRef, - adds: &HashMap, Vec>, - delete_ids: &HashSet, -) -> NestedFieldRef { - match field.field_type.as_ref() { - Type::Primitive(_) => field.clone(), - Type::Struct(s) => { - let new_fields = rebuild_fields(s.fields(), adds, delete_ids, Some(field.id)); - Arc::new(NestedField { - id: field.id, - name: field.name.clone(), - required: field.required, - field_type: Box::new(Type::Struct(StructType::new(new_fields))), - doc: field.doc.clone(), - initial_default: field.initial_default.clone(), - write_default: field.write_default.clone(), - }) - } - Type::List(l) => { - let new_element = rebuild_field(&l.element_field, adds, delete_ids); - Arc::new(NestedField { - id: field.id, - name: field.name.clone(), - required: field.required, - field_type: Box::new(Type::List(ListType { - element_field: new_element, - })), - doc: field.doc.clone(), - initial_default: field.initial_default.clone(), - write_default: field.write_default.clone(), - }) - } - Type::Map(m) => { - let new_key = rebuild_field(&m.key_field, adds, delete_ids); - let new_value = rebuild_field(&m.value_field, adds, delete_ids); - Arc::new(NestedField { - id: field.id, - name: field.name.clone(), - required: field.required, - field_type: Box::new(Type::Map(MapType { - key_field: new_key, - value_field: new_value, - })), - doc: field.doc.clone(), - initial_default: field.initial_default.clone(), - write_default: field.write_default.clone(), - }) - } +impl Default for UpdateSchemaAction { + fn default() -> Self { + Self::new() } } -// --------------------------------------------------------------------------- -// TransactionAction implementation -// --------------------------------------------------------------------------- - #[async_trait] impl TransactionAction for UpdateSchemaAction { async fn commit(self: Arc, table: &Table) -> Result { - let base_schema = table.metadata().current_schema(); - let mut last_column_id = table.metadata().last_column_id(); - - // --- 1. Validate deletes --- - let delete_ids = self - .deletes - .iter() - .map(|name: &String| { - base_schema - .field_by_name(name) - .ok_or_else(|| { - Error::new( - ErrorKind::PreconditionFailed, - format!("Cannot delete missing column: {name}"), - ) - }) - .and_then(|field| { - match base_schema - .identifier_field_ids() - .find(|id| *id == field.id) - { - Some(_) => Err(Error::new( - ErrorKind::PreconditionFailed, - format!("Cannot delete identifier field: {name}"), - )), - None => Ok(field.id), - } - }) - }) - .collect::>>()?; - - // --- 2. Resolve parents, validate additions, assign IDs, and group by parent ID --- - // We assign IDs inline (before grouping) to preserve the caller's insertion order, - // since HashMap iteration order is non-deterministic. - let mut additions_by_parent: HashMap, Vec> = HashMap::new(); - - for add in &self.additions { - let pending_field = add.to_nested_field(); - - // Check that name does not contain `SCHEMA_NAME_DELIMITER`. - if pending_field.name.contains(SCHEMA_NAME_DELIMITER) { - return Err(Error::new( - ErrorKind::PreconditionFailed, - format!( - "Cannot add column with ambiguous name: {}. Use `AddColumn::with_parent` to add a column to a nested struct.", - pending_field.name - ), - )); - } - - // Required columns without an initial default need allow_incompatible_changes. - if pending_field.required && pending_field.initial_default.is_none() { - return Err(Error::new( - ErrorKind::PreconditionFailed, - format!( - "Incompatible change: cannot add required column without an initial default: {}", - pending_field.name - ), - )); - } - - let parent_id = match &add.parent { - None => { - // Root-level: check name conflict against root-level fields. - if let Some(existing) = base_schema.field_by_name(&pending_field.name) - && !delete_ids.contains(&existing.id) - { - return Err(Error::new( - ErrorKind::PreconditionFailed, - format!( - "Cannot add column, name already exists: {}", - pending_field.name - ), - )); - } - None - } - Some(parent_path) => { - // Nested: resolve parent, check name conflict within parent struct. - let (resolved_parent_id, parent_struct) = - resolve_parent_target(base_schema, parent_path)?; - - if parent_struct.fields().iter().any(|f| { - f.name == pending_field.name - && !delete_ids.contains(&f.id) - && !delete_ids.contains(&resolved_parent_id) - }) { - return Err(Error::new( - ErrorKind::PreconditionFailed, - format!( - "Cannot add column, name already exists in '{}': {}", - parent_path, pending_field.name - ), - )); - } - - Some(resolved_parent_id) - } - }; - - // Assign fresh IDs immediately, preserving insertion order. - let field = assign_fresh_ids(&pending_field, &mut last_column_id); - - additions_by_parent - .entry(parent_id) - .or_default() - .push(field); - } - - // --- 4. Rebuild the schema tree with additions and deletions --- - let new_fields = rebuild_fields( - base_schema.as_struct().fields(), - &additions_by_parent, - &delete_ids, - None, - ); - - // --- 5. Build the new schema --- - let schema = Schema::builder() - .with_fields(new_fields) - .with_identifier_field_ids(base_schema.identifier_field_ids()) - .build()?; - - let updates = vec![ - TableUpdate::AddSchema { schema }, - TableUpdate::SetCurrentSchema { schema_id: -1 }, - ]; - - let requirements = vec![TableRequirement::CurrentSchemaIdMatch { - current_schema_id: base_schema.schema_id(), - }]; - - Ok(ActionCommit::new(updates, requirements)) + let schema = schema_update(table.current_schema_ref(), &self.operations)?; + let current_schema_id = table.metadata().current_schema_id(); + let last_column_id = table.metadata().last_column_id(); + Ok(ActionCommit::new( + vec![ + TableUpdate::AddSchema { + schema: Arc::unwrap_or_clone(schema), + }, + TableUpdate::SetCurrentSchema { schema_id: -1 }, + ], + vec![ + TableRequirement::CurrentSchemaIdMatch { current_schema_id }, + TableRequirement::LastAssignedFieldIdMatch { + last_assigned_field_id: last_column_id, + }, + ], + )) } } #[cfg(test)] mod tests { - use std::io::BufReader; - use std::sync::Arc; - - use as_any::Downcast; + use std::sync::{Arc, LazyLock}; use crate::spec::{ - DEFAULT_SCHEMA_ID, Literal, NestedField, PrimitiveType, StructType, TableMetadata, Type, + AddColumn, DeleteColumn, ListType, MapType, MoveColumn, NestedField, PrimitiveType, + RenameColumn, Schema, SchemaOperation, StructType, UpdateColumn, schema_update, }; - use crate::table::Table; - use crate::transaction::Transaction; - use crate::transaction::action::{ApplyTransactionAction, TransactionAction}; - use crate::transaction::tests::make_v2_table; - use crate::transaction::update_schema::{AddColumn, DEFAULT_FIELD_ID, UpdateSchemaAction}; - use crate::{ErrorKind, TableIdent, TableRequirement, TableUpdate}; - - // The V2 test table has: - // last_column_id: 3 - // current schema (id=1): x(1, req, long), y(2, req, long), z(3, req, long) - // identifier_field_ids: [1, 2] - /// Build a V2 test table that includes nested types: + /// Build a schema with top-level and nested fields to exercise all operations. /// - /// last_column_id: 14 - /// current schema (id=0): - /// x(1, req, long) -- identifier - /// y(2, req, long) -- identifier - /// z(3, req, long) - /// person(4, opt, struct) - /// name(5, opt, string) - /// age(6, req, int) - /// tags(7, opt, list) - /// element(8, req, struct) - /// key(9, opt, string) - /// value(10, opt, string) - /// props(11, opt, map) - /// key(12, req, string) - /// value(13, req, struct) - /// data(14, opt, string) - fn make_v2_table_with_nested() -> Table { - let json = r#"{ - "format-version": 2, - "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c2", - "location": "s3://bucket/test/location", - "last-sequence-number": 0, - "last-updated-ms": 1602638573590, - "last-column-id": 14, - "current-schema-id": 0, - "schemas": [ - { - "type": "struct", - "schema-id": 0, - "identifier-field-ids": [1, 2], - "fields": [ - {"id": 1, "name": "x", "required": true, "type": "long"}, - {"id": 2, "name": "y", "required": true, "type": "long"}, - {"id": 3, "name": "z", "required": true, "type": "long"}, - {"id": 4, "name": "person", "required": false, "type": { - "type": "struct", - "fields": [ - {"id": 5, "name": "name", "required": false, "type": "string"}, - {"id": 6, "name": "age", "required": true, "type": "int"} - ] - }}, - {"id": 7, "name": "tags", "required": false, "type": { - "type": "list", - "element-id": 8, - "element": { - "type": "struct", - "fields": [ - {"id": 9, "name": "key", "required": false, "type": "string"}, - {"id": 10, "name": "value", "required": false, "type": "string"} - ] - }, - "element-required": true - }}, - {"id": 11, "name": "props", "required": false, "type": { - "type": "map", - "key-id": 12, - "key": "string", - "value-id": 13, - "value": { - "type": "struct", - "fields": [ - {"id": 14, "name": "data", "required": false, "type": "string"} - ] - }, - "value-required": true - }} - ] - } - ], - "default-spec-id": 0, - "partition-specs": [ - {"spec-id": 0, "fields": []} - ], - "last-partition-id": 999, - "default-sort-order-id": 0, - "sort-orders": [ - {"order-id": 0, "fields": []} - ], - "properties": {}, - "current-snapshot-id": -1, - "snapshots": [] - }"#; - - let reader = BufReader::new(json.as_bytes()); - let metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); - - Table::builder() - .metadata(metadata) - .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) - .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) - .file_io(crate::io::FileIO::new_with_memory()) - .build() - .unwrap() - } - - // ----------------------------------------------------------------------- - // Existing root-level tests - // ----------------------------------------------------------------------- - - #[tokio::test] - async fn test_add_column() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - - let action = tx.update_schema().add_column(AddColumn::optional( - "new_col", - Type::Primitive(PrimitiveType::Int), - )); - - let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); - let updates = action_commit.take_updates(); - let requirements = action_commit.take_requirements(); - - assert_eq!(updates.len(), 2); - - // Extract the new schema from the AddSchema update. - let new_schema = match &updates[0] { - TableUpdate::AddSchema { schema } => schema, - other => panic!("expected AddSchema, got {other:?}"), - }; - - let expected_schema = table - .metadata() - .current_schema() - .as_ref() - .clone() - .into_builder() - .with_schema_id(DEFAULT_SCHEMA_ID) - .with_fields([ - NestedField::optional(4, "new_col", Type::Primitive(PrimitiveType::Int)).into(), + /// Schema: + /// 1 id required int + /// 2 name optional string + /// 3 address optional struct + /// 4 street required string + /// 5 city optional string + /// 6 zip optional int + /// 7 tags optional list (element id 8) + /// 9 metrics optional map (key 10, value 11) + static SCHEMA: LazyLock = LazyLock::new(|| { + Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", PrimitiveType::Int.into()).into(), + NestedField::optional(2, "name", PrimitiveType::String.into()).into(), + NestedField::optional( + 3, + "address", + StructType::new(vec![ + NestedField::required(4, "street", PrimitiveType::String.into()).into(), + NestedField::optional(5, "city", PrimitiveType::String.into()).into(), + NestedField::optional(6, "zip", PrimitiveType::Int.into()).into(), + ]) + .into(), + ) + .into(), + NestedField::optional( + 7, + "tags", + ListType::optional(8, PrimitiveType::String.into()).into(), + ) + .into(), + NestedField::optional( + 9, + "metrics", + MapType::optional( + 10, + PrimitiveType::String.into(), + 11, + PrimitiveType::Double.into(), + ) + .into(), + ) + .into(), ]) .build() - .unwrap(); - assert_eq!(new_schema, &expected_schema); - - assert_eq!(updates[1], TableUpdate::SetCurrentSchema { schema_id: -1 }); - - // Verify requirement. - assert_eq!(requirements.len(), 1); - assert_eq!(requirements[0], TableRequirement::CurrentSchemaIdMatch { - current_schema_id: table.metadata().current_schema().schema_id() - }); - } - - #[tokio::test] - async fn test_add_column_with_doc() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - - let action = tx.update_schema().add_column( - AddColumn::builder() - .name("documented_col") - .field_type(Type::Primitive(PrimitiveType::String)) - .doc("A documented column") - .build(), - ); - - let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); - let updates = action_commit.take_updates(); - - let new_schema = match &updates[0] { - TableUpdate::AddSchema { schema } => schema, - other => panic!("expected AddSchema, got {other:?}"), - }; - - let field = new_schema - .field_by_name("documented_col") - .expect("documented_col should exist"); - assert_eq!(field.id, 4); - assert!(!field.required); - assert_eq!(field.doc.as_deref(), Some("A documented column")); - } - - #[tokio::test] - async fn test_add_required_column_with_initial_default() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - - let action = tx.update_schema().add_column(AddColumn::required( - "req_col", - Type::Primitive(PrimitiveType::Int), - Literal::int(0), - )); - - let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); - let updates = action_commit.take_updates(); - - let new_schema = match &updates[0] { - TableUpdate::AddSchema { schema } => schema, - other => panic!("expected AddSchema, got {other:?}"), - }; - - let field = new_schema - .field_by_name("req_col") - .expect("req_col should exist"); - assert_eq!(field.id, 4); - assert!(field.required); - assert_eq!(field.initial_default, Some(Literal::int(0))); - assert_eq!(field.write_default, Some(Literal::int(0))); - } - - #[tokio::test] - async fn test_add_column_name_conflict_fails() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - - // "x" already exists in the V2 test schema. - let action = tx.update_schema().add_column(AddColumn::optional( - "x", - Type::Primitive(PrimitiveType::Int), - )); - - let result = Arc::new(action).commit(&table).await; - let err = match result { - Err(e) => e, - Ok(_) => panic!("should reject adding a column with an existing name"), - }; - assert_eq!(err.kind(), ErrorKind::PreconditionFailed); - assert!( - err.message().contains("already exists"), - "error should mention name conflict, got: {}", - err.message() - ); - } - - #[tokio::test] - async fn test_delete_column() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - - // z is not an identifier field, so we can delete it. - let action = tx.update_schema().delete_column("z"); - - let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); - let updates = action_commit.take_updates(); - - let new_schema = match &updates[0] { - TableUpdate::AddSchema { schema } => schema, - other => panic!("expected AddSchema, got {other:?}"), - }; - - assert!( - new_schema.field_by_name("z").is_none(), - "z should be deleted" - ); - assert!(new_schema.field_by_name("x").is_some()); - assert!(new_schema.field_by_name("y").is_some()); - } - - #[tokio::test] - async fn test_delete_missing_column_fails() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - - let action = tx.update_schema().delete_column("nonexistent"); - - let result = Arc::new(action).commit(&table).await; - let err = match result { - Err(e) => e, - Ok(_) => panic!("should reject deleting a non-existent column"), - }; - assert_eq!(err.kind(), ErrorKind::PreconditionFailed); - assert!( - err.message().contains("nonexistent"), - "error should mention the missing column, got: {}", - err.message() - ); - } - - #[tokio::test] - async fn test_add_and_delete_combined() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - - // Delete z, add a new column. - let action = tx - .update_schema() - .delete_column("z") - .add_column(AddColumn::optional( - "w", - Type::Primitive(PrimitiveType::Boolean), - )); - - let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); - let updates = action_commit.take_updates(); - - let new_schema = match &updates[0] { - TableUpdate::AddSchema { schema } => schema, - other => panic!("expected AddSchema, got {other:?}"), - }; - - assert!( - new_schema.field_by_name("z").is_none(), - "z should be deleted" - ); - let w = new_schema.field_by_name("w").expect("w should exist"); - assert_eq!(w.id, 4); - assert!(!w.required); - } - - #[tokio::test] - async fn test_delete_and_readd_same_name() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - - // Delete z, then add a new column named z -- should succeed. - let action = tx - .update_schema() - .delete_column("z") - .add_column(AddColumn::optional( - "z", - Type::Primitive(PrimitiveType::Boolean), - )); - - let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); - let updates = action_commit.take_updates(); - - let new_schema = match &updates[0] { - TableUpdate::AddSchema { schema } => schema, - other => panic!("expected AddSchema, got {other:?}"), - }; - - let z = new_schema - .field_by_name("z") - .expect("z should exist with new type"); - assert_eq!(z.id, 4); // new ID, not the old 3 - assert_eq!(*z.field_type, Type::Primitive(PrimitiveType::Boolean)); - } + .unwrap() + }); + /// A complex schema evolution test exercising add, rename, update, move, and delete + /// on both top-level and nested fields in a single schema_update call. + /// + /// Operations applied (in order): + /// 1. Add top-level "score" (optional float) + /// 2. Add nested "address.country" (optional string) + /// 3. Rename "name" → "full_name" + /// 4. Rename nested "address.city" → "town" + /// 5. Update "address.zip" doc to "postal code" + /// 6. Delete "tags" + /// 7. Move "id" after "name" (so order becomes: full_name, id, ...) + /// 8. Move nested "address.street" after "address.city" + /// + /// Expected result: + /// 2 full_name optional string + /// 1 id required int + /// 3 address optional struct + /// 5 town optional string + /// 4 street required string + /// 6 zip optional int doc="postal code" + /// 13 country optional string + /// 9 metrics optional map + /// 12 score optional float #[test] - fn test_apply() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - - let tx = tx - .update_schema() - .add_column(AddColumn::optional( - "new_col", - Type::Primitive(PrimitiveType::Int), - )) - .apply(tx) - .unwrap(); - - assert_eq!(tx.actions.len(), 1); - (*tx.actions[0]) - .downcast_ref::() - .expect("UpdateSchemaAction was not applied to Transaction!"); - } - - // ----------------------------------------------------------------------- - // Nested add tests - // ----------------------------------------------------------------------- - - #[tokio::test] - async fn test_add_column_to_struct() { - let table = make_v2_table_with_nested(); - let tx = Transaction::new(&table); - - // Add "email" to the "person" struct. - let action = tx.update_schema().add_column( - AddColumn::builder() - .name("email") - .field_type(Type::Primitive(PrimitiveType::String)) - .parent("person") - .build(), - ); - - let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); - let updates = action_commit.take_updates(); - - let new_schema = match &updates[0] { - TableUpdate::AddSchema { schema } => schema, - other => panic!("expected AddSchema, got {other:?}"), - }; - - // "email" should be nested under "person" with ID = last_column_id + 1 = 15. - let email = new_schema - .field_by_name("person.email") - .expect("person.email should exist"); - assert_eq!(email.id, 15); - assert!(!email.required); - assert_eq!(*email.field_type, Type::Primitive(PrimitiveType::String)); - - // Original nested fields should still be there. - assert!(new_schema.field_by_name("person.name").is_some()); - assert!(new_schema.field_by_name("person.age").is_some()); - } - - #[tokio::test] - async fn test_add_column_to_struct_with_doc() { - let table = make_v2_table_with_nested(); - let tx = Transaction::new(&table); - - let action = tx.update_schema().add_column( - AddColumn::builder() - .name("phone") - .field_type(Type::Primitive(PrimitiveType::String)) - .parent("person") - .doc("Phone number") - .build(), - ); - - let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); - let updates = action_commit.take_updates(); - - let new_schema = match &updates[0] { - TableUpdate::AddSchema { schema } => schema, - other => panic!("expected AddSchema, got {other:?}"), - }; - - let phone = new_schema - .field_by_name("person.phone") - .expect("person.phone should exist"); - assert_eq!(phone.id, 15); - assert_eq!(phone.doc.as_deref(), Some("Phone number")); - } - - #[tokio::test] - async fn test_add_column_to_list_element_struct() { - let table = make_v2_table_with_nested(); - let tx = Transaction::new(&table); - - // "tags" is a list. Adding to the list navigates to its - // element struct automatically. - let action = tx.update_schema().add_column( - AddColumn::builder() - .name("score") - .field_type(Type::Primitive(PrimitiveType::Double)) - .parent("tags") - .build(), - ); - - let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); - let updates = action_commit.take_updates(); - - let new_schema = match &updates[0] { - TableUpdate::AddSchema { schema } => schema, - other => panic!("expected AddSchema, got {other:?}"), - }; - - // The list element struct should now contain "score". - let score = new_schema - .field_by_name("tags.element.score") - .expect("tags.element.score should exist"); - assert_eq!(score.id, 15); - assert!(!score.required); - - // Existing fields preserved. - assert!(new_schema.field_by_name("tags.element.key").is_some()); - assert!(new_schema.field_by_name("tags.element.value").is_some()); - } - - #[tokio::test] - async fn test_add_column_to_map_value_struct() { - let table = make_v2_table_with_nested(); - let tx = Transaction::new(&table); - - // "props" is a map. Adding to the map navigates to its - // value struct automatically. - let action = tx.update_schema().add_column( - AddColumn::builder() - .name("version") - .field_type(Type::Primitive(PrimitiveType::Int)) - .parent("props") - .build(), - ); - - let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); - let updates = action_commit.take_updates(); - - let new_schema = match &updates[0] { - TableUpdate::AddSchema { schema } => schema, - other => panic!("expected AddSchema, got {other:?}"), - }; - - let version = new_schema - .field_by_name("props.value.version") - .expect("props.value.version should exist"); - assert_eq!(version.id, 15); - - // Existing map value fields preserved. - assert!(new_schema.field_by_name("props.value.data").is_some()); - } - - #[tokio::test] - async fn test_add_column_to_nonexistent_parent_fails() { - let table = make_v2_table_with_nested(); - let tx = Transaction::new(&table); - - let action = tx.update_schema().add_column( - AddColumn::builder() - .name("col") - .field_type(Type::Primitive(PrimitiveType::Int)) - .parent("nonexistent") - .build(), - ); - - let err = match Arc::new(action).commit(&table).await { - Err(e) => e, - Ok(_) => panic!("should reject adding to a nonexistent parent"), - }; - assert_eq!(err.kind(), ErrorKind::PreconditionFailed); - assert!( - err.message().contains("nonexistent"), - "error should mention the missing parent, got: {}", - err.message() - ); - } - - #[tokio::test] - async fn test_add_column_to_primitive_parent_fails() { - let table = make_v2_table_with_nested(); - let tx = Transaction::new(&table); - - // "x" is a primitive (long), not a struct. - let action = tx.update_schema().add_column( - AddColumn::builder() - .name("col") - .field_type(Type::Primitive(PrimitiveType::Int)) - .parent("x") - .build(), - ); - - let err = match Arc::new(action).commit(&table).await { - Err(e) => e, - Ok(_) => panic!("should reject adding to a primitive parent"), - }; - assert_eq!(err.kind(), ErrorKind::PreconditionFailed); - assert!( - err.message().contains("not a struct"), - "error should mention type mismatch, got: {}", - err.message() - ); - } - - #[tokio::test] - async fn test_add_column_to_nested_name_conflict_fails() { - let table = make_v2_table_with_nested(); - let tx = Transaction::new(&table); - - // "name" already exists in the "person" struct. - let action = tx.update_schema().add_column( - AddColumn::builder() - .name("name") - .field_type(Type::Primitive(PrimitiveType::String)) - .parent("person") - .build(), - ); - - let err = match Arc::new(action).commit(&table).await { - Err(e) => e, - Ok(_) => panic!("should reject adding a column with conflicting name"), - }; - assert_eq!(err.kind(), ErrorKind::PreconditionFailed); - assert!( - err.message().contains("already exists"), - "error should mention name conflict, got: {}", - err.message() - ); - } - - #[tokio::test] - async fn test_root_and_nested_add_combined() { - let table = make_v2_table_with_nested(); - let tx = Transaction::new(&table); - - // Add a root column and a nested column in the same action. - let action = tx - .update_schema() - .add_column(AddColumn::optional( - "root_col", - Type::Primitive(PrimitiveType::Boolean), - )) - .add_column( + fn complex_schema_evolution() { + let schema = Arc::new(SCHEMA.clone()); + let result = schema_update(schema, &[ + // 1. Add top-level "score" + SchemaOperation::Add( AddColumn::builder() - .name("email") - .field_type(Type::Primitive(PrimitiveType::String)) - .parent("person") + .name("score") + .r#type(PrimitiveType::Float.into()) .build(), - ); - - let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); - let updates = action_commit.take_updates(); - - let new_schema = match &updates[0] { - TableUpdate::AddSchema { schema } => schema, - other => panic!("expected AddSchema, got {other:?}"), - }; - - // Root column gets the first fresh ID. - let root_col = new_schema - .field_by_name("root_col") - .expect("root_col should exist"); - assert_eq!(root_col.id, 15); - - // Nested column gets the next ID. - let email = new_schema - .field_by_name("person.email") - .expect("person.email should exist"); - assert_eq!(email.id, 16); - } - - #[tokio::test] - async fn test_add_nested_struct_type_with_fresh_ids() { - // Adding a new column whose TYPE contains nested fields (e.g. a struct column). All sub-fields must receive - // fresh IDs, not placeholder `DEFAULT_FIELD_ID`. - let table = make_v2_table(); - let tx = Transaction::new(&table); - - let action = tx.update_schema().add_column(AddColumn::optional( - "address", - Type::Struct(StructType::new(vec![ + ), + // 2. Add nested "address.country" + SchemaOperation::Add( + AddColumn::builder() + .parent("address".to_string()) + .name("country") + .r#type(PrimitiveType::String.into()) + .build(), + ), + // 3. Rename "name" → "full_name" + SchemaOperation::Rename( + RenameColumn::builder() + .name("name") + .new_name("full_name") + .build(), + ), + // 4. Rename nested "address.city" → "town" + SchemaOperation::Rename( + RenameColumn::builder() + .name("address.city") + .new_name("town") + .build(), + ), + // 5. Update "address.zip" doc + SchemaOperation::Update( + UpdateColumn::builder() + .name("address.zip") + .new_doc(Some("postal code".into())) + .build(), + ), + // 6. Delete "tags" + SchemaOperation::Delete(DeleteColumn::new("tags")), + // 7. Move "id" after "name" + SchemaOperation::Move(MoveColumn::after("id", "name")), + // 8. Move nested "address.street" after "address.city" + SchemaOperation::Move(MoveColumn::after("address.street", "address.city")), + ]) + .unwrap(); + + let expected = Schema::builder() + .with_fields(vec![ + NestedField::optional(2, "full_name", PrimitiveType::String.into()).into(), + NestedField::required(1, "id", PrimitiveType::Int.into()).into(), NestedField::optional( - DEFAULT_FIELD_ID, - "street", - Type::Primitive(PrimitiveType::String), + 3, + "address", + StructType::new(vec![ + NestedField::optional(5, "town", PrimitiveType::String.into()).into(), + NestedField::required(4, "street", PrimitiveType::String.into()).into(), + NestedField::optional(6, "zip", PrimitiveType::Int.into()) + .with_doc("postal code") + .into(), + NestedField::optional(13, "country", PrimitiveType::String.into()).into(), + ]) + .into(), ) .into(), NestedField::optional( - DEFAULT_FIELD_ID, - "city", - Type::Primitive(PrimitiveType::String), + 9, + "metrics", + MapType::optional( + 10, + PrimitiveType::String.into(), + 11, + PrimitiveType::Double.into(), + ) + .into(), ) .into(), - ])), - )); - - let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); - let updates = action_commit.take_updates(); - - let new_schema = match &updates[0] { - TableUpdate::AddSchema { schema } => schema, - other => panic!("expected AddSchema, got {other:?}"), - }; - - // "address" gets ID 4 (last_column_id=3, +1). - let address = new_schema - .field_by_name("address") - .expect("address should exist"); - assert_eq!(address.id, 4); - - // Sub-fields get IDs 5 and 6. - let street = new_schema - .field_by_name("address.street") - .expect("address.street should exist"); - assert_eq!(street.id, 5); + NestedField::optional(12, "score", PrimitiveType::Float.into()).into(), + ]) + .build() + .unwrap(); - let city = new_schema - .field_by_name("address.city") - .expect("address.city should exist"); - assert_eq!(city.id, 6); + assert_eq!(result.as_struct(), expected.as_struct()); } }