Skip to content

Commit 07b6f6c

Browse files
authored
feat: Allow reading compressed metadata files
To support compressed metadata files, we need to decompress them before converting the data into json.
2 parents be5f609 + 0c2d540 commit 07b6f6c

4 files changed

Lines changed: 328 additions & 1 deletion

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iceberg-rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ async-trait = { workspace = true }
1616
bytes = { workspace = true }
1717
derive-getters = { workspace = true }
1818
derive_builder = { workspace = true }
19+
flate2 = { version = "1.1", features = ["zlib-rs"], default-features = false }
1920
futures = { workspace = true }
2021
getrandom = { workspace = true }
2122
iceberg-rust-spec = { path = "../iceberg-rust-spec", version = "0.7.0" }

iceberg-rust/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ pub enum Error {
2020
/// Conversion error
2121
#[error("Failed to convert {0} to {1}.")]
2222
Conversion(String, String),
23+
/// Failed to decompress gzip data
24+
#[error("Failed to decompress gzip data: {0}")]
25+
Decompress(String),
2326
/// Not found
2427
#[error("{0} not found.")]
2528
NotFound(String),

iceberg-rust/src/object_store/store.rs

Lines changed: 323 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use iceberg_rust_spec::{
88
use object_store::{Attributes, ObjectStore, PutOptions, TagSet};
99

1010
use crate::error::Error;
11+
use flate2::read::GzDecoder;
12+
use std::io::Read;
1113

1214
/// Simplify interaction with iceberg files
1315
#[async_trait]
@@ -32,7 +34,8 @@ impl<T: ObjectStore> IcebergStore for T {
3234
.await?
3335
.bytes()
3436
.await?;
35-
serde_json::from_slice(&bytes).map_err(Error::from)
37+
38+
parse_metadata(location, &bytes)
3639
}
3740

3841
async fn put_metadata(
@@ -80,9 +83,23 @@ fn version_hint_path(original: &str) -> Option<String> {
8083
)
8184
}
8285

86+
fn parse_metadata(location: &str, bytes: &[u8]) -> Result<TabularMetadata, Error> {
87+
if location.ends_with(".gz.metadata.json") {
88+
let mut decoder = GzDecoder::new(bytes);
89+
let mut decompressed_data = Vec::new();
90+
decoder
91+
.read_to_end(&mut decompressed_data)
92+
.map_err(|e| Error::Decompress(e.to_string()))?;
93+
serde_json::from_slice(&decompressed_data).map_err(Error::from)
94+
} else {
95+
serde_json::from_slice(bytes).map_err(Error::from)
96+
}
97+
}
98+
8399
#[cfg(test)]
84100
mod tests {
85101
use super::*;
102+
use std::io::Write;
86103

87104
#[test]
88105
fn test_version_hint_path_normal_case() {
@@ -124,4 +141,309 @@ mod tests {
124141
let expected = "/path/to/version-hint.text";
125142
assert_eq!(version_hint_path(input), Some(expected.to_string()));
126143
}
144+
145+
#[test]
146+
fn test_parse_metadata_table_plain_json() {
147+
let location = "/path/to/metadata/v1.metadata.json";
148+
let json_data = r#"
149+
{
150+
"format-version" : 2,
151+
"table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
152+
"location": "s3://b/wh/data.db/table",
153+
"last-sequence-number" : 1,
154+
"last-updated-ms": 1515100955770,
155+
"last-column-id": 1,
156+
"schemas": [
157+
{
158+
"schema-id" : 1,
159+
"type" : "struct",
160+
"fields" :[
161+
{
162+
"id": 1,
163+
"name": "struct_name",
164+
"required": true,
165+
"type": "fixed[1]"
166+
}
167+
]
168+
}
169+
],
170+
"current-schema-id" : 1,
171+
"partition-specs": [
172+
{
173+
"spec-id": 1,
174+
"fields": [
175+
{
176+
"source-id": 4,
177+
"field-id": 1000,
178+
"name": "ts_day",
179+
"transform": "day"
180+
}
181+
]
182+
}
183+
],
184+
"default-spec-id": 1,
185+
"last-partition-id": 1,
186+
"properties": {
187+
"commit.retry.num-retries": "1"
188+
},
189+
"metadata-log": [
190+
{
191+
"metadata-file": "s3://bucket/.../v1.json",
192+
"timestamp-ms": 1515100
193+
}
194+
],
195+
"sort-orders": [],
196+
"default-sort-order-id": 0
197+
}
198+
"#;
199+
let bytes = json_data.as_bytes();
200+
201+
let result = parse_metadata(location, bytes);
202+
assert!(result.is_ok());
203+
let metadata = result.unwrap();
204+
if let TabularMetadata::Table(table_metadata) = metadata {
205+
// Add specific checks for `table_metadata` fields if needed
206+
assert_eq!(table_metadata.table_uuid.to_string(), "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94");
207+
} else {
208+
panic!("Expected TabularMetadata::Table variant");
209+
}
210+
}
211+
212+
#[test]
213+
fn test_parse_metadata_table_gzipped_json() {
214+
let location = "/path/to/metadata/v1.gz.metadata.json";
215+
let json_data = r#"
216+
{
217+
"format-version" : 2,
218+
"table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
219+
"location": "s3://b/wh/data.db/table",
220+
"last-sequence-number" : 1,
221+
"last-updated-ms": 1515100955770,
222+
"last-column-id": 1,
223+
"schemas": [
224+
{
225+
"schema-id" : 1,
226+
"type" : "struct",
227+
"fields" :[
228+
{
229+
"id": 1,
230+
"name": "struct_name",
231+
"required": true,
232+
"type": "fixed[1]"
233+
}
234+
]
235+
}
236+
],
237+
"current-schema-id" : 1,
238+
"partition-specs": [
239+
{
240+
"spec-id": 1,
241+
"fields": [
242+
{
243+
"source-id": 4,
244+
"field-id": 1000,
245+
"name": "ts_day",
246+
"transform": "day"
247+
}
248+
]
249+
}
250+
],
251+
"default-spec-id": 1,
252+
"last-partition-id": 1,
253+
"properties": {
254+
"commit.retry.num-retries": "1"
255+
},
256+
"metadata-log": [
257+
{
258+
"metadata-file": "s3://bucket/.../v1.json",
259+
"timestamp-ms": 1515100
260+
}
261+
],
262+
"sort-orders": [],
263+
"default-sort-order-id": 0
264+
}
265+
"#;
266+
267+
let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
268+
encoder.write_all(json_data.as_bytes()).unwrap();
269+
let compressed_data = encoder.finish().unwrap();
270+
271+
let result = parse_metadata(location, &compressed_data);
272+
assert!(result.is_ok());
273+
let metadata = result.unwrap();
274+
if let TabularMetadata::Table(table_metadata) = metadata {
275+
// Add specific checks for `table_metadata` fields if needed
276+
assert_eq!(table_metadata.table_uuid.to_string(), "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94");
277+
} else {
278+
panic!("Expected TabularMetadata::Table variant");
279+
}
280+
}
281+
282+
#[test]
283+
fn test_parse_metadata_view_plain_json() {
284+
let location = "/path/to/metadata/v1.metadata.json";
285+
let json_data = r#"
286+
{
287+
"view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
288+
"format-version" : 1,
289+
"location" : "s3://bucket/warehouse/default.db/event_agg",
290+
"current-version-id" : 1,
291+
"properties" : {
292+
"comment" : "Daily event counts"
293+
},
294+
"versions" : [ {
295+
"version-id" : 1,
296+
"timestamp-ms" : 1573518431292,
297+
"schema-id" : 1,
298+
"default-catalog" : "prod",
299+
"default-namespace" : [ "default" ],
300+
"summary" : {
301+
"operation" : "create",
302+
"engine-name" : "Spark",
303+
"engineVersion" : "3.3.2"
304+
},
305+
"representations" : [ {
306+
"type" : "sql",
307+
"sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
308+
"dialect" : "spark"
309+
} ]
310+
} ],
311+
"schemas": [ {
312+
"schema-id": 1,
313+
"type" : "struct",
314+
"fields" : [ {
315+
"id" : 1,
316+
"name" : "event_count",
317+
"required" : false,
318+
"type" : "int",
319+
"doc" : "Count of events"
320+
}, {
321+
"id" : 2,
322+
"name" : "event_date",
323+
"required" : false,
324+
"type" : "date"
325+
} ]
326+
} ],
327+
"version-log" : [ {
328+
"timestamp-ms" : 1573518431292,
329+
"version-id" : 1
330+
} ]
331+
}
332+
"#;
333+
let bytes = json_data.as_bytes();
334+
335+
let result = parse_metadata(location, bytes);
336+
assert!(result.is_ok());
337+
let metadata = result.unwrap();
338+
if let TabularMetadata::View(view_metadata) = metadata {
339+
// Add specific checks for `view_metadata` fields if needed
340+
assert_eq!(view_metadata.view_uuid.to_string(), "fa6506c3-7681-40c8-86dc-e36561f83385");
341+
} else {
342+
panic!("Expected TabularMetadata::View variant");
343+
}
344+
}
345+
346+
#[test]
347+
fn test_parse_metadata_view_gzipped_json() {
348+
let location = "/path/to/metadata/v1.gz.metadata.json";
349+
let json_data = r#"
350+
{
351+
"view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
352+
"format-version" : 1,
353+
"location" : "s3://bucket/warehouse/default.db/event_agg",
354+
"current-version-id" : 1,
355+
"properties" : {
356+
"comment" : "Daily event counts"
357+
},
358+
"versions" : [ {
359+
"version-id" : 1,
360+
"timestamp-ms" : 1573518431292,
361+
"schema-id" : 1,
362+
"default-catalog" : "prod",
363+
"default-namespace" : [ "default" ],
364+
"summary" : {
365+
"operation" : "create",
366+
"engine-name" : "Spark",
367+
"engineVersion" : "3.3.2"
368+
},
369+
"representations" : [ {
370+
"type" : "sql",
371+
"sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
372+
"dialect" : "spark"
373+
} ]
374+
} ],
375+
"schemas": [ {
376+
"schema-id": 1,
377+
"type" : "struct",
378+
"fields" : [ {
379+
"id" : 1,
380+
"name" : "event_count",
381+
"required" : false,
382+
"type" : "int",
383+
"doc" : "Count of events"
384+
}, {
385+
"id" : 2,
386+
"name" : "event_date",
387+
"required" : false,
388+
"type" : "date"
389+
} ]
390+
} ],
391+
"version-log" : [ {
392+
"timestamp-ms" : 1573518431292,
393+
"version-id" : 1
394+
} ]
395+
}
396+
"#;
397+
398+
let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
399+
encoder.write_all(json_data.as_bytes()).unwrap();
400+
let compressed_data = encoder.finish().unwrap();
401+
402+
let result = parse_metadata(location, &compressed_data);
403+
assert!(result.is_ok());
404+
let metadata = result.unwrap();
405+
if let TabularMetadata::View(view_metadata) = metadata {
406+
// Add specific checks for `view_metadata` fields if needed
407+
assert_eq!(view_metadata.view_uuid.to_string(), "fa6506c3-7681-40c8-86dc-e36561f83385");
408+
} else {
409+
panic!("Expected TabularMetadata::View variant");
410+
}
411+
}
412+
413+
#[test]
414+
fn test_parse_metadata_invalid_json() {
415+
let location = "/path/to/metadata/v1.metadata.json";
416+
let invalid_json_data = r#"{"key": "value""#; // Missing closing brace
417+
let bytes = invalid_json_data.as_bytes();
418+
419+
let result = parse_metadata(location, bytes);
420+
assert!(result.is_err());
421+
}
422+
423+
#[test]
424+
fn test_parse_metadata_invalid_gzipped_data() {
425+
let location = "/path/to/metadata/v1.gz.metadata.json";
426+
let invalid_gzipped_data = b"not a valid gzip";
427+
428+
let result = parse_metadata(location, invalid_gzipped_data);
429+
assert!(result.is_err());
430+
}
431+
432+
#[test]
433+
fn test_parse_metadata_empty_bytes() {
434+
let location = "/path/to/metadata/v1.metadata.json";
435+
let empty_bytes: &[u8] = &[];
436+
437+
let result = parse_metadata(location, empty_bytes);
438+
assert!(result.is_err());
439+
}
440+
441+
#[test]
442+
fn test_parse_metadata_gzipped_empty_bytes() {
443+
let location = "/path/to/metadata/v1.gz.metadata.json";
444+
let empty_gzipped_bytes: &[u8] = &[];
445+
446+
let result = parse_metadata(location, empty_gzipped_bytes);
447+
assert!(result.is_err());
448+
}
127449
}

0 commit comments

Comments
 (0)