Skip to content

Commit 7a3ed01

Browse files
authored
Merge pull request JanKaul#324 from JanKaul/materialized-view-spec
Materialized view spec
2 parents 7842a41 + 010ee1a commit 7a3ed01

11 files changed

Lines changed: 127 additions & 95 deletions

File tree

catalogs/iceberg-file-catalog/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use iceberg_rust::{
2222
materialized_view::MaterializedView,
2323
object_store::{store::IcebergStore, Bucket, ObjectStoreBuilder},
2424
spec::{
25-
identifier::FullIdentifier,
2625
materialized_view_metadata::MaterializedViewMetadata,
2726
table_metadata::{new_metadata_location, TableMetadata},
2827
tabular::TabularMetadata,
@@ -441,7 +440,7 @@ impl Catalog for FileCatalog {
441440
}
442441
async fn update_materialized_view(
443442
self: Arc<Self>,
444-
commit: CommitView<FullIdentifier>,
443+
commit: CommitView<Identifier>,
445444
) -> Result<MaterializedView, IcebergError> {
446445
let bucket = Bucket::from_path(&self.path)?;
447446
let object_store = self.object_store.build(bucket)?;

catalogs/iceberg-glue-catalog/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use iceberg_rust::{
2727
object_store::{store::IcebergStore, Bucket, ObjectStoreBuilder},
2828
spec::{
2929
self,
30-
identifier::FullIdentifier,
3130
materialized_view_metadata::MaterializedViewMetadata,
3231
table_metadata::{new_metadata_location, TableMetadata},
3332
tabular::TabularMetadata,
@@ -769,7 +768,7 @@ impl Catalog for GlueCatalog {
769768
}
770769
async fn update_materialized_view(
771770
self: Arc<Self>,
772-
commit: CommitView<FullIdentifier>,
771+
commit: CommitView<Identifier>,
773772
) -> Result<MaterializedView, IcebergError> {
774773
let identifier = commit.identifier;
775774
let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {

catalogs/iceberg-rest-catalog/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ impl Catalog for RestCatalog {
526526
}
527527
async fn update_materialized_view(
528528
self: Arc<Self>,
529-
commit: CommitView<FullIdentifier>,
529+
commit: CommitView<Identifier>,
530530
) -> Result<MaterializedView, Error> {
531531
let configuration = self.get_configuration().await?;
532532
let identifier = commit.identifier.clone();

catalogs/iceberg-s3tables-catalog/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use iceberg_rust::{
2727
object_store::{store::IcebergStore, Bucket, ObjectStoreBuilder},
2828
spec::{
2929
self,
30-
identifier::FullIdentifier,
3130
materialized_view_metadata::MaterializedViewMetadata,
3231
table_metadata::{new_metadata_location, TableMetadata},
3332
tabular::TabularMetadata,
@@ -716,7 +715,7 @@ impl Catalog for S3TablesCatalog {
716715
}
717716
async fn update_materialized_view(
718717
self: Arc<Self>,
719-
commit: CommitView<FullIdentifier>,
718+
commit: CommitView<Identifier>,
720719
) -> Result<MaterializedView, IcebergError> {
721720
let identifier = commit.identifier;
722721
let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {

catalogs/iceberg-sql-catalog/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use iceberg_rust::{
2020
materialized_view::MaterializedView,
2121
object_store::{store::IcebergStore, Bucket, ObjectStoreBuilder},
2222
spec::{
23-
identifier::FullIdentifier,
2423
materialized_view_metadata::MaterializedViewMetadata,
2524
table_metadata::{new_metadata_location, TableMetadata},
2625
tabular::TabularMetadata,
@@ -579,7 +578,7 @@ impl Catalog for SqlCatalog {
579578
}
580579
async fn update_materialized_view(
581580
self: Arc<Self>,
582-
commit: CommitView<FullIdentifier>,
581+
commit: CommitView<Identifier>,
583582
) -> Result<MaterializedView, IcebergError> {
584583
let identifier = commit.identifier;
585584
let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {

datafusion_iceberg/src/materialized_view/mod.rs

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ use iceberg_rust::{
1414
arrow::write::{write_equality_deletes_parquet_partitioned, write_parquet_partitioned},
1515
catalog::{identifier::Identifier, tabular::Tabular, CatalogList},
1616
materialized_view::MaterializedView,
17-
spec::materialized_view_metadata::{SourceTables, SourceViews},
17+
spec::{
18+
identifier::FullIdentifier,
19+
materialized_view_metadata::{SourceState, SourceStates, SourceTable},
20+
},
1821
};
1922
use iceberg_rust::{
2023
error::Error,
@@ -85,10 +88,15 @@ pub async fn refresh_materialized_view(
8588

8689
let refresh_version_id = matview.metadata().current_version_id;
8790

91+
let refresh_start_timestamp_ms = std::time::SystemTime::now()
92+
.duration_since(std::time::UNIX_EPOCH)
93+
.map(|d| d.as_millis() as i64)
94+
.unwrap_or(0);
95+
8896
let refresh_state = RefreshState {
8997
refresh_version_id,
90-
source_table_states: SourceTables(source_table_states),
91-
source_view_states: SourceViews(HashMap::new()),
98+
source_states: SourceStates(source_table_states),
99+
refresh_start_timestamp_ms,
92100
};
93101

94102
let storage_table_provider = Arc::new(DataFusionTable::new(
@@ -219,7 +227,7 @@ async fn get_source_tables(
219227
branch: &Option<String>,
220228
old_refresh_state: Arc<Option<RefreshState>>,
221229
version: &iceberg_rust::spec::view_metadata::Version<
222-
iceberg_rust::spec::identifier::FullIdentifier,
230+
iceberg_rust::spec::identifier::Identifier,
223231
>,
224232
) -> Result<
225233
(
@@ -230,7 +238,7 @@ async fn get_source_tables(
230238
Option<Arc<dyn TableProvider>>,
231239
),
232240
>,
233-
HashMap<(uuid::Uuid, Option<String>), i64>,
241+
HashMap<FullIdentifier, SourceState>,
234242
),
235243
Error,
236244
> {
@@ -246,10 +254,8 @@ async fn get_source_tables(
246254
&version.default_namespace()[0],
247255
);
248256
let catalog_name = resolved_reference.catalog.to_string();
249-
let identifier = Identifier::new(
250-
&[resolved_reference.schema.to_string()],
251-
&resolved_reference.table,
252-
);
257+
let namespace = [resolved_reference.schema.to_string()];
258+
let identifier = Identifier::new(&namespace, &resolved_reference.table);
253259
let catalog = catalog_list
254260
.catalog(&catalog_name)
255261
.ok_or(Error::NotFound(format!("Catalog {catalog_name}")))?;
@@ -288,13 +294,19 @@ async fn get_source_tables(
288294
_ => Err(Error::InvalidFormat("storage table".to_string())),
289295
}?;
290296

297+
let full_identifier =
298+
FullIdentifier::new(Some(&catalog_name), &namespace, &resolved_reference.table);
299+
291300
#[allow(clippy::type_complexity)]
292301
let source_table_provider: (
293302
Option<Arc<dyn TableProvider>>,
294303
Option<Arc<dyn TableProvider>>,
295304
) = if let Some(old_refresh_state) = old_refresh_state.as_ref() {
296-
let revision_id = old_refresh_state.source_table_states.get(&(uuid, None));
297-
if Some(&current_snapshot_id) == revision_id {
305+
let revision_snapshot_id = old_refresh_state
306+
.source_states
307+
.get(&full_identifier)
308+
.and_then(|s| s.snapshot_id());
309+
if Some(current_snapshot_id) == revision_snapshot_id {
298310
// Fresh
299311
(
300312
Some(Arc::new(DataFusionTable::new(
@@ -305,7 +317,7 @@ async fn get_source_tables(
305317
))),
306318
None,
307319
)
308-
} else if Some(&-1) == revision_id {
320+
} else if Some(-1) == revision_snapshot_id {
309321
// Invalid
310322
(
311323
None,
@@ -316,18 +328,18 @@ async fn get_source_tables(
316328
branch.as_deref(),
317329
))),
318330
)
319-
} else if let Some(revision_id) = revision_id {
331+
} else if let Some(revision_id) = revision_snapshot_id {
320332
// Outdated
321333
(
322334
Some(Arc::new(DataFusionTable::new(
323335
tabular.clone(),
324336
None,
325-
Some(*revision_id),
337+
Some(revision_id),
326338
branch.as_deref(),
327339
))),
328340
Some(Arc::new(DataFusionTable::new(
329341
tabular,
330-
Some(*revision_id),
342+
Some(revision_id),
331343
None,
332344
branch.as_deref(),
333345
))),
@@ -357,9 +369,18 @@ async fn get_source_tables(
357369
)
358370
};
359371

372+
let source_state = SourceState::Table(SourceTable::new(
373+
Some(&catalog_name),
374+
&namespace,
375+
&resolved_reference.table,
376+
uuid,
377+
current_snapshot_id,
378+
branch.clone(),
379+
));
380+
360381
Ok((
361382
(reference, source_table_provider),
362-
((uuid, None), current_snapshot_id),
383+
(full_identifier, source_state),
363384
))
364385
}
365386
})

0 commit comments

Comments
 (0)