Skip to content

Commit 65637e5

Browse files
author
Jan Kaul
committed
write documentaion
1 parent 8807ee7 commit 65637e5

2 files changed

Lines changed: 258 additions & 0 deletions

File tree

iceberg-rust/src/table/manifest.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,41 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
365365
})
366366
}
367367

368+
/// Creates a ManifestWriter from an existing manifest file with selective filtering of entries.
369+
///
370+
/// This method reads an existing manifest file and creates a new writer that includes
371+
/// only the entries whose file paths are NOT in the provided filter set. Entries that
372+
/// pass the filter have their status updated to "Existing" and their sequence numbers
373+
/// and snapshot IDs updated as needed.
374+
///
375+
/// This is particularly useful for overwrite operations where specific files need to be
376+
/// excluded from the new manifest while preserving other existing entries.
377+
///
378+
/// # Arguments
379+
/// * `bytes` - The raw bytes of the existing manifest file
380+
/// * `manifest` - The manifest list entry describing the existing manifest
381+
/// * `filter` - A set of file paths to exclude from the new manifest
382+
/// * `schema` - The Avro schema used for serializing manifest entries
383+
/// * `table_metadata` - The table metadata containing schema and partition information
384+
/// * `branch` - Optional branch name to get the current schema from
385+
///
386+
/// # Returns
387+
/// * `Result<Self, Error>` - A new ManifestWriter instance or an error if initialization fails
388+
///
389+
/// # Errors
390+
/// Returns an error if:
391+
/// * The existing manifest cannot be read
392+
/// * The Avro writer cannot be created
393+
/// * Required metadata fields cannot be serialized
394+
/// * The partition spec ID is not found in table metadata
395+
///
396+
/// # Behavior
397+
/// - Entries whose file paths are in the `filter` set are excluded from the new manifest
398+
/// - Remaining entries have their status set to `Status::Existing`
399+
/// - Sequence numbers are updated for entries that don't have them
400+
/// - Snapshot IDs are updated for entries that don't have them
401+
/// - The manifest's sequence number is incremented
402+
/// - File counts are updated to reflect the filtered entries
368403
pub(crate) fn from_existing_with_filter(
369404
bytes: &[u8],
370405
mut manifest: ManifestListEntry,

iceberg-rust/src/table/manifest_list.rs

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,53 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
334334
})
335335
}
336336

337+
/// Creates a ManifestListWriter from an existing manifest list, excluding manifests scheduled for overwriting.
338+
///
339+
/// This constructor is specifically designed for overwrite operations where certain manifests
340+
/// need to be replaced while preserving others. It analyzes an existing manifest list and:
341+
/// 1. Identifies manifests that should be overwritten (excluded from the new manifest list)
342+
/// 2. Selects compatible manifests that can be reused for appending new data
343+
/// 3. Copies non-selected, non-overwritten manifests to the new manifest list
344+
/// 4. Returns both the writer and the list of manifests that will be overwritten
345+
///
346+
/// This approach optimizes overwrite operations by:
347+
/// - Avoiding unnecessary rewrites of unaffected manifests
348+
/// - Providing efficient append capabilities for new data
349+
/// - Returning metadata about what will be overwritten for cleanup operations
350+
///
351+
/// # Arguments
352+
/// * `bytes` - The raw bytes of the existing manifest list file
353+
/// * `data_files` - Iterator over new data files to be appended
354+
/// * `manifests_to_overwrite` - Set of manifest paths that should be excluded/overwritten
355+
/// * `schema` - The Avro schema to use for manifest list serialization
356+
/// * `table_metadata` - Reference to the table metadata for partition field information
357+
/// * `branch` - Optional branch name for multi-branch table operations
358+
///
359+
/// # Returns
360+
/// * `Result<(Self, Vec<ManifestListEntry>), Error>` - A tuple containing:
361+
/// - A new ManifestListWriter instance with selected manifest for appends
362+
/// - A vector of ManifestListEntry objects that will be overwritten
363+
///
364+
/// # Errors
365+
/// Returns an error if:
366+
/// * The existing manifest list cannot be parsed
367+
/// * Partition fields cannot be retrieved from table metadata
368+
/// * Partition boundary computation fails
369+
/// * Manifest selection logic fails
370+
/// * The Avro writer cannot be initialized
371+
///
372+
/// # Example Usage
373+
/// ```ignore
374+
/// let manifests_to_overwrite = HashSet::from(["manifest1.avro", "manifest2.avro"]);
375+
/// let (writer, overwritten_manifests) = ManifestListWriter::from_existing_without_overwrites(
376+
/// &existing_manifest_list_bytes,
377+
/// new_data_files.iter(),
378+
/// &manifests_to_overwrite,
379+
/// &manifest_list_schema,
380+
/// &table_metadata,
381+
/// Some("main"),
382+
/// )?;
383+
/// ```
337384
pub(crate) fn from_existing_without_overwrites<'datafiles>(
338385
bytes: &[u8],
339386
data_files: impl Iterator<Item = &'datafiles DataFile>,
@@ -494,6 +541,60 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
494541
.await
495542
}
496543

544+
/// Appends data files to a single manifest with optional filtering and finalizes the manifest list.
545+
///
546+
/// This method extends the basic `append_and_finish` functionality by providing the ability to
547+
/// filter data files during the append process. It creates a single manifest file containing
548+
/// the provided data files (after filtering), either by appending to an existing reusable
549+
/// manifest or creating a new one.
550+
///
551+
/// The filtering capability is particularly useful for:
552+
/// - Excluding certain files from being included in the manifest
553+
/// - Conditional processing based on file properties or metadata
554+
/// - Implementing custom business logic during manifest creation
555+
/// - Selective processing of existing manifest entries when reusing manifests
556+
///
557+
/// This approach is optimal for:
558+
/// - Small to medium append operations with conditional logic
559+
/// - Cases where certain files need to be excluded or processed differently
560+
/// - Operations requiring custom filtering logic during manifest creation
561+
///
562+
/// The process:
563+
/// 1. Determines whether to reuse an existing manifest or create a new one
564+
/// 2. If reusing, applies the filter when reading existing manifest entries
565+
/// 3. Creates/updates a manifest writer with the selected manifest
566+
/// 4. Appends all provided data files to the manifest
567+
/// 5. Finalizes the manifest and writes it to storage
568+
/// 6. Adds the manifest entry to the manifest list
569+
/// 7. Writes the complete manifest list to storage
570+
///
571+
/// # Arguments
572+
/// * `data_files` - Iterator over manifest entries to append
573+
/// * `snapshot_id` - The snapshot ID for the new manifest
574+
/// * `filter` - Optional filter function to apply to existing manifest entries when reusing
575+
/// * `object_store` - The object store for writing files
576+
///
577+
/// # Returns
578+
/// * `Result<String, Error>` - The location of the new manifest list file or an error
579+
///
580+
/// # Errors
581+
/// Returns an error if:
582+
/// * Partition field retrieval fails
583+
/// * Manifest schema creation fails
584+
/// * Manifest writer creation or operation fails
585+
/// * Object storage operations fail
586+
/// * Avro serialization fails
587+
/// * Filter function encounters an error
588+
///
589+
/// # Example Usage
590+
/// ```ignore
591+
/// let manifest_list_location = writer.append_filtered_and_finish(
592+
/// data_files_iter,
593+
/// snapshot_id,
594+
/// Some(|entry| entry.as_ref().map(|e| e.status() == &Status::Added).unwrap_or(false)),
595+
/// object_store,
596+
/// ).await?;
597+
/// ```
497598
pub(crate) async fn append_filtered_and_finish(
498599
mut self,
499600
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
@@ -644,6 +745,67 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
644745
.await
645746
}
646747

748+
/// Appends data files across multiple manifests with optional filtering and finalizes the manifest list.
749+
///
750+
/// This method extends the `append_multiple_and_finish` functionality by providing the ability to
751+
/// filter data files during the append and splitting process. It distributes the data files
752+
/// (after filtering) across the specified number of splits based on partition boundaries,
753+
/// optimizing for large operations that require conditional processing.
754+
///
755+
/// The filtering capability is particularly useful for:
756+
/// - Excluding certain files from being included in any manifest
757+
/// - Conditional processing based on file properties, status, or metadata
758+
/// - Implementing custom business logic during large-scale manifest operations
759+
/// - Selective processing of existing manifest entries when reusing manifests
760+
/// - Complex overwrite scenarios where certain entries need special handling
761+
///
762+
/// This approach is optimal for:
763+
/// - Large append operations with hundreds or thousands of files requiring filtering
764+
/// - Partitioned tables where files need both splitting and filtering
765+
/// - Complex operations combining append, overwrite, and conditional logic
766+
/// - Cases requiring high query parallelism with selective data inclusion
767+
///
768+
/// The process:
769+
/// 1. Computes optimal partition boundaries for splitting
770+
/// 2. If reusing an existing manifest, applies filter when reading existing entries
771+
/// 3. Merges new data files with filtered existing files from selected manifest
772+
/// 4. Splits all files across the specified number of manifest files
773+
/// 5. Creates and writes multiple manifest files concurrently
774+
/// 6. Adds all manifest entries to the manifest list
775+
/// 7. Writes the complete manifest list to storage
776+
///
777+
/// # Arguments
778+
/// * `data_files` - Iterator over manifest entries to append and split
779+
/// * `snapshot_id` - The snapshot ID for the new manifests
780+
/// * `n_splits` - The number of manifest files to create (should match `n_splits()` result)
781+
/// * `filter` - Optional filter function to apply to existing manifest entries when reusing
782+
/// * `object_store` - The object store for writing files
783+
///
784+
/// # Returns
785+
/// * `Result<String, Error>` - The location of the new manifest list file or an error
786+
///
787+
/// # Errors
788+
/// Returns an error if:
789+
/// * Partition field retrieval fails
790+
/// * Manifest schema creation fails
791+
/// * File splitting logic fails
792+
/// * Manifest writer creation or operation fails
793+
/// * Concurrent manifest writing fails
794+
/// * Object storage operations fail
795+
/// * Avro serialization fails
796+
/// * Filter function encounters an error
797+
///
798+
/// # Example Usage
799+
/// ```ignore
800+
/// let n_splits = writer.n_splits(data_files.len());
801+
/// let manifest_list_location = writer.append_multiple_filtered_and_finish(
802+
/// data_files_iter,
803+
/// snapshot_id,
804+
/// n_splits,
805+
/// Some(|entry| entry.as_ref().map(|e| e.status() != &Status::Deleted).unwrap_or(false)),
806+
/// object_store,
807+
/// ).await?;
808+
/// ```
647809
pub(crate) async fn append_multiple_filtered_and_finish(
648810
mut self,
649811
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
@@ -764,6 +926,67 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
764926
Ok(new_manifest_list_location)
765927
}
766928

929+
/// Processes manifests for overwrite operations by filtering out specific data files.
930+
///
931+
/// This method is specifically designed for complex overwrite scenarios where certain data files
932+
/// within existing manifests need to be removed while preserving others. It processes a list of
933+
/// manifests, filters out specified data files from each one, and adds the filtered manifests
934+
/// to the manifest list being constructed.
935+
///
936+
/// This operation is essential for:
937+
/// - **Overwrite operations**: Removing specific files that are being replaced by new data
938+
/// - **Partial table updates**: Selectively removing files while keeping others
939+
/// - **Data deduplication**: Filtering out duplicate or obsolete data files
940+
/// - **Complex merge operations**: Managing file-level changes during table merges
941+
///
942+
/// The method operates at the manifest level rather than the manifest list level, providing
943+
/// fine-grained control over which data files are included in the final table state.
944+
///
945+
/// The process:
946+
/// 1. Processes each manifest in the provided list concurrently
947+
/// 2. For each manifest, retrieves the list of data files to filter out
948+
/// 3. Loads the manifest content from object storage
949+
/// 4. Creates a new manifest location and updates the manifest path
950+
/// 5. Uses `ManifestWriter::from_existing_with_filter` to exclude specified files
951+
/// 6. Writes the filtered manifest to storage with a new location
952+
/// 7. Adds the new manifest entry to the manifest list being constructed
953+
///
954+
/// # Arguments
955+
/// * `manifests_to_overwrite` - Vector of manifest list entries to process and filter
956+
/// * `data_files_to_filter` - Map from manifest path to list of data file paths to exclude
957+
/// * `object_store` - The object store for reading existing and writing new manifest files
958+
///
959+
/// # Returns
960+
/// * `Result<(), Error>` - Ok if all manifests were successfully processed and filtered
961+
///
962+
/// # Errors
963+
/// Returns an error if:
964+
/// * A manifest path is not found in the `data_files_to_filter` map
965+
/// * Object storage operations fail (reading existing or writing new manifests)
966+
/// * Manifest parsing or writing operations fail
967+
/// * Avro serialization fails
968+
/// * Concurrent processing encounters errors
969+
///
970+
/// # Example Usage
971+
/// ```ignore
972+
/// let mut manifest_list_writer = ManifestListWriter::new(...)?;
973+
/// let data_files_to_filter = HashMap::from([
974+
/// ("manifest1.avro".to_string(), vec!["file1.parquet".to_string(), "file2.parquet".to_string()]),
975+
/// ("manifest2.avro".to_string(), vec!["file3.parquet".to_string()]),
976+
/// ]);
977+
///
978+
/// manifest_list_writer.append_and_filter(
979+
/// manifests_to_overwrite,
980+
/// &data_files_to_filter,
981+
/// object_store,
982+
/// ).await?;
983+
/// ```
984+
///
985+
/// # Implementation Notes
986+
/// - Manifests are processed concurrently for optimal performance
987+
/// - Each filtered manifest gets a new location to avoid conflicts
988+
/// - The method modifies the manifest list writer's internal state by adding filtered manifests
989+
/// - This method is typically called as part of a larger overwrite operation workflow
767990
pub(crate) async fn append_and_filter(
768991
&mut self,
769992
manifests_to_overwrite: Vec<ManifestListEntry>,

0 commit comments

Comments
 (0)