diff --git a/Cargo.lock b/Cargo.lock index ca7bde27a9..5c33d58d34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3410,6 +3410,7 @@ dependencies = [ "futures", "iceberg_test_utils", "itertools 0.13.0", + "lz4_flex", "minijinja", "mockall", "moka", diff --git a/Cargo.toml b/Cargo.toml index 793bb49d86..0d10e44f28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ itertools = "0.13" libtest-mimic = "0.8.1" linkedbytes = "0.1.8" log = "0.4.28" +lz4_flex = "0.13" metainfo = "0.7.14" mimalloc = "0.1.46" minijinja = "2.12.0" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 18729176dc..a754f69bb7 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -58,6 +58,7 @@ flate2 = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } +lz4_flex = { workspace = true } moka = { version = "0.12.10", features = ["future"] } murmur3 = { workspace = true } once_cell = { workspace = true } diff --git a/crates/iceberg/src/compression.rs b/crates/iceberg/src/compression.rs index 929d9226e7..097847e7bf 100644 --- a/crates/iceberg/src/compression.rs +++ b/crates/iceberg/src/compression.rs @@ -23,6 +23,7 @@ use std::io::{Read, Write}; use flate2::Compression; use flate2::read::GzDecoder; use flate2::write::GzEncoder; +use lz4_flex::frame::{FrameDecoder, FrameEncoder, FrameInfo}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::{Error, ErrorKind, Result}; @@ -118,10 +119,12 @@ impl CompressionCodec { pub(crate) fn decompress(&self, bytes: Vec) -> Result> { match self { CompressionCodec::None => Ok(bytes), - CompressionCodec::Lz4 => Err(Error::new( - ErrorKind::FeatureUnsupported, - "LZ4 decompression is not supported currently", - )), + CompressionCodec::Lz4 => { + let mut decoder = FrameDecoder::new(&bytes[..]); + let mut decompressed = Vec::new(); + decoder.read_to_end(&mut decompressed)?; + Ok(decompressed) + } CompressionCodec::Zstd(_) => Ok(zstd::stream::decode_all(&bytes[..])?), CompressionCodec::Gzip(_) => { let mut decoder = GzDecoder::new(&bytes[..]); @@ -139,10 +142,17 @@ impl CompressionCodec { pub(crate) fn compress(&self, bytes: Vec) -> Result> { match self { CompressionCodec::None => Ok(bytes), - CompressionCodec::Lz4 => Err(Error::new( - ErrorKind::FeatureUnsupported, - "LZ4 compression is not supported currently", - )), + CompressionCodec::Lz4 => { + // The Puffin spec requires "LZ4 single compression frame with content size + // present" for footer payloads, so we set content_size on the frame header. + // See https://iceberg.apache.org/puffin-spec/#footer-payload + let frame_info = FrameInfo::new().content_size(Some(bytes.len() as u64)); + let mut encoder = FrameEncoder::with_frame_info(frame_info, Vec::new()); + encoder.write_all(&bytes)?; + encoder.finish().map_err(|e| { + Error::new(ErrorKind::Unexpected, "Failed to finish LZ4 frame").with_source(e) + }) + } CompressionCodec::Zstd(level) => { let writer = Vec::::new(); let mut encoder = zstd::stream::Encoder::new(writer, *level as i32)?; @@ -173,7 +183,10 @@ impl CompressionCodec { /// /// # Errors /// - /// Returns an error for Lz4 and Zstd as they are not fully supported. + /// Returns an error for codecs without a canonical file-extension convention + /// (Lz4, Zstd, Snappy). LZ4 is fully supported for compression and decompression, + /// but is used in framed form (e.g., inside Puffin footers) where no separate + /// file suffix is required. pub fn suffix(&self) -> Result<&'static str> { match self { CompressionCodec::None => Ok(""), @@ -208,6 +221,7 @@ mod tests { let bytes_vec = [0_u8; 100].to_vec(); let compression_codecs = [ + CompressionCodec::Lz4, CompressionCodec::zstd_default(), CompressionCodec::gzip_default(), ]; @@ -220,25 +234,46 @@ mod tests { } } + #[tokio::test] + async fn test_compression_codec_lz4_roundtrip() { + let codec = CompressionCodec::Lz4; + + // Empty input must round-trip cleanly. + let empty: Vec = vec![]; + let compressed_empty = codec.compress(empty.clone()).unwrap(); + assert_eq!(codec.decompress(compressed_empty).unwrap(), empty); + + // Mixed-byte payload (less compressible than all-zeros) round-trips. + let payload: Vec = (0..10_000).map(|i| (i % 251) as u8).collect(); + let compressed = codec.compress(payload.clone()).unwrap(); + assert_eq!(codec.decompress(compressed).unwrap(), payload); + + // Frame must begin with the LZ4 frame magic number 0x184D2204 (little-endian) + // per https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md. + let highly_compressible = vec![0u8; 10_000]; + let compressed = codec.compress(highly_compressible).unwrap(); + assert_eq!(&compressed[..4], &[0x04, 0x22, 0x4D, 0x18]); + } + #[tokio::test] async fn test_compression_codec_unsupported() { - let unsupported_codecs = [ - (CompressionCodec::Lz4, "LZ4"), - (CompressionCodec::Snappy, "Snappy"), - ]; let bytes_vec = [0_u8; 100].to_vec(); - for (codec, name) in unsupported_codecs { - assert_eq!( - codec.compress(bytes_vec.clone()).unwrap_err().to_string(), - format!("FeatureUnsupported => {name} compression is not supported currently"), - ); + assert_eq!( + CompressionCodec::Snappy + .compress(bytes_vec.clone()) + .unwrap_err() + .to_string(), + "FeatureUnsupported => Snappy compression is not supported currently", + ); - assert_eq!( - codec.decompress(bytes_vec.clone()).unwrap_err().to_string(), - format!("FeatureUnsupported => {name} decompression is not supported currently"), - ); - } + assert_eq!( + CompressionCodec::Snappy + .decompress(bytes_vec) + .unwrap_err() + .to_string(), + "FeatureUnsupported => Snappy decompression is not supported currently", + ); } #[test] diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index e2dfc10c23..06b4cf5028 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -548,26 +548,31 @@ mod tests { } #[tokio::test] - async fn test_lz4_compressed_footer_returns_error() { + async fn test_lz4_compressed_footer_is_decoded() { + // Hand-built file with FooterPayloadCompressed=1 and a real LZ4 frame in place + // of the raw JSON. This confirms the reader honors the flag rather than + // assuming the payload is always uncompressed. let temp_dir = TempDir::new().unwrap(); + let compressed_payload = CompressionCodec::Lz4 + .compress(empty_footer_payload_bytes()) + .unwrap(); + let mut bytes = vec![]; bytes.extend(FileMetadata::MAGIC.to_vec()); bytes.extend(FileMetadata::MAGIC.to_vec()); - bytes.extend(empty_footer_payload_bytes()); - bytes.extend(empty_footer_payload_bytes_length_bytes()); + bytes.extend(&compressed_payload); + bytes.extend(u32::to_le_bytes(compressed_payload.len() as u32)); + // FooterPayloadCompressed bit set in the first flag byte. bytes.extend(vec![0b00000001, 0, 0, 0]); bytes.extend(FileMetadata::MAGIC.to_vec()); let input_file = input_file_with_bytes(&temp_dir, &bytes).await; assert_eq!( - FileMetadata::read(&input_file) - .await - .unwrap_err() - .to_string(), - "FeatureUnsupported => LZ4 decompression is not supported currently", - ) + FileMetadata::read(&input_file).await.unwrap(), + empty_footer_payload() + ); } #[tokio::test] diff --git a/crates/iceberg/src/puffin/writer.rs b/crates/iceberg/src/puffin/writer.rs index 4af4970b04..a2dd367423 100644 --- a/crates/iceberg/src/puffin/writer.rs +++ b/crates/iceberg/src/puffin/writer.rs @@ -273,13 +273,65 @@ mod tests { let blobs = vec![blob_0(), blob_1()]; let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Lz4); - assert_eq!( - write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) - .await - .unwrap_err() - .to_string(), - "FeatureUnsupported => LZ4 compression is not supported currently" - ); + let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) + .await + .unwrap() + .to_input_file(); + + // Blob round-trip must yield the original bytes after LZ4 framed decompression. + assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs); + } + + /// Regression for https://github.com/apache/iceberg-rust/issues/2419 — + /// the PuffinWriter previously claimed LZ4 compression on the footer (by setting + /// the FooterPayloadCompressed flag) but wrote the raw uncompressed JSON, which + /// produced unreadable files. The writer now actually LZ4-encodes the footer when + /// compress_footer=true, and the reader round-trips it. + #[tokio::test] + async fn test_compress_footer_lz4_round_trips() { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIO::new_with_fs(); + let path = temp_dir.path().join("compressed_footer.bin"); + let output_file = file_io.new_output(path.to_str().unwrap()).unwrap(); + + // compress_footer=true sets the footer codec to LZ4. + let mut writer = PuffinWriter::new(&output_file, file_properties(), true) + .await + .unwrap(); + writer.add(blob_0(), CompressionCodec::None).await.unwrap(); + writer.close().await.unwrap(); + + // Reader must be able to LZ4-decompress the footer and recover both the + // file metadata and the blob payload. + let input_file = output_file.to_input_file(); + let metadata = FileMetadata::read(&input_file).await.unwrap(); + assert_eq!(metadata.properties, file_properties()); + assert_eq!(metadata.blobs.len(), 1); + assert_eq!(read_all_blobs_from_puffin_file(input_file).await, vec![ + blob_0() + ]); + } + + /// Direct adaptation of the reproducer from + /// https://github.com/apache/iceberg-rust/issues/2419 — close must succeed when + /// compress_footer=true even with no blobs written. + #[tokio::test] + async fn test_compress_empty_footer_lz4_succeeds() { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIO::new_with_fs(); + let path = temp_dir.path().join("compressed_empty_footer.bin"); + let output_file = file_io.new_output(path.to_str().unwrap()).unwrap(); + + let writer = PuffinWriter::new(&output_file, HashMap::new(), true) + .await + .unwrap(); + writer.close().await.unwrap(); + + // The compressed empty footer must still parse back to an empty FileMetadata. + let input_file = output_file.to_input_file(); + let metadata = FileMetadata::read(&input_file).await.unwrap(); + assert!(metadata.blobs.is_empty()); + assert!(metadata.properties.is_empty()); } async fn get_file_as_byte_vec(input_file: InputFile) -> Vec {