ice: Add support for partition by and sort columns in create table and insert.#22
Conversation
|
Testing: Multiple columns java -jar ../../ice/target/ice-0.0.0-SNAPSHOT-shaded.jar create-table flowers.irs_no_copy_partition --schema-from-parquet=file://iris.parquet --partition-by=variety,petal.width |
|
adding to README |
|
What about |
| .commit(); | ||
| var updatedSortOrder = table.replaceSortOrder(); | ||
| for (String column : sortColumns) { | ||
| updatedSortOrder.asc(column); |
There was a problem hiding this comment.
what if order supposed to be desc?
… work with ice-rest-catalog.
|
The parquet file size is 3475226 Spark(Data partitioned) Spark(Data not partitioned) |
| The spark-sql shell can now query the tables directory | ||
|
|
||
| ``` | ||
| docker exec -it <container_id> bash |
There was a problem hiding this comment.
I'd suggest updating compose file to include container_name: spark so that this part could be simplified to docker exec -it spark ./spark-sql (or docker exec -it spark spark-sql if spark-sql is on the PATH)
| List<IcePartition> partitions = new ArrayList<>(); | ||
|
|
||
| if (sortOrderJson != null && !sortOrderJson.isEmpty()) { | ||
| ObjectMapper mapper = new ObjectMapper(); |
There was a problem hiding this comment.
it's best to set https://github.com/Altinity/ice/blob/master/ice/src/main/java/com/altinity/ice/internal/config/Config.java#L37, otherwise any typos are silently ignored
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.TableProperties; | ||
| import org.apache.iceberg.*; |
There was a problem hiding this comment.
let's try top avoid wildcard imports: https://google.github.io/styleguide/javaguide.html#s3.3.1-wildcard-imports
| } | ||
| long dataFileSizeInBytes; | ||
| var dataFile = Strings.replacePrefix(file, "s3a://", "s3://"); | ||
| long dataFileSizeInBytes = 0; |
There was a problem hiding this comment.
noCopy branch broken due to dataFileSizeInBytes being always 0
There was a problem hiding this comment.
it also looks like we should fail the operation if noCopy is enabled and partitioning/sort-order is defined
There was a problem hiding this comment.
added throw exception for f noCopy is enabled and partitioning/sort-order is defined
There was a problem hiding this comment.
noCopy branch broken due to dataFileSizeInBytes being always 0
still applies (line 317)
| .withFileSizeInBytes(inFile.getLength()) | ||
| .withPartition(partKey) | ||
| .withFormat(FileFormat.PARQUET) | ||
| .withRecordCount(records.size()) |
There was a problem hiding this comment.
if I remember correctly recordCount is set implicitly from metrics
| new DataFiles.Builder(table.spec()) | ||
| .withPath(dstDataFile) | ||
| .withFormat("PARQUET") | ||
| .withFileSizeInBytes(inFile.getLength()) |
There was a problem hiding this comment.
extra requests just to get file length can be avoid by reading the value from fileappender, same as we do in https://github.com/Altinity/ice/pull/22/files#diff-efe5f830dfd30841f29c50a9f843a6c295aafb4bfd3d60202bb22f8680272686L390
There was a problem hiding this comment.
changed to appender.length
| } | ||
|
|
||
| // Commit transaction. | ||
| txn.commitTransaction(); |
There was a problem hiding this comment.
we should probably merge file appends into the same tx; otherwise we may have multiple TXs per ice insert
| if (!finalOptions.noCommit()) { | ||
| // TODO: log | ||
| if (atLeastOneFileAppended) { | ||
| if (atLeastOneFileAppended.get()) { |
There was a problem hiding this comment.
it doesn't look like this needs to be an atomic
| appender.add(rec); | ||
| } | ||
|
|
||
| fileSizeInBytes = appender.length(); |
There was a problem hiding this comment.
this is incorrect. length() is guaranteed to return correct value only after close(). see javadocs
There was a problem hiding this comment.
Are u referring to FileAppender java class,
/** Returns the length of this file. */
long length();
There was a problem hiding this comment.
There was a problem hiding this comment.
yes appender.length before close, throws an error in spark. changed.
| spark-iceberg: | ||
| image: tabulario/spark-iceberg | ||
| container_name: spark-iceberg | ||
| build: spark/ |
There was a problem hiding this comment.
there is no spark/ in this PR
There was a problem hiding this comment.
its from iceberg repo, looks like build is ignored if image is defined, removed.
| @CommandLine.Option( | ||
| names = {"--partition"}, | ||
| description = | ||
| "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]") |
There was a problem hiding this comment.
can you please include a list of supported transforms in the description
| for (Main.IceSortOrder order : sortOrders) { | ||
| SortDirection dir = order.desc() ? SortDirection.DESC : SortDirection.ASC; | ||
| NullOrder nullOrd = order.nullFirst() ? NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; | ||
| if (dir == SortDirection.ASC) { |
There was a problem hiding this comment.
dir var appears to be unnecessary
There was a problem hiding this comment.
idea is dir would default to DESC if its not passed.
There was a problem hiding this comment.
this is just a nit, so keep it if you want but what I meant was: you can remove dir by changing if statement to
if (!order.desc()) {
| import org.apache.iceberg.io.InputFile; | ||
| import org.apache.iceberg.io.OutputFile; | ||
| import org.apache.iceberg.expressions.Expressions; | ||
| import org.apache.iceberg.io.*; |
| for (DataFile df : dataFiles) { | ||
| atLeastOneFileAppended = true; | ||
| appendOp.appendFile(df); | ||
| appendOp.appendFile(df); // ✅ Only main thread appends now |
| } | ||
|
|
||
| // Commit transaction. | ||
| txn.commitTransaction(); |
| } | ||
|
|
||
| // Commit transaction. | ||
| txn.commitTransaction(); |
| names = {"--partition"}, | ||
| description = | ||
| "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]," | ||
| + "Supported transforms: hour, day, month, year, identity") |
There was a problem hiding this comment.
| + "Supported transforms: hour, day, month, year, identity") | |
| + "Supported transforms: hour, day, month, year, identity (default)") |
| @CommandLine.Option( | ||
| names = {"--partition"}, | ||
| description = | ||
| "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]") |
There was a problem hiding this comment.
description is out of sync with create-table same field description
| replaceSortOrder.desc(order.column(), nullOrd); | ||
| } | ||
| } | ||
| replaceSortOrder.commit(); |
There was a problem hiding this comment.
just to confirm: there is no way to pass sort-order as part of createTable request? it looks like if there is an issue with replaceSortOrder.commit(); above the table will be left with wrong configuration and no way to update without performing ice incert (because repeated createTable will throw AlreadyExistsException
There was a problem hiding this comment.
I couldnt find a way, this was in the test class for iceberg, but the regular createTable doesnt take in sortOrder as parameter
Snippet of test class
@Test
public void testUpdateSortOrder() {
Schema schema = new Schema(Types.NestedField.required(10, "x", Types.StringType.get()));
SortOrder order = SortOrder.builderFor(schema).asc("x").build();
TableMetadata sortedByX =
TableMetadata.newTableMetadata(
schema, PartitionSpec.unpartitioned(), order, null, ImmutableMap.of());
assertThat(sortedByX.sortOrders()).hasSize(1);
assertThat(sortedByX.sortOrder().orderId()).isEqualTo(1);
assertThat(sortedByX.sortOrder().fields()).hasSize(1);
assertThat(sortedByX.sortOrder().fields().get(0).sourceId()).isEqualTo(1);
assertThat(sortedByX.sortOrder().fields().get(0).direction()).isEqualTo(SortDirection.ASC);
assertThat(sortedByX.sortOrder().fields().get(0).nullOrder()).isEqualTo(NullOrder.NULLS_FIRST);
// build an equivalent order with the correct schema
SortOrder newOrder = SortOrder.builderFor(sortedByX.schema()).asc("x").build();
TableMetadata alsoSortedByX = sortedByX.replaceSortOrder(newOrder);
assertThat(sortedByX)
.as("Should detect current
@Override
public Table createTable(
TableIdentifier ident,
Schema schema,
PartitionSpec spec,
String location,
Map<String, String> props) {
return delegate.createTable(ident, schema, spec, location, props);
}
@Override
public Table createTable(
TableIdentifier ident, Schema schema, PartitionSpec spec, Map<String, String> props) {
return delegate.createTable(ident, schema, spec, props);
}
@Override
public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec) {
return delegate.createTable(ident, schema, spec);
}
@Override
public Table createTable(TableIdentifier identifier, Schema schema) {
return delegate.createTable(identifier, schema);
}
shyiko
left a comment
There was a problem hiding this comment.
almost there 👍 the only blocker is https://github.com/Altinity/ice/pull/22/files#r2081978804
Changes: - ice create-table: Fix createTable not including sortOrder as part of createTable transaction. - ice insert: Fix create-table & insert generating different/incompatible partition specs. - ice insert: Fix "java.lang.IllegalArgumentException: Cannot add duplicate partition field" when trying to insert into a table with partition spec already set by create-table (--partition had to be specified for `ice insert` to reproduce). - ice insert: Fix "software.amazon.awssdk.services.s3.model.S3Exception: Object name contains unsupported characters" when trying to insert --partition data into an existing un-partitioned table (OutputFileFactory was missing tableSpec). - ice insert: Fix insert mutating table partitioning/ordering specs + write.distribution-mode even when there are no changes; - ice insert: Fix race condition resulting from multiple threads accessing the same Schema instance. - ice insert: Fix partitioning activity logging invalid "took Ns" values. - ice insert: Fix improper use of reuseContainer that could lead to invalid data being written to the catalog. - ice insert: Fix insert accepting new partitioning/ordering specs without rewriting existing data. - ice insert: Fix insert not following --data-file-naming-strategy=DEFAULT strategy - ice create-table/insert: Fix NPE when --partition is specified without "transform". - examples/docker-compose:spark: Fix invalid/incomplete spark configuration (spark conf was missing client.region, header.authorization, etc.) - examples/docker-compose:spark: Pin tabulario/spark-iceberg tag to reduce the risk of it breaking in the future. - examples/docker-compose:spark: Fix invalid `docker exec` command (./spark-sql didn't work). - examples/docker-compose:spark: Remove the need to manually edit spark config just to try things. - examples/docker-compose:spark: Explain what docker-compose-spark-iceberg.yaml is for and how to use it + spark-sql. - examples/docker-compose:spark: Remove redundant/copy&paste parts from docker-compose-spark-iceberg.yaml. - examples/scratch: Fix examples referencing non-existent options (like --partition-by). Future work: - Support --no-copy when partitioning - Support --data-file-naming-strategy=PRESERVE_ORIGINAL when partitioning
Changes: - ice create-table: Fix createTable not including sortOrder as part of createTable transaction. - ice insert: Fix create-table & insert generating different/incompatible partition specs. - ice insert: Fix "java.lang.IllegalArgumentException: Cannot add duplicate partition field" when trying to insert into a table with partition spec already set by create-table (--partition had to be specified for `ice insert` to reproduce). - ice insert: Fix "software.amazon.awssdk.services.s3.model.S3Exception: Object name contains unsupported characters" when trying to insert --partition data into an existing un-partitioned table (OutputFileFactory was missing tableSpec). - ice insert: Fix insert mutating table partitioning/ordering specs + write.distribution-mode even when there are no changes; - ice insert: Fix race condition resulting from multiple threads accessing the same Schema instance. - ice insert: Fix partitioning activity logging invalid "took Ns" values. - ice insert: Fix improper use of reuseContainer that could lead to invalid data being written to the catalog. - ice insert: Fix insert accepting new partitioning/ordering specs without rewriting existing data. - ice insert: Fix insert not following --data-file-naming-strategy=DEFAULT strategy - ice create-table/insert: Fix NPE when --partition is specified without "transform". - examples/docker-compose:spark: Fix invalid/incomplete spark configuration (spark conf was missing client.region, header.authorization, etc.) - examples/docker-compose:spark: Pin tabulario/spark-iceberg tag to reduce the risk of it breaking in the future. - examples/docker-compose:spark: Fix invalid `docker exec` command (./spark-sql didn't work). - examples/docker-compose:spark: Remove the need to manually edit spark config just to try things. - examples/docker-compose:spark: Explain what docker-compose-spark-iceberg.yaml is for and how to use it + spark-sql. - examples/docker-compose:spark: Remove redundant/copy&paste parts from docker-compose-spark-iceberg.yaml. - examples/scratch: Fix examples referencing non-existent options (like --partition-by). Future work: - Support --no-copy when partitioning - Support --data-file-naming-strategy=PRESERVE_ORIGINAL when partitioning

closes: #20