Skip to content

Commit b159444

Browse files
author
Jan Kaul
committed
introduce ManifestListWriter
1 parent 0c1a026 commit b159444

2 files changed

Lines changed: 356 additions & 209 deletions

File tree

iceberg-rust/src/table/manifest_list.rs

Lines changed: 309 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@ use std::{
88
sync::Arc,
99
};
1010

11-
use apache_avro::{types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema};
11+
use apache_avro::{
12+
types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema, Writer as AvroWriter,
13+
};
1214
use iceberg_rust_spec::{
15+
manifest::{partition_value_schema, DataFile, ManifestEntry, Status},
1316
manifest_list::{
1417
avro_value_to_manifest_list_entry, manifest_list_schema_v1, manifest_list_schema_v2,
1518
ManifestListEntry,
@@ -19,8 +22,26 @@ use iceberg_rust_spec::{
1922
util::strip_prefix,
2023
};
2124
use object_store::ObjectStore;
25+
use smallvec::SmallVec;
26+
27+
use crate::{
28+
error::Error,
29+
util::{summary_to_rectangle, Rectangle},
30+
};
2231

23-
use crate::error::Error;
32+
use super::{
33+
manifest::{ManifestReader, ManifestWriter},
34+
transaction::{
35+
append::{
36+
select_manifest_partitioned, select_manifest_unpartitioned, split_datafiles,
37+
SelectedManifest,
38+
},
39+
operation::{
40+
bounding_partition_values, compute_n_splits, new_manifest_list_location,
41+
new_manifest_location, prefetch_manifest,
42+
},
43+
},
44+
};
2445

2546
type ReaderZip<'a, 'metadata, R> = Zip<AvroReader<'a, R>, Repeat<&'metadata TableMetadata>>;
2647
type ReaderMap<'a, 'metadata, R> = Map<
@@ -114,4 +135,289 @@ pub(crate) async fn read_snapshot<'metadata>(
114135
.await?
115136
.into(),
116137
);
117-
ManifestListReader::new(bytes, table_metadata)}
138+
ManifestListReader::new(bytes, table_metadata)
139+
}
140+
141+
pub(crate) struct ManifestListWriter<'schema, 'metadata> {
142+
table_metadata: &'metadata TableMetadata,
143+
writer: AvroWriter<'schema, Vec<u8>>,
144+
selected_manifest: Option<ManifestListEntry>,
145+
bounding_partition_values: Rectangle,
146+
n_existing_files: usize,
147+
branch: Option<String>,
148+
}
149+
150+
impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
151+
pub(crate) fn new<'datafiles>(
152+
data_files: impl Iterator<Item = &'datafiles DataFile>,
153+
schema: &'schema AvroSchema,
154+
table_metadata: &'metadata TableMetadata,
155+
branch: Option<&str>,
156+
) -> Result<Self, Error> {
157+
let partition_fields = table_metadata.current_partition_fields(branch)?;
158+
159+
let partition_column_names = partition_fields
160+
.iter()
161+
.map(|x| x.name())
162+
.collect::<SmallVec<[_; 4]>>();
163+
164+
let bounding_partition_values =
165+
bounding_partition_values(data_files, &partition_column_names)?;
166+
167+
let writer = AvroWriter::new(schema, Vec::new());
168+
169+
Ok(Self {
170+
table_metadata,
171+
writer,
172+
selected_manifest: None,
173+
bounding_partition_values,
174+
n_existing_files: 0,
175+
branch: branch.map(ToOwned::to_owned),
176+
})
177+
}
178+
179+
pub(crate) fn from_existing<'datafiles>(
180+
bytes: &[u8],
181+
data_files: impl Iterator<Item = &'datafiles DataFile>,
182+
schema: &'schema AvroSchema,
183+
table_metadata: &'metadata TableMetadata,
184+
branch: Option<&str>,
185+
) -> Result<Self, Error> {
186+
let partition_fields = table_metadata.current_partition_fields(branch)?;
187+
188+
let partition_column_names = partition_fields
189+
.iter()
190+
.map(|x| x.name())
191+
.collect::<SmallVec<[_; 4]>>();
192+
193+
let bounding_partition_values =
194+
bounding_partition_values(data_files, &partition_column_names)?;
195+
196+
let manifest_list_reader = ManifestListReader::new(bytes, table_metadata)?;
197+
198+
let mut writer = AvroWriter::new(schema, Vec::new());
199+
200+
let SelectedManifest {
201+
manifest,
202+
file_count_all_entries,
203+
} = if partition_column_names.is_empty() {
204+
select_manifest_unpartitioned(manifest_list_reader, &mut writer)?
205+
} else {
206+
select_manifest_partitioned(
207+
manifest_list_reader,
208+
&mut writer,
209+
&bounding_partition_values,
210+
)?
211+
};
212+
213+
Ok(Self {
214+
table_metadata,
215+
writer,
216+
selected_manifest: Some(manifest),
217+
bounding_partition_values,
218+
n_existing_files: file_count_all_entries,
219+
branch: branch.map(ToOwned::to_owned),
220+
})
221+
}
222+
223+
pub(crate) fn n_splits(&self, n_data_files: usize) -> u32 {
224+
let selected_manifest_file_count = self
225+
.selected_manifest
226+
.as_ref()
227+
.and_then(|selected_manifest| {
228+
match (
229+
selected_manifest.existing_files_count,
230+
selected_manifest.added_files_count,
231+
) {
232+
(Some(x), Some(y)) => Some(x + y),
233+
(Some(x), None) => Some(x),
234+
(None, Some(y)) => Some(y),
235+
(None, None) => None,
236+
}
237+
})
238+
.unwrap_or(0) as usize;
239+
240+
compute_n_splits(
241+
self.n_existing_files,
242+
n_data_files,
243+
selected_manifest_file_count,
244+
)
245+
}
246+
247+
pub(crate) async fn append_and_finish(
248+
mut self,
249+
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
250+
snapshot_id: i64,
251+
object_store: Arc<dyn ObjectStore>,
252+
) -> Result<String, Error> {
253+
let selected_manifest_bytes_opt = prefetch_manifest(&self.selected_manifest, &object_store);
254+
255+
let partition_fields = self
256+
.table_metadata
257+
.current_partition_fields(self.branch.as_deref())?;
258+
259+
let manifest_schema = ManifestEntry::schema(
260+
&partition_value_schema(&partition_fields)?,
261+
&self.table_metadata.format_version,
262+
)?;
263+
264+
let commit_uuid = &uuid::Uuid::new_v4().to_string();
265+
266+
let mut manifest_writer = if let (Some(mut manifest), Some(manifest_bytes)) =
267+
(self.selected_manifest, selected_manifest_bytes_opt)
268+
{
269+
let manifest_bytes = manifest_bytes.await??;
270+
271+
manifest.manifest_path =
272+
new_manifest_location(&self.table_metadata.location, commit_uuid, 0);
273+
274+
ManifestWriter::from_existing(
275+
&manifest_bytes,
276+
manifest,
277+
&manifest_schema,
278+
self.table_metadata,
279+
self.branch.as_deref(),
280+
)?
281+
} else {
282+
let manifest_location =
283+
new_manifest_location(&self.table_metadata.location, commit_uuid, 0);
284+
285+
ManifestWriter::new(
286+
&manifest_location,
287+
snapshot_id,
288+
&manifest_schema,
289+
self.table_metadata,
290+
self.branch.as_deref(),
291+
)?
292+
};
293+
294+
for manifest_entry in data_files {
295+
manifest_writer.append(manifest_entry?)?;
296+
}
297+
298+
let manifest = manifest_writer.finish(object_store.clone()).await?;
299+
300+
self.writer.append_ser(manifest)?;
301+
302+
let new_manifest_list_location =
303+
new_manifest_list_location(&self.table_metadata.location, snapshot_id, 0, commit_uuid);
304+
305+
let manifest_list_bytes = self.writer.into_inner()?;
306+
307+
object_store
308+
.put(
309+
&strip_prefix(&new_manifest_list_location).into(),
310+
manifest_list_bytes.into(),
311+
)
312+
.await?;
313+
314+
Ok(new_manifest_list_location)
315+
}
316+
317+
pub(crate) async fn append_split_and_finish(
318+
mut self,
319+
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
320+
snapshot_id: i64,
321+
n_splits: u32,
322+
object_store: Arc<dyn ObjectStore>,
323+
) -> Result<String, Error> {
324+
let partition_fields = self
325+
.table_metadata
326+
.current_partition_fields(self.branch.as_deref())?;
327+
328+
let partition_column_names = partition_fields
329+
.iter()
330+
.map(|x| x.name())
331+
.collect::<SmallVec<[_; 4]>>();
332+
333+
let manifest_schema = ManifestEntry::schema(
334+
&partition_value_schema(&partition_fields)?,
335+
&self.table_metadata.format_version,
336+
)?;
337+
338+
let bounds = self
339+
.selected_manifest
340+
.as_ref()
341+
.and_then(|x| x.partitions.as_deref())
342+
.map(summary_to_rectangle)
343+
.transpose()?
344+
.map(|mut x| {
345+
x.expand(&self.bounding_partition_values);
346+
x
347+
})
348+
.unwrap_or(self.bounding_partition_values);
349+
350+
let selected_manifest_bytes_opt = prefetch_manifest(&self.selected_manifest, &object_store);
351+
352+
let commit_uuid = &uuid::Uuid::new_v4().to_string();
353+
// Split datafiles
354+
let splits = if let (Some(manifest), Some(manifest_bytes)) =
355+
(self.selected_manifest, selected_manifest_bytes_opt)
356+
{
357+
let manifest_bytes = manifest_bytes.await??;
358+
let manifest_reader = ManifestReader::new(&*manifest_bytes)?.map(|entry| {
359+
let mut entry = entry?;
360+
*entry.status_mut() = Status::Existing;
361+
if entry.sequence_number().is_none() {
362+
*entry.sequence_number_mut() = Some(manifest.sequence_number);
363+
}
364+
if entry.snapshot_id().is_none() {
365+
*entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
366+
}
367+
Ok(entry)
368+
});
369+
370+
split_datafiles(
371+
data_files.chain(manifest_reader),
372+
bounds,
373+
&partition_column_names,
374+
n_splits,
375+
)?
376+
} else {
377+
split_datafiles(data_files, bounds, &partition_column_names, n_splits)?
378+
};
379+
380+
let manifest_futures = splits
381+
.into_iter()
382+
.enumerate()
383+
.map(|(i, entries)| {
384+
let manifest_location =
385+
new_manifest_location(&self.table_metadata.location, commit_uuid, i);
386+
387+
let mut manifest_writer = ManifestWriter::new(
388+
&manifest_location,
389+
snapshot_id,
390+
&manifest_schema,
391+
self.table_metadata,
392+
self.branch.as_deref(),
393+
)?;
394+
395+
for manifest_entry in entries {
396+
manifest_writer.append(manifest_entry)?;
397+
}
398+
399+
Ok::<_, Error>(manifest_writer.finish(object_store.clone()))
400+
})
401+
.collect::<Result<Vec<_>, _>>()?;
402+
403+
let manifests = futures::future::try_join_all(manifest_futures).await?;
404+
405+
for manifest in manifests {
406+
self.writer.append_ser(manifest)?;
407+
}
408+
409+
let new_manifest_list_location =
410+
new_manifest_list_location(&self.table_metadata.location, snapshot_id, 0, commit_uuid);
411+
412+
let manifest_list_bytes = self.writer.into_inner()?;
413+
414+
object_store
415+
.put(
416+
&strip_prefix(&new_manifest_list_location).into(),
417+
manifest_list_bytes.into(),
418+
)
419+
.await?;
420+
421+
Ok(new_manifest_list_location)
422+
}
423+
}

0 commit comments

Comments
 (0)