Skip to content

Commit 84476cc

Browse files
committed
use default_object_store for catalogs
1 parent bd9c72c commit 84476cc

7 files changed

Lines changed: 50 additions & 206 deletions

File tree

catalogs/iceberg-file-catalog/src/lib.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ impl Catalog for FileCatalog {
129129
.common_prefixes
130130
.into_iter()
131131
.map(|x| self.namespace(x.as_ref()))
132-
.collect::<Result<_, IcebergError>>()}
132+
.collect::<Result<_, IcebergError>>()
133+
}
133134
async fn tabular_exists(&self, identifier: &Identifier) -> Result<bool, IcebergError> {
134135
self.metadata_location(identifier)
135136
.await
@@ -199,7 +200,7 @@ impl Catalog for FileCatalog {
199200

200201
// Write metadata to object_store
201202
let bucket = Bucket::from_path(&location)?;
202-
let object_store = self.object_store(bucket);
203+
let object_store = self.default_object_store(bucket);
203204

204205
let metadata_location = location + "/metadata/v0.metadata.json";
205206

@@ -235,7 +236,7 @@ impl Catalog for FileCatalog {
235236

236237
// Write metadata to object_store
237238
let bucket = Bucket::from_path(&location)?;
238-
let object_store = self.object_store(bucket);
239+
let object_store = self.default_object_store(bucket);
239240

240241
let metadata_location = location + "/metadata/v0.metadata.json";
241242

@@ -278,7 +279,7 @@ impl Catalog for FileCatalog {
278279

279280
// Write metadata to object_store
280281
let bucket = Bucket::from_path(&location)?;
281-
let object_store = self.object_store(bucket);
282+
let object_store = self.default_object_store(bucket);
282283

283284
let metadata_location = location + "/metadata/v0.metadata.json";
284285

@@ -489,13 +490,12 @@ impl Catalog for FileCatalog {
489490
) -> Result<Table, IcebergError> {
490491
unimplemented!()
491492
}
492-
493-
fn object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
494-
Arc::new(self.object_store.build(bucket).unwrap())
495-
}
496493
}
497494

498495
impl FileCatalog {
496+
fn default_object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
497+
Arc::new(self.object_store.build(bucket).unwrap())
498+
}
499499
fn namespace_path(&self, namespace: &str) -> String {
500500
self.path.as_str().trim_end_matches('/').to_owned() + "/" + namespace
501501
}
@@ -634,7 +634,8 @@ impl CatalogList for FileCatalogList {
634634
.common_prefixes
635635
.into_iter()
636636
.map(|x| self.parse_catalog(x.as_ref()))
637-
.collect::<Result<_, IcebergError>>().unwrap()
637+
.collect::<Result<_, IcebergError>>()
638+
.unwrap()
638639
}
639640
}
640641

@@ -841,8 +842,8 @@ pub mod tests {
841842

842843
assert!(once);
843844

844-
let object_store =
845-
iceberg_catalog.object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));
845+
let object_store = iceberg_catalog
846+
.default_object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));
846847

847848
let version_hint = object_store
848849
.get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into())

catalogs/iceberg-glue-catalog/src/lib.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ impl GlueCatalog {
6767
cache: Arc::new(RwLock::new(HashMap::new())),
6868
})
6969
}
70+
fn default_object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
71+
Arc::new(self.object_store.build(bucket).unwrap())
72+
}
7073
}
7174

7275
#[async_trait]
@@ -257,7 +260,7 @@ impl Catalog for GlueCatalog {
257260
.to_string();
258261

259262
let bucket = Bucket::from_path(metadata_location)?;
260-
let object_store = self.object_store(bucket);
263+
let object_store = self.default_object_store(bucket);
261264

262265
let bytes = object_store
263266
.get(&strip_prefix(metadata_location).as_str().into())
@@ -295,7 +298,7 @@ impl Catalog for GlueCatalog {
295298

296299
// Write metadata to object_store
297300
let bucket = Bucket::from_path(&location)?;
298-
let object_store = self.object_store(bucket);
301+
let object_store = self.default_object_store(bucket);
299302

300303
let metadata_location = new_metadata_location(&metadata);
301304
object_store
@@ -365,7 +368,7 @@ impl Catalog for GlueCatalog {
365368

366369
// Write metadata to object_store
367370
let bucket = Bucket::from_path(&location)?;
368-
let object_store = self.object_store(bucket);
371+
let object_store = self.default_object_store(bucket);
369372

370373
let metadata_location = new_metadata_location(&metadata);
371374
object_store
@@ -437,7 +440,7 @@ impl Catalog for GlueCatalog {
437440

438441
// Write metadata to object_store
439442
let bucket = Bucket::from_path(&location)?;
440-
let object_store = self.object_store(bucket);
443+
let object_store = self.default_object_store(bucket);
441444

442445
let metadata_location = new_metadata_location(&metadata);
443446

@@ -557,7 +560,7 @@ impl Catalog for GlueCatalog {
557560
let metadata_location = new_metadata_location(&metadata);
558561

559562
let bucket = Bucket::from_path(&metadata_location)?;
560-
let object_store = self.object_store(bucket);
563+
let object_store = self.default_object_store(bucket);
561564

562565
object_store
563566
.put_metadata(&metadata_location, metadata.as_ref())
@@ -650,7 +653,7 @@ impl Catalog for GlueCatalog {
650653

651654
let metadata_ref = metadata.as_ref();
652655
let bucket = Bucket::from_path(metadata_ref.location())?;
653-
let object_store = self.object_store(bucket);
656+
let object_store = self.default_object_store(bucket);
654657

655658
let metadata_location = match &mut metadata {
656659
TabularMetadata::View(metadata) => {
@@ -759,7 +762,7 @@ impl Catalog for GlueCatalog {
759762

760763
let metadata_ref = metadata.as_ref();
761764
let bucket = Bucket::from_path(metadata_ref.location())?;
762-
let object_store = self.object_store(bucket);
765+
let object_store = self.default_object_store(bucket);
763766

764767
let metadata_location = match &mut metadata {
765768
TabularMetadata::MaterializedView(metadata) => {
@@ -860,7 +863,7 @@ impl Catalog for GlueCatalog {
860863
metadata_location: &str,
861864
) -> Result<Table, IcebergError> {
862865
let bucket = Bucket::from_path(metadata_location)?;
863-
let object_store = self.object_store(bucket);
866+
let object_store = self.default_object_store(bucket);
864867

865868
let metadata: TableMetadata = serde_json::from_slice(
866869
&object_store
@@ -919,10 +922,6 @@ impl Catalog for GlueCatalog {
919922
);
920923
Ok(Table::new(identifier.clone(), self.clone(), metadata).await?)
921924
}
922-
923-
fn object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
924-
Arc::new(self.object_store.build(bucket).unwrap())
925-
}
926925
}
927926

928927
#[cfg(test)]
@@ -1162,8 +1161,8 @@ pub mod tests {
11621161

11631162
assert!(once);
11641163

1165-
let object_store =
1166-
iceberg_catalog.object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));
1164+
let object_store = iceberg_catalog
1165+
.default_object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));
11671166

11681167
let version_hint = object_store
11691168
.get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into())

catalogs/iceberg-rest-catalog/src/catalog.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -483,10 +483,6 @@ impl Catalog for RestCatalog {
483483
})
484484
.await
485485
}
486-
/// Return an object store for the desired bucket
487-
fn object_store(&self, bucket: Bucket) -> Arc<dyn ObjectStore> {
488-
self.object_store_builder.build(bucket).unwrap()
489-
}
490486
}
491487

492488
#[derive(Debug, Clone)]

catalogs/iceberg-s3tables-catalog/src/lib.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@ use std::{
66
use async_trait::async_trait;
77
use aws_config::SdkConfig;
88

9-
use aws_sdk_s3tables::{
10-
types::OpenTableFormat,
11-
Client,
12-
};
9+
use aws_sdk_s3tables::{types::OpenTableFormat, Client};
1310
use iceberg_rust::{
1411
catalog::{
1512
commit::{
@@ -65,6 +62,9 @@ impl S3TablesCatalog {
6562
cache: Arc::new(RwLock::new(HashMap::new())),
6663
})
6764
}
65+
fn default_object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
66+
Arc::new(self.object_store.build(bucket).unwrap())
67+
}
6868
}
6969

7070
#[async_trait]
@@ -246,7 +246,7 @@ impl Catalog for S3TablesCatalog {
246246
let version_token = table.version_token;
247247

248248
let bucket = Bucket::from_path(&metadata_location)?;
249-
let object_store = self.object_store(bucket);
249+
let object_store = self.default_object_store(bucket);
250250

251251
let bytes = object_store
252252
.get(&strip_prefix(&metadata_location).as_str().into())
@@ -307,7 +307,7 @@ impl Catalog for S3TablesCatalog {
307307

308308
// Write metadata to object_store
309309
let bucket = Bucket::from_path(&location)?;
310-
let object_store = self.object_store(bucket);
310+
let object_store = self.default_object_store(bucket);
311311

312312
let metadata_location = new_metadata_location(&metadata);
313313
object_store
@@ -368,7 +368,7 @@ impl Catalog for S3TablesCatalog {
368368

369369
// Write metadata to object_store
370370
let bucket = Bucket::from_path(&location)?;
371-
let object_store = self.object_store(bucket);
371+
let object_store = self.default_object_store(bucket);
372372

373373
let metadata_location = new_metadata_location(&metadata);
374374
object_store
@@ -459,7 +459,7 @@ impl Catalog for S3TablesCatalog {
459459

460460
// Write metadata to object_store
461461
let bucket = Bucket::from_path(&location)?;
462-
let object_store = self.object_store(bucket);
462+
let object_store = self.default_object_store(bucket);
463463

464464
let metadata_location = new_metadata_location(&metadata);
465465

@@ -545,7 +545,7 @@ impl Catalog for S3TablesCatalog {
545545
let metadata_location = new_metadata_location(&metadata);
546546

547547
let bucket = Bucket::from_path(&metadata_location)?;
548-
let object_store = self.object_store(bucket);
548+
let object_store = self.default_object_store(bucket);
549549

550550
object_store
551551
.put_metadata(&metadata_location, metadata.as_ref())
@@ -603,7 +603,7 @@ impl Catalog for S3TablesCatalog {
603603

604604
let metadata_ref = metadata.as_ref();
605605
let bucket = Bucket::from_path(metadata_ref.location())?;
606-
let object_store = self.object_store(bucket);
606+
let object_store = self.default_object_store(bucket);
607607

608608
let metadata_location = match &mut metadata {
609609
TabularMetadata::View(metadata) => {
@@ -681,7 +681,7 @@ impl Catalog for S3TablesCatalog {
681681

682682
let metadata_ref = metadata.as_ref();
683683
let bucket = Bucket::from_path(metadata_ref.location())?;
684-
let object_store = self.object_store(bucket);
684+
let object_store = self.default_object_store(bucket);
685685

686686
let metadata_location = match &mut metadata {
687687
TabularMetadata::MaterializedView(metadata) => {
@@ -751,7 +751,7 @@ impl Catalog for S3TablesCatalog {
751751
metadata_location: &str,
752752
) -> Result<Table, IcebergError> {
753753
let bucket = Bucket::from_path(metadata_location)?;
754-
let object_store = self.object_store(bucket);
754+
let object_store = self.default_object_store(bucket);
755755

756756
let metadata: TableMetadata = serde_json::from_slice(
757757
&object_store
@@ -789,10 +789,6 @@ impl Catalog for S3TablesCatalog {
789789
);
790790
Ok(Table::new(identifier.clone(), self.clone(), metadata).await?)
791791
}
792-
793-
fn object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
794-
Arc::new(self.object_store.build(bucket).unwrap())
795-
}
796792
}
797793

798794
#[derive(Debug)]

catalogs/iceberg-sql-catalog/src/lib.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ impl SqlCatalog {
107107
object_store: self.object_store.clone(),
108108
})
109109
}
110+
fn default_object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
111+
Arc::new(self.object_store.build(bucket).unwrap())
112+
}
110113
}
111114

112115
#[derive(Debug)]
@@ -281,7 +284,7 @@ impl Catalog for SqlCatalog {
281284
};
282285

283286
let bucket = Bucket::from_path(&path)?;
284-
let object_store = self.object_store(bucket);
287+
let object_store = self.default_object_store(bucket);
285288

286289
let bytes = object_store
287290
.get(&strip_prefix(&path).as_str().into())
@@ -317,7 +320,7 @@ impl Catalog for SqlCatalog {
317320

318321
// Write metadata to object_store
319322
let bucket = Bucket::from_path(&location)?;
320-
let object_store = self.object_store(bucket);
323+
let object_store = self.default_object_store(bucket);
321324

322325
let metadata_location = new_metadata_location(&metadata);
323326
object_store
@@ -351,7 +354,7 @@ impl Catalog for SqlCatalog {
351354

352355
// Write metadata to object_store
353356
let bucket = Bucket::from_path(&location)?;
354-
let object_store = self.object_store(bucket);
357+
let object_store = self.default_object_store(bucket);
355358

356359
let metadata_location = new_metadata_location(&metadata);
357360
object_store
@@ -387,7 +390,7 @@ impl Catalog for SqlCatalog {
387390

388391
// Write metadata to object_store
389392
let bucket = Bucket::from_path(&location)?;
390-
let object_store = self.object_store(bucket);
393+
let object_store = self.default_object_store(bucket);
391394

392395
let metadata_location = new_metadata_location(&metadata);
393396

@@ -443,7 +446,7 @@ impl Catalog for SqlCatalog {
443446
let (previous_metadata_location, metadata) = entry;
444447

445448
let bucket = Bucket::from_path(&previous_metadata_location)?;
446-
let object_store = self.object_store(bucket);
449+
let object_store = self.default_object_store(bucket);
447450

448451
let TabularMetadata::Table(mut metadata) = metadata else {
449452
return Err(IcebergError::InvalidFormat(
@@ -491,7 +494,7 @@ impl Catalog for SqlCatalog {
491494
let (previous_metadata_location, mut metadata) = entry;
492495

493496
let bucket = Bucket::from_path(&previous_metadata_location)?;
494-
let object_store = self.object_store(bucket);
497+
let object_store = self.default_object_store(bucket);
495498

496499
let metadata_location = match &mut metadata {
497500
TabularMetadata::View(metadata) => {
@@ -546,7 +549,7 @@ impl Catalog for SqlCatalog {
546549
let (previous_metadata_location, mut metadata) = entry;
547550

548551
let bucket = Bucket::from_path(&previous_metadata_location)?;
549-
let object_store = self.object_store(bucket);
552+
let object_store = self.default_object_store(bucket);
550553

551554
let metadata_location = match &mut metadata {
552555
TabularMetadata::MaterializedView(metadata) => {
@@ -596,7 +599,7 @@ impl Catalog for SqlCatalog {
596599
metadata_location: &str,
597600
) -> Result<Table, IcebergError> {
598601
let bucket = Bucket::from_path(metadata_location)?;
599-
let object_store = self.object_store(bucket);
602+
let object_store = self.default_object_store(bucket);
600603

601604
let metadata: TableMetadata = serde_json::from_slice(
602605
&object_store
@@ -620,10 +623,6 @@ impl Catalog for SqlCatalog {
620623
);
621624
Ok(Table::new(identifier.clone(), self.clone(), metadata).await?)
622625
}
623-
624-
fn object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
625-
Arc::new(self.object_store.build(bucket).unwrap())
626-
}
627626
}
628627

629628
impl SqlCatalog {
@@ -937,8 +936,8 @@ pub mod tests {
937936

938937
assert!(once);
939938

940-
let object_store =
941-
iceberg_catalog.object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));
939+
let object_store = iceberg_catalog
940+
.default_object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));
942941

943942
let version_hint = object_store
944943
.get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into())

0 commit comments

Comments
 (0)