From 55d08e812f72f7b096360f8f71b597ad9f15647d Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 23 Apr 2025 23:48:07 -0400 Subject: [PATCH 01/30] Add support for partition by columns in create table. --- ice/src/main/java/com/altinity/ice/Main.java | 12 +++++++++--- .../altinity/ice/internal/cmd/CreateTable.java | 15 +++++++++++++-- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/Main.java b/ice/src/main/java/com/altinity/ice/Main.java index a26ed39f..1410ba98 100644 --- a/ice/src/main/java/com/altinity/ice/Main.java +++ b/ice/src/main/java/com/altinity/ice/Main.java @@ -104,7 +104,12 @@ void createTable( required = true, names = "--schema-from-parquet", description = "/path/to/file.parquet") - String schemaFile) + String schemaFile, + @CommandLine.Option( + names = {"--partition-by"}, + description = "Comma-separated list of columns to partition by", + split = ",") + List partitionColumns) throws IOException { try (RESTCatalog catalog = loadCatalog(this.configFile)) { CreateTable.run( @@ -113,7 +118,8 @@ void createTable( schemaFile, location, createTableIfNotExists, - s3NoSignRequest); + s3NoSignRequest, + partitionColumns); } } @@ -189,7 +195,7 @@ void insert( if (createTableIfNotExists) { // TODO: newCreateTableTransaction CreateTable.run( - catalog, tableId, dataFiles[0], null, createTableIfNotExists, s3NoSignRequest); + catalog, tableId, dataFiles[0], null, createTableIfNotExists, s3NoSignRequest, null); } Insert.run( catalog, diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java b/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java index 14a646a7..3db208bd 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java @@ -32,7 +32,8 @@ public static void run( String schemaFile, String location, boolean ignoreAlreadyExists, - boolean s3NoSignRequest) + boolean s3NoSignRequest, + List partitionColumns) throws IOException { Lazy s3ClientLazy = new Lazy<>(() -> S3.newClient(s3NoSignRequest)); @@ -56,8 +57,18 @@ public static void run( String mappingJson = NameMappingParser.toJson(mapping); props = Map.of(TableProperties.DEFAULT_NAME_MAPPING, mappingJson); } + + // Create partition spec based on provided partition columns + PartitionSpec.Builder partitionSpecBuilder = PartitionSpec.builderFor(fileSchema); + if (partitionColumns != null && !partitionColumns.isEmpty()) { + for (String column : partitionColumns) { + partitionSpecBuilder.identity(column); + } + } + PartitionSpec partitionSpec = partitionSpecBuilder.build(); + // if we don't set location, it's automatically set to $warehouse/$namespace/$table - catalog.createTable(nsTable, fileSchema, PartitionSpec.unpartitioned(), location, props); + catalog.createTable(nsTable, fileSchema, partitionSpec, location, props); } catch (AlreadyExistsException e) { if (ignoreAlreadyExists) { return; From 1a9ca7fa34496b7838682434a3f6f01d953fafeb Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 24 Apr 2025 10:39:16 -0400 Subject: [PATCH 02/30] Updated README with example to create table with multiple partitioning columns. --- examples/scratch/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/scratch/README.md b/examples/scratch/README.md index f11e9f95..d8fe5919 100644 --- a/examples/scratch/README.md +++ b/examples/scratch/README.md @@ -36,6 +36,9 @@ date=2025-01-01/part-00000-33e8d075-2099-409b-a806-68dd17217d39-c000.snappy.parq # upload file to minio using local-mc, # then add file to the catalog without making a copy ice create-table flowers.iris_no_copy --schema-from-parquet=file://iris.parquet +# create table with partitioning columns +ice create-table flowers.irs_no_copy_partition --schema-from-parquet=file://iris.parquet --partition-by=variety,petal.width + local-mc cp iris.parquet local/bucket1/flowers/iris_no_copy/ ice insert flowers.iris_no_copy --no-copy s3://bucket1/flowers/iris_no_copy/iris.parquet From 5dfcdf952bc194bf646920e0b1b6577b92545c54 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 24 Apr 2025 10:42:46 -0400 Subject: [PATCH 03/30] Made partition variables final. --- .../main/java/com/altinity/ice/internal/cmd/CreateTable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java b/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java index 3db208bd..bf2aa362 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java @@ -59,13 +59,13 @@ public static void run( } // Create partition spec based on provided partition columns - PartitionSpec.Builder partitionSpecBuilder = PartitionSpec.builderFor(fileSchema); + final PartitionSpec.Builder partitionSpecBuilder = PartitionSpec.builderFor(fileSchema); if (partitionColumns != null && !partitionColumns.isEmpty()) { for (String column : partitionColumns) { partitionSpecBuilder.identity(column); } } - PartitionSpec partitionSpec = partitionSpecBuilder.build(); + final PartitionSpec partitionSpec = partitionSpecBuilder.build(); // if we don't set location, it's automatically set to $warehouse/$namespace/$table catalog.createTable(nsTable, fileSchema, partitionSpec, location, props); From 8b5463a44cdfc02321a0b81e7f38952def3fd463 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 25 Apr 2025 16:35:36 -0400 Subject: [PATCH 04/30] ice: Added support to pass sort-by columns. --- examples/scratch/README.md | 4 +++ ice/src/main/java/com/altinity/ice/Main.java | 35 ++++++++++++++++--- .../ice/internal/cmd/CreateTable.java | 30 ++++++++++++++-- .../com/altinity/ice/internal/cmd/Insert.java | 31 ++++++++++++++-- 4 files changed, 91 insertions(+), 9 deletions(-) diff --git a/examples/scratch/README.md b/examples/scratch/README.md index d8fe5919..4d11eb5e 100644 --- a/examples/scratch/README.md +++ b/examples/scratch/README.md @@ -36,9 +36,13 @@ date=2025-01-01/part-00000-33e8d075-2099-409b-a806-68dd17217d39-c000.snappy.parq # upload file to minio using local-mc, # then add file to the catalog without making a copy ice create-table flowers.iris_no_copy --schema-from-parquet=file://iris.parquet + # create table with partitioning columns ice create-table flowers.irs_no_copy_partition --schema-from-parquet=file://iris.parquet --partition-by=variety,petal.width +# create table with sort columns +ice create-table flowers.irs_no_copy_sort --schema-from-parquet=file://iris.parquet --sort-by=variety + local-mc cp iris.parquet local/bucket1/flowers/iris_no_copy/ ice insert flowers.iris_no_copy --no-copy s3://bucket1/flowers/iris_no_copy/iris.parquet diff --git a/ice/src/main/java/com/altinity/ice/Main.java b/ice/src/main/java/com/altinity/ice/Main.java index 1410ba98..2c0119ad 100644 --- a/ice/src/main/java/com/altinity/ice/Main.java +++ b/ice/src/main/java/com/altinity/ice/Main.java @@ -109,7 +109,12 @@ void createTable( names = {"--partition-by"}, description = "Comma-separated list of columns to partition by", split = ",") - List partitionColumns) + List partitionColumns, + @CommandLine.Option( + names = {"--sort-by"}, + description = "Comma-separated list of columns to sort by", + split = ",") + List sortColumns) throws IOException { try (RESTCatalog catalog = loadCatalog(this.configFile)) { CreateTable.run( @@ -119,7 +124,8 @@ void createTable( location, createTableIfNotExists, s3NoSignRequest, - partitionColumns); + partitionColumns, + sortColumns); } } @@ -177,7 +183,17 @@ void insert( description = "/path/to/file where to save list of files to retry" + " (useful for retrying partially failed insert using `cat ice.retry | ice insert - --retry-list=ice.retry`)") - String retryList) + String retryList, + @CommandLine.Option( + names = {"--partition-by"}, + description = "Comma-separated list of columns to partition by", + split = ",") + List partitionColumns, + @CommandLine.Option( + names = {"--sort-by"}, + description = "Comma-separated list of columns to sort by", + split = ",") + List sortColumns) throws IOException { if (s3NoSignRequest && s3CopyObject) { throw new UnsupportedOperationException( @@ -195,7 +211,14 @@ void insert( if (createTableIfNotExists) { // TODO: newCreateTableTransaction CreateTable.run( - catalog, tableId, dataFiles[0], null, createTableIfNotExists, s3NoSignRequest, null); + catalog, + tableId, + dataFiles[0], + null, + createTableIfNotExists, + s3NoSignRequest, + partitionColumns, + sortColumns); } Insert.run( catalog, @@ -209,7 +232,9 @@ void insert( forceTableAuth, s3NoSignRequest, s3CopyObject, - retryList); + retryList, + partitionColumns, + sortColumns); } } diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java b/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java index bf2aa362..977bedca 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java @@ -8,6 +8,7 @@ import java.util.Map; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; @@ -33,7 +34,8 @@ public static void run( String location, boolean ignoreAlreadyExists, boolean s3NoSignRequest, - List partitionColumns) + List partitionColumns, + List sortColumns) throws IOException { Lazy s3ClientLazy = new Lazy<>(() -> S3.newClient(s3NoSignRequest)); @@ -67,8 +69,32 @@ public static void run( } final PartitionSpec partitionSpec = partitionSpecBuilder.build(); + // Create sort order based on provided sort columns (z-order) + SortOrder sortOrder = null; + if (sortColumns != null && !sortColumns.isEmpty()) { + SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(fileSchema); + for (String column : sortColumns) { + sortOrderBuilder.asc(column); + } + sortOrder = sortOrderBuilder.build(); + } + // if we don't set location, it's automatically set to $warehouse/$namespace/$table - catalog.createTable(nsTable, fileSchema, partitionSpec, location, props); + var table = catalog.createTable(nsTable, fileSchema, partitionSpec, location, props); + // Apply the sort order to the table + if (sortOrder != null) { + table + .updateProperties() + .set( + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.WRITE_DISTRIBUTION_MODE_RANGE) + .commit(); + var updatedSortOrder = table.replaceSortOrder(); + for (String column : sortColumns) { + updatedSortOrder.asc(column); + } + updatedSortOrder.commit(); + } } catch (AlreadyExistsException e) { if (ignoreAlreadyExists) { return; diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java index 2a47fc28..7ae9578f 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -53,7 +54,6 @@ public final class Insert { private Insert() {} - // TODO: refactor public static void run( RESTCatalog catalog, TableIdentifier nsTable, @@ -66,7 +66,9 @@ public static void run( boolean forceTableAuth, boolean s3NoSignRequest, boolean s3CopyObject, - String retryListFile) + String retryListFile, + List partitionColumns, + List sortColumns) throws IOException { if (files.length == 0) { // no work to be done @@ -76,6 +78,31 @@ public static void run( noCopy = true; } Table table = catalog.loadTable(nsTable); + + // Update partition spec if provided + if (partitionColumns != null && !partitionColumns.isEmpty()) { + var updateSpec = table.updateSpec(); + for (String column : partitionColumns) { + updateSpec.addField(column); + } + updateSpec.commit(); + } + + // Update sort order if provided + if (sortColumns != null && !sortColumns.isEmpty()) { + table + .updateProperties() + .set( + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.WRITE_DISTRIBUTION_MODE_RANGE) + .commit(); + var updatedSortOrder = table.replaceSortOrder(); + for (String column : sortColumns) { + updatedSortOrder.asc(column); + } + updatedSortOrder.commit(); + } + try (FileIO tableIO = table.io()) { final Supplier s3ClientSupplier; if (forceTableAuth) { From 4a98c498a400ba88ad43225b604ab646c741a1bf Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 25 Apr 2025 19:12:37 -0400 Subject: [PATCH 05/30] ice: Updated README.md to document insert with sort-key and partition-key --- examples/scratch/README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/examples/scratch/README.md b/examples/scratch/README.md index 4d11eb5e..caae1d12 100644 --- a/examples/scratch/README.md +++ b/examples/scratch/README.md @@ -26,6 +26,12 @@ ice insert flowers.iris -p \ ice insert nyc.taxis -p \ https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet + +# Insert rows with sort-key +ice insert --sort-by=VendorID nyc.taxis2 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet + +# Insert with partition key +ice insert --partition-by=Airport_fee nyc2.taxis3 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet # warning: each parquet file below is ~500mb. this may take a while AWS_REGION=us-east-2 ice insert btc.transactions -p --s3-no-sign-request \ From 6762798412a80f45b2eb4438b95320a4589d9764 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 30 Apr 2025 11:23:32 -0400 Subject: [PATCH 06/30] Added standalone spark-iceberg docker compose --- .../docker-compose-spark-iceberg.yaml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 examples/docker-compose/docker-compose-spark-iceberg.yaml diff --git a/examples/docker-compose/docker-compose-spark-iceberg.yaml b/examples/docker-compose/docker-compose-spark-iceberg.yaml new file mode 100644 index 00000000..8a26671a --- /dev/null +++ b/examples/docker-compose/docker-compose-spark-iceberg.yaml @@ -0,0 +1,18 @@ +services: + spark-iceberg: + image: tabulario/spark-iceberg + container_name: spark-iceberg + build: spark/ + network_mode: host + volumes: + - ./warehouse:/home/iceberg/warehouse + - ./notebooks:/home/iceberg/notebooks/notebooks + environment: + - AWS_ACCESS_KEY_ID=miniouser + - AWS_SECRET_ACCESS_KEY=miniopassword + - AWS_REGION=minio + ports: + - 8888:8888 + - 8080:8080 + - 10000:10000 + - 10001:10001 From 991593bf636fb05e706058305ce7bad977d5af13 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 30 Apr 2025 12:55:25 -0400 Subject: [PATCH 07/30] ice-rest-catalog: Added README instructions to setup spark-iceberg to work with ice-rest-catalog. --- examples/docker-compose/README.md | 34 +++++++++++++++++++++++++ examples/scratch/.ice-rest-catalog.yaml | 2 +- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/examples/docker-compose/README.md b/examples/docker-compose/README.md index 9d4d7d35..7e72da13 100644 --- a/examples/docker-compose/README.md +++ b/examples/docker-compose/README.md @@ -19,3 +19,37 @@ ice insert nyc.taxis -p \ clickhouse client --query 'select count(*) from ice.`nyc.taxis`;' ``` + +#### Spark Iceberg +A spark-iceberg container can be launched using the `docker-compose-spark-iceberg.yml` file. + + +The default configuration is located in the following path +`/opt/spark/conf/spark-defaults.conf` + +For spark to communicate with `ice-rest-catalog` and `minio`, the following configuration variables need to be updated.\ + +`spark.sql.catalog.demo.uri` - ice-rest-catalog URI \ +`spark.sql.catalog.demo.s3.endpoint` - minio server url. +`spark.sql.catalog.demo.s3.access-key` - minio access key. +`spark.sql.catalog.demo.s3.secret-key` - minio password. + + +``` +spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.demo.type rest +spark.sql.catalog.demo.uri http://localhost:5000 +spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.catalog.demo.warehouse s3://warehouse/wh/ +spark.sql.catalog.demo.s3.endpoint http://localhost:9000 +spark.sql.catalog.demo.s3.access-key miniouser +spark.sql.catalog.demo.s3.secret-key miniopassword +spark.sql.catalog.demo.s3.path-style-access true +spark.sql.catalog.demo.s3.ssl-enabled false +spark.sql.defaultCatalog demo +spark.eventLog.enabled true +spark.eventLog.dir /home/iceberg/spark-events +spark.history.fs.logDirectory /home/iceberg/spark-events +spark.sql.catalogImplementation in-memory +``` \ No newline at end of file diff --git a/examples/scratch/.ice-rest-catalog.yaml b/examples/scratch/.ice-rest-catalog.yaml index f5c73af9..d22d8cab 100644 --- a/examples/scratch/.ice-rest-catalog.yaml +++ b/examples/scratch/.ice-rest-catalog.yaml @@ -5,4 +5,4 @@ s3.access-key-id: "miniouser" s3.secret-access-key: "miniopassword" ice.s3.region: minio ice.token: foo -ice.maintenance.snapshot.expiration.days: 20 +ice.maintenance.snapshot.expiration.days: "20" From 8898f26f6a6bd76ff6e19c64c75b1fe7838a5719 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 2 May 2025 09:37:45 -0400 Subject: [PATCH 08/30] ice: If partition columns are passed by the user, write the data as partitions. --- examples/docker-compose/docker-compose.yaml | 3 +- ice/pom.xml | 40 +++++ .../com/altinity/ice/internal/cmd/Insert.java | 161 +++++++++++++----- .../ice/internal/io/PartitionWriter.java | 88 ++++++++++ 4 files changed, 252 insertions(+), 40 deletions(-) create mode 100644 ice/src/main/java/com/altinity/ice/internal/io/PartitionWriter.java diff --git a/examples/docker-compose/docker-compose.yaml b/examples/docker-compose/docker-compose.yaml index 4072c178..82fa7c99 100644 --- a/examples/docker-compose/docker-compose.yaml +++ b/examples/docker-compose/docker-compose.yaml @@ -68,7 +68,8 @@ configs: SETTINGS catalog_type = 'rest', auth_header = 'Authorization: Bearer foo', storage_endpoint = 'http://minio:8999', - warehouse = 's3://bucket1/'; + warehouse = 's3://bucket1/', + sort_order = 'id'; " ice-rest-catalog-yaml: content: | diff --git a/ice/pom.xml b/ice/pom.xml index b734d21d..ad2b6334 100644 --- a/ice/pom.xml +++ b/ice/pom.xml @@ -22,6 +22,41 @@ iceberg-bundled-guava ${iceberg.version} + + org.apache.iceberg + iceberg-data + ${iceberg.version} + + + org.apache.parquet + parquet-avro + + + com.github.ben-manes.caffeine + caffeine + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + commons-codec + commons-codec + + + org.slf4j + slf4j-api + + + io.airlift + aircompressor + + + org.apache.iceberg iceberg-core @@ -31,6 +66,7 @@ com.github.ben-manes.caffeine caffeine + com.fasterxml.jackson.core jackson-core @@ -43,6 +79,10 @@ commons-codec commons-codec + + org.apache.commons + commons-lang3 + org.slf4j slf4j-api diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java index b88782b4..766ae7f1 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java @@ -7,13 +7,9 @@ import com.altinity.ice.internal.jvm.Stats; import com.altinity.ice.internal.parquet.Metadata; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -22,16 +18,13 @@ import org.apache.iceberg.*; import org.apache.iceberg.aws.s3.S3FileIO; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.BadRequestException; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.*; import org.apache.iceberg.mapping.MappedField; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; @@ -78,6 +71,7 @@ public static void run( // no work to be done return; } + InsertOptions options = InsertOptions.builder() .skipDuplicates(skipDuplicates) @@ -176,29 +170,38 @@ public static void run( retryListFile != null && !retryListFile.isEmpty() ? new RetryLog(retryListFile) : null) { - boolean atLeastOneFileAppended = false; + AtomicBoolean atLeastOneFileAppended = new AtomicBoolean(false); int numThreads = Math.min(finalOptions.threadCount(), filesExpanded.size()); ExecutorService executor = Executors.newFixedThreadPool(numThreads); try { - var futures = new ArrayList>(); + var futures = new ArrayList>>(); for (final String file : filesExpanded) { futures.add( executor.submit( () -> { try { - return processFile( - table, - catalog, - tableIO, - inputIO, - tableDataFiles, - finalOptions, - s3ClientLazy, - dstDataFileSource, - tableSchema, - dataFileNamingStrategy, - file); + List dataFiles = + processFile( + table, + catalog, + tableIO, + inputIO, + tableDataFiles, + finalOptions, + s3ClientLazy, + dstDataFileSource, + tableSchema, + dataFileNamingStrategy, + file, + partitionColumns); + if (dataFiles != null) { + for (DataFile df : dataFiles) { + atLeastOneFileAppended.set(true); + appendOp.appendFile(df); + } + } + return dataFiles; } catch (Exception e) { if (retryLog != null) { logger.error( @@ -214,10 +217,12 @@ public static void run( for (var future : futures) { try { - DataFile df = future.get(); - if (df != null) { - atLeastOneFileAppended = true; - appendOp.appendFile(df); + List dataFiles = future.get(); + if (dataFiles != null) { + for (DataFile df : dataFiles) { + atLeastOneFileAppended.set(true); + appendOp.appendFile(df); + } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -235,7 +240,7 @@ public static void run( if (!finalOptions.noCommit()) { // TODO: log - if (atLeastOneFileAppended) { + if (atLeastOneFileAppended.get()) { appendOp.commit(); } else { logger.warn("Table commit skipped (no files to append)"); @@ -255,7 +260,7 @@ public static void run( } } - private static DataFile processFile( + private static List processFile( Table table, RESTCatalog catalog, FileIO tableIO, @@ -266,7 +271,8 @@ private static DataFile processFile( DataFileNamingStrategy dstDataFileSource, Schema tableSchema, DataFileNamingStrategy.Name dataFileNamingStrategy, - String file) + String file, + List partitionColumns) throws IOException { logger.info("{}: processing", file); logger.info("{}: jvm: {}", file, Stats.gather()); @@ -327,6 +333,12 @@ private static DataFile processFile( s3ClientLazy.getValue().copyObject(copyReq); dataFileSizeInBytes = inputFile.getLength(); dataFile = dstDataFile; + } else if (partitionColumns != null && !partitionColumns.isEmpty()) { + String dstDataFile = dstDataFileSource.get(file); + if (checkNotExists.apply(dstDataFile)) { + return null; + } + return copyParquetWithPartition(file, dstDataFile, tableSchema, table, inputFile); } else { String dstDataFile = dstDataFileSource.get(file); if (checkNotExists.apply(dstDataFile)) { @@ -355,13 +367,84 @@ private static DataFile processFile( long recordCount = metadata.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum(); MetricsConfig metricsConfig = MetricsConfig.forTable(table); Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig); - return new DataFiles.Builder(table.spec()) - .withPath(dataFile) - .withFormat("PARQUET") - .withRecordCount(recordCount) - .withFileSizeInBytes(dataFileSizeInBytes) - .withMetrics(metrics) - .build(); + DataFile dataFileObj = + new DataFiles.Builder(table.spec()) + .withPath(dataFile) + .withFormat("PARQUET") + .withRecordCount(recordCount) + .withFileSizeInBytes(dataFileSizeInBytes) + .withMetrics(metrics) + .build(); + return Collections.singletonList(dataFileObj); + } + + private static List copyParquetWithPartition( + String file, String dstDataFile, Schema tableSchema, Table table, InputFile inputFile) + throws IOException { + + logger.info("{}: copying to partitions under {}", file, dstDataFile); + + // Partition writer setup + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 0).format(FileFormat.PARQUET).build(); + + GenericAppenderFactory appenderFactory = new GenericAppenderFactory(tableSchema, table.spec()); + + PartitionKey partitionKey = new PartitionKey(table.spec(), tableSchema); + Map> openAppenders = new HashMap<>(); + Map writtenFiles = new HashMap<>(); + + Parquet.ReadBuilder readBuilder = + Parquet.read(inputFile) + .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) + .project(tableSchema) + .reuseContainers(); + Map recordCounts = new HashMap<>(); + + try (CloseableIterable records = readBuilder.build()) { + for (Record record : records) { + partitionKey.partition(record); + PartitionKey keyCopy = partitionKey.copy(); + + FileAppender appender = openAppenders.get(keyCopy); + if (appender == null) { + OutputFile outFile = fileFactory.newOutputFile(keyCopy).encryptingOutputFile(); + appender = appenderFactory.newAppender(outFile, FileFormat.PARQUET); + openAppenders.put(keyCopy, appender); + writtenFiles.put(keyCopy, outFile); + recordCounts.put(keyCopy, 0L); + } + + appender.add(record); + recordCounts.put(keyCopy, recordCounts.get(keyCopy) + 1); + } + } + + List dataFiles = new ArrayList<>(); + + for (Map.Entry> entry : openAppenders.entrySet()) { + PartitionKey partKey = entry.getKey(); + FileAppender appender = entry.getValue(); + appender.close(); + + OutputFile outFile = writtenFiles.get(partKey); + InputFile inFile = outFile.toInputFile(); + + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + Metrics metrics = ParquetUtil.fileMetrics(inFile, metricsConfig); + + dataFiles.add( + DataFiles.builder(table.spec()) + .withPath(outFile.location()) + .withFileSizeInBytes(inFile.getLength()) + .withPartition(partKey) + .withFormat(FileFormat.PARQUET) + .withRecordCount(recordCounts.get(partKey)) + .withMetrics(metrics) + .build()); + } + + return dataFiles; } private static boolean sameSchema(Table table, Schema fileSchema) { diff --git a/ice/src/main/java/com/altinity/ice/internal/io/PartitionWriter.java b/ice/src/main/java/com/altinity/ice/internal/io/PartitionWriter.java new file mode 100644 index 00000000..ba0ba67d --- /dev/null +++ b/ice/src/main/java/com/altinity/ice/internal/io/PartitionWriter.java @@ -0,0 +1,88 @@ +package com.altinity.ice.internal.io; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.iceberg.*; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.*; + +public class PartitionWriter implements TaskWriter { + + private final PartitionSpec spec; + private final FileFormat format; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final Schema schema; + private final long targetFileSize; + private final Map> openAppenders; + private final Map writtenFiles; + private final GenericAppenderFactory appenderFactory; + + public PartitionWriter(Table table, FileFormat format, long targetFileSizeBytes) { + this.spec = table.spec(); + this.format = format; + this.io = table.io(); + this.schema = table.schema(); + this.targetFileSize = targetFileSizeBytes; + this.openAppenders = new HashMap<>(); + this.writtenFiles = new HashMap<>(); + + this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(format).build(); + + this.appenderFactory = new GenericAppenderFactory(schema); + } + + @Override + public void write(Record record) throws IOException { + PartitionKey partitionKey = new PartitionKey(spec, schema); + + partitionKey.partition(record); + + FileAppender appender = openAppenders.get(partitionKey); + + if (appender == null) { + OutputFile file = (OutputFile) fileFactory.newOutputFile(partitionKey); + appender = appenderFactory.newAppender(file, format); + openAppenders.put(partitionKey, appender); + } + + appender.add(record); + } + + @Override + public WriteResult complete() throws IOException { + for (Map.Entry> entry : openAppenders.entrySet()) { + FileAppender appender = entry.getValue(); + StructLike partition = entry.getKey(); + + appender.close(); + + DataFile dataFile = + DataFiles.builder(spec) + .withPath(appender.toString()) + .withFormat(format) + .withPartition(partition) + .withRecordCount(appender.length()) + .withFileSizeInBytes(appender.length()) // approximation + .build(); + + writtenFiles.put(partition, dataFile); + } + + DataFile[] dataFiles = writtenFiles.values().toArray(new DataFile[0]); + return WriteResult.builder().addDataFiles(dataFiles).build(); + } + + @Override + public void abort() throws IOException { + for (FileAppender appender : openAppenders.values()) { + appender.close(); + } + openAppenders.clear(); + } + + @Override + public void close() throws IOException {} +} From bbc9c10807e686366c047a66308495389ed4d2ad Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 2 May 2025 09:44:05 -0400 Subject: [PATCH 09/30] ice: Removed PartitionWriter --- .../ice/internal/io/PartitionWriter.java | 88 ------------------- 1 file changed, 88 deletions(-) delete mode 100644 ice/src/main/java/com/altinity/ice/internal/io/PartitionWriter.java diff --git a/ice/src/main/java/com/altinity/ice/internal/io/PartitionWriter.java b/ice/src/main/java/com/altinity/ice/internal/io/PartitionWriter.java deleted file mode 100644 index ba0ba67d..00000000 --- a/ice/src/main/java/com/altinity/ice/internal/io/PartitionWriter.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.altinity.ice.internal.io; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import org.apache.iceberg.*; -import org.apache.iceberg.data.GenericAppenderFactory; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.io.*; - -public class PartitionWriter implements TaskWriter { - - private final PartitionSpec spec; - private final FileFormat format; - private final OutputFileFactory fileFactory; - private final FileIO io; - private final Schema schema; - private final long targetFileSize; - private final Map> openAppenders; - private final Map writtenFiles; - private final GenericAppenderFactory appenderFactory; - - public PartitionWriter(Table table, FileFormat format, long targetFileSizeBytes) { - this.spec = table.spec(); - this.format = format; - this.io = table.io(); - this.schema = table.schema(); - this.targetFileSize = targetFileSizeBytes; - this.openAppenders = new HashMap<>(); - this.writtenFiles = new HashMap<>(); - - this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(format).build(); - - this.appenderFactory = new GenericAppenderFactory(schema); - } - - @Override - public void write(Record record) throws IOException { - PartitionKey partitionKey = new PartitionKey(spec, schema); - - partitionKey.partition(record); - - FileAppender appender = openAppenders.get(partitionKey); - - if (appender == null) { - OutputFile file = (OutputFile) fileFactory.newOutputFile(partitionKey); - appender = appenderFactory.newAppender(file, format); - openAppenders.put(partitionKey, appender); - } - - appender.add(record); - } - - @Override - public WriteResult complete() throws IOException { - for (Map.Entry> entry : openAppenders.entrySet()) { - FileAppender appender = entry.getValue(); - StructLike partition = entry.getKey(); - - appender.close(); - - DataFile dataFile = - DataFiles.builder(spec) - .withPath(appender.toString()) - .withFormat(format) - .withPartition(partition) - .withRecordCount(appender.length()) - .withFileSizeInBytes(appender.length()) // approximation - .build(); - - writtenFiles.put(partition, dataFile); - } - - DataFile[] dataFiles = writtenFiles.values().toArray(new DataFile[0]); - return WriteResult.builder().addDataFiles(dataFiles).build(); - } - - @Override - public void abort() throws IOException { - for (FileAppender appender : openAppenders.values()) { - appender.close(); - } - openAppenders.clear(); - } - - @Override - public void close() throws IOException {} -} From 68ebd1d07a3fafe96671510640ee3dc441f5aa1a Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 5 May 2025 09:35:40 -0400 Subject: [PATCH 10/30] ice: Added logic to sort columns in insert. --- ice/src/main/java/com/altinity/ice/Main.java | 31 ++-- .../ice/internal/cmd/CreateTable.java | 38 ++++- .../com/altinity/ice/internal/cmd/Insert.java | 141 +++++++++++------- .../internal/cmd/RecordSortComparator.java | 39 +++++ 4 files changed, 177 insertions(+), 72 deletions(-) create mode 100644 ice/src/main/java/com/altinity/ice/internal/cmd/RecordSortComparator.java diff --git a/ice/src/main/java/com/altinity/ice/Main.java b/ice/src/main/java/com/altinity/ice/Main.java index 731dcf25..aecd062e 100644 --- a/ice/src/main/java/com/altinity/ice/Main.java +++ b/ice/src/main/java/com/altinity/ice/Main.java @@ -107,10 +107,15 @@ void createTable( split = ",") List partitionColumns, @CommandLine.Option( - names = {"--sort-by"}, - description = "Comma-separated list of columns to sort by", + names = {"--sort-ascending"}, + description = "Comma-separated list of columns to sort in ascending order", split = ",") - List sortColumns) + List sortAscendingColumns, + @CommandLine.Option( + names = {"--sort-descending"}, + description = "Comma-separated list of columns to sort in descending order", + split = ",") + List sortDescendingColumns) throws IOException { try (RESTCatalog catalog = loadCatalog(this.configFile)) { CreateTable.run( @@ -121,7 +126,8 @@ void createTable( createTableIfNotExists, s3NoSignRequest, partitionColumns, - sortColumns); + sortAscendingColumns, + sortDescendingColumns); } } @@ -186,10 +192,15 @@ void insert( split = ",") List partitionColumns, @CommandLine.Option( - names = {"--sort-by"}, - description = "Comma-separated list of columns to sort by", + names = {"--sort-ascending"}, + description = "Comma-separated list of columns to sort in ascending order", + split = ",") + List sortAscendingColumns, + @CommandLine.Option( + names = {"--sort-descending"}, + description = "Comma-separated list of columns to sort in descending order", split = ",") - List sortColumns, + List sortDescendingColumns, @CommandLine.Option( names = {"--thread-count"}, description = "Number of threads to use for inserting data", @@ -219,7 +230,8 @@ void insert( createTableIfNotExists, s3NoSignRequest, partitionColumns, - sortColumns); + sortAscendingColumns, + sortDescendingColumns); } Insert.run( catalog, @@ -235,7 +247,8 @@ void insert( s3CopyObject, retryList, partitionColumns, - sortColumns, + sortAscendingColumns, + sortDescendingColumns, threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount); } } diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java b/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java index 977bedca..ed7445f2 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java @@ -35,7 +35,8 @@ public static void run( boolean ignoreAlreadyExists, boolean s3NoSignRequest, List partitionColumns, - List sortColumns) + List sortAscendingColumns, + List sortDescingColumns) throws IOException { Lazy s3ClientLazy = new Lazy<>(() -> S3.newClient(s3NoSignRequest)); @@ -71,11 +72,24 @@ public static void run( // Create sort order based on provided sort columns (z-order) SortOrder sortOrder = null; - if (sortColumns != null && !sortColumns.isEmpty()) { + if ((sortAscendingColumns != null && !sortAscendingColumns.isEmpty()) + || (sortDescingColumns != null && !sortDescingColumns.isEmpty())) { SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(fileSchema); - for (String column : sortColumns) { - sortOrderBuilder.asc(column); + + // Add ascending columns first + if (sortAscendingColumns != null) { + for (String column : sortAscendingColumns) { + sortOrderBuilder.asc(column); + } + } + + // Add descending columns + if (sortDescingColumns != null) { + for (String column : sortDescingColumns) { + sortOrderBuilder.desc(column); + } } + sortOrder = sortOrderBuilder.build(); } @@ -90,9 +104,21 @@ public static void run( TableProperties.WRITE_DISTRIBUTION_MODE_RANGE) .commit(); var updatedSortOrder = table.replaceSortOrder(); - for (String column : sortColumns) { - updatedSortOrder.asc(column); + + // Add ascending columns + if (sortAscendingColumns != null) { + for (String column : sortAscendingColumns) { + updatedSortOrder.asc(column); + } + } + + // Add descending columns + if (sortDescingColumns != null) { + for (String column : sortDescingColumns) { + updatedSortOrder.desc(column); + } } + updatedSortOrder.commit(); } } catch (AlreadyExistsException e) { diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java index 766ae7f1..a8d7b57b 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java @@ -64,7 +64,8 @@ public static void run( boolean s3CopyObject, String retryListFile, List partitionColumns, - List sortColumns, + List sortAscendingColumns, + List sortDescendingColumns, int threadCount) throws IOException, InterruptedException { if (files.length == 0) { @@ -88,29 +89,8 @@ public static void run( options.forceNoCopy() ? options.toBuilder().noCopy(true).build() : options; Table table = catalog.loadTable(nsTable); - // Update partition spec if provided - if (partitionColumns != null && !partitionColumns.isEmpty()) { - var updateSpec = table.updateSpec(); - for (String column : partitionColumns) { - updateSpec.addField(column); - } - updateSpec.commit(); - } - - // Update sort order if provided - if (sortColumns != null && !sortColumns.isEmpty()) { - table - .updateProperties() - .set( - TableProperties.WRITE_DISTRIBUTION_MODE, - TableProperties.WRITE_DISTRIBUTION_MODE_RANGE) - .commit(); - var updatedSortOrder = table.replaceSortOrder(); - for (String column : sortColumns) { - updatedSortOrder.asc(column); - } - updatedSortOrder.commit(); - } + updatePartitionAndSortOrderMetadata( + table, partitionColumns, sortAscendingColumns, sortDescendingColumns); try (FileIO tableIO = table.io()) { final Supplier s3ClientSupplier; @@ -260,6 +240,47 @@ public static void run( } } + private static void updatePartitionAndSortOrderMetadata( + Table table, + List partitionColumns, + List sortAscendingColumns, + List sortDescendingColumns) { + // Update partition spec if provided + if (partitionColumns != null && !partitionColumns.isEmpty()) { + var updateSpec = table.updateSpec(); + for (String column : partitionColumns) { + updateSpec.addField(column); + } + updateSpec.commit(); + } + + // Update sort order if provided + if ((sortAscendingColumns != null && !sortAscendingColumns.isEmpty()) + || (sortDescendingColumns != null && !sortDescendingColumns.isEmpty())) { + table + .updateProperties() + .set( + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.WRITE_DISTRIBUTION_MODE_RANGE) + .commit(); + var updatedSortOrder = table.replaceSortOrder(); + + if (sortAscendingColumns != null) { + for (String column : sortAscendingColumns) { + updatedSortOrder.asc(column); + } + } + + if (sortDescendingColumns != null) { + for (String column : sortDescendingColumns) { + updatedSortOrder.desc(column); + } + } + + updatedSortOrder.commit(); + } + } + private static List processFile( Table table, RESTCatalog catalog, @@ -338,7 +359,8 @@ private static List processFile( if (checkNotExists.apply(dstDataFile)) { return null; } - return copyParquetWithPartition(file, dstDataFile, tableSchema, table, inputFile); + return copyParquetWithPartition( + file, replacePrefix(dstDataFile, "s3://", "s3a://"), tableSchema, table, inputFile); } else { String dstDataFile = dstDataFileSource.get(file); if (checkNotExists.apply(dstDataFile)) { @@ -391,7 +413,7 @@ private static List copyParquetWithPartition( GenericAppenderFactory appenderFactory = new GenericAppenderFactory(tableSchema, table.spec()); PartitionKey partitionKey = new PartitionKey(table.spec(), tableSchema); - Map> openAppenders = new HashMap<>(); + Map> partitionedRecords = new HashMap<>(); Map writtenFiles = new HashMap<>(); Parquet.ReadBuilder readBuilder = @@ -399,49 +421,54 @@ private static List copyParquetWithPartition( .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) .project(tableSchema) .reuseContainers(); - Map recordCounts = new HashMap<>(); + // Read and group records by partition try (CloseableIterable records = readBuilder.build()) { for (Record record : records) { partitionKey.partition(record); PartitionKey keyCopy = partitionKey.copy(); - FileAppender appender = openAppenders.get(keyCopy); - if (appender == null) { - OutputFile outFile = fileFactory.newOutputFile(keyCopy).encryptingOutputFile(); - appender = appenderFactory.newAppender(outFile, FileFormat.PARQUET); - openAppenders.put(keyCopy, appender); - writtenFiles.put(keyCopy, outFile); - recordCounts.put(keyCopy, 0L); - } - - appender.add(record); - recordCounts.put(keyCopy, recordCounts.get(keyCopy) + 1); + partitionedRecords.computeIfAbsent(keyCopy, k -> new ArrayList<>()).add(record); } } List dataFiles = new ArrayList<>(); - for (Map.Entry> entry : openAppenders.entrySet()) { + // Create a comparator based on table.sortOrder() + RecordSortComparator comparator = new RecordSortComparator(table.sortOrder(), tableSchema); + + // Write sorted records for each partition + for (Map.Entry> entry : partitionedRecords.entrySet()) { PartitionKey partKey = entry.getKey(); - FileAppender appender = entry.getValue(); - appender.close(); - - OutputFile outFile = writtenFiles.get(partKey); - InputFile inFile = outFile.toInputFile(); - - MetricsConfig metricsConfig = MetricsConfig.forTable(table); - Metrics metrics = ParquetUtil.fileMetrics(inFile, metricsConfig); - - dataFiles.add( - DataFiles.builder(table.spec()) - .withPath(outFile.location()) - .withFileSizeInBytes(inFile.getLength()) - .withPartition(partKey) - .withFormat(FileFormat.PARQUET) - .withRecordCount(recordCounts.get(partKey)) - .withMetrics(metrics) - .build()); + List records = entry.getValue(); + + // Sort records within the partition + records.sort(comparator); + + OutputFile outFile = fileFactory.newOutputFile(partKey).encryptingOutputFile(); + writtenFiles.put(partKey, outFile); + + try (FileAppender appender = + appenderFactory.newAppender(outFile, FileFormat.PARQUET)) { + for (Record rec : records) { + appender.add(rec); + } + appender.close(); + + InputFile inFile = outFile.toInputFile(); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + Metrics metrics = ParquetUtil.fileMetrics(inFile, metricsConfig); + + dataFiles.add( + DataFiles.builder(table.spec()) + .withPath(outFile.location()) + .withFileSizeInBytes(inFile.getLength()) + .withPartition(partKey) + .withFormat(FileFormat.PARQUET) + .withRecordCount(records.size()) + .withMetrics(metrics) + .build()); + } } return dataFiles; diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/RecordSortComparator.java b/ice/src/main/java/com/altinity/ice/internal/cmd/RecordSortComparator.java new file mode 100644 index 00000000..01916bc2 --- /dev/null +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/RecordSortComparator.java @@ -0,0 +1,39 @@ +package com.altinity.ice.internal.cmd; + +import java.util.Comparator; +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortField; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.data.Record; + +public class RecordSortComparator implements Comparator { + private final List sortFields; + private final Schema schema; + + public RecordSortComparator(SortOrder sortOrder, Schema schema) { + this.sortFields = sortOrder.fields(); + this.schema = schema; + } + + @SuppressWarnings("unchecked") + @Override + public int compare(Record r1, Record r2) { + for (SortField sf : sortFields) { + String fieldName = schema.findColumnName(sf.sourceId()); + Comparable v1 = (Comparable) r1.getField(fieldName); + Comparable v2 = (Comparable) r2.getField(fieldName); + + if (v1 == null && v2 == null) continue; + if (v1 == null) return sf.direction() == SortDirection.ASC ? -1 : 1; + if (v2 == null) return sf.direction() == SortDirection.ASC ? 1 : -1; + + int cmp = v1.compareTo(v2); + if (cmp != 0) { + return sf.direction() == SortDirection.ASC ? cmp : -cmp; + } + } + return 0; + } +} From 3775b5ecebd8d8d860cbfbed9b130ccc5052442a Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 5 May 2025 12:04:17 -0400 Subject: [PATCH 11/30] ice: Merged changes from master. --- .../com/altinity/ice/cli/internal/cmd/Insert.java | 14 +++++++------- .../ice/cli/internal/cmd/RecordSortComparator.java | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 2c462253..462eb8f2 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -9,8 +9,6 @@ */ package com.altinity.ice.cli.internal.cmd; -import static com.altinity.ice.internal.strings.Strings.replacePrefix; - import com.altinity.ice.cli.internal.iceberg.io.Input; import com.altinity.ice.cli.internal.iceberg.parquet.Metadata; import com.altinity.ice.cli.internal.jvm.Stats; @@ -25,6 +23,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; + +import com.altinity.ice.internal.strings.Strings; import org.apache.commons.codec.digest.DigestUtils; import org.apache.iceberg.*; import org.apache.iceberg.aws.s3.S3FileIO; @@ -337,7 +337,7 @@ private static List processFile( file + " cannot be added to catalog without copy"); // TODO: explain } long dataFileSizeInBytes; - var dataFile = replacePrefix(file, "s3a://", "s3://"); + var dataFile = Strings.replacePrefix(file, "s3a://", "s3://"); var start = System.currentTimeMillis(); if (options.noCopy()) { if (checkNotExists.apply(dataFile)) { @@ -371,13 +371,13 @@ private static List processFile( return null; } return copyParquetWithPartition( - file, replacePrefix(dstDataFile, "s3://", "s3a://"), tableSchema, table, inputFile); + file, Strings.replacePrefix(dstDataFile, "s3://", "s3a://"), tableSchema, table, inputFile); } else { String dstDataFile = dstDataFileSource.get(file); if (checkNotExists.apply(dstDataFile)) { return null; } - OutputFile outputFile = tableIO.newOutputFile(replacePrefix(dstDataFile, "s3://", "s3a://")); + OutputFile outputFile = tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://")); // TODO: support transferTo below (note that compression, etc. might be different) // try (var d = outputFile.create()) { try (var s = inputFile.newStream()) { // s.transferTo(d); }} @@ -446,8 +446,8 @@ private static List copyParquetWithPartition( List dataFiles = new ArrayList<>(); // Create a comparator based on table.sortOrder() - com.altinity.ice.internal.cmd.RecordSortComparator comparator = - new com.altinity.ice.internal.cmd.RecordSortComparator(table.sortOrder(), tableSchema); + RecordSortComparator comparator = + new RecordSortComparator(table.sortOrder(), tableSchema); // Write sorted records for each partition for (Map.Entry> entry : partitionedRecords.entrySet()) { diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/RecordSortComparator.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/RecordSortComparator.java index 66882abc..90b71a3b 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/RecordSortComparator.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/RecordSortComparator.java @@ -7,7 +7,7 @@ * * http://www.apache.org/licenses/LICENSE-2.0 */ -package com.altinity.ice.internal.cmd; +package com.altinity.ice.cli.internal.cmd; import java.util.Comparator; import java.util.List; From b28af691c7d5f00fd4f04e9cb39fd21c25552a06 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 5 May 2025 12:11:08 -0400 Subject: [PATCH 12/30] ice: Merged changes from master and fixed jar conflicts --- ice/pom.xml | 4 ++++ .../com/altinity/ice/cli/internal/cmd/Insert.java | 15 +++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/ice/pom.xml b/ice/pom.xml index 8c592b09..4f5ea9fa 100644 --- a/ice/pom.xml +++ b/ice/pom.xml @@ -27,6 +27,10 @@ iceberg-data ${iceberg.version} + + org.apache.commons + commons-lang3 + org.apache.parquet parquet-avro diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 462eb8f2..cf2fc9ae 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -14,6 +14,7 @@ import com.altinity.ice.cli.internal.jvm.Stats; import com.altinity.ice.cli.internal.retry.RetryLog; import com.altinity.ice.cli.internal.s3.S3; +import com.altinity.ice.internal.strings.Strings; import java.io.IOException; import java.util.*; import java.util.concurrent.*; @@ -23,8 +24,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; - -import com.altinity.ice.internal.strings.Strings; import org.apache.commons.codec.digest.DigestUtils; import org.apache.iceberg.*; import org.apache.iceberg.aws.s3.S3FileIO; @@ -371,13 +370,18 @@ private static List processFile( return null; } return copyParquetWithPartition( - file, Strings.replacePrefix(dstDataFile, "s3://", "s3a://"), tableSchema, table, inputFile); + file, + Strings.replacePrefix(dstDataFile, "s3://", "s3a://"), + tableSchema, + table, + inputFile); } else { String dstDataFile = dstDataFileSource.get(file); if (checkNotExists.apply(dstDataFile)) { return null; } - OutputFile outputFile = tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://")); + OutputFile outputFile = + tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://")); // TODO: support transferTo below (note that compression, etc. might be different) // try (var d = outputFile.create()) { try (var s = inputFile.newStream()) { // s.transferTo(d); }} @@ -446,8 +450,7 @@ private static List copyParquetWithPartition( List dataFiles = new ArrayList<>(); // Create a comparator based on table.sortOrder() - RecordSortComparator comparator = - new RecordSortComparator(table.sortOrder(), tableSchema); + RecordSortComparator comparator = new RecordSortComparator(table.sortOrder(), tableSchema); // Write sorted records for each partition for (Map.Entry> entry : partitionedRecords.entrySet()) { From 2128640d6f8d8ec6fe62f67bf34e91701bb558a9 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 5 May 2025 17:03:07 -0400 Subject: [PATCH 13/30] ice: Fixed fileSizeBytes since spark was throwing an error. --- examples/scratch/README.md | 2 +- .../altinity/ice/cli/internal/cmd/Insert.java | 67 ++++++++++++++----- 2 files changed, 53 insertions(+), 16 deletions(-) diff --git a/examples/scratch/README.md b/examples/scratch/README.md index 8104bf3f..2779627a 100644 --- a/examples/scratch/README.md +++ b/examples/scratch/README.md @@ -28,7 +28,7 @@ ice insert nyc.taxis -p \ https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet # Insert rows with sort-key -ice insert --sort-by=VendorID nyc.taxis2 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet +ice insert --sort-descending=VendorID nyc.taxis2 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet # Insert with partition key ice insert --partition-by=Airport_fee nyc2.taxis3 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index cf2fc9ae..9d46ffe4 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -335,9 +335,10 @@ private static List processFile( throw new BadRequestException( file + " cannot be added to catalog without copy"); // TODO: explain } - long dataFileSizeInBytes; - var dataFile = Strings.replacePrefix(file, "s3a://", "s3://"); + long dataFileSizeInBytes = 0; + var start = System.currentTimeMillis(); + var dataFile = Strings.replacePrefix(file, "s3a://", "s3://"); if (options.noCopy()) { if (checkNotExists.apply(dataFile)) { return null; @@ -362,7 +363,6 @@ private static List processFile( .destinationKey(dst.path()) .build(); s3ClientLazy.getValue().copyObject(copyReq); - dataFileSizeInBytes = inputFile.getLength(); dataFile = dstDataFile; } else if (partitionColumns != null && !partitionColumns.isEmpty()) { String dstDataFile = dstDataFileSource.get(file); @@ -371,39 +371,66 @@ private static List processFile( } return copyParquetWithPartition( file, - Strings.replacePrefix(dstDataFile, "s3://", "s3a://"), + Strings.replacePrefix(dstDataFileSource.get(file), "s3://", "s3a://"), tableSchema, table, - inputFile); + inputFile, + metadata); } else { String dstDataFile = dstDataFileSource.get(file); if (checkNotExists.apply(dstDataFile)) { return null; } + OutputFile outputFile = tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://")); - // TODO: support transferTo below (note that compression, etc. might be different) - // try (var d = outputFile.create()) { try (var s = inputFile.newStream()) { - // s.transferTo(d); }} + Parquet.ReadBuilder readBuilder = Parquet.read(inputFile) .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) - .project(tableSchema); // TODO: ? - // TODO: reuseContainers? + .project(tableSchema); + + logger.info("{}: copying to {}", file, dstDataFile); + + // Read records into memory + List records = new ArrayList<>(); + try (CloseableIterable iterable = readBuilder.build()) { + for (Record r : iterable) { + records.add(r); + } + } + + // Sort records if sort order is defined and non-empty + SortOrder sortOrder = table.sortOrder(); + if (sortOrder != null && !sortOrder.isUnsorted()) { + records.sort(new RecordSortComparator(sortOrder, tableSchema)); + } + + // Write sorted records out Parquet.WriteBuilder writeBuilder = Parquet.write(outputFile) .overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL) .createWriterFunc(GenericParquetWriter::buildWriter) .schema(tableSchema); - logger.info("{}: copying to {}", file, dstDataFile); - // file size may have changed due to different compression, etc. - dataFileSizeInBytes = copy(readBuilder, writeBuilder); + + try (FileAppender appender = writeBuilder.build()) { + for (Record record : records) { + appender.add(record); + } + + // dataFileSizeInBytes = appender.length(); + } + + InputFile inFile = outputFile.toInputFile(); + dataFileSizeInBytes = inFile.getLength(); dataFile = dstDataFile; } logger.info( "{}: adding data file (copy took {}s)", file, (System.currentTimeMillis() - start) / 1000); MetricsConfig metricsConfig = MetricsConfig.forTable(table); Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig); + + // dataFileSizeInBytes = inputFile.getLength(); DataFile dataFileObj = new DataFiles.Builder(table.spec()) .withPath(dataFile) @@ -416,10 +443,16 @@ private static List processFile( } private static List copyParquetWithPartition( - String file, String dstDataFile, Schema tableSchema, Table table, InputFile inputFile) + String file, + String dstDataFile, + Schema tableSchema, + Table table, + InputFile inputFile, + ParquetMetadata metadata) throws IOException { logger.info("{}: copying to partitions under {}", file, dstDataFile); + var start = System.currentTimeMillis(); // Partition writer setup OutputFileFactory fileFactory = @@ -449,6 +482,7 @@ private static List copyParquetWithPartition( List dataFiles = new ArrayList<>(); + logger.info("Sort order: " + table.sortOrder().toString()); // Create a comparator based on table.sortOrder() RecordSortComparator comparator = new RecordSortComparator(table.sortOrder(), tableSchema); @@ -470,10 +504,13 @@ private static List copyParquetWithPartition( } appender.close(); + logger.info( + "{}: adding data file (copy took {}s)", + file, + (System.currentTimeMillis() - start) / 1000); InputFile inFile = outFile.toInputFile(); MetricsConfig metricsConfig = MetricsConfig.forTable(table); Metrics metrics = ParquetUtil.fileMetrics(inFile, metricsConfig); - dataFiles.add( DataFiles.builder(table.spec()) .withPath(outFile.location()) From 2eab476ae1380e3d8dfc3121efd644033742921d Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 5 May 2025 17:04:40 -0400 Subject: [PATCH 14/30] Fixed typo in variable name --- .../com/altinity/ice/cli/internal/cmd/CreateTable.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java index 3002e449..bf02a1b8 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java @@ -47,7 +47,7 @@ public static void run( boolean s3NoSignRequest, List partitionColumns, List sortAscendingColumns, - List sortDescingColumns) + List sortDescendingColumns) throws IOException { Lazy s3ClientLazy = new Lazy<>(() -> S3.newClient(s3NoSignRequest)); @@ -84,7 +84,7 @@ public static void run( // Create sort order based on provided sort columns (z-order) SortOrder sortOrder = null; if ((sortAscendingColumns != null && !sortAscendingColumns.isEmpty()) - || (sortDescingColumns != null && !sortDescingColumns.isEmpty())) { + || (sortDescendingColumns != null && !sortDescendingColumns.isEmpty())) { SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(fileSchema); // Add ascending columns first @@ -95,8 +95,8 @@ public static void run( } // Add descending columns - if (sortDescingColumns != null) { - for (String column : sortDescingColumns) { + if (sortDescendingColumns != null) { + for (String column : sortDescendingColumns) { sortOrderBuilder.desc(column); } } From f7651b68fb0ffe23d2ec274e0567fce6e318277b Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 6 May 2025 22:56:33 -0400 Subject: [PATCH 15/30] ice: Replaced SortAscending/Descending with JSON that accepts sort order, nullOrder and column name. --- .../main/java/com/altinity/ice/cli/Main.java | 61 +++++++++++-------- .../ice/cli/internal/cmd/CreateTable.java | 46 ++++++-------- .../altinity/ice/cli/internal/cmd/Insert.java | 36 +++++------ 3 files changed, 68 insertions(+), 75 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index b0c117a6..d1147e47 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -18,8 +18,11 @@ import com.altinity.ice.cli.internal.config.Config; import com.altinity.ice.internal.picocli.VersionProvider; import com.altinity.ice.internal.strings.Strings; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Scanner; @@ -114,6 +117,11 @@ void describe( } } + public record IceSortOrder( + @JsonProperty("column") String column, + @JsonProperty("desc") boolean desc, + @JsonProperty("nullFirst") boolean nullFirst) {} + @CommandLine.Command(name = "create-table", description = "Create table.") void createTable( @CommandLine.Parameters( @@ -145,17 +153,20 @@ void createTable( split = ",") List partitionColumns, @CommandLine.Option( - names = {"--sort-ascending"}, - description = "Comma-separated list of columns to sort in ascending order", - split = ",") - List sortAscendingColumns, - @CommandLine.Option( - names = {"--sort-descending"}, - description = "Comma-separated list of columns to sort in descending order", - split = ",") - List sortDescendingColumns) + names = {"--sort-order"}, + description = + "JSON array of sort orders: [{\"column\":\"name\",\"desc\":true,\"nullFirst\":true}]") + String sortOrderJson) throws IOException { try (RESTCatalog catalog = loadCatalog(this.configFile())) { + List sortOrders = new ArrayList<>(); + + if (sortOrderJson != null && !sortOrderJson.isEmpty()) { + ObjectMapper mapper = new ObjectMapper(); + IceSortOrder[] orders = mapper.readValue(sortOrderJson, IceSortOrder[].class); + sortOrders = Arrays.asList(orders); + } + CreateTable.run( catalog, TableIdentifier.parse(name), @@ -164,8 +175,7 @@ void createTable( createTableIfNotExists, s3NoSignRequest, partitionColumns, - sortAscendingColumns, - sortDescendingColumns); + sortOrders); } } @@ -230,15 +240,10 @@ void insert( split = ",") List partitionColumns, @CommandLine.Option( - names = {"--sort-ascending"}, - description = "Comma-separated list of columns to sort in ascending order", - split = ",") - List sortAscendingColumns, - @CommandLine.Option( - names = {"--sort-descending"}, - description = "Comma-separated list of columns to sort in descending order", - split = ",") - List sortDescendingColumns, + names = {"--sort-order"}, + description = + "JSON array of sort orders: [{\"column\":\"name\",\"desc\":true,\"nullFirst\":true}]") + String sortOrderJson, @CommandLine.Option( names = {"--thread-count"}, description = "Number of threads to use for inserting data", @@ -257,9 +262,17 @@ void insert( return; } } + + List sortOrders = new ArrayList<>(); + + if (sortOrderJson != null && !sortOrderJson.isEmpty()) { + ObjectMapper mapper = new ObjectMapper(); + IceSortOrder[] orders = mapper.readValue(sortOrderJson, IceSortOrder[].class); + sortOrders = Arrays.asList(orders); + } + TableIdentifier tableId = TableIdentifier.parse(name); if (createTableIfNotExists) { - // TODO: newCreateTableTransaction CreateTable.run( catalog, tableId, @@ -268,8 +281,7 @@ void insert( createTableIfNotExists, s3NoSignRequest, partitionColumns, - sortAscendingColumns, - sortDescendingColumns); + sortOrders); } Insert.run( catalog, @@ -285,8 +297,7 @@ void insert( s3CopyObject, retryList, partitionColumns, - sortAscendingColumns, - sortDescendingColumns, + sortOrders, threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount); } } diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java index bf02a1b8..cfedbcbc 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java @@ -9,6 +9,7 @@ */ package com.altinity.ice.cli.internal.cmd; +import com.altinity.ice.cli.Main; import com.altinity.ice.cli.internal.iceberg.io.Input; import com.altinity.ice.cli.internal.iceberg.parquet.Metadata; import com.altinity.ice.cli.internal.s3.S3; @@ -16,10 +17,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SortOrder; -import org.apache.iceberg.TableProperties; +import org.apache.iceberg.*; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; @@ -46,8 +44,7 @@ public static void run( boolean ignoreAlreadyExists, boolean s3NoSignRequest, List partitionColumns, - List sortAscendingColumns, - List sortDescendingColumns) + List sortOrders) throws IOException { Lazy s3ClientLazy = new Lazy<>(() -> S3.newClient(s3NoSignRequest)); @@ -81,32 +78,25 @@ public static void run( } final PartitionSpec partitionSpec = partitionSpecBuilder.build(); - // Create sort order based on provided sort columns (z-order) - SortOrder sortOrder = null; - if ((sortAscendingColumns != null && !sortAscendingColumns.isEmpty()) - || (sortDescendingColumns != null && !sortDescendingColumns.isEmpty())) { - SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(fileSchema); - - // Add ascending columns first - if (sortAscendingColumns != null) { - for (String column : sortAscendingColumns) { - sortOrderBuilder.asc(column); - } - } + // if we don't set location, it's automatically set to $warehouse/$namespace/$table + createNamespace(catalog, nsTable.namespace()); + Table table = catalog.createTable(nsTable, fileSchema, partitionSpec, location, props); + // ToDO: Move this shared code from Input and CreateTable to base class or common. + // Create sort order based on provided sort columns + if (sortOrders != null && !sortOrders.isEmpty()) { - // Add descending columns - if (sortDescendingColumns != null) { - for (String column : sortDescendingColumns) { - sortOrderBuilder.desc(column); + ReplaceSortOrder replaceSortOrder = table.replaceSortOrder(); + for (Main.IceSortOrder order : sortOrders) { + SortDirection dir = order.desc() ? SortDirection.DESC : SortDirection.ASC; + NullOrder nullOrd = order.nullFirst() ? NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; + if (dir == SortDirection.ASC) { + replaceSortOrder.asc(order.column(), nullOrd); + } else { + replaceSortOrder.desc(order.column(), nullOrd); } } - - sortOrder = sortOrderBuilder.build(); + replaceSortOrder.commit(); } - - // if we don't set location, it's automatically set to $warehouse/$namespace/$table - createNamespace(catalog, nsTable.namespace()); - catalog.createTable(nsTable, fileSchema, PartitionSpec.unpartitioned(), location, props); } catch (AlreadyExistsException e) { if (ignoreAlreadyExists) { return; diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 9d46ffe4..33a018c0 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -9,6 +9,7 @@ */ package com.altinity.ice.cli.internal.cmd; +import com.altinity.ice.cli.Main; import com.altinity.ice.cli.internal.iceberg.io.Input; import com.altinity.ice.cli.internal.iceberg.parquet.Metadata; import com.altinity.ice.cli.internal.jvm.Stats; @@ -73,8 +74,7 @@ public static void run( boolean s3CopyObject, String retryListFile, List partitionColumns, - List sortAscendingColumns, - List sortDescendingColumns, + List sortOrders, int threadCount) throws IOException, InterruptedException { if (files.length == 0) { @@ -97,8 +97,7 @@ public static void run( options.forceNoCopy() ? options.toBuilder().noCopy(true).build() : options; Table table = catalog.loadTable(nsTable); - updatePartitionAndSortOrderMetadata( - table, partitionColumns, sortAscendingColumns, sortDescendingColumns); + updatePartitionAndSortOrderMetadata(table, partitionColumns, sortOrders); try (FileIO tableIO = table.io()) { final Supplier s3ClientSupplier; @@ -249,10 +248,7 @@ public static void run( } private static void updatePartitionAndSortOrderMetadata( - Table table, - List partitionColumns, - List sortAscendingColumns, - List sortDescendingColumns) { + Table table, List partitionColumns, List sortOrders) { // Update partition spec if provided if (partitionColumns != null && !partitionColumns.isEmpty()) { var updateSpec = table.updateSpec(); @@ -263,29 +259,25 @@ private static void updatePartitionAndSortOrderMetadata( } // Update sort order if provided - if ((sortAscendingColumns != null && !sortAscendingColumns.isEmpty()) - || (sortDescendingColumns != null && !sortDescendingColumns.isEmpty())) { + if (sortOrders != null && !sortOrders.isEmpty()) { table .updateProperties() .set( TableProperties.WRITE_DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE) .commit(); - var updatedSortOrder = table.replaceSortOrder(); - - if (sortAscendingColumns != null) { - for (String column : sortAscendingColumns) { - updatedSortOrder.asc(column); - } - } - if (sortDescendingColumns != null) { - for (String column : sortDescendingColumns) { - updatedSortOrder.desc(column); + ReplaceSortOrder replaceSortOrder = table.replaceSortOrder(); + for (Main.IceSortOrder order : sortOrders) { + SortDirection dir = order.desc() ? SortDirection.DESC : SortDirection.ASC; + NullOrder nullOrd = order.nullFirst() ? NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; + if (dir == SortDirection.ASC) { + replaceSortOrder.asc(order.column(), nullOrd); + } else { + replaceSortOrder.desc(order.column(), nullOrd); } } - - updatedSortOrder.commit(); + replaceSortOrder.commit(); } } From 5a29eb4159cab7552042d4d6481693fbc2f69b06 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 7 May 2025 15:16:23 -0400 Subject: [PATCH 16/30] ice: Change partition option to a json that accepts column name and transform function. --- examples/docker-compose/README.md | 10 +- .../main/java/com/altinity/ice/cli/Main.java | 39 +++++-- .../ice/cli/internal/cmd/CreateTable.java | 32 +++++- .../altinity/ice/cli/internal/cmd/Insert.java | 53 +++++++-- .../java/com/altinity/ice/cli/MainTest.java | 104 ++++++++++++++++++ 5 files changed, 213 insertions(+), 25 deletions(-) create mode 100644 ice/src/test/java/com/altinity/ice/cli/MainTest.java diff --git a/examples/docker-compose/README.md b/examples/docker-compose/README.md index f4883449..6bf77b61 100644 --- a/examples/docker-compose/README.md +++ b/examples/docker-compose/README.md @@ -33,7 +33,8 @@ A spark-iceberg container can be launched using the `docker-compose-spark-iceber The default configuration is located in the following path `/opt/spark/conf/spark-defaults.conf` -For spark to communicate with `ice-rest-catalog` and `minio`, the following configuration variables need to be updated.\ +For spark to communicate with `ice-rest-catalog` and `minio`, the following configuration variables +in `/opt/spark/conf/spark-defaults.conf` can to be updated.\ `spark.sql.catalog.demo.uri` - ice-rest-catalog URI \ `spark.sql.catalog.demo.s3.endpoint` - minio server url. @@ -59,3 +60,10 @@ spark.eventLog.dir /home/iceberg/spark-events spark.history.fs.logDirectory /home/iceberg/spark-events spark.sql.catalogImplementation in-memory ``` + +The spark-sql shell can now query the tables directory + +``` +docker exec -it bash +./spark-sql +``` diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index d1147e47..92928377 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -122,6 +122,9 @@ public record IceSortOrder( @JsonProperty("desc") boolean desc, @JsonProperty("nullFirst") boolean nullFirst) {} + public record IcePartition( + @JsonProperty("column") String column, @JsonProperty("transform") String transform) {} + @CommandLine.Command(name = "create-table", description = "Create table.") void createTable( @CommandLine.Parameters( @@ -148,10 +151,10 @@ void createTable( description = "/path/to/file.parquet") String schemaFile, @CommandLine.Option( - names = {"--partition-by"}, - description = "Comma-separated list of columns to partition by", - split = ",") - List partitionColumns, + names = {"--partition"}, + description = + "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]") + String partitionJson, @CommandLine.Option( names = {"--sort-order"}, description = @@ -160,6 +163,7 @@ void createTable( throws IOException { try (RESTCatalog catalog = loadCatalog(this.configFile())) { List sortOrders = new ArrayList<>(); + List partitions = new ArrayList<>(); if (sortOrderJson != null && !sortOrderJson.isEmpty()) { ObjectMapper mapper = new ObjectMapper(); @@ -167,6 +171,12 @@ void createTable( sortOrders = Arrays.asList(orders); } + if (partitionJson != null && !partitionJson.isEmpty()) { + ObjectMapper mapper = new ObjectMapper(); + IcePartition[] parts = mapper.readValue(partitionJson, IcePartition[].class); + partitions = Arrays.asList(parts); + } + CreateTable.run( catalog, TableIdentifier.parse(name), @@ -174,7 +184,7 @@ void createTable( location, createTableIfNotExists, s3NoSignRequest, - partitionColumns, + partitions, sortOrders); } } @@ -235,10 +245,10 @@ void insert( + " (useful for retrying partially failed insert using `cat ice.retry | ice insert - --retry-list=ice.retry`)") String retryList, @CommandLine.Option( - names = {"--partition-by"}, - description = "Comma-separated list of columns to partition by", - split = ",") - List partitionColumns, + names = {"--partition"}, + description = + "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]") + String partitionJson, @CommandLine.Option( names = {"--sort-order"}, description = @@ -264,6 +274,7 @@ void insert( } List sortOrders = new ArrayList<>(); + List partitions = new ArrayList<>(); if (sortOrderJson != null && !sortOrderJson.isEmpty()) { ObjectMapper mapper = new ObjectMapper(); @@ -271,6 +282,12 @@ void insert( sortOrders = Arrays.asList(orders); } + if (partitionJson != null && !partitionJson.isEmpty()) { + ObjectMapper mapper = new ObjectMapper(); + IcePartition[] parts = mapper.readValue(partitionJson, IcePartition[].class); + partitions = Arrays.asList(parts); + } + TableIdentifier tableId = TableIdentifier.parse(name); if (createTableIfNotExists) { CreateTable.run( @@ -280,7 +297,7 @@ void insert( null, createTableIfNotExists, s3NoSignRequest, - partitionColumns, + partitions, sortOrders); } Insert.run( @@ -296,7 +313,7 @@ void insert( s3NoSignRequest, s3CopyObject, retryList, - partitionColumns, + partitions, sortOrders, threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount); } diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java index cfedbcbc..e65745fd 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java @@ -43,7 +43,7 @@ public static void run( String location, boolean ignoreAlreadyExists, boolean s3NoSignRequest, - List partitionColumns, + List partitionColumns, List sortOrders) throws IOException { Lazy s3ClientLazy = new Lazy<>(() -> S3.newClient(s3NoSignRequest)); @@ -72,8 +72,34 @@ public static void run( // Create partition spec based on provided partition columns final PartitionSpec.Builder partitionSpecBuilder = PartitionSpec.builderFor(fileSchema); if (partitionColumns != null && !partitionColumns.isEmpty()) { - for (String column : partitionColumns) { - partitionSpecBuilder.identity(column); + for (Main.IcePartition partition : partitionColumns) { + String transform = partition.transform().toLowerCase(); + if (transform.startsWith("bucket[")) { + int numBuckets = Integer.parseInt(transform.substring(7, transform.length() - 1)); + partitionSpecBuilder.bucket(partition.column(), numBuckets); + } else if (transform.startsWith("truncate[")) { + int width = Integer.parseInt(transform.substring(9, transform.length() - 1)); + partitionSpecBuilder.truncate(partition.column(), width); + } else { + switch (transform) { + case "year": + partitionSpecBuilder.year(partition.column()); + break; + case "month": + partitionSpecBuilder.month(partition.column()); + break; + case "day": + partitionSpecBuilder.day(partition.column()); + break; + case "hour": + partitionSpecBuilder.hour(partition.column()); + break; + case "identity": + default: + partitionSpecBuilder.identity(partition.column()); + break; + } + } } } final PartitionSpec partitionSpec = partitionSpecBuilder.build(); diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 33a018c0..957f35ad 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -35,6 +35,7 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.BadRequestException; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.*; import org.apache.iceberg.mapping.MappedField; import org.apache.iceberg.mapping.NameMapping; @@ -73,7 +74,7 @@ public static void run( boolean s3NoSignRequest, boolean s3CopyObject, String retryListFile, - List partitionColumns, + List partitionColumns, List sortOrders, int threadCount) throws IOException, InterruptedException { @@ -248,26 +249,55 @@ public static void run( } private static void updatePartitionAndSortOrderMetadata( - Table table, List partitionColumns, List sortOrders) { - // Update partition spec if provided - if (partitionColumns != null && !partitionColumns.isEmpty()) { - var updateSpec = table.updateSpec(); - for (String column : partitionColumns) { - updateSpec.addField(column); + Table table, List partitions, List sortOrders) { + + // Create a new transaction. + Transaction txn = table.newTransaction(); + + if (partitions != null && !partitions.isEmpty()) { + var updateSpec = txn.updateSpec(); + for (Main.IcePartition partition : partitions) { + String transform = partition.transform().toLowerCase(); + if (transform.startsWith("bucket[")) { + int numBuckets = Integer.parseInt(transform.substring(7, transform.length() - 1)); + updateSpec.addField(Expressions.bucket(partition.column(), numBuckets)); + } else if (transform.startsWith("truncate[")) { + int width = Integer.parseInt(transform.substring(9, transform.length() - 1)); + updateSpec.addField(Expressions.truncate(partition.column(), width)); + } else { + switch (transform) { + case "year": + updateSpec.addField(Expressions.year(partition.column())); + break; + case "month": + updateSpec.addField(Expressions.month(partition.column())); + break; + case "day": + updateSpec.addField(Expressions.day(partition.column())); + break; + case "hour": + updateSpec.addField(Expressions.hour(partition.column())); + break; + case "identity": + default: + updateSpec.addField(partition.column()); + break; + } + } } updateSpec.commit(); } // Update sort order if provided if (sortOrders != null && !sortOrders.isEmpty()) { - table + txn .updateProperties() .set( TableProperties.WRITE_DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE) .commit(); - ReplaceSortOrder replaceSortOrder = table.replaceSortOrder(); + ReplaceSortOrder replaceSortOrder = txn.replaceSortOrder(); for (Main.IceSortOrder order : sortOrders) { SortDirection dir = order.desc() ? SortDirection.DESC : SortDirection.ASC; NullOrder nullOrd = order.nullFirst() ? NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; @@ -279,6 +309,9 @@ private static void updatePartitionAndSortOrderMetadata( } replaceSortOrder.commit(); } + + // Commit transaction. + txn.commitTransaction(); } private static List processFile( @@ -293,7 +326,7 @@ private static List processFile( Schema tableSchema, DataFileNamingStrategy.Name dataFileNamingStrategy, String file, - List partitionColumns) + List partitionColumns) throws IOException { logger.info("{}: processing", file); logger.info("{}: jvm: {}", file, Stats.gather()); diff --git a/ice/src/test/java/com/altinity/ice/cli/MainTest.java b/ice/src/test/java/com/altinity/ice/cli/MainTest.java new file mode 100644 index 00000000..edc5b3b1 --- /dev/null +++ b/ice/src/test/java/com/altinity/ice/cli/MainTest.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.cli; + +import static org.testng.Assert.*; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.testng.annotations.Test; + +public class MainTest { + + @Test + public void testSortOrderJsonParsing() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + + // Test single sort order + String singleSortJson = + """ + { + "column": "name", + "desc": true, + "nullFirst": true + } + """; + + Main.IceSortOrder singleSort = mapper.readValue(singleSortJson, Main.IceSortOrder.class); + assertEquals(singleSort.column(), "name"); + assertTrue(singleSort.desc()); + assertTrue(singleSort.nullFirst()); + + // Test array of sort orders + String multipleSortJson = + """ + [ + { + "column": "name", + "desc": true, + "nullFirst": true + }, + { + "column": "age", + "desc": false, + "nullFirst": false + } + ] + """; + + Main.IceSortOrder[] multipleSorts = + mapper.readValue(multipleSortJson, Main.IceSortOrder[].class); + assertEquals(multipleSorts.length, 2); + + // Verify first sort order + assertEquals(multipleSorts[0].column(), "name"); + assertTrue(multipleSorts[0].desc()); + assertTrue(multipleSorts[0].nullFirst()); + + // Verify second sort order + assertEquals(multipleSorts[1].column(), "age"); + assertFalse(multipleSorts[1].desc()); + assertFalse(multipleSorts[1].nullFirst()); + } + + @Test + public void testSortOrderJsonParsingWithMissingFields() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + + // Test with missing nullFirst field (should default to false) + String json = + """ + { + "column": "name", + "desc": true + } + """; + + Main.IceSortOrder sort = mapper.readValue(json, Main.IceSortOrder.class); + assertEquals(sort.column(), "name"); + assertTrue(sort.desc()); + assertFalse(sort.nullFirst()); + } + + @Test(expectedExceptions = Exception.class) + public void testSortOrderJsonParsingWithInvalidJson() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + + // Test with invalid JSON + String invalidJson = + """ + { + "column": "name", + "desc": "not-a-boolean" + } + """; + + mapper.readValue(invalidJson, Main.IceSortOrder.class); + } +} From 913028576ed5a0aebefc82a44f66162ab808ec9c Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 7 May 2025 15:42:16 -0400 Subject: [PATCH 17/30] ice: Formatting updates --- examples/scratch/README.md | 9 ++++++--- .../java/com/altinity/ice/cli/internal/cmd/Insert.java | 3 +-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/examples/scratch/README.md b/examples/scratch/README.md index 3a2c95e7..891ee2e4 100644 --- a/examples/scratch/README.md +++ b/examples/scratch/README.md @@ -31,10 +31,13 @@ ice insert nyc.taxis -p \ https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet # Insert rows with sort-key -ice insert --sort-descending=VendorID nyc.taxis2 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet +ice insert --sort-order='[{"column": "VendorID", "desc": true, "nullFirst": true}]' nyc.taxis2 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet + +# Insert sort-key(multiple columns) +ice insert --sort-order='[{"column": "VendorID", "desc": true, "nullFirst": true}, {"column": "Airport_fee", "desc": false, "nullFirst": false}]' nyc.taxis24 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet # Insert with partition key -ice insert --partition-by=Airport_fee nyc2.taxis3 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet +ice insert --partition='[{"column": "RatecodeID", "transform": "identity"}]' nyc.taxis20 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet # warning: each parquet file below is ~500mb. this may take a while AWS_REGION=us-east-2 ice insert btc.transactions -p --s3-no-sign-request \ @@ -50,7 +53,7 @@ ice create-table flowers.iris_no_copy --schema-from-parquet=file://iris.parquet ice create-table flowers.irs_no_copy_partition --schema-from-parquet=file://iris.parquet --partition-by=variety,petal.width # create table with sort columns -ice create-table flowers.irs_no_copy_sort --schema-from-parquet=file://iris.parquet --sort-by=variety +ice create-table flowers.irs_no_copy_sort --schema-from-parquet=file://iris.parquet --sort-order='[{"column": "variety", "desc": false}]' local-mc cp iris.parquet local/bucket1/flowers/iris_no_copy/ ice insert flowers.iris_no_copy --no-copy s3://bucket1/flowers/iris_no_copy/iris.parquet diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 957f35ad..999af92d 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -290,8 +290,7 @@ private static void updatePartitionAndSortOrderMetadata( // Update sort order if provided if (sortOrders != null && !sortOrders.isEmpty()) { - txn - .updateProperties() + txn.updateProperties() .set( TableProperties.WRITE_DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE) From 8512ba58580b8d154cbb841b4a1bb7ebeda83521 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 7 May 2025 15:52:57 -0400 Subject: [PATCH 18/30] ice: Replaced fileMetrics with footerMetrics. --- .../main/java/com/altinity/ice/cli/internal/cmd/Insert.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 999af92d..f9e796e8 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -452,7 +452,7 @@ private static List processFile( logger.info( "{}: adding data file (copy took {}s)", file, (System.currentTimeMillis() - start) / 1000); MetricsConfig metricsConfig = MetricsConfig.forTable(table); - Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig); + Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig); // dataFileSizeInBytes = inputFile.getLength(); DataFile dataFileObj = @@ -534,7 +534,7 @@ private static List copyParquetWithPartition( (System.currentTimeMillis() - start) / 1000); InputFile inFile = outFile.toInputFile(); MetricsConfig metricsConfig = MetricsConfig.forTable(table); - Metrics metrics = ParquetUtil.fileMetrics(inFile, metricsConfig); + Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig); dataFiles.add( DataFiles.builder(table.spec()) .withPath(outFile.location()) From 65847f834726743be4c51c6219a14be0d2213596 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 7 May 2025 20:49:58 -0400 Subject: [PATCH 19/30] ice: Reverted default of copy() when partitioning or sorting is not required. --- .../altinity/ice/cli/internal/cmd/Insert.java | 124 ++++++++++++------ 1 file changed, 87 insertions(+), 37 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index f9e796e8..1174a605 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -182,7 +182,8 @@ public static void run( tableSchema, dataFileNamingStrategy, file, - partitionColumns); + partitionColumns, + sortOrders); if (dataFiles != null) { for (DataFile df : dataFiles) { atLeastOneFileAppended.set(true); @@ -325,7 +326,8 @@ private static List processFile( Schema tableSchema, DataFileNamingStrategy.Name dataFileNamingStrategy, String file, - List partitionColumns) + List partitionColumns, + List sortOrders) throws IOException { logger.info("{}: processing", file); logger.info("{}: jvm: {}", file, Stats.gather()); @@ -400,66 +402,48 @@ private static List processFile( table, inputFile, metadata); + } else if (sortOrders != null && !sortOrders.isEmpty()) { + return copyParquetWithSortOrder( + file, + Strings.replacePrefix(dstDataFileSource.get(file), "s3://", "s3a://"), + tableSchema, + table, + inputFile, + metadata, + dataFileNamingStrategy); } else { String dstDataFile = dstDataFileSource.get(file); if (checkNotExists.apply(dstDataFile)) { return null; } - OutputFile outputFile = tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://")); - + // TODO: support transferTo below (note that compression, etc. might be different) + // try (var d = outputFile.create()) { try (var s = inputFile.newStream()) { + // s.transferTo(d); }} Parquet.ReadBuilder readBuilder = Parquet.read(inputFile) .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) - .project(tableSchema); - - logger.info("{}: copying to {}", file, dstDataFile); - - // Read records into memory - List records = new ArrayList<>(); - try (CloseableIterable iterable = readBuilder.build()) { - for (Record r : iterable) { - records.add(r); - } - } - - // Sort records if sort order is defined and non-empty - SortOrder sortOrder = table.sortOrder(); - if (sortOrder != null && !sortOrder.isUnsorted()) { - records.sort(new RecordSortComparator(sortOrder, tableSchema)); - } - - // Write sorted records out + .project(tableSchema); // TODO: ? + // TODO: reuseContainers? Parquet.WriteBuilder writeBuilder = Parquet.write(outputFile) .overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL) .createWriterFunc(GenericParquetWriter::buildWriter) .schema(tableSchema); - - try (FileAppender appender = writeBuilder.build()) { - for (Record record : records) { - appender.add(record); - } - - // dataFileSizeInBytes = appender.length(); - } - - InputFile inFile = outputFile.toInputFile(); - dataFileSizeInBytes = inFile.getLength(); + logger.info("{}: copying to {}", file, dstDataFile); + // file size may have changed due to different compression, etc. + dataFileSizeInBytes = copy(readBuilder, writeBuilder); dataFile = dstDataFile; } logger.info( "{}: adding data file (copy took {}s)", file, (System.currentTimeMillis() - start) / 1000); MetricsConfig metricsConfig = MetricsConfig.forTable(table); Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig); - - // dataFileSizeInBytes = inputFile.getLength(); DataFile dataFileObj = new DataFiles.Builder(table.spec()) .withPath(dataFile) .withFormat("PARQUET") - // .withRecordCount(recordCount) .withFileSizeInBytes(dataFileSizeInBytes) .withMetrics(metrics) .build(); @@ -550,6 +534,72 @@ private static List copyParquetWithPartition( return dataFiles; } + private static List copyParquetWithSortOrder( + String file, + String dstDataFile, + Schema tableSchema, + Table table, + InputFile inputFile, + ParquetMetadata metadata, + DataFileNamingStrategy.Name dataFileNamingStrategy) + throws IOException { + Logger logger = LoggerFactory.getLogger(Insert.class); + logger.info("{}: copying with sort order to {}", file, dstDataFile); + long start = System.currentTimeMillis(); + + OutputFile outputFile = table.io().newOutputFile(dstDataFile); + + Parquet.ReadBuilder readBuilder = + Parquet.read(inputFile) + .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) + .project(tableSchema); + + // Read records into memory + List records = new ArrayList<>(); + try (CloseableIterable iterable = readBuilder.build()) { + for (Record r : iterable) { + records.add(r); + } + } + + // Sort records if sort order is defined and non-empty + SortOrder sortOrder = table.sortOrder(); + if (sortOrder != null && !sortOrder.isUnsorted()) { + records.sort(new RecordSortComparator(sortOrder, tableSchema)); + } + + // Write sorted records out + Parquet.WriteBuilder writeBuilder = + Parquet.write(outputFile) + .overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL) + .createWriterFunc(GenericParquetWriter::buildWriter) + .schema(tableSchema); + + try (FileAppender appender = writeBuilder.build()) { + for (Record record : records) { + appender.add(record); + } + } + + InputFile inFile = outputFile.toInputFile(); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig); + + DataFile dataFileObj = + new DataFiles.Builder(table.spec()) + .withPath(dstDataFile) + .withFormat("PARQUET") + .withFileSizeInBytes(inFile.getLength()) + .withMetrics(metrics) + .build(); + + logger.info( + "{}: adding data file (copy with sort order took {}s)", + file, + (System.currentTimeMillis() - start) / 1000); + return Collections.singletonList(dataFileObj); + } + private static boolean sameSchema(Table table, Schema fileSchema) { boolean sameSchema; Schema tableSchema = table.schema(); From 5d25f55e8727e9cf0cba5a44452aef8a474ed0f3 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 7 May 2025 21:21:07 -0400 Subject: [PATCH 20/30] ice: Fixed appendOp used in main thread and the thread pool threads. --- .../altinity/ice/cli/internal/cmd/Insert.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 1174a605..15b92702 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -185,18 +185,15 @@ public static void run( partitionColumns, sortOrders); if (dataFiles != null) { - for (DataFile df : dataFiles) { - atLeastOneFileAppended.set(true); - appendOp.appendFile(df); - } + return dataFiles; } - return dataFiles; + return Collections.emptyList(); } catch (Exception e) { if (retryLog != null) { logger.error( "{}: error (adding to retry list and continuing)", file, e); retryLog.add(file); - return null; + return Collections.emptyList(); } else { throw e; } @@ -207,11 +204,9 @@ public static void run( for (var future : futures) { try { List dataFiles = future.get(); - if (dataFiles != null) { - for (DataFile df : dataFiles) { - atLeastOneFileAppended.set(true); - appendOp.appendFile(df); - } + for (DataFile df : dataFiles) { + atLeastOneFileAppended.set(true); + appendOp.appendFile(df); // ✅ Only main thread appends now } } catch (InterruptedException e) { Thread.currentThread().interrupt(); From 290dd80c6efe41c4cb81936351e57ea31323f611 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 8 May 2025 19:17:45 -0400 Subject: [PATCH 21/30] ice: Addressed PR review comments. --- examples/docker-compose/README.md | 3 +-- .../main/java/com/altinity/ice/cli/Main.java | 2 ++ .../ice/cli/internal/cmd/CreateTable.java | 8 +++++++- .../altinity/ice/cli/internal/cmd/Insert.java | 18 +++++++++++------- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/examples/docker-compose/README.md b/examples/docker-compose/README.md index 6bf77b61..5d57169b 100644 --- a/examples/docker-compose/README.md +++ b/examples/docker-compose/README.md @@ -64,6 +64,5 @@ spark.sql.catalogImplementation in-memory The spark-sql shell can now query the tables directory ``` -docker exec -it bash -./spark-sql +docker exec -it spark-iceberg ./spark-sql ``` diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index 92928377..7c0f8eac 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -19,6 +19,7 @@ import com.altinity.ice.internal.picocli.VersionProvider; import com.altinity.ice.internal.strings.Strings; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.ArrayList; @@ -167,6 +168,7 @@ void createTable( if (sortOrderJson != null && !sortOrderJson.isEmpty()) { ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); IceSortOrder[] orders = mapper.readValue(sortOrderJson, IceSortOrder[].class); sortOrders = Arrays.asList(orders); } diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java index e65745fd..103aeeb5 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java @@ -17,7 +17,13 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.iceberg.*; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.ReplaceSortOrder; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 15b92702..d2a83d93 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -158,7 +157,7 @@ public static void run( retryListFile != null && !retryListFile.isEmpty() ? new RetryLog(retryListFile) : null) { - AtomicBoolean atLeastOneFileAppended = new AtomicBoolean(false); + boolean atLeastOneFileAppended = false; int numThreads = Math.min(finalOptions.threadCount(), filesExpanded.size()); ExecutorService executor = Executors.newFixedThreadPool(numThreads); @@ -205,7 +204,7 @@ public static void run( try { List dataFiles = future.get(); for (DataFile df : dataFiles) { - atLeastOneFileAppended.set(true); + atLeastOneFileAppended = true; appendOp.appendFile(df); // ✅ Only main thread appends now } } catch (InterruptedException e) { @@ -224,7 +223,7 @@ public static void run( if (!finalOptions.noCommit()) { // TODO: log - if (atLeastOneFileAppended.get()) { + if (atLeastOneFileAppended) { appendOp.commit(); } else { logger.warn("Table commit skipped (no files to append)"); @@ -456,6 +455,7 @@ private static List copyParquetWithPartition( logger.info("{}: copying to partitions under {}", file, dstDataFile); var start = System.currentTimeMillis(); + long fileSizeInBytes = 0; // Partition writer setup OutputFileFactory fileFactory = @@ -505,6 +505,8 @@ private static List copyParquetWithPartition( for (Record rec : records) { appender.add(rec); } + + fileSizeInBytes = appender.length(); appender.close(); logger.info( @@ -517,10 +519,9 @@ private static List copyParquetWithPartition( dataFiles.add( DataFiles.builder(table.spec()) .withPath(outFile.location()) - .withFileSizeInBytes(inFile.getLength()) + .withFileSizeInBytes(fileSizeInBytes) .withPartition(partKey) .withFormat(FileFormat.PARQUET) - .withRecordCount(records.size()) .withMetrics(metrics) .build()); } @@ -541,6 +542,7 @@ private static List copyParquetWithSortOrder( Logger logger = LoggerFactory.getLogger(Insert.class); logger.info("{}: copying with sort order to {}", file, dstDataFile); long start = System.currentTimeMillis(); + long fileSizeInBytes = 0; OutputFile outputFile = table.io().newOutputFile(dstDataFile); @@ -574,6 +576,8 @@ private static List copyParquetWithSortOrder( for (Record record : records) { appender.add(record); } + fileSizeInBytes = appender.length(); + ; } InputFile inFile = outputFile.toInputFile(); @@ -584,7 +588,7 @@ private static List copyParquetWithSortOrder( new DataFiles.Builder(table.spec()) .withPath(dstDataFile) .withFormat("PARQUET") - .withFileSizeInBytes(inFile.getLength()) + .withFileSizeInBytes(fileSizeInBytes) .withMetrics(metrics) .build(); From 9b9ed9be144a00553364a4fe059682a0a4880741 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 8 May 2025 22:56:57 -0400 Subject: [PATCH 22/30] ice: Addressed PR review comments. --- .../altinity/ice/cli/internal/cmd/Insert.java | 63 ++++++++++++++----- 1 file changed, 46 insertions(+), 17 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index d2a83d93..ab484d2c 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -17,15 +17,40 @@ import com.altinity.ice.cli.internal.s3.S3; import com.altinity.ice.internal.strings.Strings; import java.io.IOException; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.iceberg.*; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.ReplaceSortOrder; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; import org.apache.iceberg.aws.s3.S3FileIO; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericAppenderFactory; @@ -35,7 +60,12 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.BadRequestException; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.io.*; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.mapping.MappedField; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; @@ -97,7 +127,9 @@ public static void run( options.forceNoCopy() ? options.toBuilder().noCopy(true).build() : options; Table table = catalog.loadTable(nsTable); - updatePartitionAndSortOrderMetadata(table, partitionColumns, sortOrders); + // Create transaction and pass it to updatePartitionAndSortOrderMetadata + Transaction txn = table.newTransaction(); + updatePartitionAndSortOrderMetadata(txn, partitionColumns, sortOrders); try (FileIO tableIO = table.io()) { final Supplier s3ClientSupplier; @@ -150,7 +182,8 @@ public static void run( case PRESERVE_ORIGINAL -> new DataFileNamingStrategy.InputFilename(dstPath); }; - AppendFiles appendOp = table.newAppend(); + // appendOp to use the same transaction. + AppendFiles appendOp = txn.newAppend(); try (FileIO inputIO = Input.newIO(filesExpanded.getFirst(), table, s3ClientLazy); RetryLog retryLog = @@ -205,7 +238,7 @@ public static void run( List dataFiles = future.get(); for (DataFile df : dataFiles) { atLeastOneFileAppended = true; - appendOp.appendFile(df); // ✅ Only main thread appends now + appendOp.appendFile(df); // Only main thread appends now } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -231,6 +264,9 @@ public static void run( if (retryLog != null) { retryLog.commit(); } + + // Commit transaction. + txn.commitTransaction(); } else { logger.warn("Table commit skipped (--no-commit)"); } @@ -244,10 +280,7 @@ public static void run( } private static void updatePartitionAndSortOrderMetadata( - Table table, List partitions, List sortOrders) { - - // Create a new transaction. - Transaction txn = table.newTransaction(); + Transaction txn, List partitions, List sortOrders) { if (partitions != null && !partitions.isEmpty()) { var updateSpec = txn.updateSpec(); @@ -303,9 +336,6 @@ private static void updatePartitionAndSortOrderMetadata( } replaceSortOrder.commit(); } - - // Commit transaction. - txn.commitTransaction(); } private static List processFile( @@ -506,8 +536,8 @@ private static List copyParquetWithPartition( appender.add(rec); } - fileSizeInBytes = appender.length(); appender.close(); + fileSizeInBytes = appender.length(); logger.info( "{}: adding data file (copy took {}s)", @@ -576,11 +606,10 @@ private static List copyParquetWithSortOrder( for (Record record : records) { appender.add(record); } + appender.close(); fileSizeInBytes = appender.length(); - ; } - InputFile inFile = outputFile.toInputFile(); MetricsConfig metricsConfig = MetricsConfig.forTable(table); Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig); From 3329c323ec5652a6bf07709b72751ba33d33cc90 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 8 May 2025 22:57:05 -0400 Subject: [PATCH 23/30] ice: Addressed PR review comments. --- examples/docker-compose/docker-compose-spark-iceberg.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/docker-compose/docker-compose-spark-iceberg.yaml b/examples/docker-compose/docker-compose-spark-iceberg.yaml index 8a26671a..b7b2fa70 100644 --- a/examples/docker-compose/docker-compose-spark-iceberg.yaml +++ b/examples/docker-compose/docker-compose-spark-iceberg.yaml @@ -2,8 +2,7 @@ services: spark-iceberg: image: tabulario/spark-iceberg container_name: spark-iceberg - build: spark/ - network_mode: host + network_mode: host volumes: - ./warehouse:/home/iceberg/warehouse - ./notebooks:/home/iceberg/notebooks/notebooks From 070667223d22ecf3455260f51afb355c1d178b08 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 9 May 2025 09:47:28 -0400 Subject: [PATCH 24/30] ice: Added objectMapper fail settings. --- ice/src/main/java/com/altinity/ice/cli/Main.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index 7c0f8eac..08b2e922 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -175,6 +175,7 @@ void createTable( if (partitionJson != null && !partitionJson.isEmpty()) { ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); IcePartition[] parts = mapper.readValue(partitionJson, IcePartition[].class); partitions = Arrays.asList(parts); } @@ -280,12 +281,14 @@ void insert( if (sortOrderJson != null && !sortOrderJson.isEmpty()) { ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); IceSortOrder[] orders = mapper.readValue(sortOrderJson, IceSortOrder[].class); sortOrders = Arrays.asList(orders); } if (partitionJson != null && !partitionJson.isEmpty()) { ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); IcePartition[] parts = mapper.readValue(partitionJson, IcePartition[].class); partitions = Arrays.asList(parts); } From fe371ad90f4f68e7ac4d62d21149f435766c8df9 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 9 May 2025 10:03:26 -0400 Subject: [PATCH 25/30] ice: Added supported transforms in partition argument. --- ice/src/main/java/com/altinity/ice/cli/Main.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index 08b2e922..f36264da 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -154,7 +154,8 @@ void createTable( @CommandLine.Option( names = {"--partition"}, description = - "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]") + "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]," + + "Supported transforms: hour, day, month, year, identity") String partitionJson, @CommandLine.Option( names = {"--sort-order"}, From b0abb1a7c02256f20e6bdf6c597f0b6b8b54cf43 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 9 May 2025 10:08:33 -0400 Subject: [PATCH 26/30] ice: Fixed formatting errors. --- ice/src/main/java/com/altinity/ice/cli/Main.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index f36264da..3b9f9655 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -154,8 +154,8 @@ void createTable( @CommandLine.Option( names = {"--partition"}, description = - "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]," + - "Supported transforms: hour, day, month, year, identity") + "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]," + + "Supported transforms: hour, day, month, year, identity") String partitionJson, @CommandLine.Option( names = {"--sort-order"}, From 827fd73f03cdde390d8b89f9067d2dd446330274 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 9 May 2025 12:42:23 -0400 Subject: [PATCH 27/30] ice: Added logic to explicitly convert datetime/timestamp to long for partitioning. --- examples/scratch/README.md | 6 ++ .../main/java/com/altinity/ice/cli/Main.java | 5 +- .../altinity/ice/cli/internal/cmd/Insert.java | 74 ++++++++++++++----- 3 files changed, 66 insertions(+), 19 deletions(-) diff --git a/examples/scratch/README.md b/examples/scratch/README.md index 891ee2e4..6215541e 100644 --- a/examples/scratch/README.md +++ b/examples/scratch/README.md @@ -39,6 +39,12 @@ ice insert --sort-order='[{"column": "VendorID", "desc": true, "nullFirst": true # Insert with partition key ice insert --partition='[{"column": "RatecodeID", "transform": "identity"}]' nyc.taxis20 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet +# Partition by day +ice insert --partition='[{"column": "tpep_pickup_datetime", "transform": "day"}]' nyc.taxis20 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet + +# Partition by year +ice insert --partition='[{"column": "tpep_pickup_datetime", "transform": "year"}]' nyc44.taxis111 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet + # warning: each parquet file below is ~500mb. this may take a while AWS_REGION=us-east-2 ice insert btc.transactions -p --s3-no-sign-request \ s3://aws-public-blockchain/v1.0/btc/transactions/\ diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index 3b9f9655..e7564dd2 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -155,7 +155,7 @@ void createTable( names = {"--partition"}, description = "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]," - + "Supported transforms: hour, day, month, year, identity") + + "Supported transforms: hour, day, month, year, identity(default)") String partitionJson, @CommandLine.Option( names = {"--sort-order"}, @@ -251,7 +251,8 @@ void insert( @CommandLine.Option( names = {"--partition"}, description = - "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]") + "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]," + + "Supported transforms: hour, day, month, year, identity(default)") String partitionJson, @CommandLine.Option( names = {"--sort-order"}, diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index ab484d2c..15cf491b 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -17,6 +17,10 @@ import com.altinity.ice.cli.internal.s3.S3; import com.altinity.ice.internal.strings.Strings; import java.io.IOException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -36,24 +40,11 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Metrics; -import org.apache.iceberg.MetricsConfig; -import org.apache.iceberg.NullOrder; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.ReplaceSortOrder; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SortDirection; -import org.apache.iceberg.SortOrder; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; -import org.apache.iceberg.Transaction; +import org.apache.iceberg.*; import org.apache.iceberg.aws.s3.S3FileIO; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; @@ -73,6 +64,10 @@ import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.transforms.Days; +import org.apache.iceberg.transforms.Hours; +import org.apache.iceberg.transforms.Months; +import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -502,13 +497,36 @@ private static List copyParquetWithPartition( .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) .project(tableSchema) .reuseContainers(); + PartitionSpec spec = table.spec(); + Schema schema = table.schema(); - // Read and group records by partition try (CloseableIterable records = readBuilder.build()) { for (Record record : records) { - partitionKey.partition(record); + + Record partitionRecord = GenericRecord.create(schema); + for (Types.NestedField field : schema.columns()) { + partitionRecord.setField(field.name(), record.getField(field.name())); + } + + for (PartitionField field : spec.fields()) { + String fieldName = schema.findField(field.sourceId()).name(); + Object value = partitionRecord.getField(fieldName); + String transformName = field.transform().toString(); + + if (value != null + && (transformName.equals("day") + || transformName.equals("month") + || transformName.equals("hour"))) { + long micros = toMicros(value); + partitionRecord.setField(fieldName, micros); + } + } + + // Partition based on converted values + partitionKey.partition(partitionRecord); PartitionKey keyCopy = partitionKey.copy(); + // Store the original record (without converted timestamp fields) partitionedRecords.computeIfAbsent(keyCopy, k -> new ArrayList<>()).add(record); } } @@ -560,6 +578,28 @@ private static List copyParquetWithPartition( return dataFiles; } + public static long toMicros(Object tsValue) { + if (tsValue instanceof Long) { + return (Long) tsValue; + + } else if (tsValue instanceof String) { + LocalDateTime ldt = LocalDateTime.parse((String) tsValue); + return ldt.toInstant(ZoneOffset.UTC).toEpochMilli() * 1000L; + + } else if (tsValue instanceof LocalDateTime) { + return ((LocalDateTime) tsValue).toInstant(ZoneOffset.UTC).toEpochMilli() * 1000L; + + } else if (tsValue instanceof OffsetDateTime) { + return ((OffsetDateTime) tsValue).toInstant().toEpochMilli() * 1000L; + + } else if (tsValue instanceof Instant) { + return ((Instant) tsValue).toEpochMilli() * 1000L; + + } else { + throw new IllegalArgumentException("Unsupported timestamp type: " + tsValue.getClass()); + } + } + private static List copyParquetWithSortOrder( String file, String dstDataFile, From d3e76480d6cb095d3d2ba5d0706c476d1046f983 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 9 May 2025 12:46:47 -0400 Subject: [PATCH 28/30] ice: Added logic to explicitly convert datetime/timestamp to long for partitioning. --- ice/src/main/java/com/altinity/ice/cli/Main.java | 4 ++-- .../main/java/com/altinity/ice/cli/internal/cmd/Insert.java | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index e7564dd2..acda67f4 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -251,8 +251,8 @@ void insert( @CommandLine.Option( names = {"--partition"}, description = - "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]," - + "Supported transforms: hour, day, month, year, identity(default)") + "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]," + + "Supported transforms: hour, day, month, year, identity(default)") String partitionJson, @CommandLine.Option( names = {"--sort-order"}, diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 15cf491b..d94254f3 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -64,10 +64,6 @@ import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.rest.RESTCatalog; -import org.apache.iceberg.transforms.Days; -import org.apache.iceberg.transforms.Hours; -import org.apache.iceberg.transforms.Months; -import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.parquet.hadoop.metadata.ParquetMetadata; From 701352e0dc31db1d24b47960bea3295d1bc451e4 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 9 May 2025 13:28:02 -0400 Subject: [PATCH 29/30] ice: move initialization of dataFileSizeInBytes. --- .../main/java/com/altinity/ice/cli/internal/cmd/Insert.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index d94254f3..dbab81d0 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -376,7 +376,7 @@ private static List processFile( throw new BadRequestException( file + " cannot be added to catalog without copy"); // TODO: explain } - long dataFileSizeInBytes = 0; + long dataFileSizeInBytes; var start = System.currentTimeMillis(); var dataFile = Strings.replacePrefix(file, "s3a://", "s3://"); @@ -386,6 +386,7 @@ private static List processFile( } dataFileSizeInBytes = inputFile.getLength(); } else if (options.s3CopyObject()) { + dataFileSizeInBytes = 0; if (!dataFile.startsWith("s3://") || !table.location().startsWith("s3://")) { throw new BadRequestException("--s3-copy-object is only supported between s3:// buckets"); } From 1b30ce0b29659e3649f7de60fa6f688638c86a32 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 9 May 2025 13:47:30 -0400 Subject: [PATCH 30/30] ice: Reverted back removed line of setting dataFileSizeInBytes for s3 copy. --- .../main/java/com/altinity/ice/cli/internal/cmd/Insert.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index dbab81d0..9775a2e5 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -376,7 +376,7 @@ private static List processFile( throw new BadRequestException( file + " cannot be added to catalog without copy"); // TODO: explain } - long dataFileSizeInBytes; + long dataFileSizeInBytes = 0; var start = System.currentTimeMillis(); var dataFile = Strings.replacePrefix(file, "s3a://", "s3://"); @@ -386,7 +386,6 @@ private static List processFile( } dataFileSizeInBytes = inputFile.getLength(); } else if (options.s3CopyObject()) { - dataFileSizeInBytes = 0; if (!dataFile.startsWith("s3://") || !table.location().startsWith("s3://")) { throw new BadRequestException("--s3-copy-object is only supported between s3:// buckets"); } @@ -405,6 +404,7 @@ private static List processFile( .destinationKey(dst.path()) .build(); s3ClientLazy.getValue().copyObject(copyReq); + dataFileSizeInBytes = inputFile.getLength(); dataFile = dstDataFile; } else if (partitionColumns != null && !partitionColumns.isEmpty()) { String dstDataFile = dstDataFileSource.get(file);