Skip to content

Commit 010ee1a

Browse files
committed
update refresh-state
1 parent ef302fc commit 010ee1a

2 files changed

Lines changed: 109 additions & 71 deletions

File tree

datafusion_iceberg/src/materialized_view/mod.rs

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ use iceberg_rust::{
1414
arrow::write::{write_equality_deletes_parquet_partitioned, write_parquet_partitioned},
1515
catalog::{identifier::Identifier, tabular::Tabular, CatalogList},
1616
materialized_view::MaterializedView,
17-
spec::materialized_view_metadata::{SourceTables, SourceViews},
17+
spec::{
18+
identifier::FullIdentifier,
19+
materialized_view_metadata::{SourceState, SourceStates, SourceTable},
20+
},
1821
};
1922
use iceberg_rust::{
2023
error::Error,
@@ -85,10 +88,15 @@ pub async fn refresh_materialized_view(
8588

8689
let refresh_version_id = matview.metadata().current_version_id;
8790

91+
let refresh_start_timestamp_ms = std::time::SystemTime::now()
92+
.duration_since(std::time::UNIX_EPOCH)
93+
.map(|d| d.as_millis() as i64)
94+
.unwrap_or(0);
95+
8896
let refresh_state = RefreshState {
8997
refresh_version_id,
90-
source_table_states: SourceTables(source_table_states),
91-
source_view_states: SourceViews(HashMap::new()),
98+
source_states: SourceStates(source_table_states),
99+
refresh_start_timestamp_ms,
92100
};
93101

94102
let storage_table_provider = Arc::new(DataFusionTable::new(
@@ -230,7 +238,7 @@ async fn get_source_tables(
230238
Option<Arc<dyn TableProvider>>,
231239
),
232240
>,
233-
HashMap<(uuid::Uuid, Option<String>), i64>,
241+
HashMap<FullIdentifier, SourceState>,
234242
),
235243
Error,
236244
> {
@@ -246,10 +254,8 @@ async fn get_source_tables(
246254
&version.default_namespace()[0],
247255
);
248256
let catalog_name = resolved_reference.catalog.to_string();
249-
let identifier = Identifier::new(
250-
&[resolved_reference.schema.to_string()],
251-
&resolved_reference.table,
252-
);
257+
let namespace = [resolved_reference.schema.to_string()];
258+
let identifier = Identifier::new(&namespace, &resolved_reference.table);
253259
let catalog = catalog_list
254260
.catalog(&catalog_name)
255261
.ok_or(Error::NotFound(format!("Catalog {catalog_name}")))?;
@@ -288,13 +294,19 @@ async fn get_source_tables(
288294
_ => Err(Error::InvalidFormat("storage table".to_string())),
289295
}?;
290296

297+
let full_identifier =
298+
FullIdentifier::new(Some(&catalog_name), &namespace, &resolved_reference.table);
299+
291300
#[allow(clippy::type_complexity)]
292301
let source_table_provider: (
293302
Option<Arc<dyn TableProvider>>,
294303
Option<Arc<dyn TableProvider>>,
295304
) = if let Some(old_refresh_state) = old_refresh_state.as_ref() {
296-
let revision_id = old_refresh_state.source_table_states.get(&(uuid, None));
297-
if Some(&current_snapshot_id) == revision_id {
305+
let revision_snapshot_id = old_refresh_state
306+
.source_states
307+
.get(&full_identifier)
308+
.and_then(|s| s.snapshot_id());
309+
if Some(current_snapshot_id) == revision_snapshot_id {
298310
// Fresh
299311
(
300312
Some(Arc::new(DataFusionTable::new(
@@ -305,7 +317,7 @@ async fn get_source_tables(
305317
))),
306318
None,
307319
)
308-
} else if Some(&-1) == revision_id {
320+
} else if Some(-1) == revision_snapshot_id {
309321
// Invalid
310322
(
311323
None,
@@ -316,18 +328,18 @@ async fn get_source_tables(
316328
branch.as_deref(),
317329
))),
318330
)
319-
} else if let Some(revision_id) = revision_id {
331+
} else if let Some(revision_id) = revision_snapshot_id {
320332
// Outdated
321333
(
322334
Some(Arc::new(DataFusionTable::new(
323335
tabular.clone(),
324336
None,
325-
Some(*revision_id),
337+
Some(revision_id),
326338
branch.as_deref(),
327339
))),
328340
Some(Arc::new(DataFusionTable::new(
329341
tabular,
330-
Some(*revision_id),
342+
Some(revision_id),
331343
None,
332344
branch.as_deref(),
333345
))),
@@ -357,9 +369,18 @@ async fn get_source_tables(
357369
)
358370
};
359371

372+
let source_state = SourceState::Table(SourceTable::new(
373+
Some(&catalog_name),
374+
&namespace,
375+
&resolved_reference.table,
376+
uuid,
377+
current_snapshot_id,
378+
branch.clone(),
379+
));
380+
360381
Ok((
361382
(reference, source_table_provider),
362-
((uuid, None), current_snapshot_id),
383+
(full_identifier, source_state),
363384
))
364385
}
365386
})

iceberg-rust-spec/src/spec/materialized_view_metadata.rs

Lines changed: 73 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,17 @@
66
//! The main types are:
77
//! - [`MaterializedViewMetadata`]: The top-level metadata for a materialized view
88
//! - [`RefreshState`]: Information about the last refresh operation
9-
//! - [`SourceTables`]: Collection of source table states
10-
//! - [`SourceViews`]: Collection of source view states
9+
//! - [`SourceStates`]: Collection of source states
1110
1211
use std::{collections::HashMap, ops::Deref};
1312

1413
use serde::{Deserialize, Serialize};
1514
use uuid::Uuid;
1615

17-
use crate::identifier::Identifier;
16+
use crate::{
17+
identifier::{FullIdentifier, Identifier},
18+
namespace::Namespace,
19+
};
1820

1921
use super::{
2022
tabular::TabularMetadataRef,
@@ -41,26 +43,25 @@ pub struct RefreshState {
4143
/// The version-id of the materialized view when the refresh operation was performed.
4244
pub refresh_version_id: i64,
4345
/// A map from sequence-id (as defined in the view lineage) to the source tables’ snapshot-id of when the last refresh operation was performed.
44-
pub source_table_states: SourceTables,
45-
/// A map from sequence-id (as defined in the view lineage) to the source views’ version-id of when the last refresh operation was performed.
46-
pub source_view_states: SourceViews,
46+
pub source_states: SourceStates,
47+
// A timestamp of when the refresh operation was started
48+
pub refresh_start_timestamp_ms: i64,
4749
}
4850

49-
/// Represents a collection of source table states in a materialized view refresh
50-
///
51-
/// # Fields
52-
/// * `0` - A HashMap mapping (table UUID, optional reference) pairs to snapshot IDs
53-
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
54-
#[serde(from = "Vec<SourceTable>", into = "Vec<SourceTable>")]
55-
pub struct SourceTables(pub HashMap<(Uuid, Option<String>), i64>);
56-
5751
/// Represents a collection of source view states in a materialized view refresh
5852
///
5953
/// # Fields
6054
/// * `0` - A HashMap mapping (table UUID, optional reference) pairs to version IDs
6155
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
62-
#[serde(from = "Vec<SourceView>", into = "Vec<SourceView>")]
63-
pub struct SourceViews(pub HashMap<(Uuid, Option<String>), i64>);
56+
#[serde(from = "Vec<SourceState>", into = "Vec<SourceState>")]
57+
pub struct SourceStates(pub HashMap<FullIdentifier, SourceState>);
58+
59+
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
60+
#[serde(rename_all = "kebab-case", tag = "type")]
61+
pub enum SourceState {
62+
Table(SourceTable),
63+
View(SourceView),
64+
}
6465

6566
/// Represents a source table state in a materialized view refresh
6667
///
@@ -71,8 +72,13 @@ pub struct SourceViews(pub HashMap<(Uuid, Option<String>), i64>);
7172
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
7273
#[serde(rename_all = "kebab-case")]
7374
pub struct SourceTable {
75+
name: String,
76+
namespace: Namespace,
77+
#[serde(skip_serializing_if = "Option::is_none")]
78+
catalog: Option<String>,
7479
uuid: Uuid,
7580
snapshot_id: i64,
81+
#[serde(skip_serializing_if = "Option::is_none")]
7682
r#ref: Option<String>,
7783
}
7884

@@ -84,65 +90,77 @@ pub struct SourceTable {
8490
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
8591
#[serde(rename_all = "kebab-case")]
8692
pub struct SourceView {
93+
name: String,
94+
namespace: Namespace,
95+
#[serde(skip_serializing_if = "Option::is_none")]
96+
catalog: Option<String>,
8797
uuid: Uuid,
8898
version_id: i64,
8999
}
90100

91-
impl From<Vec<SourceTable>> for SourceTables {
92-
fn from(value: Vec<SourceTable>) -> Self {
93-
SourceTables(
94-
value
95-
.into_iter()
96-
.map(|x| ((x.uuid, x.r#ref), x.snapshot_id))
97-
.collect(),
98-
)
101+
impl SourceTable {
102+
/// Create a new SourceTable
103+
pub fn new(
104+
catalog: Option<&str>,
105+
namespace: &[String],
106+
name: &str,
107+
uuid: Uuid,
108+
snapshot_id: i64,
109+
r#ref: Option<String>,
110+
) -> Self {
111+
Self {
112+
catalog: catalog.map(ToString::to_string),
113+
namespace: Namespace(namespace.to_owned()),
114+
name: name.to_owned(),
115+
uuid,
116+
snapshot_id,
117+
r#ref,
118+
}
99119
}
100120
}
101121

102-
impl From<SourceTables> for Vec<SourceTable> {
103-
fn from(value: SourceTables) -> Self {
104-
value
105-
.0
106-
.into_iter()
107-
.map(|((uuid, r#ref), snapshot_id)| SourceTable {
108-
uuid,
109-
snapshot_id,
110-
r#ref,
111-
})
112-
.collect()
122+
impl SourceState {
123+
/// Returns the snapshot_id for Table states, None for View states
124+
pub fn snapshot_id(&self) -> Option<i64> {
125+
match self {
126+
SourceState::Table(t) => Some(t.snapshot_id),
127+
SourceState::View(_) => None,
128+
}
113129
}
114130
}
115131

116-
impl From<Vec<SourceView>> for SourceViews {
117-
fn from(value: Vec<SourceView>) -> Self {
118-
SourceViews(
132+
impl From<Vec<SourceState>> for SourceStates {
133+
fn from(value: Vec<SourceState>) -> Self {
134+
SourceStates(
119135
value
120136
.into_iter()
121-
.map(|x| ((x.uuid, None), x.version_id))
137+
.map(|x| match &x {
138+
SourceState::Table(table) => (
139+
FullIdentifier::new(
140+
table.catalog.as_deref(),
141+
&table.namespace,
142+
&table.name,
143+
),
144+
x,
145+
),
146+
SourceState::View(view) => (
147+
FullIdentifier::new(view.catalog.as_deref(), &view.namespace, &view.name),
148+
x,
149+
),
150+
})
122151
.collect(),
123152
)
124153
}
125154
}
126155

127-
impl From<SourceViews> for Vec<SourceView> {
128-
fn from(value: SourceViews) -> Self {
129-
value
130-
.0
131-
.into_iter()
132-
.map(|((uuid, _), version_id)| SourceView { uuid, version_id })
133-
.collect()
134-
}
135-
}
136-
137-
impl Deref for SourceTables {
138-
type Target = HashMap<(Uuid, Option<String>), i64>;
139-
fn deref(&self) -> &Self::Target {
140-
&self.0
156+
impl From<SourceStates> for Vec<SourceState> {
157+
fn from(value: SourceStates) -> Self {
158+
value.0.into_values().collect()
141159
}
142160
}
143161

144-
impl Deref for SourceViews {
145-
type Target = HashMap<(Uuid, Option<String>), i64>;
162+
impl Deref for SourceStates {
163+
type Target = HashMap<FullIdentifier, SourceState>;
146164
fn deref(&self) -> &Self::Target {
147165
&self.0
148166
}
@@ -181,7 +199,6 @@ mod tests {
181199
"dialect" : "spark"
182200
} ],
183201
"storage-table": {
184-
"catalog": "prod",
185202
"namespace": ["default"],
186203
"name": "event_agg_storage"
187204
}

0 commit comments

Comments
 (0)