Skip to content

Commit 01e8ce2

Browse files
Fokkosungwy
andcommitted
Allow setting write.parquet.row-group-limit (#1016)
* Allow setting `write.parquet.row-group-limit` And update the docs * Add test * Make ruff happy --------- Co-authored-by: Sung Yun <107272191+sungwy@users.noreply.github.com>
1 parent fbf72f6 commit 01e8ce2

4 files changed

Lines changed: 49 additions & 11 deletions

File tree

mkdocs/docs/configuration.md

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,16 @@ Iceberg tables support table properties to configure table behavior.
2828

2929
### Write options
3030

31-
| Key | Options | Default | Description |
32-
| --------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- |
33-
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
34-
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
35-
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
36-
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk |
37-
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
38-
| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit |
31+
| Key | Options | Default | Description |
32+
| -------------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- |
33+
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
34+
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
35+
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
36+
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
37+
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk |
38+
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
39+
| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit |
40+
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
3941

4042
### Table behavior options
4143

pyiceberg/io/pyarrow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2137,8 +2137,8 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
21372137
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
21382138
row_group_size = PropertyUtil.property_as_int(
21392139
properties=table_metadata.properties,
2140-
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
2141-
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
2140+
property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT,
2141+
default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT,
21422142
)
21432143

21442144
def write_parquet(task: WriteTask) -> DataFile:

pyiceberg/table/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ class TableProperties:
173173
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB
174174

175175
PARQUET_ROW_GROUP_LIMIT = "write.parquet.row-group-limit"
176-
PARQUET_ROW_GROUP_LIMIT_DEFAULT = 128 * 1024 * 1024 # 128 MB
176+
PARQUET_ROW_GROUP_LIMIT_DEFAULT = 1048576
177177

178178
PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes"
179179
PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024 # 1 MB

tests/integration/test_reads.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from pyiceberg.types import (
4747
BooleanType,
4848
IntegerType,
49+
LongType,
4950
NestedField,
5051
StringType,
5152
TimestampType,
@@ -665,6 +666,41 @@ def another_task() -> None:
665666
assert table.properties.get("lock") == "xxx"
666667

667668

669+
@pytest.mark.integration
670+
def test_configure_row_group_batch_size(session_catalog: Catalog) -> None:
671+
from pyiceberg.table import TableProperties
672+
673+
table_name = "default.test_small_row_groups"
674+
try:
675+
session_catalog.drop_table(table_name)
676+
except NoSuchTableError:
677+
pass # Just to make sure that the table doesn't exist
678+
679+
tbl = session_catalog.create_table(
680+
table_name,
681+
Schema(
682+
NestedField(1, "number", LongType()),
683+
),
684+
properties={TableProperties.PARQUET_ROW_GROUP_LIMIT: "1"},
685+
)
686+
687+
# Write 10 row groups, that should end up as 10 batches
688+
entries = 10
689+
tbl.append(
690+
pa.Table.from_pylist(
691+
[
692+
{
693+
"number": number,
694+
}
695+
for number in range(entries)
696+
],
697+
)
698+
)
699+
700+
batches = list(tbl.scan().to_arrow_batch_reader())
701+
assert len(batches) == entries
702+
703+
668704
@pytest.mark.integration
669705
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
670706
def test_empty_scan_ordered_str(catalog: Catalog) -> None:

0 commit comments

Comments
 (0)