Skip to content

Commit cde5cf1

Browse files
authored
Merge pull request JanKaul#149 from JanKaul/refactor/object_store_in_table
Refactor/object store in table
2 parents bd9c72c + bb192f7 commit cde5cf1

11 files changed

Lines changed: 408 additions & 302 deletions

File tree

catalogs/iceberg-file-catalog/src/lib.rs

Lines changed: 78 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ impl Catalog for FileCatalog {
129129
.common_prefixes
130130
.into_iter()
131131
.map(|x| self.namespace(x.as_ref()))
132-
.collect::<Result<_, IcebergError>>()}
132+
.collect::<Result<_, IcebergError>>()
133+
}
133134
async fn tabular_exists(&self, identifier: &Identifier) -> Result<bool, IcebergError> {
134135
self.metadata_location(identifier)
135136
.await
@@ -170,13 +171,31 @@ impl Catalog for FileCatalog {
170171

171172
match metadata {
172173
TabularMetadata::Table(metadata) => Ok(Tabular::Table(
173-
Table::new(identifier.clone(), self.clone(), metadata).await?,
174+
Table::new(
175+
identifier.clone(),
176+
self.clone(),
177+
object_store.clone(),
178+
metadata,
179+
)
180+
.await?,
174181
)),
175182
TabularMetadata::View(metadata) => Ok(Tabular::View(
176-
View::new(identifier.clone(), self.clone(), metadata).await?,
183+
View::new(
184+
identifier.clone(),
185+
self.clone(),
186+
object_store.clone(),
187+
metadata,
188+
)
189+
.await?,
177190
)),
178191
TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView(
179-
MaterializedView::new(identifier.clone(), self.clone(), metadata).await?,
192+
MaterializedView::new(
193+
identifier.clone(),
194+
self.clone(),
195+
object_store.clone(),
196+
metadata,
197+
)
198+
.await?,
180199
)),
181200
}
182201
}
@@ -199,7 +218,7 @@ impl Catalog for FileCatalog {
199218

200219
// Write metadata to object_store
201220
let bucket = Bucket::from_path(&location)?;
202-
let object_store = self.object_store(bucket);
221+
let object_store = self.default_object_store(bucket);
203222

204223
let metadata_location = location + "/metadata/v0.metadata.json";
205224

@@ -213,7 +232,13 @@ impl Catalog for FileCatalog {
213232
identifier.clone(),
214233
(metadata_location.clone(), metadata.clone().into()),
215234
);
216-
Ok(Table::new(identifier.clone(), self.clone(), metadata).await?)
235+
Ok(Table::new(
236+
identifier.clone(),
237+
self.clone(),
238+
object_store.clone(),
239+
metadata,
240+
)
241+
.await?)
217242
}
218243

219244
async fn create_view(
@@ -235,7 +260,7 @@ impl Catalog for FileCatalog {
235260

236261
// Write metadata to object_store
237262
let bucket = Bucket::from_path(&location)?;
238-
let object_store = self.object_store(bucket);
263+
let object_store = self.default_object_store(bucket);
239264

240265
let metadata_location = location + "/metadata/v0.metadata.json";
241266

@@ -249,7 +274,13 @@ impl Catalog for FileCatalog {
249274
identifier.clone(),
250275
(metadata_location.clone(), metadata.clone().into()),
251276
);
252-
Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
277+
Ok(View::new(
278+
identifier.clone(),
279+
self.clone(),
280+
object_store.clone(),
281+
metadata,
282+
)
283+
.await?)
253284
}
254285

255286
async fn create_materialized_view(
@@ -278,7 +309,7 @@ impl Catalog for FileCatalog {
278309

279310
// Write metadata to object_store
280311
let bucket = Bucket::from_path(&location)?;
281-
let object_store = self.object_store(bucket);
312+
let object_store = self.default_object_store(bucket);
282313

283314
let metadata_location = location + "/metadata/v0.metadata.json";
284315

@@ -300,7 +331,13 @@ impl Catalog for FileCatalog {
300331
(metadata_location.clone(), metadata.clone().into()),
301332
);
302333

303-
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
334+
Ok(MaterializedView::new(
335+
identifier.clone(),
336+
self.clone(),
337+
object_store.clone(),
338+
metadata,
339+
)
340+
.await?)
304341
}
305342

306343
async fn update_table(self: Arc<Self>, commit: CommitTable) -> Result<Table, IcebergError> {
@@ -356,7 +393,13 @@ impl Catalog for FileCatalog {
356393
(metadata_location.clone(), metadata.clone().into()),
357394
);
358395

359-
Ok(Table::new(identifier.clone(), self.clone(), metadata).await?)
396+
Ok(Table::new(
397+
identifier.clone(),
398+
self.clone(),
399+
object_store.clone(),
400+
metadata,
401+
)
402+
.await?)
360403
}
361404

362405
async fn update_view(
@@ -413,7 +456,13 @@ impl Catalog for FileCatalog {
413456
(metadata_location.clone(), metadata.clone()),
414457
);
415458
if let TabularMetadata::View(metadata) = metadata {
416-
Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
459+
Ok(View::new(
460+
identifier.clone(),
461+
self.clone(),
462+
object_store.clone(),
463+
metadata,
464+
)
465+
.await?)
417466
} else {
418467
Err(IcebergError::InvalidFormat(
419468
"Entity is not a view".to_owned(),
@@ -474,7 +523,13 @@ impl Catalog for FileCatalog {
474523
(metadata_location.clone(), metadata.clone()),
475524
);
476525
if let TabularMetadata::MaterializedView(metadata) = metadata {
477-
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
526+
Ok(MaterializedView::new(
527+
identifier.clone(),
528+
self.clone(),
529+
object_store.clone(),
530+
metadata,
531+
)
532+
.await?)
478533
} else {
479534
Err(IcebergError::InvalidFormat(
480535
"Entity is not a materialized view".to_owned(),
@@ -489,13 +544,12 @@ impl Catalog for FileCatalog {
489544
) -> Result<Table, IcebergError> {
490545
unimplemented!()
491546
}
492-
493-
fn object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
494-
Arc::new(self.object_store.build(bucket).unwrap())
495-
}
496547
}
497548

498549
impl FileCatalog {
550+
fn default_object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
551+
Arc::new(self.object_store.build(bucket).unwrap())
552+
}
499553
fn namespace_path(&self, namespace: &str) -> String {
500554
self.path.as_str().trim_end_matches('/').to_owned() + "/" + namespace
501555
}
@@ -634,7 +688,8 @@ impl CatalogList for FileCatalogList {
634688
.common_prefixes
635689
.into_iter()
636690
.map(|x| self.parse_catalog(x.as_ref()))
637-
.collect::<Result<_, IcebergError>>().unwrap()
691+
.collect::<Result<_, IcebergError>>()
692+
.unwrap()
638693
}
639694
}
640695

@@ -652,7 +707,7 @@ pub mod tests {
652707
};
653708
use iceberg_rust::{
654709
catalog::{namespace::Namespace, Catalog},
655-
object_store::ObjectStoreBuilder,
710+
object_store::{Bucket, ObjectStoreBuilder},
656711
spec::util::strip_prefix,
657712
};
658713
use std::{sync::Arc, time::Duration};
@@ -704,7 +759,7 @@ pub mod tests {
704759
// let object_store = ObjectStoreBuilder::memory();
705760

706761
let iceberg_catalog: Arc<dyn Catalog> = Arc::new(
707-
FileCatalog::new("s3://warehouse", object_store)
762+
FileCatalog::new("s3://warehouse", object_store.clone())
708763
.await
709764
.unwrap(),
710765
);
@@ -841,8 +896,9 @@ pub mod tests {
841896

842897
assert!(once);
843898

844-
let object_store =
845-
iceberg_catalog.object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));
899+
let object_store = object_store
900+
.build(Bucket::from_path("s3://warehouse").unwrap())
901+
.unwrap();
846902

847903
let version_hint = object_store
848904
.get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into())

0 commit comments

Comments
 (0)