Skip to content

Core, Spark 4.1: Add K-way merge rewrite strategy for pre-sorted data files#16305

Open
anuragmantri wants to merge 2 commits into
apache:mainfrom
anuragmantri:spark-4.1-kway-merge
Open

Core, Spark 4.1: Add K-way merge rewrite strategy for pre-sorted data files#16305
anuragmantri wants to merge 2 commits into
apache:mainfrom
anuragmantri:spark-4.1-kway-merge

Conversation

@anuragmantri
Copy link
Copy Markdown
Contributor

This PR adds a K-way merge compaction strategy to the RewriteDataFiles action. K-way merge rewrites pre-sorted data files by streaming-merging them in sort-key order without shuffle, preserving the table's sort order in output files.

K-way merge is intended for tables that are already sorted but have accumulated multiple overlapping files per partition (e.g., after daily ingestion into a previously sort-compacted table). It achieves the same output as SORT but eliminates shuffle and spill entirely.

Implementation summary:

  1. API: New kWayMerge() default method on RewriteDataFiles interface. The table must have a defined sort order.
  2. Planner (KWayMergeRewriteFilePlanner): Extends SizeBasedFileRewritePlanner with sort-key-ordered file grouping. Files are sorted by their lower bounds on the first sort field before bin-packing, ensuring groups cover contiguous key ranges. Includes includeColumnStats() on the table scan to load file bounds for planning.
  3. Runner (SparkKWayMergeFileRewriteRunner): Opens all files in a group as streaming iterators, applies GenericDeleteFilter for position/equality deletes, then merges using SortedMerge (priority queue). Output is written via GenericAppenderFactory with size-based file rotation and partition-change detection.
  4. Range parallelism: For large groups, the runner splits files into ranges by total size and processes ranges in parallel via jsc.parallelize(). Range assignments are broadcast to executors. Controlled by range-parallelism-enabled (default true), ranges-per-group (default 25), and min-files-for-range-parallelism (default 10).

Constraints:

  1. All input files must have a valid sort_order_id > 0 matching the table's sort order. Files without sort metadata are rejected with a clear error.
  2. Uses Iceberg's generic reader/writer stack (row-by-row), not Spark's vectorized path. This means higher per-record overhead than sort, but zero shuffle I/O.
  3. Output files are individually sorted. When range parallelism is enabled, files from different ranges may have overlapping key ranges (consistent with how SORT behaves).

Usage:

-- Via procedure                                                                                                                                        
CALL catalog.system.rewrite_data_files(                                                                                                                 
    table => 'db.my_table',                                                                                                                               
    strategy => 'k-way-merge',                                                                                                                            
    options => map('max-concurrent-file-group-rewrites', '100')                                                                                           
  ) 
// Via action API                                                                                                                                       
SparkActions.get()                                                                                                                                      
   .rewriteDataFiles(table)                                                                                                                            
    .kWayMerge()                                                                                                                                        
    .option("max-concurrent-file-group-rewrites", "100")                                                                                                
    .execute(); 

AI Usage: I used Claude Opus 4.7 for code generation, test writing, and review. I manually reviewed and validated all generated code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant