Skip to content

Commit b6764a5

Browse files
committed
load storage credentials from catalog response
1 parent ea1d685 commit b6764a5

1 file changed

Lines changed: 101 additions & 30 deletions

File tree

catalogs/iceberg-rest-catalog/src/catalog.rs

Lines changed: 101 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,35 +25,41 @@ use iceberg_rust::{
2525
table::Table,
2626
view::View,
2727
};
28-
use object_store::ObjectStore;
29-
use std::{collections::HashMap, path::Path, sync::Arc};
28+
use object_store::{aws::AmazonS3Builder, ObjectStore};
29+
use std::{
30+
collections::HashMap,
31+
path::Path,
32+
sync::{Arc, RwLock},
33+
};
3034

3135
use crate::{
3236
apis::{
3337
self,
3438
catalog_api_api::{self, NamespaceExistsError},
3539
configuration::Configuration,
3640
},
37-
models,
41+
models::{self, StorageCredential},
3842
};
3943

4044
#[derive(Debug)]
4145
pub struct RestCatalog {
4246
name: Option<String>,
4347
configuration: Configuration,
44-
object_store_builder: ObjectStoreBuilder,
48+
default_object_store_builder: Option<ObjectStoreBuilder>,
49+
cache: Arc<RwLock<HashMap<Identifier, Arc<dyn ObjectStore>>>>,
4550
}
4651

4752
impl RestCatalog {
4853
pub fn new(
4954
name: Option<&str>,
5055
configuration: Configuration,
51-
object_store_builder: ObjectStoreBuilder,
56+
default_object_store_builder: Option<ObjectStoreBuilder>,
5257
) -> Self {
5358
RestCatalog {
5459
name: name.map(ToString::to_string),
5560
configuration,
56-
object_store_builder,
61+
default_object_store_builder,
62+
cache: Arc::new(RwLock::new(HashMap::new())),
5763
}
5864
}
5965
}
@@ -276,7 +282,7 @@ impl Catalog for RestCatalog {
276282
)),
277283
Err(apis::Error::ResponseError(content)) => {
278284
if content.status == 404 {
279-
let table_metadata = catalog_api_api::load_table(
285+
let response = catalog_api_api::load_table(
280286
&self.configuration,
281287
self.name.as_deref(),
282288
&identifier.namespace().to_string(),
@@ -285,12 +291,27 @@ impl Catalog for RestCatalog {
285291
None,
286292
)
287293
.await
288-
.map(|x| x.metadata)
289294
.map_err(|_| Error::CatalogNotFound)?;
290295

291-
let object_store = self
292-
.object_store_builder
293-
.build(Bucket::from_path(&table_metadata.location)?)?;
296+
let object_store = object_store_from_response(&response)
297+
.or_else(|_| {
298+
self.default_object_store_builder
299+
.as_ref()
300+
.map(|x| {
301+
let bucket = Bucket::from_path(&response.metadata.location)?;
302+
x.build(bucket)
303+
})
304+
.transpose()
305+
})?
306+
.ok_or(Error::NotFound("Object store credentials".to_string()))?;
307+
308+
self.cache
309+
.write()
310+
.unwrap()
311+
.insert(identifier.clone(), object_store.clone());
312+
313+
let table_metadata = response.metadata;
314+
294315
Ok(Tabular::Table(
295316
Table::new(
296317
identifier.clone(),
@@ -327,7 +348,9 @@ impl Catalog for RestCatalog {
327348
let clone = self.clone();
328349
async move {
329350
let object_store = clone
330-
.object_store_builder
351+
.default_object_store_builder
352+
.as_ref()
353+
.ok_or(Error::NotFound("Default object store".to_string()))?
331354
.build(Bucket::from_path(&response.metadata.location)?)?;
332355
Table::new(identifier.clone(), clone, object_store, response.metadata).await
333356
}
@@ -340,25 +363,24 @@ impl Catalog for RestCatalog {
340363
commit: iceberg_rust::catalog::commit::CommitTable,
341364
) -> Result<Table, Error> {
342365
let identifier = commit.identifier.clone();
343-
catalog_api_api::update_table(
366+
let response = catalog_api_api::update_table(
344367
&self.configuration,
345368
self.name.as_deref(),
346369
&identifier.namespace().to_string(),
347370
identifier.name(),
348371
commit,
349372
)
350-
.map_err(Into::<Error>::into)
351-
.and_then(|response| {
352-
let clone = self.clone();
353-
let identifier = identifier.clone();
354-
async move {
355-
let object_store = clone
356-
.object_store_builder
357-
.build(Bucket::from_path(&response.metadata.location)?)?;
358-
Table::new(identifier, clone, object_store, response.metadata).await
359-
}
360-
})
361373
.await
374+
.map_err(Into::<Error>::into)?;
375+
376+
let Some(object_store) = self.cache.read().unwrap().get(&identifier).cloned() else {
377+
return Err(Error::NotFound(format!(
378+
"Object store for table {}",
379+
&identifier
380+
)));
381+
};
382+
383+
Table::new(identifier, self, object_store, response.metadata).await
362384
}
363385
async fn create_view(
364386
self: Arc<Self>,
@@ -500,7 +522,9 @@ impl Catalog for RestCatalog {
500522
let clone = self.clone();
501523
async move {
502524
let object_store = clone
503-
.object_store_builder
525+
.default_object_store_builder
526+
.as_ref()
527+
.ok_or(Error::NotFound("Default object store".to_string()))?
504528
.build(Bucket::from_path(&response.metadata.location)?)?;
505529
Table::new(identifier.clone(), clone, object_store, response.metadata).await
506530
}
@@ -512,11 +536,14 @@ impl Catalog for RestCatalog {
512536
#[derive(Debug, Clone)]
513537
pub struct RestCatalogList {
514538
configuration: Configuration,
515-
object_store_builder: ObjectStoreBuilder,
539+
object_store_builder: Option<ObjectStoreBuilder>,
516540
}
517541

518542
impl RestCatalogList {
519-
pub fn new(configuration: Configuration, object_store_builder: ObjectStoreBuilder) -> Self {
543+
pub fn new(
544+
configuration: Configuration,
545+
object_store_builder: Option<ObjectStoreBuilder>,
546+
) -> Self {
520547
Self {
521548
configuration,
522549
object_store_builder,
@@ -542,14 +569,14 @@ impl CatalogList for RestCatalogList {
542569
pub struct RestNoPrefixCatalogList {
543570
name: String,
544571
configuration: Configuration,
545-
object_store_builder: ObjectStoreBuilder,
572+
object_store_builder: Option<ObjectStoreBuilder>,
546573
}
547574

548575
impl RestNoPrefixCatalogList {
549576
pub fn new(
550577
name: &str,
551578
configuration: Configuration,
552-
object_store_builder: ObjectStoreBuilder,
579+
object_store_builder: Option<ObjectStoreBuilder>,
553580
) -> Self {
554581
Self {
555582
name: name.to_owned(),
@@ -577,6 +604,50 @@ impl CatalogList for RestNoPrefixCatalogList {
577604
}
578605
}
579606

607+
const CLIENT_REGION: &str = "client.region";
608+
const AWS_ACCESS_KEY_ID: &str = "s3.access-key-id";
609+
const AWS_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
610+
const AWS_SESSION_TOKEN: &str = "s3.session-token";
611+
612+
fn object_store_from_response(
613+
response: &models::LoadTableResult,
614+
) -> Result<Option<Arc<dyn ObjectStore>>, Error> {
615+
let config = match (&response.storage_credentials, &response.config) {
616+
(Some(credentials), _) => Some(&credentials[0].config),
617+
(None, Some(config)) => Some(config),
618+
(None, None) => None,
619+
};
620+
621+
let Some(config) = config else {
622+
return Ok(None);
623+
};
624+
625+
let region = config.get(CLIENT_REGION);
626+
if config.contains_key(AWS_ACCESS_KEY_ID) {
627+
let access_key_id = config.get(AWS_ACCESS_KEY_ID);
628+
let secret_access_key = config.get(AWS_SECRET_ACCESS_KEY);
629+
let session_token = config.get(AWS_SESSION_TOKEN);
630+
let mut builder = AmazonS3Builder::new();
631+
632+
if let Some(region) = region {
633+
builder = builder.with_region(region)
634+
}
635+
if let Some(access_key_id) = access_key_id {
636+
builder = builder.with_access_key_id(access_key_id)
637+
}
638+
if let Some(secret_access_key) = secret_access_key {
639+
builder = builder.with_access_key_id(secret_access_key)
640+
}
641+
if let Some(session_token) = session_token {
642+
builder = builder.with_access_key_id(session_token)
643+
}
644+
645+
Ok(Some(Arc::new(builder.build()?)))
646+
} else {
647+
Ok(None)
648+
}
649+
}
650+
580651
#[cfg(test)]
581652
pub mod tests {
582653
use datafusion::{
@@ -685,7 +756,7 @@ pub mod tests {
685756
let iceberg_catalog = Arc::new(RestCatalog::new(
686757
None,
687758
configuration(&format!("http://{rest_host}:{rest_port}")),
688-
object_store,
759+
Some(object_store),
689760
));
690761

691762
iceberg_catalog

0 commit comments

Comments
 (0)