From 243ee366eafc94bb3973b46990003dc4023bf8b8 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 1 May 2026 04:55:44 +0000 Subject: [PATCH] Merge pull request #100404 from ClickHouse/fix-iceberg-date-partition-year Fix bad cast exception when inserting into Iceberg table partitioned by year/month/day on Date column --- .../DataLakes/Iceberg/ChunkPartitioner.cpp | 3 +- .../ObjectStorage/DataLakes/Iceberg/Utils.cpp | 3 ++ .../test_writes_create_partitioned_table.py | 28 +++++++++++++- .../test_writes_with_partitioned_table.py | 38 +++++++++++++++++++ 4 files changed, 69 insertions(+), 3 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ChunkPartitioner.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ChunkPartitioner.cpp index c61c566c1744..386845d9629c 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ChunkPartitioner.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ChunkPartitioner.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -116,7 +115,7 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk) arguments.push_back(ColumnWithTypeAndName(const_column->clone(), type, "PartitioningTimezone")); } auto result - = functions[transform_ind]->build(arguments)->execute(arguments, std::make_shared(), chunk.getNumRows(), false); + = functions[transform_ind]->build(arguments)->execute(arguments, result_data_types[transform_ind], chunk.getNumRows(), false); functions_columns.push_back(result); raw_columns.push_back(result.get()); for (size_t i = 0; i < chunk.getNumRows(); ++i) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 6f79801514e6..6a4b2c143c2a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -551,7 +551,10 @@ Poco::Dynamic::Var getAvroType(DataTypePtr type) { switch (type->getTypeId()) { + case TypeIndex::UInt8: + case TypeIndex::Int8: case TypeIndex::UInt16: + case TypeIndex::Int16: case TypeIndex::UInt32: case TypeIndex::Int32: case TypeIndex::Date: diff --git a/tests/integration/test_storage_iceberg_with_spark/test_writes_create_partitioned_table.py b/tests/integration/test_storage_iceberg_with_spark/test_writes_create_partitioned_table.py index f5de0a43f803..3b1b1e80e090 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_writes_create_partitioned_table.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_writes_create_partitioned_table.py @@ -36,4 +36,30 @@ def test_writes_create_partitioned_table(started_cluster_iceberg_with_spark, for f.write(b"2") df = spark.read.format("iceberg").load(f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}").collect() - assert len(df) == 1 \ No newline at end of file + assert len(df) == 1 + + +@pytest.mark.parametrize("storage_type", ["s3", "local"]) +@pytest.mark.parametrize("partition_type", ["toYearNumSinceEpoch(d)", "toMonthNumSinceEpoch(d)", "toRelativeDayNum(d)"]) +def test_writes_date_column_with_time_transforms(started_cluster_iceberg_with_spark, storage_type, partition_type): + """Test that Date columns work with year/month/day partition transforms (issue #86337).""" + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = "test_writes_date_time_transforms_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark, "(id Int64, d Date)", 2, partition_type) + + assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '' + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, '2025-08-28');", settings={"allow_insert_into_iceberg": 1}) + assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '1\t2025-08-28\n' + + default_download_directory( + started_cluster_iceberg_with_spark, + storage_type, + f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}/", + f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}/", + ) + + df = spark.read.format("iceberg").load(f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}").collect() + assert len(df) == 1 diff --git a/tests/integration/test_storage_iceberg_with_spark/test_writes_with_partitioned_table.py b/tests/integration/test_storage_iceberg_with_spark/test_writes_with_partitioned_table.py index ac54c5d66534..858ea1bae4f2 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_writes_with_partitioned_table.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_writes_with_partitioned_table.py @@ -117,3 +117,41 @@ def test_writes_with_partitioned_table_count_partitions(started_cluster_iceberg_ num_pq_files += 1 assert num_pq_files == 16 + + +@pytest.mark.parametrize("storage_type", ["s3", "local"]) +@pytest.mark.parametrize("partition_transform", ["year", "month", "day"]) +def test_writes_spark_date_partition_by_time_transform(started_cluster_iceberg_with_spark, storage_type, partition_transform): + """Regression test for issue #86337: inserting into Spark-created Iceberg table + partitioned by year/month/day on a Date column caused a bad cast exception.""" + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = "test_date_partition_" + partition_transform + "_" + storage_type + "_" + get_uuid_str() + + def execute_spark_query(query: str): + spark.sql(query) + default_upload_directory( + started_cluster_iceberg_with_spark, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + c0 DATE + ) + USING iceberg + PARTITIONED BY ({partition_transform}(c0)) + OPTIONS('format-version'='2') + """ + ) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark) + + instance.query( + f"INSERT INTO {TABLE_NAME} (c0) VALUES ('2025-08-28');", + settings={"allow_insert_into_iceberg": 1} + ) + + assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '2025-08-28\n'