diff --git a/examples/docker-compose/README.md b/examples/docker-compose/README.md index 39a8cda8..5d57169b 100644 --- a/examples/docker-compose/README.md +++ b/examples/docker-compose/README.md @@ -25,3 +25,44 @@ clickhouse client --query 'select count(*) from ice.`nyc.taxis`;' 1. `docker compose up` fails with `ERROR: Invalid interpolation format for "content" option in config "clickhouse-init": "#!/bin/bash` Solution: Upgrade docker/docker compose to v2. + +#### Spark Iceberg +A spark-iceberg container can be launched using the `docker-compose-spark-iceberg.yml` file. + + +The default configuration is located in the following path +`/opt/spark/conf/spark-defaults.conf` + +For spark to communicate with `ice-rest-catalog` and `minio`, the following configuration variables +in `/opt/spark/conf/spark-defaults.conf` can to be updated.\ + +`spark.sql.catalog.demo.uri` - ice-rest-catalog URI \ +`spark.sql.catalog.demo.s3.endpoint` - minio server url. +`spark.sql.catalog.demo.s3.access-key` - minio access key. +`spark.sql.catalog.demo.s3.secret-key` - minio password. + + +``` +spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.demo.type rest +spark.sql.catalog.demo.uri http://localhost:5000 +spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.catalog.demo.warehouse s3://warehouse/wh/ +spark.sql.catalog.demo.s3.endpoint http://localhost:9000 +spark.sql.catalog.demo.s3.access-key miniouser +spark.sql.catalog.demo.s3.secret-key miniopassword +spark.sql.catalog.demo.s3.path-style-access true +spark.sql.catalog.demo.s3.ssl-enabled false +spark.sql.defaultCatalog demo +spark.eventLog.enabled true +spark.eventLog.dir /home/iceberg/spark-events +spark.history.fs.logDirectory /home/iceberg/spark-events +spark.sql.catalogImplementation in-memory +``` + +The spark-sql shell can now query the tables directory + +``` +docker exec -it spark-iceberg ./spark-sql +``` diff --git a/examples/docker-compose/docker-compose-spark-iceberg.yaml b/examples/docker-compose/docker-compose-spark-iceberg.yaml new file mode 100644 index 00000000..b7b2fa70 --- /dev/null +++ b/examples/docker-compose/docker-compose-spark-iceberg.yaml @@ -0,0 +1,17 @@ +services: + spark-iceberg: + image: tabulario/spark-iceberg + container_name: spark-iceberg + network_mode: host + volumes: + - ./warehouse:/home/iceberg/warehouse + - ./notebooks:/home/iceberg/notebooks/notebooks + environment: + - AWS_ACCESS_KEY_ID=miniouser + - AWS_SECRET_ACCESS_KEY=miniopassword + - AWS_REGION=minio + ports: + - 8888:8888 + - 8080:8080 + - 10000:10000 + - 10001:10001 diff --git a/examples/scratch/README.md b/examples/scratch/README.md index 5abc0bf4..6215541e 100644 --- a/examples/scratch/README.md +++ b/examples/scratch/README.md @@ -29,6 +29,21 @@ ice insert flowers.iris -p \ ice insert nyc.taxis -p \ https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet + +# Insert rows with sort-key +ice insert --sort-order='[{"column": "VendorID", "desc": true, "nullFirst": true}]' nyc.taxis2 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet + +# Insert sort-key(multiple columns) +ice insert --sort-order='[{"column": "VendorID", "desc": true, "nullFirst": true}, {"column": "Airport_fee", "desc": false, "nullFirst": false}]' nyc.taxis24 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet + +# Insert with partition key +ice insert --partition='[{"column": "RatecodeID", "transform": "identity"}]' nyc.taxis20 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet + +# Partition by day +ice insert --partition='[{"column": "tpep_pickup_datetime", "transform": "day"}]' nyc.taxis20 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet + +# Partition by year +ice insert --partition='[{"column": "tpep_pickup_datetime", "transform": "year"}]' nyc44.taxis111 -p https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet # warning: each parquet file below is ~500mb. this may take a while AWS_REGION=us-east-2 ice insert btc.transactions -p --s3-no-sign-request \ @@ -39,6 +54,13 @@ date=2025-01-01/part-00000-33e8d075-2099-409b-a806-68dd17217d39-c000.snappy.parq # upload file to minio using local-mc, # then add file to the catalog without making a copy ice create-table flowers.iris_no_copy --schema-from-parquet=file://iris.parquet + +# create table with partitioning columns +ice create-table flowers.irs_no_copy_partition --schema-from-parquet=file://iris.parquet --partition-by=variety,petal.width + +# create table with sort columns +ice create-table flowers.irs_no_copy_sort --schema-from-parquet=file://iris.parquet --sort-order='[{"column": "variety", "desc": false}]' + local-mc cp iris.parquet local/bucket1/flowers/iris_no_copy/ ice insert flowers.iris_no_copy --no-copy s3://bucket1/flowers/iris_no_copy/iris.parquet diff --git a/ice/pom.xml b/ice/pom.xml index 9903628a..4f5ea9fa 100644 --- a/ice/pom.xml +++ b/ice/pom.xml @@ -22,6 +22,45 @@ iceberg-bundled-guava ${iceberg.version} + + org.apache.iceberg + iceberg-data + ${iceberg.version} + + + org.apache.commons + commons-lang3 + + + org.apache.parquet + parquet-avro + + + com.github.ben-manes.caffeine + caffeine + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + commons-codec + commons-codec + + + org.slf4j + slf4j-api + + + io.airlift + aircompressor + + + org.apache.iceberg iceberg-core @@ -31,6 +70,7 @@ com.github.ben-manes.caffeine caffeine + com.fasterxml.jackson.core jackson-core @@ -43,6 +83,10 @@ commons-codec commons-codec + + org.apache.commons + commons-lang3 + org.slf4j slf4j-api diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index 0b1fefa4..acda67f4 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -18,8 +18,12 @@ import com.altinity.ice.cli.internal.config.Config; import com.altinity.ice.internal.picocli.VersionProvider; import com.altinity.ice.internal.strings.Strings; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Scanner; @@ -114,6 +118,14 @@ void describe( } } + public record IceSortOrder( + @JsonProperty("column") String column, + @JsonProperty("desc") boolean desc, + @JsonProperty("nullFirst") boolean nullFirst) {} + + public record IcePartition( + @JsonProperty("column") String column, @JsonProperty("transform") String transform) {} + @CommandLine.Command(name = "create-table", description = "Create table.") void createTable( @CommandLine.Parameters( @@ -138,16 +150,46 @@ void createTable( required = true, names = "--schema-from-parquet", description = "/path/to/file.parquet") - String schemaFile) + String schemaFile, + @CommandLine.Option( + names = {"--partition"}, + description = + "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]," + + "Supported transforms: hour, day, month, year, identity(default)") + String partitionJson, + @CommandLine.Option( + names = {"--sort-order"}, + description = + "JSON array of sort orders: [{\"column\":\"name\",\"desc\":true,\"nullFirst\":true}]") + String sortOrderJson) throws IOException { try (RESTCatalog catalog = loadCatalog(this.configFile())) { + List sortOrders = new ArrayList<>(); + List partitions = new ArrayList<>(); + + if (sortOrderJson != null && !sortOrderJson.isEmpty()) { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); + IceSortOrder[] orders = mapper.readValue(sortOrderJson, IceSortOrder[].class); + sortOrders = Arrays.asList(orders); + } + + if (partitionJson != null && !partitionJson.isEmpty()) { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); + IcePartition[] parts = mapper.readValue(partitionJson, IcePartition[].class); + partitions = Arrays.asList(parts); + } + CreateTable.run( catalog, TableIdentifier.parse(name), schemaFile, location, createTableIfNotExists, - s3NoSignRequest); + s3NoSignRequest, + partitions, + sortOrders); } } @@ -206,6 +248,17 @@ void insert( "/path/to/file where to save list of files to retry" + " (useful for retrying partially failed insert using `cat ice.retry | ice insert - --retry-list=ice.retry`)") String retryList, + @CommandLine.Option( + names = {"--partition"}, + description = + "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]," + + "Supported transforms: hour, day, month, year, identity(default)") + String partitionJson, + @CommandLine.Option( + names = {"--sort-order"}, + description = + "JSON array of sort orders: [{\"column\":\"name\",\"desc\":true,\"nullFirst\":true}]") + String sortOrderJson, @CommandLine.Option( names = {"--thread-count"}, description = "Number of threads to use for inserting data", @@ -224,11 +277,35 @@ void insert( return; } } + + List sortOrders = new ArrayList<>(); + List partitions = new ArrayList<>(); + + if (sortOrderJson != null && !sortOrderJson.isEmpty()) { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); + IceSortOrder[] orders = mapper.readValue(sortOrderJson, IceSortOrder[].class); + sortOrders = Arrays.asList(orders); + } + + if (partitionJson != null && !partitionJson.isEmpty()) { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); + IcePartition[] parts = mapper.readValue(partitionJson, IcePartition[].class); + partitions = Arrays.asList(parts); + } + TableIdentifier tableId = TableIdentifier.parse(name); if (createTableIfNotExists) { - // TODO: newCreateTableTransaction CreateTable.run( - catalog, tableId, dataFiles[0], null, createTableIfNotExists, s3NoSignRequest); + catalog, + tableId, + dataFiles[0], + null, + createTableIfNotExists, + s3NoSignRequest, + partitions, + sortOrders); } Insert.run( catalog, @@ -243,6 +320,8 @@ void insert( s3NoSignRequest, s3CopyObject, retryList, + partitions, + sortOrders, threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount); } } diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java index e5561a13..103aeeb5 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java @@ -9,6 +9,7 @@ */ package com.altinity.ice.cli.internal.cmd; +import com.altinity.ice.cli.Main; import com.altinity.ice.cli.internal.iceberg.io.Input; import com.altinity.ice.cli.internal.iceberg.parquet.Metadata; import com.altinity.ice.cli.internal.s3.S3; @@ -16,8 +17,12 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.iceberg.NullOrder; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.ReplaceSortOrder; import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -43,7 +48,9 @@ public static void run( String schemaFile, String location, boolean ignoreAlreadyExists, - boolean s3NoSignRequest) + boolean s3NoSignRequest, + List partitionColumns, + List sortOrders) throws IOException { Lazy s3ClientLazy = new Lazy<>(() -> S3.newClient(s3NoSignRequest)); @@ -67,9 +74,61 @@ public static void run( String mappingJson = NameMappingParser.toJson(mapping); props = Map.of(TableProperties.DEFAULT_NAME_MAPPING, mappingJson); } + + // Create partition spec based on provided partition columns + final PartitionSpec.Builder partitionSpecBuilder = PartitionSpec.builderFor(fileSchema); + if (partitionColumns != null && !partitionColumns.isEmpty()) { + for (Main.IcePartition partition : partitionColumns) { + String transform = partition.transform().toLowerCase(); + if (transform.startsWith("bucket[")) { + int numBuckets = Integer.parseInt(transform.substring(7, transform.length() - 1)); + partitionSpecBuilder.bucket(partition.column(), numBuckets); + } else if (transform.startsWith("truncate[")) { + int width = Integer.parseInt(transform.substring(9, transform.length() - 1)); + partitionSpecBuilder.truncate(partition.column(), width); + } else { + switch (transform) { + case "year": + partitionSpecBuilder.year(partition.column()); + break; + case "month": + partitionSpecBuilder.month(partition.column()); + break; + case "day": + partitionSpecBuilder.day(partition.column()); + break; + case "hour": + partitionSpecBuilder.hour(partition.column()); + break; + case "identity": + default: + partitionSpecBuilder.identity(partition.column()); + break; + } + } + } + } + final PartitionSpec partitionSpec = partitionSpecBuilder.build(); + // if we don't set location, it's automatically set to $warehouse/$namespace/$table createNamespace(catalog, nsTable.namespace()); - catalog.createTable(nsTable, fileSchema, PartitionSpec.unpartitioned(), location, props); + Table table = catalog.createTable(nsTable, fileSchema, partitionSpec, location, props); + // ToDO: Move this shared code from Input and CreateTable to base class or common. + // Create sort order based on provided sort columns + if (sortOrders != null && !sortOrders.isEmpty()) { + + ReplaceSortOrder replaceSortOrder = table.replaceSortOrder(); + for (Main.IceSortOrder order : sortOrders) { + SortDirection dir = order.desc() ? SortDirection.DESC : SortDirection.ASC; + NullOrder nullOrd = order.nullFirst() ? NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; + if (dir == SortDirection.ASC) { + replaceSortOrder.asc(order.column(), nullOrd); + } else { + replaceSortOrder.desc(order.column(), nullOrd); + } + } + replaceSortOrder.commit(); + } } catch (AlreadyExistsException e) { if (ignoreAlreadyExists) { return; diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 1decc152..9775a2e5 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -9,6 +9,7 @@ */ package com.altinity.ice.cli.internal.cmd; +import com.altinity.ice.cli.Main; import com.altinity.ice.cli.internal.iceberg.io.Input; import com.altinity.ice.cli.internal.iceberg.parquet.Metadata; import com.altinity.ice.cli.internal.jvm.Stats; @@ -16,32 +17,46 @@ import com.altinity.ice.cli.internal.s3.S3; import com.altinity.ice.internal.strings.Strings; import java.io.IOException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.hadoop.util.NativeCodeLoader; import org.apache.iceberg.*; import org.apache.iceberg.aws.s3.S3FileIO; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.BadRequestException; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.mapping.MappedField; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; @@ -65,11 +80,6 @@ public final class Insert { private Insert() {} - static { - // Force-init. - NativeCodeLoader.isNativeCodeLoaded(); - } - // TODO: refactor public static void run( RESTCatalog catalog, @@ -84,6 +94,8 @@ public static void run( boolean s3NoSignRequest, boolean s3CopyObject, String retryListFile, + List partitionColumns, + List sortOrders, int threadCount) throws IOException, InterruptedException { if (files.length == 0) { @@ -105,6 +117,11 @@ public static void run( final Options finalOptions = options.forceNoCopy() ? options.toBuilder().noCopy(true).build() : options; Table table = catalog.loadTable(nsTable); + + // Create transaction and pass it to updatePartitionAndSortOrderMetadata + Transaction txn = table.newTransaction(); + updatePartitionAndSortOrderMetadata(txn, partitionColumns, sortOrders); + try (FileIO tableIO = table.io()) { final Supplier s3ClientSupplier; if (finalOptions.forceTableAuth()) { @@ -156,7 +173,8 @@ public static void run( case PRESERVE_ORIGINAL -> new DataFileNamingStrategy.InputFilename(dstPath); }; - AppendFiles appendOp = table.newAppend(); + // appendOp to use the same transaction. + AppendFiles appendOp = txn.newAppend(); try (FileIO inputIO = Input.newIO(filesExpanded.getFirst(), table, s3ClientLazy); RetryLog retryLog = @@ -168,30 +186,37 @@ public static void run( int numThreads = Math.min(finalOptions.threadCount(), filesExpanded.size()); ExecutorService executor = Executors.newFixedThreadPool(numThreads); try { - var futures = new ArrayList>(); + var futures = new ArrayList>>(); for (final String file : filesExpanded) { futures.add( executor.submit( () -> { try { - return processFile( - table, - catalog, - tableIO, - inputIO, - tableDataFiles, - finalOptions, - s3ClientLazy, - dstDataFileSource, - tableSchema, - dataFileNamingStrategy, - file); + List dataFiles = + processFile( + table, + catalog, + tableIO, + inputIO, + tableDataFiles, + finalOptions, + s3ClientLazy, + dstDataFileSource, + tableSchema, + dataFileNamingStrategy, + file, + partitionColumns, + sortOrders); + if (dataFiles != null) { + return dataFiles; + } + return Collections.emptyList(); } catch (Exception e) { if (retryLog != null) { logger.error( "{}: error (adding to retry list and continuing)", file, e); retryLog.add(file); - return null; + return Collections.emptyList(); } else { throw e; } @@ -201,10 +226,10 @@ public static void run( for (var future : futures) { try { - DataFile df = future.get(); - if (df != null) { + List dataFiles = future.get(); + for (DataFile df : dataFiles) { atLeastOneFileAppended = true; - appendOp.appendFile(df); + appendOp.appendFile(df); // Only main thread appends now } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -230,6 +255,9 @@ public static void run( if (retryLog != null) { retryLog.commit(); } + + // Commit transaction. + txn.commitTransaction(); } else { logger.warn("Table commit skipped (--no-commit)"); } @@ -242,7 +270,66 @@ public static void run( } } - private static DataFile processFile( + private static void updatePartitionAndSortOrderMetadata( + Transaction txn, List partitions, List sortOrders) { + + if (partitions != null && !partitions.isEmpty()) { + var updateSpec = txn.updateSpec(); + for (Main.IcePartition partition : partitions) { + String transform = partition.transform().toLowerCase(); + if (transform.startsWith("bucket[")) { + int numBuckets = Integer.parseInt(transform.substring(7, transform.length() - 1)); + updateSpec.addField(Expressions.bucket(partition.column(), numBuckets)); + } else if (transform.startsWith("truncate[")) { + int width = Integer.parseInt(transform.substring(9, transform.length() - 1)); + updateSpec.addField(Expressions.truncate(partition.column(), width)); + } else { + switch (transform) { + case "year": + updateSpec.addField(Expressions.year(partition.column())); + break; + case "month": + updateSpec.addField(Expressions.month(partition.column())); + break; + case "day": + updateSpec.addField(Expressions.day(partition.column())); + break; + case "hour": + updateSpec.addField(Expressions.hour(partition.column())); + break; + case "identity": + default: + updateSpec.addField(partition.column()); + break; + } + } + } + updateSpec.commit(); + } + + // Update sort order if provided + if (sortOrders != null && !sortOrders.isEmpty()) { + txn.updateProperties() + .set( + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.WRITE_DISTRIBUTION_MODE_RANGE) + .commit(); + + ReplaceSortOrder replaceSortOrder = txn.replaceSortOrder(); + for (Main.IceSortOrder order : sortOrders) { + SortDirection dir = order.desc() ? SortDirection.DESC : SortDirection.ASC; + NullOrder nullOrd = order.nullFirst() ? NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; + if (dir == SortDirection.ASC) { + replaceSortOrder.asc(order.column(), nullOrd); + } else { + replaceSortOrder.desc(order.column(), nullOrd); + } + } + replaceSortOrder.commit(); + } + } + + private static List processFile( Table table, RESTCatalog catalog, FileIO tableIO, @@ -253,7 +340,9 @@ private static DataFile processFile( DataFileNamingStrategy dstDataFileSource, Schema tableSchema, DataFileNamingStrategy.Name dataFileNamingStrategy, - String file) + String file, + List partitionColumns, + List sortOrders) throws IOException { logger.info("{}: processing", file); logger.info("{}: jvm: {}", file, Stats.gather()); @@ -287,9 +376,10 @@ private static DataFile processFile( throw new BadRequestException( file + " cannot be added to catalog without copy"); // TODO: explain } - long dataFileSizeInBytes; - var dataFile = Strings.replacePrefix(file, "s3a://", "s3://"); + long dataFileSizeInBytes = 0; + var start = System.currentTimeMillis(); + var dataFile = Strings.replacePrefix(file, "s3a://", "s3://"); if (options.noCopy()) { if (checkNotExists.apply(dataFile)) { return null; @@ -316,6 +406,27 @@ private static DataFile processFile( s3ClientLazy.getValue().copyObject(copyReq); dataFileSizeInBytes = inputFile.getLength(); dataFile = dstDataFile; + } else if (partitionColumns != null && !partitionColumns.isEmpty()) { + String dstDataFile = dstDataFileSource.get(file); + if (checkNotExists.apply(dstDataFile)) { + return null; + } + return copyParquetWithPartition( + file, + Strings.replacePrefix(dstDataFileSource.get(file), "s3://", "s3a://"), + tableSchema, + table, + inputFile, + metadata); + } else if (sortOrders != null && !sortOrders.isEmpty()) { + return copyParquetWithSortOrder( + file, + Strings.replacePrefix(dstDataFileSource.get(file), "s3://", "s3a://"), + tableSchema, + table, + inputFile, + metadata, + dataFileNamingStrategy); } else { String dstDataFile = dstDataFileSource.get(file); if (checkNotExists.apply(dstDataFile)) { @@ -345,12 +456,213 @@ private static DataFile processFile( "{}: adding data file (copy took {}s)", file, (System.currentTimeMillis() - start) / 1000); MetricsConfig metricsConfig = MetricsConfig.forTable(table); Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig); - return new DataFiles.Builder(table.spec()) - .withPath(dataFile) - .withFormat("PARQUET") - .withFileSizeInBytes(dataFileSizeInBytes) - .withMetrics(metrics) - .build(); + DataFile dataFileObj = + new DataFiles.Builder(table.spec()) + .withPath(dataFile) + .withFormat("PARQUET") + .withFileSizeInBytes(dataFileSizeInBytes) + .withMetrics(metrics) + .build(); + return Collections.singletonList(dataFileObj); + } + + private static List copyParquetWithPartition( + String file, + String dstDataFile, + Schema tableSchema, + Table table, + InputFile inputFile, + ParquetMetadata metadata) + throws IOException { + + logger.info("{}: copying to partitions under {}", file, dstDataFile); + var start = System.currentTimeMillis(); + long fileSizeInBytes = 0; + + // Partition writer setup + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 0).format(FileFormat.PARQUET).build(); + + GenericAppenderFactory appenderFactory = new GenericAppenderFactory(tableSchema, table.spec()); + + PartitionKey partitionKey = new PartitionKey(table.spec(), tableSchema); + Map> partitionedRecords = new HashMap<>(); + Map writtenFiles = new HashMap<>(); + + Parquet.ReadBuilder readBuilder = + Parquet.read(inputFile) + .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) + .project(tableSchema) + .reuseContainers(); + PartitionSpec spec = table.spec(); + Schema schema = table.schema(); + + try (CloseableIterable records = readBuilder.build()) { + for (Record record : records) { + + Record partitionRecord = GenericRecord.create(schema); + for (Types.NestedField field : schema.columns()) { + partitionRecord.setField(field.name(), record.getField(field.name())); + } + + for (PartitionField field : spec.fields()) { + String fieldName = schema.findField(field.sourceId()).name(); + Object value = partitionRecord.getField(fieldName); + String transformName = field.transform().toString(); + + if (value != null + && (transformName.equals("day") + || transformName.equals("month") + || transformName.equals("hour"))) { + long micros = toMicros(value); + partitionRecord.setField(fieldName, micros); + } + } + + // Partition based on converted values + partitionKey.partition(partitionRecord); + PartitionKey keyCopy = partitionKey.copy(); + + // Store the original record (without converted timestamp fields) + partitionedRecords.computeIfAbsent(keyCopy, k -> new ArrayList<>()).add(record); + } + } + + List dataFiles = new ArrayList<>(); + + logger.info("Sort order: " + table.sortOrder().toString()); + // Create a comparator based on table.sortOrder() + RecordSortComparator comparator = new RecordSortComparator(table.sortOrder(), tableSchema); + + // Write sorted records for each partition + for (Map.Entry> entry : partitionedRecords.entrySet()) { + PartitionKey partKey = entry.getKey(); + List records = entry.getValue(); + + // Sort records within the partition + records.sort(comparator); + + OutputFile outFile = fileFactory.newOutputFile(partKey).encryptingOutputFile(); + writtenFiles.put(partKey, outFile); + + try (FileAppender appender = + appenderFactory.newAppender(outFile, FileFormat.PARQUET)) { + for (Record rec : records) { + appender.add(rec); + } + + appender.close(); + fileSizeInBytes = appender.length(); + + logger.info( + "{}: adding data file (copy took {}s)", + file, + (System.currentTimeMillis() - start) / 1000); + InputFile inFile = outFile.toInputFile(); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig); + dataFiles.add( + DataFiles.builder(table.spec()) + .withPath(outFile.location()) + .withFileSizeInBytes(fileSizeInBytes) + .withPartition(partKey) + .withFormat(FileFormat.PARQUET) + .withMetrics(metrics) + .build()); + } + } + + return dataFiles; + } + + public static long toMicros(Object tsValue) { + if (tsValue instanceof Long) { + return (Long) tsValue; + + } else if (tsValue instanceof String) { + LocalDateTime ldt = LocalDateTime.parse((String) tsValue); + return ldt.toInstant(ZoneOffset.UTC).toEpochMilli() * 1000L; + + } else if (tsValue instanceof LocalDateTime) { + return ((LocalDateTime) tsValue).toInstant(ZoneOffset.UTC).toEpochMilli() * 1000L; + + } else if (tsValue instanceof OffsetDateTime) { + return ((OffsetDateTime) tsValue).toInstant().toEpochMilli() * 1000L; + + } else if (tsValue instanceof Instant) { + return ((Instant) tsValue).toEpochMilli() * 1000L; + + } else { + throw new IllegalArgumentException("Unsupported timestamp type: " + tsValue.getClass()); + } + } + + private static List copyParquetWithSortOrder( + String file, + String dstDataFile, + Schema tableSchema, + Table table, + InputFile inputFile, + ParquetMetadata metadata, + DataFileNamingStrategy.Name dataFileNamingStrategy) + throws IOException { + Logger logger = LoggerFactory.getLogger(Insert.class); + logger.info("{}: copying with sort order to {}", file, dstDataFile); + long start = System.currentTimeMillis(); + long fileSizeInBytes = 0; + + OutputFile outputFile = table.io().newOutputFile(dstDataFile); + + Parquet.ReadBuilder readBuilder = + Parquet.read(inputFile) + .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) + .project(tableSchema); + + // Read records into memory + List records = new ArrayList<>(); + try (CloseableIterable iterable = readBuilder.build()) { + for (Record r : iterable) { + records.add(r); + } + } + + // Sort records if sort order is defined and non-empty + SortOrder sortOrder = table.sortOrder(); + if (sortOrder != null && !sortOrder.isUnsorted()) { + records.sort(new RecordSortComparator(sortOrder, tableSchema)); + } + + // Write sorted records out + Parquet.WriteBuilder writeBuilder = + Parquet.write(outputFile) + .overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL) + .createWriterFunc(GenericParquetWriter::buildWriter) + .schema(tableSchema); + + try (FileAppender appender = writeBuilder.build()) { + for (Record record : records) { + appender.add(record); + } + appender.close(); + fileSizeInBytes = appender.length(); + } + + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig); + + DataFile dataFileObj = + new DataFiles.Builder(table.spec()) + .withPath(dstDataFile) + .withFormat("PARQUET") + .withFileSizeInBytes(fileSizeInBytes) + .withMetrics(metrics) + .build(); + + logger.info( + "{}: adding data file (copy with sort order took {}s)", + file, + (System.currentTimeMillis() - start) / 1000); + return Collections.singletonList(dataFileObj); } private static boolean sameSchema(Table table, Schema fileSchema) { diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/RecordSortComparator.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/RecordSortComparator.java new file mode 100644 index 00000000..90b71a3b --- /dev/null +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/RecordSortComparator.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.cli.internal.cmd; + +import java.util.Comparator; +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortField; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.data.Record; + +public class RecordSortComparator implements Comparator { + private final List sortFields; + private final Schema schema; + + public RecordSortComparator(SortOrder sortOrder, Schema schema) { + this.sortFields = sortOrder.fields(); + this.schema = schema; + } + + @SuppressWarnings("unchecked") + @Override + public int compare(Record r1, Record r2) { + for (SortField sf : sortFields) { + String fieldName = schema.findColumnName(sf.sourceId()); + Comparable v1 = (Comparable) r1.getField(fieldName); + Comparable v2 = (Comparable) r2.getField(fieldName); + + if (v1 == null && v2 == null) continue; + if (v1 == null) return sf.direction() == SortDirection.ASC ? -1 : 1; + if (v2 == null) return sf.direction() == SortDirection.ASC ? 1 : -1; + + int cmp = v1.compareTo(v2); + if (cmp != 0) { + return sf.direction() == SortDirection.ASC ? cmp : -cmp; + } + } + return 0; + } +} diff --git a/ice/src/test/java/com/altinity/ice/cli/MainTest.java b/ice/src/test/java/com/altinity/ice/cli/MainTest.java new file mode 100644 index 00000000..edc5b3b1 --- /dev/null +++ b/ice/src/test/java/com/altinity/ice/cli/MainTest.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.cli; + +import static org.testng.Assert.*; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.testng.annotations.Test; + +public class MainTest { + + @Test + public void testSortOrderJsonParsing() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + + // Test single sort order + String singleSortJson = + """ + { + "column": "name", + "desc": true, + "nullFirst": true + } + """; + + Main.IceSortOrder singleSort = mapper.readValue(singleSortJson, Main.IceSortOrder.class); + assertEquals(singleSort.column(), "name"); + assertTrue(singleSort.desc()); + assertTrue(singleSort.nullFirst()); + + // Test array of sort orders + String multipleSortJson = + """ + [ + { + "column": "name", + "desc": true, + "nullFirst": true + }, + { + "column": "age", + "desc": false, + "nullFirst": false + } + ] + """; + + Main.IceSortOrder[] multipleSorts = + mapper.readValue(multipleSortJson, Main.IceSortOrder[].class); + assertEquals(multipleSorts.length, 2); + + // Verify first sort order + assertEquals(multipleSorts[0].column(), "name"); + assertTrue(multipleSorts[0].desc()); + assertTrue(multipleSorts[0].nullFirst()); + + // Verify second sort order + assertEquals(multipleSorts[1].column(), "age"); + assertFalse(multipleSorts[1].desc()); + assertFalse(multipleSorts[1].nullFirst()); + } + + @Test + public void testSortOrderJsonParsingWithMissingFields() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + + // Test with missing nullFirst field (should default to false) + String json = + """ + { + "column": "name", + "desc": true + } + """; + + Main.IceSortOrder sort = mapper.readValue(json, Main.IceSortOrder.class); + assertEquals(sort.column(), "name"); + assertTrue(sort.desc()); + assertFalse(sort.nullFirst()); + } + + @Test(expectedExceptions = Exception.class) + public void testSortOrderJsonParsingWithInvalidJson() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + + // Test with invalid JSON + String invalidJson = + """ + { + "column": "name", + "desc": "not-a-boolean" + } + """; + + mapper.readValue(invalidJson, Main.IceSortOrder.class); + } +}