@@ -23,9 +23,9 @@ use crate::{
2323/// View operation
2424pub enum Operation {
2525 /// Update vresion
26- UpdateRepresentation {
26+ UpdateRepresentations {
2727 /// Representation to add
28- representation : ViewRepresentation ,
28+ representations : Vec < ViewRepresentation > ,
2929 /// Schema of the representation
3030 schema : StructType ,
3131 /// Branch where to add the representation
@@ -35,19 +35,60 @@ pub enum Operation {
3535 UpdateProperties ( Vec < ( String , String ) > ) ,
3636}
3737
38+ // Tries to preserve dialect order
39+ fn upsert_representation (
40+ current_representations : & [ ViewRepresentation ] ,
41+ new_representation : ViewRepresentation ,
42+ ) -> Vec < ViewRepresentation > {
43+ let ViewRepresentation :: Sql {
44+ dialect : new_dialect,
45+ ..
46+ } = & new_representation;
47+ let mut updated = false ;
48+ let mut representations: Vec < ViewRepresentation > = current_representations
49+ . iter ( )
50+ . map (
51+ |current_representation @ ViewRepresentation :: Sql { dialect, .. } | {
52+ if dialect == new_dialect {
53+ updated = true ;
54+ new_representation. clone ( )
55+ } else {
56+ current_representation. clone ( )
57+ }
58+ } ,
59+ )
60+ . collect ( ) ;
61+ if !updated {
62+ representations. push ( new_representation) ;
63+ }
64+ representations
65+ }
66+
67+ fn upsert_representations (
68+ current_representations : & [ ViewRepresentation ] ,
69+ new_representations : & [ ViewRepresentation ] ,
70+ ) -> Vec < ViewRepresentation > {
71+ let mut representations: Vec < ViewRepresentation > = current_representations. into ( ) ;
72+ for r in new_representations {
73+ representations = upsert_representation ( & representations, r. clone ( ) ) ;
74+ }
75+ representations
76+ }
77+
3878impl Operation {
3979 /// Execute operation
4080 pub async fn execute < T : Materialization > (
4181 self ,
4282 metadata : & GeneralViewMetadata < T > ,
4383 ) -> Result < ( Option < ViewRequirement > , Vec < ViewUpdate < T > > ) , Error > {
4484 match self {
45- Operation :: UpdateRepresentation {
46- representation ,
85+ Operation :: UpdateRepresentations {
86+ representations ,
4787 schema,
4888 branch,
4989 } => {
50- let schema_changed = metadata. current_schema ( branch. as_deref ( ) )
90+ let schema_changed = metadata
91+ . current_schema ( branch. as_deref ( ) )
5192 . map ( |s| schema != * s. fields ( ) )
5293 . unwrap_or ( true ) ;
5394
@@ -56,7 +97,10 @@ impl Operation {
5697 let schema_id = if schema_changed {
5798 metadata. schemas . keys ( ) . max ( ) . unwrap_or ( & 0 ) + 1
5899 } else {
59- * metadata. current_schema ( branch. as_deref ( ) ) . unwrap ( ) . schema_id ( )
100+ * metadata
101+ . current_schema ( branch. as_deref ( ) )
102+ . unwrap ( )
103+ . schema_id ( )
60104 } ;
61105 let last_column_id = schema. iter ( ) . map ( |x| x. id ) . max ( ) . unwrap_or ( 0 ) ;
62106
@@ -68,7 +112,10 @@ impl Operation {
68112 engine_name : None ,
69113 engine_version : None ,
70114 } ,
71- representations : vec ! [ representation] ,
115+ representations : upsert_representations (
116+ version. representations ( ) ,
117+ & representations,
118+ ) ,
72119 default_catalog : version. default_catalog . clone ( ) ,
73120 default_namespace : version. default_namespace . clone ( ) ,
74121 timestamp_ms : SystemTime :: now ( )
@@ -120,3 +167,48 @@ impl Operation {
120167 }
121168 }
122169}
170+
171+ #[ cfg( test) ]
172+ mod tests {
173+ use iceberg_rust_spec:: view_metadata:: ViewRepresentation ;
174+
175+ use crate :: view:: transaction:: operation:: upsert_representations;
176+
177+ #[ test]
178+ fn test_upsert_representations ( ) {
179+ assert_eq ! (
180+ upsert_representations(
181+ & [
182+ ViewRepresentation :: sql( "a1" , Some ( "a" ) ) ,
183+ ViewRepresentation :: sql( "b1" , Some ( "b" ) )
184+ ] ,
185+ & [
186+ ViewRepresentation :: sql( "b2" , Some ( "b" ) ) ,
187+ ViewRepresentation :: sql( "c2" , Some ( "c" ) )
188+ ]
189+ ) ,
190+ vec![
191+ ViewRepresentation :: sql( "a1" , Some ( "a" ) ) ,
192+ ViewRepresentation :: sql( "b2" , Some ( "b" ) ) ,
193+ ViewRepresentation :: sql( "c2" , Some ( "c" ) ) ,
194+ ]
195+ ) ;
196+ assert_eq ! (
197+ upsert_representations(
198+ & [
199+ ViewRepresentation :: sql( "a1" , Some ( "a" ) ) ,
200+ ViewRepresentation :: sql( "b1" , Some ( "b" ) )
201+ ] ,
202+ & [
203+ ViewRepresentation :: sql( "c2" , Some ( "c" ) ) ,
204+ ViewRepresentation :: sql( "a2" , Some ( "a" ) )
205+ ]
206+ ) ,
207+ vec![
208+ ViewRepresentation :: sql( "a2" , Some ( "a" ) ) ,
209+ ViewRepresentation :: sql( "b1" , Some ( "b" ) ) ,
210+ ViewRepresentation :: sql( "c2" , Some ( "c" ) ) ,
211+ ]
212+ ) ;
213+ }
214+ }
0 commit comments