Skip to content

Commit bb192f7

Browse files
committed
move object_store to table struct
1 parent 84476cc commit bb192f7

11 files changed

Lines changed: 362 additions & 100 deletions

File tree

catalogs/iceberg-file-catalog/src/lib.rs

Lines changed: 68 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,31 @@ impl Catalog for FileCatalog {
171171

172172
match metadata {
173173
TabularMetadata::Table(metadata) => Ok(Tabular::Table(
174-
Table::new(identifier.clone(), self.clone(), metadata).await?,
174+
Table::new(
175+
identifier.clone(),
176+
self.clone(),
177+
object_store.clone(),
178+
metadata,
179+
)
180+
.await?,
175181
)),
176182
TabularMetadata::View(metadata) => Ok(Tabular::View(
177-
View::new(identifier.clone(), self.clone(), metadata).await?,
183+
View::new(
184+
identifier.clone(),
185+
self.clone(),
186+
object_store.clone(),
187+
metadata,
188+
)
189+
.await?,
178190
)),
179191
TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView(
180-
MaterializedView::new(identifier.clone(), self.clone(), metadata).await?,
192+
MaterializedView::new(
193+
identifier.clone(),
194+
self.clone(),
195+
object_store.clone(),
196+
metadata,
197+
)
198+
.await?,
181199
)),
182200
}
183201
}
@@ -214,7 +232,13 @@ impl Catalog for FileCatalog {
214232
identifier.clone(),
215233
(metadata_location.clone(), metadata.clone().into()),
216234
);
217-
Ok(Table::new(identifier.clone(), self.clone(), metadata).await?)
235+
Ok(Table::new(
236+
identifier.clone(),
237+
self.clone(),
238+
object_store.clone(),
239+
metadata,
240+
)
241+
.await?)
218242
}
219243

220244
async fn create_view(
@@ -250,7 +274,13 @@ impl Catalog for FileCatalog {
250274
identifier.clone(),
251275
(metadata_location.clone(), metadata.clone().into()),
252276
);
253-
Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
277+
Ok(View::new(
278+
identifier.clone(),
279+
self.clone(),
280+
object_store.clone(),
281+
metadata,
282+
)
283+
.await?)
254284
}
255285

256286
async fn create_materialized_view(
@@ -301,7 +331,13 @@ impl Catalog for FileCatalog {
301331
(metadata_location.clone(), metadata.clone().into()),
302332
);
303333

304-
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
334+
Ok(MaterializedView::new(
335+
identifier.clone(),
336+
self.clone(),
337+
object_store.clone(),
338+
metadata,
339+
)
340+
.await?)
305341
}
306342

307343
async fn update_table(self: Arc<Self>, commit: CommitTable) -> Result<Table, IcebergError> {
@@ -357,7 +393,13 @@ impl Catalog for FileCatalog {
357393
(metadata_location.clone(), metadata.clone().into()),
358394
);
359395

360-
Ok(Table::new(identifier.clone(), self.clone(), metadata).await?)
396+
Ok(Table::new(
397+
identifier.clone(),
398+
self.clone(),
399+
object_store.clone(),
400+
metadata,
401+
)
402+
.await?)
361403
}
362404

363405
async fn update_view(
@@ -414,7 +456,13 @@ impl Catalog for FileCatalog {
414456
(metadata_location.clone(), metadata.clone()),
415457
);
416458
if let TabularMetadata::View(metadata) = metadata {
417-
Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
459+
Ok(View::new(
460+
identifier.clone(),
461+
self.clone(),
462+
object_store.clone(),
463+
metadata,
464+
)
465+
.await?)
418466
} else {
419467
Err(IcebergError::InvalidFormat(
420468
"Entity is not a view".to_owned(),
@@ -475,7 +523,13 @@ impl Catalog for FileCatalog {
475523
(metadata_location.clone(), metadata.clone()),
476524
);
477525
if let TabularMetadata::MaterializedView(metadata) = metadata {
478-
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
526+
Ok(MaterializedView::new(
527+
identifier.clone(),
528+
self.clone(),
529+
object_store.clone(),
530+
metadata,
531+
)
532+
.await?)
479533
} else {
480534
Err(IcebergError::InvalidFormat(
481535
"Entity is not a materialized view".to_owned(),
@@ -653,7 +707,7 @@ pub mod tests {
653707
};
654708
use iceberg_rust::{
655709
catalog::{namespace::Namespace, Catalog},
656-
object_store::ObjectStoreBuilder,
710+
object_store::{Bucket, ObjectStoreBuilder},
657711
spec::util::strip_prefix,
658712
};
659713
use std::{sync::Arc, time::Duration};
@@ -705,7 +759,7 @@ pub mod tests {
705759
// let object_store = ObjectStoreBuilder::memory();
706760

707761
let iceberg_catalog: Arc<dyn Catalog> = Arc::new(
708-
FileCatalog::new("s3://warehouse", object_store)
762+
FileCatalog::new("s3://warehouse", object_store.clone())
709763
.await
710764
.unwrap(),
711765
);
@@ -842,8 +896,9 @@ pub mod tests {
842896

843897
assert!(once);
844898

845-
let object_store = iceberg_catalog
846-
.default_object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));
899+
let object_store = object_store
900+
.build(Bucket::from_path("s3://warehouse").unwrap())
901+
.unwrap();
847902

848903
let version_hint = object_store
849904
.get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into())

catalogs/iceberg-glue-catalog/src/lib.rs

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -276,13 +276,31 @@ impl Catalog for GlueCatalog {
276276

277277
match metadata {
278278
TabularMetadata::Table(metadata) => Ok(Tabular::Table(
279-
Table::new(identifier.clone(), self.clone(), metadata).await?,
279+
Table::new(
280+
identifier.clone(),
281+
self.clone(),
282+
object_store.clone(),
283+
metadata,
284+
)
285+
.await?,
280286
)),
281287
TabularMetadata::View(metadata) => Ok(Tabular::View(
282-
View::new(identifier.clone(), self.clone(), metadata).await?,
288+
View::new(
289+
identifier.clone(),
290+
self.clone(),
291+
object_store.clone(),
292+
metadata,
293+
)
294+
.await?,
283295
)),
284296
TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView(
285-
MaterializedView::new(identifier.clone(), self.clone(), metadata).await?,
297+
MaterializedView::new(
298+
identifier.clone(),
299+
self.clone(),
300+
object_store.clone(),
301+
metadata,
302+
)
303+
.await?,
286304
)),
287305
}
288306
}
@@ -354,7 +372,13 @@ impl Catalog for GlueCatalog {
354372
metadata.clone().into(),
355373
),
356374
);
357-
Ok(Table::new(identifier.clone(), self.clone(), metadata).await?)
375+
Ok(Table::new(
376+
identifier.clone(),
377+
self.clone(),
378+
object_store.clone(),
379+
metadata,
380+
)
381+
.await?)
358382
}
359383

360384
async fn create_view(
@@ -424,7 +448,13 @@ impl Catalog for GlueCatalog {
424448
metadata.clone().into(),
425449
),
426450
);
427-
Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
451+
Ok(View::new(
452+
identifier.clone(),
453+
self.clone(),
454+
object_store.clone(),
455+
metadata,
456+
)
457+
.await?)
428458
}
429459

430460
async fn create_materialized_view(
@@ -527,7 +557,13 @@ impl Catalog for GlueCatalog {
527557
metadata.clone().into(),
528558
),
529559
);
530-
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
560+
Ok(MaterializedView::new(
561+
identifier.clone(),
562+
self.clone(),
563+
object_store.clone(),
564+
metadata,
565+
)
566+
.await?)
531567
}
532568

533569
async fn update_table(self: Arc<Self>, commit: CommitTable) -> Result<Table, IcebergError> {
@@ -636,7 +672,13 @@ impl Catalog for GlueCatalog {
636672
.unwrap()
637673
.insert(identifier.clone(), (version_id, metadata.clone().into()));
638674

639-
Ok(Table::new(identifier.clone(), self.clone(), metadata).await?)
675+
Ok(Table::new(
676+
identifier.clone(),
677+
self.clone(),
678+
object_store.clone(),
679+
metadata,
680+
)
681+
.await?)
640682
}
641683

642684
async fn update_view(
@@ -741,7 +783,13 @@ impl Catalog for GlueCatalog {
741783
.insert(identifier.clone(), (version_id, metadata.clone()));
742784

743785
if let TabularMetadata::View(metadata) = metadata {
744-
Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
786+
Ok(View::new(
787+
identifier.clone(),
788+
self.clone(),
789+
object_store.clone(),
790+
metadata,
791+
)
792+
.await?)
745793
} else {
746794
Err(IcebergError::InvalidFormat(
747795
"Entity is not a view".to_owned(),
@@ -849,7 +897,13 @@ impl Catalog for GlueCatalog {
849897
.unwrap()
850898
.insert(identifier.clone(), (version_id, metadata.clone()));
851899
if let TabularMetadata::MaterializedView(metadata) = metadata {
852-
Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
900+
Ok(MaterializedView::new(
901+
identifier.clone(),
902+
self.clone(),
903+
object_store.clone(),
904+
metadata,
905+
)
906+
.await?)
853907
} else {
854908
Err(IcebergError::InvalidFormat(
855909
"Entity is not a materialized view".to_owned(),
@@ -920,7 +974,13 @@ impl Catalog for GlueCatalog {
920974
metadata.clone().into(),
921975
),
922976
);
923-
Ok(Table::new(identifier.clone(), self.clone(), metadata).await?)
977+
Ok(Table::new(
978+
identifier.clone(),
979+
self.clone(),
980+
object_store.clone(),
981+
metadata,
982+
)
983+
.await?)
924984
}
925985
}
926986

@@ -939,7 +999,7 @@ pub mod tests {
939999
};
9401000
use iceberg_rust::{
9411001
catalog::{namespace::Namespace, Catalog},
942-
object_store::ObjectStoreBuilder,
1002+
object_store::{Bucket, ObjectStoreBuilder},
9431003
spec::util::strip_prefix,
9441004
};
9451005
use testcontainers_modules::localstack::LocalStack;
@@ -1015,7 +1075,7 @@ pub mod tests {
10151075

10161076
// let object_store = ObjectStoreBuilder::memory();
10171077
let iceberg_catalog: Arc<dyn Catalog> =
1018-
Arc::new(GlueCatalog::new(&config, "warehouse", object_store).unwrap());
1078+
Arc::new(GlueCatalog::new(&config, "warehouse", object_store.clone()).unwrap());
10191079

10201080
iceberg_catalog
10211081
.create_namespace(&Namespace::try_new(&["tpch".to_owned()]).unwrap(), None)
@@ -1161,8 +1221,9 @@ pub mod tests {
11611221

11621222
assert!(once);
11631223

1164-
let object_store = iceberg_catalog
1165-
.default_object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));
1224+
let object_store = object_store
1225+
.build(Bucket::from_path("s3://warehouse").unwrap())
1226+
.unwrap();
11661227

11671228
let version_hint = object_store
11681229
.get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into())

0 commit comments

Comments
 (0)