Skip to content

Commit 0753844

Browse files
committed
remove object_store from matview
1 parent 9f2dbb5 commit 0753844

7 files changed

Lines changed: 21 additions & 120 deletions

File tree

catalogs/iceberg-file-catalog/src/lib.rs

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -183,13 +183,7 @@ impl Catalog for FileCatalog {
183183
View::new(identifier.clone(), self.clone(), metadata).await?,
184184
)),
185185
TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView(
186-
MaterializedView::new(
187-
identifier.clone(),
188-
self.clone(),
189-
object_store.clone(),
190-
metadata,
191-
)
192-
.await?,
186+
MaterializedView::new(identifier.clone(), self.clone(), metadata).await?,
193187
)),
194188
}
195189
}
@@ -319,13 +313,7 @@ impl Catalog for FileCatalog {
319313
(metadata_location.clone(), metadata.clone().into()),
320314
);
321315

322-
Ok(MaterializedView::new(
323-
identifier.clone(),
324-
self.clone(),
325-
object_store.clone(),
326-
metadata,
327-
)
328-
.await?)
316+
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
329317
}
330318

331319
async fn update_table(self: Arc<Self>, commit: CommitTable) -> Result<Table, IcebergError> {
@@ -505,13 +493,7 @@ impl Catalog for FileCatalog {
505493
(metadata_location.clone(), metadata.clone()),
506494
);
507495
if let TabularMetadata::MaterializedView(metadata) = metadata {
508-
Ok(MaterializedView::new(
509-
identifier.clone(),
510-
self.clone(),
511-
object_store.clone(),
512-
metadata,
513-
)
514-
.await?)
496+
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
515497
} else {
516498
Err(IcebergError::InvalidFormat(
517499
"Entity is not a materialized view".to_owned(),

catalogs/iceberg-glue-catalog/src/lib.rs

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -288,13 +288,7 @@ impl Catalog for GlueCatalog {
288288
View::new(identifier.clone(), self.clone(), metadata).await?,
289289
)),
290290
TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView(
291-
MaterializedView::new(
292-
identifier.clone(),
293-
self.clone(),
294-
object_store.clone(),
295-
metadata,
296-
)
297-
.await?,
291+
MaterializedView::new(identifier.clone(), self.clone(), metadata).await?,
298292
)),
299293
}
300294
}
@@ -545,13 +539,7 @@ impl Catalog for GlueCatalog {
545539
metadata.clone().into(),
546540
),
547541
);
548-
Ok(MaterializedView::new(
549-
identifier.clone(),
550-
self.clone(),
551-
object_store.clone(),
552-
metadata,
553-
)
554-
.await?)
542+
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
555543
}
556544

557545
async fn update_table(self: Arc<Self>, commit: CommitTable) -> Result<Table, IcebergError> {
@@ -879,13 +867,7 @@ impl Catalog for GlueCatalog {
879867
.unwrap()
880868
.insert(identifier.clone(), (version_id, metadata.clone()));
881869
if let TabularMetadata::MaterializedView(metadata) = metadata {
882-
Ok(MaterializedView::new(
883-
identifier.clone(),
884-
self.clone(),
885-
object_store.clone(),
886-
metadata,
887-
)
888-
.await?)
870+
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
889871
} else {
890872
Err(IcebergError::InvalidFormat(
891873
"Entity is not a materialized view".to_owned(),

catalogs/iceberg-rest-catalog/src/catalog.rs

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -271,15 +271,9 @@ impl Catalog for RestCatalog {
271271
Ok(TabularMetadata::View(view)) => Ok(Tabular::View(
272272
View::new(identifier.clone(), self.clone(), view).await?,
273273
)),
274-
Ok(TabularMetadata::MaterializedView(matview)) => {
275-
let object_store = self
276-
.object_store_builder
277-
.build(Bucket::from_path(&matview.location)?)?;
278-
Ok(Tabular::MaterializedView(
279-
MaterializedView::new(identifier.clone(), self.clone(), object_store, matview)
280-
.await?,
281-
))
282-
}
274+
Ok(TabularMetadata::MaterializedView(matview)) => Ok(Tabular::MaterializedView(
275+
MaterializedView::new(identifier.clone(), self.clone(), matview).await?,
276+
)),
283277
Err(apis::Error::ResponseError(content)) => {
284278
if content.status == 404 {
285279
let table_metadata = catalog_api_api::load_table(
@@ -444,10 +438,7 @@ impl Catalog for RestCatalog {
444438
let clone = self.clone();
445439
async move {
446440
if let TabularMetadata::MaterializedView(metadata) = response.metadata {
447-
let object_store = clone
448-
.object_store_builder
449-
.build(Bucket::from_path(&metadata.location)?)?;
450-
MaterializedView::new(identifier.clone(), clone, object_store, metadata).await
441+
MaterializedView::new(identifier.clone(), clone, metadata).await
451442
} else {
452443
Err(Error::InvalidFormat(
453444
"Create materialzied view didn't return materialized view metadata."
@@ -476,10 +467,7 @@ impl Catalog for RestCatalog {
476467
let identifier = identifier.clone();
477468
async move {
478469
if let TabularMetadata::MaterializedView(metadata) = response.metadata {
479-
let object_store = clone
480-
.object_store_builder
481-
.build(Bucket::from_path(&metadata.location)?)?;
482-
MaterializedView::new(identifier.clone(), clone, object_store, metadata).await
470+
MaterializedView::new(identifier.clone(), clone, metadata).await
483471
} else {
484472
Err(Error::InvalidFormat(
485473
"Create materialzied view didn't return materialized view metadata."

catalogs/iceberg-s3tables-catalog/src/lib.rs

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -274,13 +274,7 @@ impl Catalog for S3TablesCatalog {
274274
View::new(identifier.clone(), self.clone(), metadata).await?,
275275
)),
276276
TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView(
277-
MaterializedView::new(
278-
identifier.clone(),
279-
self.clone(),
280-
object_store.clone(),
281-
metadata,
282-
)
283-
.await?,
277+
MaterializedView::new(identifier.clone(), self.clone(), metadata).await?,
284278
)),
285279
}
286280
}
@@ -530,13 +524,7 @@ impl Catalog for S3TablesCatalog {
530524
identifier.clone(),
531525
(view.version_token, metadata.clone().into()),
532526
);
533-
Ok(MaterializedView::new(
534-
identifier.clone(),
535-
self.clone(),
536-
object_store.clone(),
537-
metadata,
538-
)
539-
.await?)
527+
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
540528
}
541529

542530
async fn update_table(self: Arc<Self>, commit: CommitTable) -> Result<Table, IcebergError> {
@@ -767,13 +755,7 @@ impl Catalog for S3TablesCatalog {
767755
.unwrap()
768756
.insert(identifier.clone(), (version_token, metadata.clone()));
769757
if let TabularMetadata::MaterializedView(metadata) = metadata {
770-
Ok(MaterializedView::new(
771-
identifier.clone(),
772-
self.clone(),
773-
object_store.clone(),
774-
metadata,
775-
)
776-
.await?)
758+
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
777759
} else {
778760
Err(IcebergError::InvalidFormat(
779761
"Entity is not a materialized view".to_owned(),

catalogs/iceberg-sql-catalog/src/lib.rs

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -310,13 +310,7 @@ impl Catalog for SqlCatalog {
310310
View::new(identifier.clone(), self.clone(), metadata).await?,
311311
)),
312312
TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView(
313-
MaterializedView::new(
314-
identifier.clone(),
315-
self.clone(),
316-
object_store.clone(),
317-
metadata,
318-
)
319-
.await?,
313+
MaterializedView::new(identifier.clone(), self.clone(), metadata).await?,
320314
)),
321315
}
322316
}
@@ -444,13 +438,7 @@ impl Catalog for SqlCatalog {
444438
identifier.clone(),
445439
(metadata_location.clone(), metadata.clone().into()),
446440
);
447-
Ok(MaterializedView::new(
448-
identifier.clone(),
449-
self.clone(),
450-
object_store.clone(),
451-
metadata,
452-
)
453-
.await?)
441+
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
454442
}
455443

456444
async fn update_table(self: Arc<Self>, commit: CommitTable) -> Result<Table, IcebergError> {
@@ -615,13 +603,7 @@ impl Catalog for SqlCatalog {
615603
(metadata_location.clone(), metadata.clone()),
616604
);
617605
if let TabularMetadata::MaterializedView(metadata) = metadata {
618-
Ok(MaterializedView::new(
619-
identifier.clone(),
620-
self.clone(),
621-
object_store.clone(),
622-
metadata,
623-
)
624-
.await?)
606+
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
625607
} else {
626608
Err(IcebergError::InvalidFormat(
627609
"Entity is not a materialized view".to_owned(),

iceberg-rust/src/materialized_view/mod.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use std::sync::Arc;
3434
use iceberg_rust_spec::spec::{
3535
materialized_view_metadata::MaterializedViewMetadata, schema::Schema,
3636
};
37-
use object_store::ObjectStore;
3837

3938
use crate::{
4039
catalog::{
@@ -69,8 +68,6 @@ pub struct MaterializedView {
6968
metadata: MaterializedViewMetadata,
7069
/// Catalog of the table
7170
catalog: Arc<dyn Catalog>,
72-
/// Object store
73-
object_store: Arc<dyn ObjectStore>,
7471
}
7572

7673
/// Storage table states
@@ -109,13 +106,11 @@ impl MaterializedView {
109106
pub async fn new(
110107
identifier: Identifier,
111108
catalog: Arc<dyn Catalog>,
112-
object_store: Arc<dyn ObjectStore>,
113109
metadata: MaterializedViewMetadata,
114110
) -> Result<Self, Error> {
115111
Ok(MaterializedView {
116112
identifier,
117113
metadata,
118-
object_store,
119114
catalog,
120115
})
121116
}
@@ -133,13 +128,6 @@ impl MaterializedView {
133128
pub fn catalog(&self) -> Arc<dyn Catalog> {
134129
self.catalog.clone()
135130
}
136-
/// Returns the object store used by this materialized view for data storage
137-
///
138-
/// The object store provides access to the underlying storage system (e.g. S3, local filesystem)
139-
/// where the view's data files are stored. The store is configured based on the view's location.
140-
pub fn object_store(&self) -> Arc<dyn ObjectStore> {
141-
self.object_store.clone()
142-
}
143131
/// Returns the current schema for this materialized view
144132
///
145133
/// # Arguments

iceberg-rust/src/materialized_view/transaction/mod.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,11 @@ impl<'view> Transaction<'view> {
174174

175175
let identifier = self.materialized_view.identifier().clone();
176176

177+
let storage_table = self.materialized_view.storage_table().await?;
178+
177179
let delete_data = if !self.storage_table_operations.is_empty() {
178180
let (mut table_requirements, mut table_updates) = (Vec::new(), Vec::new());
179181

180-
let storage_table = self.materialized_view.storage_table().await?;
181-
182182
// Save old metadata to be able to remove old data after a rewrite operation
183183
let delete_data = if self
184184
.storage_table_operations
@@ -193,10 +193,7 @@ impl<'view> Transaction<'view> {
193193
// Execute table operations
194194
for operation in self.storage_table_operations.into_values() {
195195
let (requirement, update) = operation
196-
.execute(
197-
storage_table.metadata(),
198-
self.materialized_view.object_store(),
199-
)
196+
.execute(storage_table.metadata(), storage_table.object_store())
200197
.await?;
201198

202199
if let Some(requirement) = requirement {
@@ -239,7 +236,7 @@ impl<'view> Transaction<'view> {
239236
.await?;
240237
// Delete data files in case of a rewrite operation
241238
if let Some(old_metadata) = delete_data {
242-
delete_all_table_files(&old_metadata, self.materialized_view.object_store()).await?;
239+
delete_all_table_files(&old_metadata, storage_table.object_store()).await?;
243240
}
244241
*self.materialized_view = new_matview;
245242
Ok(())

0 commit comments

Comments
 (0)