Skip to content

Commit b162972

Browse files
authored
Merge pull request JanKaul#172 from JanKaul/fix/commit-empty-recordbatch-stream
Fix/commit empty recordbatch stream
2 parents eba39e3 + 424b5b0 commit b162972

2 files changed

Lines changed: 19 additions & 17 deletions

File tree

iceberg-rust/src/table/transaction/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,10 @@ impl<'table> TableTransaction<'table> {
344344
updates.extend(update);
345345
}
346346

347+
if updates.is_empty() {
348+
return Ok(());
349+
}
350+
347351
let new_table = catalog
348352
.clone()
349353
.update_table(CommitTable {

iceberg-rust/src/table/transaction/operation.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,8 @@ impl Operation {
106106
let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?;
107107

108108
let snapshot_operation = match (data_files.len(), delete_files.len()) {
109-
(0, 0) => Err(Error::InvalidFormat(
110-
"Empty data and delete files".to_string(),
111-
)),
112-
(_, 0) => Ok(SnapshotOperation::Append),
109+
(0, 0) => return Ok((None, Vec::new())),
110+
(_, 0) => Ok::<_, Error>(SnapshotOperation::Append),
113111
(0, _) => Ok(SnapshotOperation::Delete),
114112
(_, _) => Ok(SnapshotOperation::Overwrite),
115113
}?;
@@ -279,18 +277,17 @@ impl Operation {
279277
(selected_manifest_opt, selected_manifest_bytes_opt)
280278
{
281279
let manifest_bytes = manifest_bytes.await??;
282-
let manifest_reader = ManifestReader::new(&*manifest_bytes)?
283-
.map(|entry| {
284-
let mut entry = entry?;
285-
*entry.status_mut() = Status::Existing;
286-
if entry.sequence_number().is_none() {
287-
*entry.sequence_number_mut() = Some(manifest.sequence_number);
288-
}
289-
if entry.snapshot_id().is_none() {
290-
*entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
291-
}
292-
Ok(entry)
293-
});
280+
let manifest_reader = ManifestReader::new(&*manifest_bytes)?.map(|entry| {
281+
let mut entry = entry?;
282+
*entry.status_mut() = Status::Existing;
283+
if entry.sequence_number().is_none() {
284+
*entry.sequence_number_mut() = Some(manifest.sequence_number);
285+
}
286+
if entry.snapshot_id().is_none() {
287+
*entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
288+
}
289+
Ok(entry)
290+
});
294291

295292
split_datafiles(
296293
new_datafile_iter.chain(manifest_reader),
@@ -459,7 +456,8 @@ impl Operation {
459456
let bounding_partition_values = files
460457
.iter()
461458
.try_fold(None, |acc, x| {
462-
let node = partition_struct_to_vec(x.partition(), &partition_column_names)?;
459+
let node =
460+
partition_struct_to_vec(x.partition(), &partition_column_names)?;
463461
let Some(mut acc) = acc else {
464462
return Ok::<_, Error>(Some(Rectangle::new(node.clone(), node)));
465463
};

0 commit comments

Comments
 (0)