diff --git a/examples/docker-compose/README.md b/examples/docker-compose/README.md
index 39a8cda8..5d57169b 100644
--- a/examples/docker-compose/README.md
+++ b/examples/docker-compose/README.md
@@ -25,3 +25,44 @@ clickhouse client --query 'select count(*) from ice.`nyc.taxis`;'
1. `docker compose up` fails with `ERROR: Invalid interpolation format for "content" option in config "clickhouse-init": "#!/bin/bash`
Solution: Upgrade docker/docker compose to v2.
+
+#### Spark Iceberg
+A spark-iceberg container can be launched using the `docker-compose-spark-iceberg.yml` file.
+
+
+The default configuration is located in the following path
+`/opt/spark/conf/spark-defaults.conf`
+
+For spark to communicate with `ice-rest-catalog` and `minio`, the following configuration variables
+in `/opt/spark/conf/spark-defaults.conf` can to be updated.\
+
+`spark.sql.catalog.demo.uri` - ice-rest-catalog URI \
+`spark.sql.catalog.demo.s3.endpoint` - minio server url.
+`spark.sql.catalog.demo.s3.access-key` - minio access key.
+`spark.sql.catalog.demo.s3.secret-key` - minio password.
+
+
+```
+spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
+spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog
+spark.sql.catalog.demo.type rest
+spark.sql.catalog.demo.uri http://localhost:5000
+spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO
+spark.sql.catalog.demo.warehouse s3://warehouse/wh/
+spark.sql.catalog.demo.s3.endpoint http://localhost:9000
+spark.sql.catalog.demo.s3.access-key miniouser
+spark.sql.catalog.demo.s3.secret-key miniopassword
+spark.sql.catalog.demo.s3.path-style-access true
+spark.sql.catalog.demo.s3.ssl-enabled false
+spark.sql.defaultCatalog demo
+spark.eventLog.enabled true
+spark.eventLog.dir /home/iceberg/spark-events
+spark.history.fs.logDirectory /home/iceberg/spark-events
+spark.sql.catalogImplementation in-memory
+```
+
+The spark-sql shell can now query the tables directory
+
+```
+docker exec -it spark-iceberg ./spark-sql
+```
diff --git a/examples/docker-compose/docker-compose-spark-iceberg.yaml b/examples/docker-compose/docker-compose-spark-iceberg.yaml
new file mode 100644
index 00000000..b7b2fa70
--- /dev/null
+++ b/examples/docker-compose/docker-compose-spark-iceberg.yaml
@@ -0,0 +1,17 @@
+services:
+ spark-iceberg:
+ image: tabulario/spark-iceberg
+ container_name: spark-iceberg
+ network_mode: host
+ volumes:
+ - ./warehouse:/home/iceberg/warehouse
+ - ./notebooks:/home/iceberg/notebooks/notebooks
+ environment:
+ - AWS_ACCESS_KEY_ID=miniouser
+ - AWS_SECRET_ACCESS_KEY=miniopassword
+ - AWS_REGION=minio
+ ports:
+ - 8888:8888
+ - 8080:8080
+ - 10000:10000
+ - 10001:10001
diff --git a/examples/scratch/README.md b/examples/scratch/README.md
index 5abc0bf4..6215541e 100644
--- a/examples/scratch/README.md
+++ b/examples/scratch/README.md
@@ -29,6 +29,21 @@ ice insert flowers.iris -p \
ice insert nyc.taxis -p \
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet
+
+# Insert rows with sort-key
+ice insert --sort-order='[{"column": "VendorID", "desc": true, "nullFirst": true}]' nyc.taxis2 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet
+
+# Insert sort-key(multiple columns)
+ice insert --sort-order='[{"column": "VendorID", "desc": true, "nullFirst": true}, {"column": "Airport_fee", "desc": false, "nullFirst": false}]' nyc.taxis24 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet
+
+# Insert with partition key
+ice insert --partition='[{"column": "RatecodeID", "transform": "identity"}]' nyc.taxis20 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet
+
+# Partition by day
+ice insert --partition='[{"column": "tpep_pickup_datetime", "transform": "day"}]' nyc.taxis20 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet
+
+# Partition by year
+ice insert --partition='[{"column": "tpep_pickup_datetime", "transform": "year"}]' nyc44.taxis111 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet
# warning: each parquet file below is ~500mb. this may take a while
AWS_REGION=us-east-2 ice insert btc.transactions -p --s3-no-sign-request \
@@ -39,6 +54,13 @@ date=2025-01-01/part-00000-33e8d075-2099-409b-a806-68dd17217d39-c000.snappy.parq
# upload file to minio using local-mc,
# then add file to the catalog without making a copy
ice create-table flowers.iris_no_copy --schema-from-parquet=file://iris.parquet
+
+# create table with partitioning columns
+ice create-table flowers.irs_no_copy_partition --schema-from-parquet=file://iris.parquet --partition-by=variety,petal.width
+
+# create table with sort columns
+ice create-table flowers.irs_no_copy_sort --schema-from-parquet=file://iris.parquet --sort-order='[{"column": "variety", "desc": false}]'
+
local-mc cp iris.parquet local/bucket1/flowers/iris_no_copy/
ice insert flowers.iris_no_copy --no-copy s3://bucket1/flowers/iris_no_copy/iris.parquet
diff --git a/ice/pom.xml b/ice/pom.xml
index 9903628a..4f5ea9fa 100644
--- a/ice/pom.xml
+++ b/ice/pom.xml
@@ -22,6 +22,45 @@
iceberg-bundled-guava
${iceberg.version}
+
+ org.apache.iceberg
+ iceberg-data
+ ${iceberg.version}
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ org.apache.parquet
+ parquet-avro
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ commons-codec
+ commons-codec
+
+
+ org.slf4j
+ slf4j-api
+
+
+ io.airlift
+ aircompressor
+
+
+
org.apache.iceberg
iceberg-core
@@ -31,6 +70,7 @@
com.github.ben-manes.caffeine
caffeine
+
com.fasterxml.jackson.core
jackson-core
@@ -43,6 +83,10 @@
commons-codec
commons-codec
+
+ org.apache.commons
+ commons-lang3
+
org.slf4j
slf4j-api
diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java
index 0b1fefa4..acda67f4 100644
--- a/ice/src/main/java/com/altinity/ice/cli/Main.java
+++ b/ice/src/main/java/com/altinity/ice/cli/Main.java
@@ -18,8 +18,12 @@
import com.altinity.ice.cli.internal.config.Config;
import com.altinity.ice.internal.picocli.VersionProvider;
import com.altinity.ice.internal.strings.Strings;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Scanner;
@@ -114,6 +118,14 @@ void describe(
}
}
+ public record IceSortOrder(
+ @JsonProperty("column") String column,
+ @JsonProperty("desc") boolean desc,
+ @JsonProperty("nullFirst") boolean nullFirst) {}
+
+ public record IcePartition(
+ @JsonProperty("column") String column, @JsonProperty("transform") String transform) {}
+
@CommandLine.Command(name = "create-table", description = "Create table.")
void createTable(
@CommandLine.Parameters(
@@ -138,16 +150,46 @@ void createTable(
required = true,
names = "--schema-from-parquet",
description = "/path/to/file.parquet")
- String schemaFile)
+ String schemaFile,
+ @CommandLine.Option(
+ names = {"--partition"},
+ description =
+ "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}],"
+ + "Supported transforms: hour, day, month, year, identity(default)")
+ String partitionJson,
+ @CommandLine.Option(
+ names = {"--sort-order"},
+ description =
+ "JSON array of sort orders: [{\"column\":\"name\",\"desc\":true,\"nullFirst\":true}]")
+ String sortOrderJson)
throws IOException {
try (RESTCatalog catalog = loadCatalog(this.configFile())) {
+ List sortOrders = new ArrayList<>();
+ List partitions = new ArrayList<>();
+
+ if (sortOrderJson != null && !sortOrderJson.isEmpty()) {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
+ IceSortOrder[] orders = mapper.readValue(sortOrderJson, IceSortOrder[].class);
+ sortOrders = Arrays.asList(orders);
+ }
+
+ if (partitionJson != null && !partitionJson.isEmpty()) {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
+ IcePartition[] parts = mapper.readValue(partitionJson, IcePartition[].class);
+ partitions = Arrays.asList(parts);
+ }
+
CreateTable.run(
catalog,
TableIdentifier.parse(name),
schemaFile,
location,
createTableIfNotExists,
- s3NoSignRequest);
+ s3NoSignRequest,
+ partitions,
+ sortOrders);
}
}
@@ -206,6 +248,17 @@ void insert(
"/path/to/file where to save list of files to retry"
+ " (useful for retrying partially failed insert using `cat ice.retry | ice insert - --retry-list=ice.retry`)")
String retryList,
+ @CommandLine.Option(
+ names = {"--partition"},
+ description =
+ "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}],"
+ + "Supported transforms: hour, day, month, year, identity(default)")
+ String partitionJson,
+ @CommandLine.Option(
+ names = {"--sort-order"},
+ description =
+ "JSON array of sort orders: [{\"column\":\"name\",\"desc\":true,\"nullFirst\":true}]")
+ String sortOrderJson,
@CommandLine.Option(
names = {"--thread-count"},
description = "Number of threads to use for inserting data",
@@ -224,11 +277,35 @@ void insert(
return;
}
}
+
+ List sortOrders = new ArrayList<>();
+ List partitions = new ArrayList<>();
+
+ if (sortOrderJson != null && !sortOrderJson.isEmpty()) {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
+ IceSortOrder[] orders = mapper.readValue(sortOrderJson, IceSortOrder[].class);
+ sortOrders = Arrays.asList(orders);
+ }
+
+ if (partitionJson != null && !partitionJson.isEmpty()) {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
+ IcePartition[] parts = mapper.readValue(partitionJson, IcePartition[].class);
+ partitions = Arrays.asList(parts);
+ }
+
TableIdentifier tableId = TableIdentifier.parse(name);
if (createTableIfNotExists) {
- // TODO: newCreateTableTransaction
CreateTable.run(
- catalog, tableId, dataFiles[0], null, createTableIfNotExists, s3NoSignRequest);
+ catalog,
+ tableId,
+ dataFiles[0],
+ null,
+ createTableIfNotExists,
+ s3NoSignRequest,
+ partitions,
+ sortOrders);
}
Insert.run(
catalog,
@@ -243,6 +320,8 @@ void insert(
s3NoSignRequest,
s3CopyObject,
retryList,
+ partitions,
+ sortOrders,
threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount);
}
}
diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java
index e5561a13..103aeeb5 100644
--- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java
+++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java
@@ -9,6 +9,7 @@
*/
package com.altinity.ice.cli.internal.cmd;
+import com.altinity.ice.cli.Main;
import com.altinity.ice.cli.internal.iceberg.io.Input;
import com.altinity.ice.cli.internal.iceberg.parquet.Metadata;
import com.altinity.ice.cli.internal.s3.S3;
@@ -16,8 +17,12 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReplaceSortOrder;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -43,7 +48,9 @@ public static void run(
String schemaFile,
String location,
boolean ignoreAlreadyExists,
- boolean s3NoSignRequest)
+ boolean s3NoSignRequest,
+ List partitionColumns,
+ List sortOrders)
throws IOException {
Lazy s3ClientLazy = new Lazy<>(() -> S3.newClient(s3NoSignRequest));
@@ -67,9 +74,61 @@ public static void run(
String mappingJson = NameMappingParser.toJson(mapping);
props = Map.of(TableProperties.DEFAULT_NAME_MAPPING, mappingJson);
}
+
+ // Create partition spec based on provided partition columns
+ final PartitionSpec.Builder partitionSpecBuilder = PartitionSpec.builderFor(fileSchema);
+ if (partitionColumns != null && !partitionColumns.isEmpty()) {
+ for (Main.IcePartition partition : partitionColumns) {
+ String transform = partition.transform().toLowerCase();
+ if (transform.startsWith("bucket[")) {
+ int numBuckets = Integer.parseInt(transform.substring(7, transform.length() - 1));
+ partitionSpecBuilder.bucket(partition.column(), numBuckets);
+ } else if (transform.startsWith("truncate[")) {
+ int width = Integer.parseInt(transform.substring(9, transform.length() - 1));
+ partitionSpecBuilder.truncate(partition.column(), width);
+ } else {
+ switch (transform) {
+ case "year":
+ partitionSpecBuilder.year(partition.column());
+ break;
+ case "month":
+ partitionSpecBuilder.month(partition.column());
+ break;
+ case "day":
+ partitionSpecBuilder.day(partition.column());
+ break;
+ case "hour":
+ partitionSpecBuilder.hour(partition.column());
+ break;
+ case "identity":
+ default:
+ partitionSpecBuilder.identity(partition.column());
+ break;
+ }
+ }
+ }
+ }
+ final PartitionSpec partitionSpec = partitionSpecBuilder.build();
+
// if we don't set location, it's automatically set to $warehouse/$namespace/$table
createNamespace(catalog, nsTable.namespace());
- catalog.createTable(nsTable, fileSchema, PartitionSpec.unpartitioned(), location, props);
+ Table table = catalog.createTable(nsTable, fileSchema, partitionSpec, location, props);
+ // ToDO: Move this shared code from Input and CreateTable to base class or common.
+ // Create sort order based on provided sort columns
+ if (sortOrders != null && !sortOrders.isEmpty()) {
+
+ ReplaceSortOrder replaceSortOrder = table.replaceSortOrder();
+ for (Main.IceSortOrder order : sortOrders) {
+ SortDirection dir = order.desc() ? SortDirection.DESC : SortDirection.ASC;
+ NullOrder nullOrd = order.nullFirst() ? NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
+ if (dir == SortDirection.ASC) {
+ replaceSortOrder.asc(order.column(), nullOrd);
+ } else {
+ replaceSortOrder.desc(order.column(), nullOrd);
+ }
+ }
+ replaceSortOrder.commit();
+ }
} catch (AlreadyExistsException e) {
if (ignoreAlreadyExists) {
return;
diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java
index 1decc152..9775a2e5 100644
--- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java
+++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java
@@ -9,6 +9,7 @@
*/
package com.altinity.ice.cli.internal.cmd;
+import com.altinity.ice.cli.Main;
import com.altinity.ice.cli.internal.iceberg.io.Input;
import com.altinity.ice.cli.internal.iceberg.parquet.Metadata;
import com.altinity.ice.cli.internal.jvm.Stats;
@@ -16,32 +17,46 @@
import com.altinity.ice.cli.internal.s3.S3;
import com.altinity.ice.internal.strings.Strings;
import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.iceberg.*;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.BadRequestException;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mapping.MappedField;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
@@ -65,11 +80,6 @@ public final class Insert {
private Insert() {}
- static {
- // Force-init.
- NativeCodeLoader.isNativeCodeLoaded();
- }
-
// TODO: refactor
public static void run(
RESTCatalog catalog,
@@ -84,6 +94,8 @@ public static void run(
boolean s3NoSignRequest,
boolean s3CopyObject,
String retryListFile,
+ List partitionColumns,
+ List sortOrders,
int threadCount)
throws IOException, InterruptedException {
if (files.length == 0) {
@@ -105,6 +117,11 @@ public static void run(
final Options finalOptions =
options.forceNoCopy() ? options.toBuilder().noCopy(true).build() : options;
Table table = catalog.loadTable(nsTable);
+
+ // Create transaction and pass it to updatePartitionAndSortOrderMetadata
+ Transaction txn = table.newTransaction();
+ updatePartitionAndSortOrderMetadata(txn, partitionColumns, sortOrders);
+
try (FileIO tableIO = table.io()) {
final Supplier s3ClientSupplier;
if (finalOptions.forceTableAuth()) {
@@ -156,7 +173,8 @@ public static void run(
case PRESERVE_ORIGINAL -> new DataFileNamingStrategy.InputFilename(dstPath);
};
- AppendFiles appendOp = table.newAppend();
+ // appendOp to use the same transaction.
+ AppendFiles appendOp = txn.newAppend();
try (FileIO inputIO = Input.newIO(filesExpanded.getFirst(), table, s3ClientLazy);
RetryLog retryLog =
@@ -168,30 +186,37 @@ public static void run(
int numThreads = Math.min(finalOptions.threadCount(), filesExpanded.size());
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
try {
- var futures = new ArrayList>();
+ var futures = new ArrayList>>();
for (final String file : filesExpanded) {
futures.add(
executor.submit(
() -> {
try {
- return processFile(
- table,
- catalog,
- tableIO,
- inputIO,
- tableDataFiles,
- finalOptions,
- s3ClientLazy,
- dstDataFileSource,
- tableSchema,
- dataFileNamingStrategy,
- file);
+ List dataFiles =
+ processFile(
+ table,
+ catalog,
+ tableIO,
+ inputIO,
+ tableDataFiles,
+ finalOptions,
+ s3ClientLazy,
+ dstDataFileSource,
+ tableSchema,
+ dataFileNamingStrategy,
+ file,
+ partitionColumns,
+ sortOrders);
+ if (dataFiles != null) {
+ return dataFiles;
+ }
+ return Collections.emptyList();
} catch (Exception e) {
if (retryLog != null) {
logger.error(
"{}: error (adding to retry list and continuing)", file, e);
retryLog.add(file);
- return null;
+ return Collections.emptyList();
} else {
throw e;
}
@@ -201,10 +226,10 @@ public static void run(
for (var future : futures) {
try {
- DataFile df = future.get();
- if (df != null) {
+ List dataFiles = future.get();
+ for (DataFile df : dataFiles) {
atLeastOneFileAppended = true;
- appendOp.appendFile(df);
+ appendOp.appendFile(df); // Only main thread appends now
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -230,6 +255,9 @@ public static void run(
if (retryLog != null) {
retryLog.commit();
}
+
+ // Commit transaction.
+ txn.commitTransaction();
} else {
logger.warn("Table commit skipped (--no-commit)");
}
@@ -242,7 +270,66 @@ public static void run(
}
}
- private static DataFile processFile(
+ private static void updatePartitionAndSortOrderMetadata(
+ Transaction txn, List partitions, List sortOrders) {
+
+ if (partitions != null && !partitions.isEmpty()) {
+ var updateSpec = txn.updateSpec();
+ for (Main.IcePartition partition : partitions) {
+ String transform = partition.transform().toLowerCase();
+ if (transform.startsWith("bucket[")) {
+ int numBuckets = Integer.parseInt(transform.substring(7, transform.length() - 1));
+ updateSpec.addField(Expressions.bucket(partition.column(), numBuckets));
+ } else if (transform.startsWith("truncate[")) {
+ int width = Integer.parseInt(transform.substring(9, transform.length() - 1));
+ updateSpec.addField(Expressions.truncate(partition.column(), width));
+ } else {
+ switch (transform) {
+ case "year":
+ updateSpec.addField(Expressions.year(partition.column()));
+ break;
+ case "month":
+ updateSpec.addField(Expressions.month(partition.column()));
+ break;
+ case "day":
+ updateSpec.addField(Expressions.day(partition.column()));
+ break;
+ case "hour":
+ updateSpec.addField(Expressions.hour(partition.column()));
+ break;
+ case "identity":
+ default:
+ updateSpec.addField(partition.column());
+ break;
+ }
+ }
+ }
+ updateSpec.commit();
+ }
+
+ // Update sort order if provided
+ if (sortOrders != null && !sortOrders.isEmpty()) {
+ txn.updateProperties()
+ .set(
+ TableProperties.WRITE_DISTRIBUTION_MODE,
+ TableProperties.WRITE_DISTRIBUTION_MODE_RANGE)
+ .commit();
+
+ ReplaceSortOrder replaceSortOrder = txn.replaceSortOrder();
+ for (Main.IceSortOrder order : sortOrders) {
+ SortDirection dir = order.desc() ? SortDirection.DESC : SortDirection.ASC;
+ NullOrder nullOrd = order.nullFirst() ? NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
+ if (dir == SortDirection.ASC) {
+ replaceSortOrder.asc(order.column(), nullOrd);
+ } else {
+ replaceSortOrder.desc(order.column(), nullOrd);
+ }
+ }
+ replaceSortOrder.commit();
+ }
+ }
+
+ private static List processFile(
Table table,
RESTCatalog catalog,
FileIO tableIO,
@@ -253,7 +340,9 @@ private static DataFile processFile(
DataFileNamingStrategy dstDataFileSource,
Schema tableSchema,
DataFileNamingStrategy.Name dataFileNamingStrategy,
- String file)
+ String file,
+ List partitionColumns,
+ List sortOrders)
throws IOException {
logger.info("{}: processing", file);
logger.info("{}: jvm: {}", file, Stats.gather());
@@ -287,9 +376,10 @@ private static DataFile processFile(
throw new BadRequestException(
file + " cannot be added to catalog without copy"); // TODO: explain
}
- long dataFileSizeInBytes;
- var dataFile = Strings.replacePrefix(file, "s3a://", "s3://");
+ long dataFileSizeInBytes = 0;
+
var start = System.currentTimeMillis();
+ var dataFile = Strings.replacePrefix(file, "s3a://", "s3://");
if (options.noCopy()) {
if (checkNotExists.apply(dataFile)) {
return null;
@@ -316,6 +406,27 @@ private static DataFile processFile(
s3ClientLazy.getValue().copyObject(copyReq);
dataFileSizeInBytes = inputFile.getLength();
dataFile = dstDataFile;
+ } else if (partitionColumns != null && !partitionColumns.isEmpty()) {
+ String dstDataFile = dstDataFileSource.get(file);
+ if (checkNotExists.apply(dstDataFile)) {
+ return null;
+ }
+ return copyParquetWithPartition(
+ file,
+ Strings.replacePrefix(dstDataFileSource.get(file), "s3://", "s3a://"),
+ tableSchema,
+ table,
+ inputFile,
+ metadata);
+ } else if (sortOrders != null && !sortOrders.isEmpty()) {
+ return copyParquetWithSortOrder(
+ file,
+ Strings.replacePrefix(dstDataFileSource.get(file), "s3://", "s3a://"),
+ tableSchema,
+ table,
+ inputFile,
+ metadata,
+ dataFileNamingStrategy);
} else {
String dstDataFile = dstDataFileSource.get(file);
if (checkNotExists.apply(dstDataFile)) {
@@ -345,12 +456,213 @@ private static DataFile processFile(
"{}: adding data file (copy took {}s)", file, (System.currentTimeMillis() - start) / 1000);
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig);
- return new DataFiles.Builder(table.spec())
- .withPath(dataFile)
- .withFormat("PARQUET")
- .withFileSizeInBytes(dataFileSizeInBytes)
- .withMetrics(metrics)
- .build();
+ DataFile dataFileObj =
+ new DataFiles.Builder(table.spec())
+ .withPath(dataFile)
+ .withFormat("PARQUET")
+ .withFileSizeInBytes(dataFileSizeInBytes)
+ .withMetrics(metrics)
+ .build();
+ return Collections.singletonList(dataFileObj);
+ }
+
+ private static List copyParquetWithPartition(
+ String file,
+ String dstDataFile,
+ Schema tableSchema,
+ Table table,
+ InputFile inputFile,
+ ParquetMetadata metadata)
+ throws IOException {
+
+ logger.info("{}: copying to partitions under {}", file, dstDataFile);
+ var start = System.currentTimeMillis();
+ long fileSizeInBytes = 0;
+
+ // Partition writer setup
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1, 0).format(FileFormat.PARQUET).build();
+
+ GenericAppenderFactory appenderFactory = new GenericAppenderFactory(tableSchema, table.spec());
+
+ PartitionKey partitionKey = new PartitionKey(table.spec(), tableSchema);
+ Map> partitionedRecords = new HashMap<>();
+ Map writtenFiles = new HashMap<>();
+
+ Parquet.ReadBuilder readBuilder =
+ Parquet.read(inputFile)
+ .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s))
+ .project(tableSchema)
+ .reuseContainers();
+ PartitionSpec spec = table.spec();
+ Schema schema = table.schema();
+
+ try (CloseableIterable records = readBuilder.build()) {
+ for (Record record : records) {
+
+ Record partitionRecord = GenericRecord.create(schema);
+ for (Types.NestedField field : schema.columns()) {
+ partitionRecord.setField(field.name(), record.getField(field.name()));
+ }
+
+ for (PartitionField field : spec.fields()) {
+ String fieldName = schema.findField(field.sourceId()).name();
+ Object value = partitionRecord.getField(fieldName);
+ String transformName = field.transform().toString();
+
+ if (value != null
+ && (transformName.equals("day")
+ || transformName.equals("month")
+ || transformName.equals("hour"))) {
+ long micros = toMicros(value);
+ partitionRecord.setField(fieldName, micros);
+ }
+ }
+
+ // Partition based on converted values
+ partitionKey.partition(partitionRecord);
+ PartitionKey keyCopy = partitionKey.copy();
+
+ // Store the original record (without converted timestamp fields)
+ partitionedRecords.computeIfAbsent(keyCopy, k -> new ArrayList<>()).add(record);
+ }
+ }
+
+ List dataFiles = new ArrayList<>();
+
+ logger.info("Sort order: " + table.sortOrder().toString());
+ // Create a comparator based on table.sortOrder()
+ RecordSortComparator comparator = new RecordSortComparator(table.sortOrder(), tableSchema);
+
+ // Write sorted records for each partition
+ for (Map.Entry> entry : partitionedRecords.entrySet()) {
+ PartitionKey partKey = entry.getKey();
+ List records = entry.getValue();
+
+ // Sort records within the partition
+ records.sort(comparator);
+
+ OutputFile outFile = fileFactory.newOutputFile(partKey).encryptingOutputFile();
+ writtenFiles.put(partKey, outFile);
+
+ try (FileAppender appender =
+ appenderFactory.newAppender(outFile, FileFormat.PARQUET)) {
+ for (Record rec : records) {
+ appender.add(rec);
+ }
+
+ appender.close();
+ fileSizeInBytes = appender.length();
+
+ logger.info(
+ "{}: adding data file (copy took {}s)",
+ file,
+ (System.currentTimeMillis() - start) / 1000);
+ InputFile inFile = outFile.toInputFile();
+ MetricsConfig metricsConfig = MetricsConfig.forTable(table);
+ Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig);
+ dataFiles.add(
+ DataFiles.builder(table.spec())
+ .withPath(outFile.location())
+ .withFileSizeInBytes(fileSizeInBytes)
+ .withPartition(partKey)
+ .withFormat(FileFormat.PARQUET)
+ .withMetrics(metrics)
+ .build());
+ }
+ }
+
+ return dataFiles;
+ }
+
+ public static long toMicros(Object tsValue) {
+ if (tsValue instanceof Long) {
+ return (Long) tsValue;
+
+ } else if (tsValue instanceof String) {
+ LocalDateTime ldt = LocalDateTime.parse((String) tsValue);
+ return ldt.toInstant(ZoneOffset.UTC).toEpochMilli() * 1000L;
+
+ } else if (tsValue instanceof LocalDateTime) {
+ return ((LocalDateTime) tsValue).toInstant(ZoneOffset.UTC).toEpochMilli() * 1000L;
+
+ } else if (tsValue instanceof OffsetDateTime) {
+ return ((OffsetDateTime) tsValue).toInstant().toEpochMilli() * 1000L;
+
+ } else if (tsValue instanceof Instant) {
+ return ((Instant) tsValue).toEpochMilli() * 1000L;
+
+ } else {
+ throw new IllegalArgumentException("Unsupported timestamp type: " + tsValue.getClass());
+ }
+ }
+
+ private static List copyParquetWithSortOrder(
+ String file,
+ String dstDataFile,
+ Schema tableSchema,
+ Table table,
+ InputFile inputFile,
+ ParquetMetadata metadata,
+ DataFileNamingStrategy.Name dataFileNamingStrategy)
+ throws IOException {
+ Logger logger = LoggerFactory.getLogger(Insert.class);
+ logger.info("{}: copying with sort order to {}", file, dstDataFile);
+ long start = System.currentTimeMillis();
+ long fileSizeInBytes = 0;
+
+ OutputFile outputFile = table.io().newOutputFile(dstDataFile);
+
+ Parquet.ReadBuilder readBuilder =
+ Parquet.read(inputFile)
+ .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s))
+ .project(tableSchema);
+
+ // Read records into memory
+ List records = new ArrayList<>();
+ try (CloseableIterable iterable = readBuilder.build()) {
+ for (Record r : iterable) {
+ records.add(r);
+ }
+ }
+
+ // Sort records if sort order is defined and non-empty
+ SortOrder sortOrder = table.sortOrder();
+ if (sortOrder != null && !sortOrder.isUnsorted()) {
+ records.sort(new RecordSortComparator(sortOrder, tableSchema));
+ }
+
+ // Write sorted records out
+ Parquet.WriteBuilder writeBuilder =
+ Parquet.write(outputFile)
+ .overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL)
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .schema(tableSchema);
+
+ try (FileAppender appender = writeBuilder.build()) {
+ for (Record record : records) {
+ appender.add(record);
+ }
+ appender.close();
+ fileSizeInBytes = appender.length();
+ }
+
+ MetricsConfig metricsConfig = MetricsConfig.forTable(table);
+ Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig);
+
+ DataFile dataFileObj =
+ new DataFiles.Builder(table.spec())
+ .withPath(dstDataFile)
+ .withFormat("PARQUET")
+ .withFileSizeInBytes(fileSizeInBytes)
+ .withMetrics(metrics)
+ .build();
+
+ logger.info(
+ "{}: adding data file (copy with sort order took {}s)",
+ file,
+ (System.currentTimeMillis() - start) / 1000);
+ return Collections.singletonList(dataFileObj);
}
private static boolean sameSchema(Table table, Schema fileSchema) {
diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/RecordSortComparator.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/RecordSortComparator.java
new file mode 100644
index 00000000..90b71a3b
--- /dev/null
+++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/RecordSortComparator.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.cli.internal.cmd;
+
+import java.util.Comparator;
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.data.Record;
+
+public class RecordSortComparator implements Comparator {
+ private final List sortFields;
+ private final Schema schema;
+
+ public RecordSortComparator(SortOrder sortOrder, Schema schema) {
+ this.sortFields = sortOrder.fields();
+ this.schema = schema;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compare(Record r1, Record r2) {
+ for (SortField sf : sortFields) {
+ String fieldName = schema.findColumnName(sf.sourceId());
+ Comparable