Skip to content

Spark: Add session configs for adaptive split sizing and read parallelism#16088

Open
karuppayya wants to merge 2 commits into
apache:mainfrom
karuppayya:ICEBERG-15988
Open

Spark: Add session configs for adaptive split sizing and read parallelism#16088
karuppayya wants to merge 2 commits into
apache:mainfrom
karuppayya:ICEBERG-15988

Conversation

@karuppayya
Copy link
Copy Markdown
Contributor

Summary

  • Add session configs to control adaptive split sizing and allow configurable read parallelism
  • Resolves ICEBERG-15988

Changes

  • Added configs to enable/disable adaptive split sizing at session level
  • Added config to config read split parallelism count

Test plan

  • Unit tests added

@github-actions github-actions Bot added the spark label Apr 23, 2026
@karuppayya
Copy link
Copy Markdown
Contributor Author

Tagging reviewers from the PR that introduced the change @rdblue @ConeyLiu @aokolnychyi
cc: @RussellSpitzer


public Integer splitParallelism() {
Integer parallelism =
confParser.intConf().sessionConf(SparkSQLProperties.READ_SPLIT_PARALLELISM).parseOptional();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only session conf? ADAPTIVE_SPLIT_SIZE get's table versions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is runtime property(for Spark it depends on cores+memory of the application)
Do we want to make it part of table property?

Comment thread spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java Outdated
Comment thread spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java Outdated
public static final String READ_ADAPTIVE_SPLIT_SIZE_ENABLED =
"spark.sql.iceberg.read.adaptive-split-size.enabled";

// Controls the parallelism used for adaptive split sizing
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this masks the fact that this parameter overrides parallelism(), keeping the default here I think would let you have a clearer doc too

assertThat(description).contains("endSnapshotId=" + endSnapshotId);
});
}

Copy link
Copy Markdown
Member

@RussellSpitzer RussellSpitzer May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few notes on the tests here,

These don't seem to be in the right place. This class is for testing the scan object and neither of these tests actually touch the spark scan, they are both essentially just parsing checkings. It may be time to start a TestSparkReadConf file for these sorts of tests

or

Actually invoke the scan here and show how the properties are changing things.

We should break out assertions, currently there are mutliple parse cases being handled in the same test.

There should probably be something like

Test Adapative enabled
test Invalid parallelism
Test ...

  1. Make sure we cover both the positive and negative cases. If we say a value is invalid we should have a test throwing an error when that value is passed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Created a seperate test file
    2 and 3 handled

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Adaptive split sizing using shuffle partitions for parallelism causes aggressive scaling

2 participants