Skip to content

ice: Add support for partition by and sort columns in create table and insert.#22

Merged
subkanthi merged 33 commits into
masterfrom
20-ice-support-partitioning-when-tables-are-created
May 9, 2025
Merged

ice: Add support for partition by and sort columns in create table and insert.#22
subkanthi merged 33 commits into
masterfrom
20-ice-support-partitioning-when-tables-are-created

Conversation

@subkanthi
Copy link
Copy Markdown
Collaborator

closes: #20

@subkanthi subkanthi linked an issue Apr 24, 2025 that may be closed by this pull request
@subkanthi
Copy link
Copy Markdown
Collaborator Author

subkanthi commented Apr 24, 2025

Testing:
java -jar ../../ice/target/ice-0.0.0-SNAPSHOT-shaded.jar create-table flowers.irs_no_copy_partition --schema-from-parquet=file://iris.parquet --partition-by=variety

    }
  partition_spec_raw: |-
    [
      1000: variety: identity(5)
    ]

Multiple columns

java -jar ../../ice/target/ice-0.0.0-SNAPSHOT-shaded.jar create-table flowers.irs_no_copy_partition --schema-from-parquet=file://iris.parquet --partition-by=variety,petal.width

  partition_spec_raw: |-
    [
      1000: variety: identity(5)
      1001: petal.width: identity(4)
    ]

@subkanthi subkanthi requested a review from shyiko April 24, 2025 13:48
@subkanthi subkanthi marked this pull request as ready for review April 24, 2025 13:48
@subkanthi subkanthi removed the request for review from shyiko April 24, 2025 14:00
@subkanthi subkanthi marked this pull request as draft April 24, 2025 14:00
@subkanthi
Copy link
Copy Markdown
Collaborator Author

adding to README

@subkanthi subkanthi marked this pull request as ready for review April 24, 2025 14:43
@shyiko
Copy link
Copy Markdown
Collaborator

shyiko commented Apr 24, 2025

What about ice insert? Does it need to be updated?

@subkanthi subkanthi changed the title Add support for partition by columns in create table. ice: Add support for partition by and sort columns in create table and insert. Apr 25, 2025
.commit();
var updatedSortOrder = table.replaceSortOrder();
for (String column : sortColumns) {
updatedSortOrder.asc(column);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if order supposed to be desc?

@subkanthi
Copy link
Copy Markdown
Collaborator Author

subkanthi commented May 2, 2025

Data with partition key.
image

The parquet file size is 3475226

SELECT count(*)
FROM url('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet', 'parquet')

Query id: 2100837d-987e-4118-b1d4-6551c398bb80

   ┌─count()─┐
1. │ 3475226 │ -- 3.48 millionthe display of the progress table.
   └─────────┘

1 row in set. Elapsed: 1.717 sec. 

:) 

Spark(Data partitioned)

select * from taxis44 where vendorID=7;
**Time taken: 0.493 seconds, Fetched 1206 row(s)**

Spark(Data not partitioned)

select * from taxis where vendorID=7;

Time taken: 1.78 seconds, Fetched 2412 row(s)

@subkanthi subkanthi marked this pull request as draft May 2, 2025 13:39
@subkanthi subkanthi marked this pull request as ready for review May 5, 2025 21:03
@subkanthi subkanthi requested a review from shyiko May 8, 2025 01:24
Comment thread examples/docker-compose/README.md Outdated
The spark-sql shell can now query the tables directory

```
docker exec -it <container_id> bash
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest updating compose file to include container_name: spark so that this part could be simplified to docker exec -it spark ./spark-sql (or docker exec -it spark spark-sql if spark-sql is on the PATH)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

List<IcePartition> partitions = new ArrayList<>();

if (sortOrderJson != null && !sortOrderJson.isEmpty()) {
ObjectMapper mapper = new ObjectMapper();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.*;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator Author

@subkanthi subkanthi May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

}
long dataFileSizeInBytes;
var dataFile = Strings.replacePrefix(file, "s3a://", "s3://");
long dataFileSizeInBytes = 0;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noCopy branch broken due to dataFileSizeInBytes being always 0

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also looks like we should fail the operation if noCopy is enabled and partitioning/sort-order is defined

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still unresolved

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added throw exception for f noCopy is enabled and partitioning/sort-order is defined

Copy link
Copy Markdown
Collaborator

@shyiko shyiko May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noCopy branch broken due to dataFileSizeInBytes being always 0

still applies (line 317)

.withFileSizeInBytes(inFile.getLength())
.withPartition(partKey)
.withFormat(FileFormat.PARQUET)
.withRecordCount(records.size())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I remember correctly recordCount is set implicitly from metrics

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

new DataFiles.Builder(table.spec())
.withPath(dstDataFile)
.withFormat("PARQUET")
.withFileSizeInBytes(inFile.getLength())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra requests just to get file length can be avoid by reading the value from fileappender, same as we do in https://github.com/Altinity/ice/pull/22/files#diff-efe5f830dfd30841f29c50a9f843a6c295aafb4bfd3d60202bb22f8680272686L390

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to appender.length

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// Commit transaction.
txn.commitTransaction();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably merge file appends into the same tx; otherwise we may have multiple TXs per ice insert

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (!finalOptions.noCommit()) {
// TODO: log
if (atLeastOneFileAppended) {
if (atLeastOneFileAppended.get()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't look like this needs to be an atomic

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

appender.add(rec);
}

fileSizeInBytes = appender.length();
Copy link
Copy Markdown
Collaborator

@shyiko shyiko May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is incorrect. length() is guaranteed to return correct value only after close(). see javadocs

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are u referring to FileAppender java class,
/** Returns the length of this file. */
long length();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes appender.length before close, throws an error in spark. changed.

spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
build: spark/
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no spark/ in this PR

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its from iceberg repo, looks like build is ignored if image is defined, removed.

@CommandLine.Option(
names = {"--partition"},
description =
"JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please include a list of supported transforms in the description

for (Main.IceSortOrder order : sortOrders) {
SortDirection dir = order.desc() ? SortDirection.DESC : SortDirection.ASC;
NullOrder nullOrd = order.nullFirst() ? NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
if (dir == SortDirection.ASC) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dir var appears to be unnecessary

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idea is dir would default to DESC if its not passed.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just a nit, so keep it if you want but what I meant was: you can remove dir by changing if statement to

if (!order.desc()) {

import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.*;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wildcards are back

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

for (DataFile df : dataFiles) {
atLeastOneFileAppended = true;
appendOp.appendFile(df);
appendOp.appendFile(df); // ✅ Only main thread appends now
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ - ai assistant artifact?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

}

// Commit transaction.
txn.commitTransaction();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump

}

// Commit transaction.
txn.commitTransaction();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noCommit flag ignored

names = {"--partition"},
description =
"JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}],"
+ "Supported transforms: hour, day, month, year, identity")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
+ "Supported transforms: hour, day, month, year, identity")
+ "Supported transforms: hour, day, month, year, identity (default)")

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@CommandLine.Option(
names = {"--partition"},
description =
"JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

description is out of sync with create-table same field description

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

replaceSortOrder.desc(order.column(), nullOrd);
}
}
replaceSortOrder.commit();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to confirm: there is no way to pass sort-order as part of createTable request? it looks like if there is an issue with replaceSortOrder.commit(); above the table will be left with wrong configuration and no way to update without performing ice incert (because repeated createTable will throw AlreadyExistsException

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldnt find a way, this was in the test class for iceberg, but the regular createTable doesnt take in sortOrder as parameter

Snippet of test class

  @Test
  public void testUpdateSortOrder() {
    Schema schema = new Schema(Types.NestedField.required(10, "x", Types.StringType.get()));

    SortOrder order = SortOrder.builderFor(schema).asc("x").build();

    TableMetadata sortedByX =
        TableMetadata.newTableMetadata(
            schema, PartitionSpec.unpartitioned(), order, null, ImmutableMap.of());
    assertThat(sortedByX.sortOrders()).hasSize(1);
    assertThat(sortedByX.sortOrder().orderId()).isEqualTo(1);
    assertThat(sortedByX.sortOrder().fields()).hasSize(1);
    assertThat(sortedByX.sortOrder().fields().get(0).sourceId()).isEqualTo(1);
    assertThat(sortedByX.sortOrder().fields().get(0).direction()).isEqualTo(SortDirection.ASC);
    assertThat(sortedByX.sortOrder().fields().get(0).nullOrder()).isEqualTo(NullOrder.NULLS_FIRST);

    // build an equivalent order with the correct schema
    SortOrder newOrder = SortOrder.builderFor(sortedByX.schema()).asc("x").build();

    TableMetadata alsoSortedByX = sortedByX.replaceSortOrder(newOrder);
    assertThat(sortedByX)
        .as("Should detect current 

  @Override
  public Table createTable(
      TableIdentifier ident,
      Schema schema,
      PartitionSpec spec,
      String location,
      Map<String, String> props) {
    return delegate.createTable(ident, schema, spec, location, props);
  }

  @Override
  public Table createTable(
      TableIdentifier ident, Schema schema, PartitionSpec spec, Map<String, String> props) {
    return delegate.createTable(ident, schema, spec, props);
  }

  @Override
  public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec) {
    return delegate.createTable(ident, schema, spec);
  }

  @Override
  public Table createTable(TableIdentifier identifier, Schema schema) {
    return delegate.createTable(identifier, schema);
  }

Copy link
Copy Markdown
Collaborator

@shyiko shyiko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

almost there 👍 the only blocker is https://github.com/Altinity/ice/pull/22/files#r2081978804

@subkanthi subkanthi merged commit bac2d6a into master May 9, 2025
1 check passed
shyiko added a commit that referenced this pull request May 12, 2025
Changes:

- ice create-table: Fix createTable not including sortOrder as part of createTable transaction.
- ice insert: Fix create-table & insert generating different/incompatible partition specs.
- ice insert: Fix "java.lang.IllegalArgumentException: Cannot add duplicate partition field" when trying to insert into a table with partition spec already set by create-table (--partition had to be specified for `ice insert` to reproduce).
- ice insert: Fix "software.amazon.awssdk.services.s3.model.S3Exception: Object name contains unsupported characters" when trying to insert --partition data into an existing un-partitioned table (OutputFileFactory was missing tableSpec).
- ice insert: Fix insert mutating table partitioning/ordering specs + write.distribution-mode even when there are no changes;
- ice insert: Fix race condition resulting from multiple threads accessing the same Schema instance.
- ice insert: Fix partitioning activity logging invalid "took Ns" values.
- ice insert: Fix improper use of reuseContainer that could lead to invalid data being written to the catalog.
- ice insert: Fix insert accepting new partitioning/ordering specs without rewriting existing data.
- ice insert: Fix insert not following --data-file-naming-strategy=DEFAULT strategy
- ice create-table/insert: Fix NPE when --partition is specified without "transform".
- examples/docker-compose:spark: Fix invalid/incomplete spark configuration (spark conf was missing client.region, header.authorization, etc.)
- examples/docker-compose:spark: Pin tabulario/spark-iceberg tag to reduce the risk of it breaking in the future.
- examples/docker-compose:spark: Fix invalid `docker exec` command (./spark-sql didn't work).
- examples/docker-compose:spark: Remove the need to manually edit spark config just to try things.
- examples/docker-compose:spark: Explain what docker-compose-spark-iceberg.yaml is for and how to use it + spark-sql.
- examples/docker-compose:spark: Remove redundant/copy&paste parts from docker-compose-spark-iceberg.yaml.
- examples/scratch: Fix examples referencing non-existent options (like --partition-by).

Future work:
- Support --no-copy when partitioning
- Support --data-file-naming-strategy=PRESERVE_ORIGINAL when partitioning
shyiko added a commit that referenced this pull request May 12, 2025
Changes:

- ice create-table: Fix createTable not including sortOrder as part of createTable transaction.
- ice insert: Fix create-table & insert generating different/incompatible partition specs.
- ice insert: Fix "java.lang.IllegalArgumentException: Cannot add duplicate partition field" when trying to insert into a table with partition spec already set by create-table (--partition had to be specified for `ice insert` to reproduce).
- ice insert: Fix "software.amazon.awssdk.services.s3.model.S3Exception: Object name contains unsupported characters" when trying to insert --partition data into an existing un-partitioned table (OutputFileFactory was missing tableSpec).
- ice insert: Fix insert mutating table partitioning/ordering specs + write.distribution-mode even when there are no changes;
- ice insert: Fix race condition resulting from multiple threads accessing the same Schema instance.
- ice insert: Fix partitioning activity logging invalid "took Ns" values.
- ice insert: Fix improper use of reuseContainer that could lead to invalid data being written to the catalog.
- ice insert: Fix insert accepting new partitioning/ordering specs without rewriting existing data.
- ice insert: Fix insert not following --data-file-naming-strategy=DEFAULT strategy
- ice create-table/insert: Fix NPE when --partition is specified without "transform".
- examples/docker-compose:spark: Fix invalid/incomplete spark configuration (spark conf was missing client.region, header.authorization, etc.)
- examples/docker-compose:spark: Pin tabulario/spark-iceberg tag to reduce the risk of it breaking in the future.
- examples/docker-compose:spark: Fix invalid `docker exec` command (./spark-sql didn't work).
- examples/docker-compose:spark: Remove the need to manually edit spark config just to try things.
- examples/docker-compose:spark: Explain what docker-compose-spark-iceberg.yaml is for and how to use it + spark-sql.
- examples/docker-compose:spark: Remove redundant/copy&paste parts from docker-compose-spark-iceberg.yaml.
- examples/scratch: Fix examples referencing non-existent options (like --partition-by).

Future work:

- Support --no-copy when partitioning
- Support --data-file-naming-strategy=PRESERVE_ORIGINAL when partitioning
@shyiko shyiko deleted the 20-ice-support-partitioning-when-tables-are-created branch June 3, 2025 18:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ice: Support partitioning when tables are created

2 participants