Skip to content

Commit b288630

Browse files
authored
Merge pull request JanKaul#152 from JanKaul/feat/storage-credentials
load storage credentials from catalog response
2 parents ea1d685 + eaac940 commit b288630

2 files changed

Lines changed: 127 additions & 51 deletions

File tree

catalogs/iceberg-rest-catalog/src/catalog.rs

Lines changed: 126 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -25,35 +25,41 @@ use iceberg_rust::{
2525
table::Table,
2626
view::View,
2727
};
28-
use object_store::ObjectStore;
29-
use std::{collections::HashMap, path::Path, sync::Arc};
28+
use object_store::{aws::AmazonS3Builder, ObjectStore};
29+
use std::{
30+
collections::HashMap,
31+
path::Path,
32+
sync::{Arc, RwLock},
33+
};
3034

3135
use crate::{
3236
apis::{
3337
self,
3438
catalog_api_api::{self, NamespaceExistsError},
3539
configuration::Configuration,
3640
},
37-
models,
41+
models::{self, StorageCredential},
3842
};
3943

4044
#[derive(Debug)]
4145
pub struct RestCatalog {
4246
name: Option<String>,
4347
configuration: Configuration,
44-
object_store_builder: ObjectStoreBuilder,
48+
default_object_store_builder: Option<ObjectStoreBuilder>,
49+
cache: Arc<RwLock<HashMap<Identifier, Arc<dyn ObjectStore>>>>,
4550
}
4651

4752
impl RestCatalog {
4853
pub fn new(
4954
name: Option<&str>,
5055
configuration: Configuration,
51-
object_store_builder: ObjectStoreBuilder,
56+
default_object_store_builder: Option<ObjectStoreBuilder>,
5257
) -> Self {
5358
RestCatalog {
5459
name: name.map(ToString::to_string),
5560
configuration,
56-
object_store_builder,
61+
default_object_store_builder,
62+
cache: Arc::new(RwLock::new(HashMap::new())),
5763
}
5864
}
5965
}
@@ -276,7 +282,7 @@ impl Catalog for RestCatalog {
276282
)),
277283
Err(apis::Error::ResponseError(content)) => {
278284
if content.status == 404 {
279-
let table_metadata = catalog_api_api::load_table(
285+
let response = catalog_api_api::load_table(
280286
&self.configuration,
281287
self.name.as_deref(),
282288
&identifier.namespace().to_string(),
@@ -285,12 +291,27 @@ impl Catalog for RestCatalog {
285291
None,
286292
)
287293
.await
288-
.map(|x| x.metadata)
289294
.map_err(|_| Error::CatalogNotFound)?;
290295

291-
let object_store = self
292-
.object_store_builder
293-
.build(Bucket::from_path(&table_metadata.location)?)?;
296+
let object_store = object_store_from_response(&response)?
297+
.ok_or(Error::NotFound("Object store credentials".to_string()))
298+
.or_else(|_| {
299+
self.default_object_store_builder
300+
.as_ref()
301+
.ok_or(Error::NotFound("Default object store".to_string()))
302+
.and_then(|x| {
303+
let bucket = Bucket::from_path(&response.metadata.location)?;
304+
x.build(bucket)
305+
})
306+
})?;
307+
308+
self.cache
309+
.write()
310+
.unwrap()
311+
.insert(identifier.clone(), object_store.clone());
312+
313+
let table_metadata = response.metadata;
314+
294315
Ok(Tabular::Table(
295316
Table::new(
296317
identifier.clone(),
@@ -315,50 +336,54 @@ impl Catalog for RestCatalog {
315336
identifier: Identifier,
316337
create_table: CreateTable,
317338
) -> Result<Table, Error> {
318-
catalog_api_api::create_table(
339+
let response = catalog_api_api::create_table(
319340
&self.configuration,
320341
self.name.as_deref(),
321342
&identifier.namespace().to_string(),
322343
create_table,
323344
None,
324345
)
325346
.map_err(Into::<Error>::into)
326-
.and_then(|response| {
327-
let clone = self.clone();
328-
async move {
329-
let object_store = clone
330-
.object_store_builder
331-
.build(Bucket::from_path(&response.metadata.location)?)?;
332-
Table::new(identifier.clone(), clone, object_store, response.metadata).await
333-
}
334-
})
335-
.await
347+
.await?;
348+
349+
let object_store = object_store_from_response(&response)?
350+
.ok_or(Error::NotFound("Object store credentials".to_string()))
351+
.or_else(|_| {
352+
self.default_object_store_builder
353+
.as_ref()
354+
.ok_or(Error::NotFound("Default object store".to_string()))
355+
.and_then(|x| {
356+
let bucket = Bucket::from_path(&response.metadata.location)?;
357+
x.build(bucket)
358+
})
359+
})?;
360+
361+
Table::new(identifier.clone(), self, object_store, response.metadata).await
336362
}
337363
/// Update a table by atomically changing the pointer to the metadata file
338364
async fn update_table(
339365
self: Arc<Self>,
340366
commit: iceberg_rust::catalog::commit::CommitTable,
341367
) -> Result<Table, Error> {
342368
let identifier = commit.identifier.clone();
343-
catalog_api_api::update_table(
369+
let response = catalog_api_api::update_table(
344370
&self.configuration,
345371
self.name.as_deref(),
346372
&identifier.namespace().to_string(),
347373
identifier.name(),
348374
commit,
349375
)
350-
.map_err(Into::<Error>::into)
351-
.and_then(|response| {
352-
let clone = self.clone();
353-
let identifier = identifier.clone();
354-
async move {
355-
let object_store = clone
356-
.object_store_builder
357-
.build(Bucket::from_path(&response.metadata.location)?)?;
358-
Table::new(identifier, clone, object_store, response.metadata).await
359-
}
360-
})
361376
.await
377+
.map_err(Into::<Error>::into)?;
378+
379+
let Some(object_store) = self.cache.read().unwrap().get(&identifier).cloned() else {
380+
return Err(Error::NotFound(format!(
381+
"Object store for table {}",
382+
&identifier
383+
)));
384+
};
385+
386+
Table::new(identifier, self, object_store, response.metadata).await
362387
}
363388
async fn create_view(
364389
self: Arc<Self>,
@@ -489,34 +514,41 @@ impl Catalog for RestCatalog {
489514
metadata_location.to_owned(),
490515
);
491516

492-
catalog_api_api::register_table(
517+
let response = catalog_api_api::register_table(
493518
&self.configuration,
494519
self.name.as_deref(),
495520
&identifier.namespace().to_string(),
496521
request,
497522
)
498523
.map_err(Into::<Error>::into)
499-
.and_then(|response| {
500-
let clone = self.clone();
501-
async move {
502-
let object_store = clone
503-
.object_store_builder
504-
.build(Bucket::from_path(&response.metadata.location)?)?;
505-
Table::new(identifier.clone(), clone, object_store, response.metadata).await
506-
}
507-
})
508-
.await
524+
.await?;
525+
let object_store = object_store_from_response(&response)?
526+
.ok_or(Error::NotFound("Object store credentials".to_string()))
527+
.or_else(|_| {
528+
self.default_object_store_builder
529+
.as_ref()
530+
.ok_or(Error::NotFound("Default object store".to_string()))
531+
.and_then(|x| {
532+
let bucket = Bucket::from_path(&response.metadata.location)?;
533+
x.build(bucket)
534+
})
535+
})?;
536+
537+
Table::new(identifier.clone(), self, object_store, response.metadata).await
509538
}
510539
}
511540

512541
#[derive(Debug, Clone)]
513542
pub struct RestCatalogList {
514543
configuration: Configuration,
515-
object_store_builder: ObjectStoreBuilder,
544+
object_store_builder: Option<ObjectStoreBuilder>,
516545
}
517546

518547
impl RestCatalogList {
519-
pub fn new(configuration: Configuration, object_store_builder: ObjectStoreBuilder) -> Self {
548+
pub fn new(
549+
configuration: Configuration,
550+
object_store_builder: Option<ObjectStoreBuilder>,
551+
) -> Self {
520552
Self {
521553
configuration,
522554
object_store_builder,
@@ -542,14 +574,14 @@ impl CatalogList for RestCatalogList {
542574
pub struct RestNoPrefixCatalogList {
543575
name: String,
544576
configuration: Configuration,
545-
object_store_builder: ObjectStoreBuilder,
577+
object_store_builder: Option<ObjectStoreBuilder>,
546578
}
547579

548580
impl RestNoPrefixCatalogList {
549581
pub fn new(
550582
name: &str,
551583
configuration: Configuration,
552-
object_store_builder: ObjectStoreBuilder,
584+
object_store_builder: Option<ObjectStoreBuilder>,
553585
) -> Self {
554586
Self {
555587
name: name.to_owned(),
@@ -577,6 +609,50 @@ impl CatalogList for RestNoPrefixCatalogList {
577609
}
578610
}
579611

612+
const CLIENT_REGION: &str = "client.region";
613+
const AWS_ACCESS_KEY_ID: &str = "s3.access-key-id";
614+
const AWS_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
615+
const AWS_SESSION_TOKEN: &str = "s3.session-token";
616+
617+
fn object_store_from_response(
618+
response: &models::LoadTableResult,
619+
) -> Result<Option<Arc<dyn ObjectStore>>, Error> {
620+
let config = match (&response.storage_credentials, &response.config) {
621+
(Some(credentials), _) => Some(&credentials[0].config),
622+
(None, Some(config)) => Some(config),
623+
(None, None) => None,
624+
};
625+
626+
let Some(config) = config else {
627+
return Ok(None);
628+
};
629+
630+
let region = config.get(CLIENT_REGION);
631+
if config.contains_key(AWS_ACCESS_KEY_ID) {
632+
let access_key_id = config.get(AWS_ACCESS_KEY_ID);
633+
let secret_access_key = config.get(AWS_SECRET_ACCESS_KEY);
634+
let session_token = config.get(AWS_SESSION_TOKEN);
635+
let mut builder = AmazonS3Builder::new();
636+
637+
if let Some(region) = region {
638+
builder = builder.with_region(region)
639+
}
640+
if let Some(access_key_id) = access_key_id {
641+
builder = builder.with_access_key_id(access_key_id)
642+
}
643+
if let Some(secret_access_key) = secret_access_key {
644+
builder = builder.with_secret_access_key(secret_access_key)
645+
}
646+
if let Some(session_token) = session_token {
647+
builder = builder.with_token(session_token)
648+
}
649+
650+
Ok(Some(Arc::new(builder.build()?)))
651+
} else {
652+
Ok(None)
653+
}
654+
}
655+
580656
#[cfg(test)]
581657
pub mod tests {
582658
use datafusion::{
@@ -685,7 +761,7 @@ pub mod tests {
685761
let iceberg_catalog = Arc::new(RestCatalog::new(
686762
None,
687763
configuration(&format!("http://{rest_host}:{rest_port}")),
688-
object_store,
764+
Some(object_store),
689765
));
690766

691767
iceberg_catalog

datafusion_iceberg/tests/integration_trino.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ async fn integration_trino_rest() {
199199
let catalog = Arc::new(RestCatalog::new(
200200
None,
201201
configuration(&rest_host.to_string(), rest_port),
202-
object_store,
202+
Some(object_store),
203203
));
204204

205205
let tables = catalog

0 commit comments

Comments
 (0)