@@ -14,7 +14,7 @@ use crate::{
1414 error:: Error ,
1515 table:: {
1616 delete_all_table_files,
17- transaction:: { operation:: Operation as TableOperation , APPEND_KEY , REPLACE_KEY } ,
17+ transaction:: { operation:: Operation as TableOperation , APPEND_INDEX , REPLACE_INDEX } ,
1818 } ,
1919 view:: transaction:: operation:: Operation as ViewOperation ,
2020} ;
@@ -25,7 +25,7 @@ use super::MaterializedView;
2525pub struct Transaction < ' view > {
2626 materialized_view : & ' view mut MaterializedView ,
2727 view_operations : Vec < ViewOperation > ,
28- storage_table_operations : HashMap < String , TableOperation > ,
28+ storage_table_operations : Vec < Option < TableOperation > > ,
2929 branch : Option < String > ,
3030}
3131
@@ -35,7 +35,7 @@ impl<'view> Transaction<'view> {
3535 Transaction {
3636 materialized_view : view,
3737 view_operations : vec ! [ ] ,
38- storage_table_operations : HashMap :: new ( ) ,
38+ storage_table_operations : ( 0 .. 6 ) . map ( |_| None ) . collect ( ) , // 6 operation types
3939 branch : branch. map ( ToString :: to_string) ,
4040 }
4141 }
@@ -69,30 +69,29 @@ impl<'view> Transaction<'view> {
6969 refresh_state : RefreshState ,
7070 ) -> Result < Self , Error > {
7171 let refresh_state = serde_json:: to_string ( & refresh_state) ?;
72- self . storage_table_operations
73- . entry ( REPLACE_KEY . to_owned ( ) )
74- . and_modify ( |mut x| {
75- if let TableOperation :: Replace {
76- branch : _,
77- files : old,
78- additional_summary : old_lineage,
79- } = & mut x
80- {
81- old. extend_from_slice ( & files) ;
82- * old_lineage = Some ( HashMap :: from_iter ( vec ! [ (
83- REFRESH_STATE . to_owned( ) ,
84- refresh_state. clone( ) ,
85- ) ] ) ) ;
86- }
87- } )
88- . or_insert ( TableOperation :: Replace {
72+ if let Some ( ref mut operation) = self . storage_table_operations [ REPLACE_INDEX ] {
73+ if let TableOperation :: Replace {
74+ branch : _,
75+ files : old,
76+ additional_summary : old_lineage,
77+ } = operation
78+ {
79+ old. extend_from_slice ( & files) ;
80+ * old_lineage = Some ( HashMap :: from_iter ( vec ! [ (
81+ REFRESH_STATE . to_owned( ) ,
82+ refresh_state. clone( ) ,
83+ ) ] ) ) ;
84+ }
85+ } else {
86+ self . storage_table_operations [ REPLACE_INDEX ] = Some ( TableOperation :: Replace {
8987 branch : self . branch . clone ( ) ,
9088 files,
9189 additional_summary : Some ( HashMap :: from_iter ( vec ! [ (
9290 REFRESH_STATE . to_owned( ) ,
9391 refresh_state,
9492 ) ] ) ) ,
9593 } ) ;
94+ }
9695 Ok ( self )
9796 }
9897
@@ -103,24 +102,22 @@ impl<'view> Transaction<'view> {
103102 refresh_state : RefreshState ,
104103 ) -> Result < Self , Error > {
105104 let refresh_state = serde_json:: to_string ( & refresh_state) ?;
106- self . storage_table_operations
107- . entry ( APPEND_KEY . to_owned ( ) )
108- . and_modify ( |mut x| {
109- if let TableOperation :: Append {
110- branch : _,
111- data_files : old,
112- delete_files : _,
113- additional_summary : old_lineage,
114- } = & mut x
115- {
116- old. extend_from_slice ( & files) ;
117- * old_lineage = Some ( HashMap :: from_iter ( vec ! [ (
118- REFRESH_STATE . to_owned( ) ,
119- refresh_state. clone( ) ,
120- ) ] ) ) ;
121- }
122- } )
123- . or_insert ( TableOperation :: Append {
105+ if let Some ( ref mut operation) = self . storage_table_operations [ APPEND_INDEX ] {
106+ if let TableOperation :: Append {
107+ branch : _,
108+ data_files : old,
109+ delete_files : _,
110+ additional_summary : old_lineage,
111+ } = operation
112+ {
113+ old. extend_from_slice ( & files) ;
114+ * old_lineage = Some ( HashMap :: from_iter ( vec ! [ (
115+ REFRESH_STATE . to_owned( ) ,
116+ refresh_state. clone( ) ,
117+ ) ] ) ) ;
118+ }
119+ } else {
120+ self . storage_table_operations [ APPEND_INDEX ] = Some ( TableOperation :: Append {
124121 branch : self . branch . clone ( ) ,
125122 data_files : files,
126123 delete_files : Vec :: new ( ) ,
@@ -129,6 +126,7 @@ impl<'view> Transaction<'view> {
129126 refresh_state,
130127 ) ] ) ) ,
131128 } ) ;
129+ }
132130 Ok ( self )
133131 }
134132
@@ -139,24 +137,22 @@ impl<'view> Transaction<'view> {
139137 refresh_state : RefreshState ,
140138 ) -> Result < Self , Error > {
141139 let refresh_state = serde_json:: to_string ( & refresh_state) ?;
142- self . storage_table_operations
143- . entry ( APPEND_KEY . to_owned ( ) )
144- . and_modify ( |mut x| {
145- if let TableOperation :: Append {
146- branch : _,
147- data_files : _,
148- delete_files : old,
149- additional_summary : old_lineage,
150- } = & mut x
151- {
152- old. extend_from_slice ( & files) ;
153- * old_lineage = Some ( HashMap :: from_iter ( vec ! [ (
154- REFRESH_STATE . to_owned( ) ,
155- refresh_state. clone( ) ,
156- ) ] ) ) ;
157- }
158- } )
159- . or_insert ( TableOperation :: Append {
140+ if let Some ( ref mut operation) = self . storage_table_operations [ APPEND_INDEX ] {
141+ if let TableOperation :: Append {
142+ branch : _,
143+ data_files : _,
144+ delete_files : old,
145+ additional_summary : old_lineage,
146+ } = operation
147+ {
148+ old. extend_from_slice ( & files) ;
149+ * old_lineage = Some ( HashMap :: from_iter ( vec ! [ (
150+ REFRESH_STATE . to_owned( ) ,
151+ refresh_state. clone( ) ,
152+ ) ] ) ) ;
153+ }
154+ } else {
155+ self . storage_table_operations [ APPEND_INDEX ] = Some ( TableOperation :: Append {
160156 branch : self . branch . clone ( ) ,
161157 data_files : Vec :: new ( ) ,
162158 delete_files : files,
@@ -165,6 +161,7 @@ impl<'view> Transaction<'view> {
165161 refresh_state,
166162 ) ] ) ) ,
167163 } ) ;
164+ }
168165 Ok ( self )
169166 }
170167
@@ -182,7 +179,8 @@ impl<'view> Transaction<'view> {
182179 // Save old metadata to be able to remove old data after a rewrite operation
183180 let delete_data = if self
184181 . storage_table_operations
185- . values ( )
182+ . iter ( )
183+ . flatten ( )
186184 . any ( |x| matches ! ( x, TableOperation :: Replace { .. } ) )
187185 {
188186 Some ( storage_table. metadata ( ) . clone ( ) )
@@ -191,7 +189,7 @@ impl<'view> Transaction<'view> {
191189 } ;
192190
193191 // Execute table operations
194- for operation in self . storage_table_operations . into_values ( ) {
192+ for operation in self . storage_table_operations . into_iter ( ) . flatten ( ) {
195193 let ( requirement, update) = operation
196194 . execute ( storage_table. metadata ( ) , storage_table. object_store ( ) )
197195 . await ?;
0 commit comments