-
Notifications
You must be signed in to change notification settings - Fork 6
ice: Add support for partition by and sort columns in create table and insert. #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
55d08e8
1a9ca7f
5dfcdf9
8b5463a
4a98c49
6762798
c07e50b
991593b
8898f26
bbc9c10
68ebd1d
bc0c075
3775b5e
b28af69
2128640
2eab476
174b420
f7651b6
5a29eb4
9130285
8512ba5
65847f8
5d25f55
290dd80
9b9ed9b
3329c32
0706672
fe371ad
b0abb1a
827fd73
d3e7648
701352e
1b30ce0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| services: | ||
| spark-iceberg: | ||
| image: tabulario/spark-iceberg | ||
| container_name: spark-iceberg | ||
| network_mode: host | ||
| volumes: | ||
| - ./warehouse:/home/iceberg/warehouse | ||
| - ./notebooks:/home/iceberg/notebooks/notebooks | ||
| environment: | ||
| - AWS_ACCESS_KEY_ID=miniouser | ||
| - AWS_SECRET_ACCESS_KEY=miniopassword | ||
| - AWS_REGION=minio | ||
| ports: | ||
| - 8888:8888 | ||
| - 8080:8080 | ||
| - 10000:10000 | ||
| - 10001:10001 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,8 +18,12 @@ | |
| import com.altinity.ice.cli.internal.config.Config; | ||
| import com.altinity.ice.internal.picocli.VersionProvider; | ||
| import com.altinity.ice.internal.strings.Strings; | ||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
| import com.fasterxml.jackson.databind.DeserializationFeature; | ||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Scanner; | ||
|
|
@@ -114,6 +118,14 @@ void describe( | |
| } | ||
| } | ||
|
|
||
| public record IceSortOrder( | ||
| @JsonProperty("column") String column, | ||
| @JsonProperty("desc") boolean desc, | ||
| @JsonProperty("nullFirst") boolean nullFirst) {} | ||
|
|
||
| public record IcePartition( | ||
| @JsonProperty("column") String column, @JsonProperty("transform") String transform) {} | ||
|
|
||
| @CommandLine.Command(name = "create-table", description = "Create table.") | ||
| void createTable( | ||
| @CommandLine.Parameters( | ||
|
|
@@ -138,16 +150,46 @@ void createTable( | |
| required = true, | ||
| names = "--schema-from-parquet", | ||
| description = "/path/to/file.parquet") | ||
| String schemaFile) | ||
| String schemaFile, | ||
| @CommandLine.Option( | ||
| names = {"--partition"}, | ||
| description = | ||
| "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]," | ||
| + "Supported transforms: hour, day, month, year, identity(default)") | ||
| String partitionJson, | ||
| @CommandLine.Option( | ||
| names = {"--sort-order"}, | ||
| description = | ||
| "JSON array of sort orders: [{\"column\":\"name\",\"desc\":true,\"nullFirst\":true}]") | ||
| String sortOrderJson) | ||
| throws IOException { | ||
| try (RESTCatalog catalog = loadCatalog(this.configFile())) { | ||
| List<IceSortOrder> sortOrders = new ArrayList<>(); | ||
| List<IcePartition> partitions = new ArrayList<>(); | ||
|
|
||
| if (sortOrderJson != null && !sortOrderJson.isEmpty()) { | ||
| ObjectMapper mapper = new ObjectMapper(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's best to set https://github.com/Altinity/ice/blob/master/ice/src/main/java/com/altinity/ice/internal/config/Config.java#L37, otherwise any typos are silently ignored
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added. |
||
| mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); | ||
| IceSortOrder[] orders = mapper.readValue(sortOrderJson, IceSortOrder[].class); | ||
| sortOrders = Arrays.asList(orders); | ||
| } | ||
|
|
||
| if (partitionJson != null && !partitionJson.isEmpty()) { | ||
| ObjectMapper mapper = new ObjectMapper(); | ||
| mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); | ||
| IcePartition[] parts = mapper.readValue(partitionJson, IcePartition[].class); | ||
| partitions = Arrays.asList(parts); | ||
| } | ||
|
|
||
| CreateTable.run( | ||
| catalog, | ||
| TableIdentifier.parse(name), | ||
| schemaFile, | ||
| location, | ||
| createTableIfNotExists, | ||
| s3NoSignRequest); | ||
| s3NoSignRequest, | ||
| partitions, | ||
| sortOrders); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -206,6 +248,17 @@ void insert( | |
| "/path/to/file where to save list of files to retry" | ||
| + " (useful for retrying partially failed insert using `cat ice.retry | ice insert - --retry-list=ice.retry`)") | ||
| String retryList, | ||
| @CommandLine.Option( | ||
| names = {"--partition"}, | ||
| description = | ||
| "JSON array of partition specifications: [{\"column\":\"date\",\"transform\":\"year\"}]," | ||
| + "Supported transforms: hour, day, month, year, identity(default)") | ||
| String partitionJson, | ||
| @CommandLine.Option( | ||
| names = {"--sort-order"}, | ||
| description = | ||
| "JSON array of sort orders: [{\"column\":\"name\",\"desc\":true,\"nullFirst\":true}]") | ||
| String sortOrderJson, | ||
| @CommandLine.Option( | ||
| names = {"--thread-count"}, | ||
| description = "Number of threads to use for inserting data", | ||
|
|
@@ -224,11 +277,35 @@ void insert( | |
| return; | ||
| } | ||
| } | ||
|
|
||
| List<IceSortOrder> sortOrders = new ArrayList<>(); | ||
| List<IcePartition> partitions = new ArrayList<>(); | ||
|
|
||
| if (sortOrderJson != null && !sortOrderJson.isEmpty()) { | ||
| ObjectMapper mapper = new ObjectMapper(); | ||
| mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); | ||
| IceSortOrder[] orders = mapper.readValue(sortOrderJson, IceSortOrder[].class); | ||
| sortOrders = Arrays.asList(orders); | ||
| } | ||
|
|
||
| if (partitionJson != null && !partitionJson.isEmpty()) { | ||
| ObjectMapper mapper = new ObjectMapper(); | ||
| mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); | ||
| IcePartition[] parts = mapper.readValue(partitionJson, IcePartition[].class); | ||
| partitions = Arrays.asList(parts); | ||
| } | ||
|
|
||
| TableIdentifier tableId = TableIdentifier.parse(name); | ||
| if (createTableIfNotExists) { | ||
| // TODO: newCreateTableTransaction | ||
| CreateTable.run( | ||
| catalog, tableId, dataFiles[0], null, createTableIfNotExists, s3NoSignRequest); | ||
| catalog, | ||
| tableId, | ||
| dataFiles[0], | ||
| null, | ||
| createTableIfNotExists, | ||
| s3NoSignRequest, | ||
| partitions, | ||
| sortOrders); | ||
| } | ||
| Insert.run( | ||
| catalog, | ||
|
|
@@ -243,6 +320,8 @@ void insert( | |
| s3NoSignRequest, | ||
| s3CopyObject, | ||
| retryList, | ||
| partitions, | ||
| sortOrders, | ||
| threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount); | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing instructions on how to actually use any of this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added