Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ itertools = "0.13"
libtest-mimic = "0.8.1"
linkedbytes = "0.1.8"
log = "0.4.28"
lz4_flex = "0.13"
metainfo = "0.7.14"
mimalloc = "0.1.46"
minijinja = "2.12.0"
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ flate2 = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
lz4_flex = { workspace = true }
moka = { version = "0.12.10", features = ["future"] }
murmur3 = { workspace = true }
once_cell = { workspace = true }
Expand Down
81 changes: 58 additions & 23 deletions crates/iceberg/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::io::{Read, Write};
use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use lz4_flex::frame::{FrameDecoder, FrameEncoder, FrameInfo};
use serde::{Deserialize, Deserializer, Serialize, Serializer};

use crate::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -118,10 +119,12 @@ impl CompressionCodec {
pub(crate) fn decompress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
match self {
CompressionCodec::None => Ok(bytes),
CompressionCodec::Lz4 => Err(Error::new(
ErrorKind::FeatureUnsupported,
"LZ4 decompression is not supported currently",
)),
CompressionCodec::Lz4 => {
let mut decoder = FrameDecoder::new(&bytes[..]);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)?;
Ok(decompressed)
}
CompressionCodec::Zstd(_) => Ok(zstd::stream::decode_all(&bytes[..])?),
CompressionCodec::Gzip(_) => {
let mut decoder = GzDecoder::new(&bytes[..]);
Expand All @@ -139,10 +142,17 @@ impl CompressionCodec {
pub(crate) fn compress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
match self {
CompressionCodec::None => Ok(bytes),
CompressionCodec::Lz4 => Err(Error::new(
ErrorKind::FeatureUnsupported,
"LZ4 compression is not supported currently",
)),
CompressionCodec::Lz4 => {
// The Puffin spec requires "LZ4 single compression frame with content size
// present" for footer payloads, so we set content_size on the frame header.
// See https://iceberg.apache.org/puffin-spec/#footer-payload
let frame_info = FrameInfo::new().content_size(Some(bytes.len() as u64));
let mut encoder = FrameEncoder::with_frame_info(frame_info, Vec::new());
encoder.write_all(&bytes)?;
encoder.finish().map_err(|e| {
Error::new(ErrorKind::Unexpected, "Failed to finish LZ4 frame").with_source(e)
})
}
CompressionCodec::Zstd(level) => {
let writer = Vec::<u8>::new();
let mut encoder = zstd::stream::Encoder::new(writer, *level as i32)?;
Expand Down Expand Up @@ -173,7 +183,10 @@ impl CompressionCodec {
///
/// # Errors
///
/// Returns an error for Lz4 and Zstd as they are not fully supported.
/// Returns an error for codecs without a canonical file-extension convention
/// (Lz4, Zstd, Snappy). LZ4 is fully supported for compression and decompression,
/// but is used in framed form (e.g., inside Puffin footers) where no separate
/// file suffix is required.
pub fn suffix(&self) -> Result<&'static str> {
match self {
CompressionCodec::None => Ok(""),
Expand Down Expand Up @@ -208,6 +221,7 @@ mod tests {
let bytes_vec = [0_u8; 100].to_vec();

let compression_codecs = [
CompressionCodec::Lz4,
CompressionCodec::zstd_default(),
CompressionCodec::gzip_default(),
];
Expand All @@ -220,25 +234,46 @@ mod tests {
}
}

#[tokio::test]
async fn test_compression_codec_lz4_roundtrip() {
let codec = CompressionCodec::Lz4;

// Empty input must round-trip cleanly.
let empty: Vec<u8> = vec![];
let compressed_empty = codec.compress(empty.clone()).unwrap();
assert_eq!(codec.decompress(compressed_empty).unwrap(), empty);

// Mixed-byte payload (less compressible than all-zeros) round-trips.
let payload: Vec<u8> = (0..10_000).map(|i| (i % 251) as u8).collect();
let compressed = codec.compress(payload.clone()).unwrap();
assert_eq!(codec.decompress(compressed).unwrap(), payload);

// Frame must begin with the LZ4 frame magic number 0x184D2204 (little-endian)
// per https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md.
let highly_compressible = vec![0u8; 10_000];
let compressed = codec.compress(highly_compressible).unwrap();
assert_eq!(&compressed[..4], &[0x04, 0x22, 0x4D, 0x18]);
}

#[tokio::test]
async fn test_compression_codec_unsupported() {
let unsupported_codecs = [
(CompressionCodec::Lz4, "LZ4"),
(CompressionCodec::Snappy, "Snappy"),
];
let bytes_vec = [0_u8; 100].to_vec();

for (codec, name) in unsupported_codecs {
assert_eq!(
codec.compress(bytes_vec.clone()).unwrap_err().to_string(),
format!("FeatureUnsupported => {name} compression is not supported currently"),
);
assert_eq!(
CompressionCodec::Snappy
.compress(bytes_vec.clone())
.unwrap_err()
.to_string(),
"FeatureUnsupported => Snappy compression is not supported currently",
);

assert_eq!(
codec.decompress(bytes_vec.clone()).unwrap_err().to_string(),
format!("FeatureUnsupported => {name} decompression is not supported currently"),
);
}
assert_eq!(
CompressionCodec::Snappy
.decompress(bytes_vec)
.unwrap_err()
.to_string(),
"FeatureUnsupported => Snappy decompression is not supported currently",
);
}

#[test]
Expand Down
23 changes: 14 additions & 9 deletions crates/iceberg/src/puffin/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,26 +548,31 @@ mod tests {
}

#[tokio::test]
async fn test_lz4_compressed_footer_returns_error() {
async fn test_lz4_compressed_footer_is_decoded() {
// Hand-built file with FooterPayloadCompressed=1 and a real LZ4 frame in place
// of the raw JSON. This confirms the reader honors the flag rather than
// assuming the payload is always uncompressed.
let temp_dir = TempDir::new().unwrap();

let compressed_payload = CompressionCodec::Lz4
.compress(empty_footer_payload_bytes())
.unwrap();

let mut bytes = vec![];
bytes.extend(FileMetadata::MAGIC.to_vec());
bytes.extend(FileMetadata::MAGIC.to_vec());
bytes.extend(empty_footer_payload_bytes());
bytes.extend(empty_footer_payload_bytes_length_bytes());
bytes.extend(&compressed_payload);
bytes.extend(u32::to_le_bytes(compressed_payload.len() as u32));
// FooterPayloadCompressed bit set in the first flag byte.
bytes.extend(vec![0b00000001, 0, 0, 0]);
bytes.extend(FileMetadata::MAGIC.to_vec());

let input_file = input_file_with_bytes(&temp_dir, &bytes).await;

assert_eq!(
FileMetadata::read(&input_file)
.await
.unwrap_err()
.to_string(),
"FeatureUnsupported => LZ4 decompression is not supported currently",
)
FileMetadata::read(&input_file).await.unwrap(),
empty_footer_payload()
);
}

#[tokio::test]
Expand Down
66 changes: 59 additions & 7 deletions crates/iceberg/src/puffin/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,65 @@ mod tests {
let blobs = vec![blob_0(), blob_1()];
let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Lz4);

assert_eq!(
write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
.await
.unwrap_err()
.to_string(),
"FeatureUnsupported => LZ4 compression is not supported currently"
);
let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
.await
.unwrap()
.to_input_file();

// Blob round-trip must yield the original bytes after LZ4 framed decompression.
assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs);
}

/// Regression for https://github.com/apache/iceberg-rust/issues/2419 —
/// the PuffinWriter previously claimed LZ4 compression on the footer (by setting
/// the FooterPayloadCompressed flag) but wrote the raw uncompressed JSON, which
/// produced unreadable files. The writer now actually LZ4-encodes the footer when
/// compress_footer=true, and the reader round-trips it.
#[tokio::test]
async fn test_compress_footer_lz4_round_trips() {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIO::new_with_fs();
let path = temp_dir.path().join("compressed_footer.bin");
let output_file = file_io.new_output(path.to_str().unwrap()).unwrap();

// compress_footer=true sets the footer codec to LZ4.
let mut writer = PuffinWriter::new(&output_file, file_properties(), true)
.await
.unwrap();
writer.add(blob_0(), CompressionCodec::None).await.unwrap();
writer.close().await.unwrap();

// Reader must be able to LZ4-decompress the footer and recover both the
// file metadata and the blob payload.
let input_file = output_file.to_input_file();
let metadata = FileMetadata::read(&input_file).await.unwrap();
assert_eq!(metadata.properties, file_properties());
assert_eq!(metadata.blobs.len(), 1);
assert_eq!(read_all_blobs_from_puffin_file(input_file).await, vec![
blob_0()
]);
}

/// Direct adaptation of the reproducer from
/// https://github.com/apache/iceberg-rust/issues/2419 — close must succeed when
/// compress_footer=true even with no blobs written.
#[tokio::test]
async fn test_compress_empty_footer_lz4_succeeds() {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIO::new_with_fs();
let path = temp_dir.path().join("compressed_empty_footer.bin");
let output_file = file_io.new_output(path.to_str().unwrap()).unwrap();

let writer = PuffinWriter::new(&output_file, HashMap::new(), true)
.await
.unwrap();
writer.close().await.unwrap();

// The compressed empty footer must still parse back to an empty FileMetadata.
let input_file = output_file.to_input_file();
let metadata = FileMetadata::read(&input_file).await.unwrap();
assert!(metadata.blobs.is_empty());
assert!(metadata.properties.is_empty());
}

async fn get_file_as_byte_vec(input_file: InputFile) -> Vec<u8> {
Expand Down
Loading