diff --git a/examples/scratch/README.md b/examples/scratch/README.md index f11e9f95..78e7b373 100644 --- a/examples/scratch/README.md +++ b/examples/scratch/README.md @@ -19,10 +19,12 @@ local-mc mb --ignore-existing local/bucket1 # start Iceberg REST Catalog server ice-rest-catalog + or using the jar file. +/ice/examples/scratch$ java -jar ../../ice-rest-catalog/target/ice-rest-catalog-jar-with-dependencies.jar & + # insert data into catalog -ice insert flowers.iris -p \ - file://iris.parquet +ice ice insert nyc.taxis -p \ https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet diff --git a/ice/src/main/java/com/altinity/ice/Main.java b/ice/src/main/java/com/altinity/ice/Main.java index a45eb5a8..a26ed39f 100644 --- a/ice/src/main/java/com/altinity/ice/Main.java +++ b/ice/src/main/java/com/altinity/ice/Main.java @@ -69,10 +69,14 @@ void describe( @CommandLine.Option( names = {"--json"}, description = "Output JSON instead of YAML") - boolean json) + boolean json, + @CommandLine.Option( + names = {"--include-metrics"}, + description = "Include table metrics in the output") + boolean includeMetrics) throws IOException { try (RESTCatalog catalog = loadCatalog(this.configFile)) { - Describe.run(catalog, target, json); + Describe.run(catalog, target, json, includeMetrics); } } diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java index 1676d8df..06fd03e9 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java @@ -3,23 +3,28 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.Table; +import org.apache.iceberg.*; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; public final class Describe { private Describe() {} // TODO: refactor: the use of StringBuilder below is absolutely criminal - public static void run(RESTCatalog catalog, String target, boolean json) throws IOException { + public static void run(RESTCatalog catalog, String target, boolean json, boolean includeMetrics) + throws IOException { String targetNamespace = null; String targetTable = null; if (target != null && !target.isEmpty()) { @@ -102,6 +107,10 @@ public static void run(RESTCatalog catalog, String target, boolean json) throws } sb.append("\t\t\t\tlocation: " + snapshot.manifestListLocation() + "\n"); } + + if (includeMetrics) { + printTableMetrics(table, sb); + } } if (!tableMatched) { sb.append(" []\n"); @@ -117,6 +126,49 @@ public static void run(RESTCatalog catalog, String target, boolean json) throws System.out.println(r); } + /** + * Print table metrics + * + * @param table + */ + private static void printTableMetrics(Table table, StringBuilder buffer) throws IOException { + TableScan scan = table.newScan().includeColumnStats(); + CloseableIterable tasks = scan.planFiles(); + + for (FileScanTask task : tasks) { + DataFile dataFile = task.file(); + buffer.append("\t\t\tmetrics:\n"); + buffer.append("\t\t\t file: " + dataFile.path() + "\n"); + buffer.append("\t\t\t record_count: " + dataFile.recordCount() + "\n"); + + Map valueCounts = dataFile.valueCounts(); + Map nullCounts = dataFile.nullValueCounts(); + Map lowerBounds = dataFile.lowerBounds(); + Map upperBounds = dataFile.upperBounds(); + + buffer.append("\t\t\t columns:\n"); + for (Types.NestedField field : table.schema().columns()) { + int id = field.fieldId(); + buffer.append("\t\t\t " + field.name() + ":\n"); + buffer.append("\t\t\t value_count: " + valueCounts.get(id) + "\n"); + buffer.append("\t\t\t null_count: " + nullCounts.get(id) + "\n"); + + ByteBuffer lower = lowerBounds.get(id); + ByteBuffer upper = upperBounds.get(id); + + String lowerStr = + lower != null ? Conversions.fromByteBuffer(field.type(), lower).toString() : "null"; + String upperStr = + upper != null ? Conversions.fromByteBuffer(field.type(), upper).toString() : "null"; + + buffer.append("\t\t\t lower_bound: " + lowerStr + "\n"); + buffer.append("\t\t\t upper_bound: " + upperStr + "\n"); + } + } + + tasks.close(); + } + private static String convertYamlToJson(String yaml) throws IOException { ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory()); Object obj = yamlReader.readValue(yaml, Object.class); diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java index 53ff4540..0f311695 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java @@ -16,12 +16,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; +import org.apache.iceberg.*; import org.apache.iceberg.aws.s3.S3FileIO; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; @@ -39,6 +34,7 @@ import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; @@ -137,7 +133,7 @@ public static void run( RetryLog retryLog = retryListFile != null && !retryListFile.isEmpty() ? new RetryLog(retryListFile) - : null; ) { + : null) { boolean atLeastOneFileAppended = false; // TODO: parallel @@ -235,13 +231,15 @@ public static void run( logger.info("{}: adding data file", file); long recordCount = metadata.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum(); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig); df = new DataFiles.Builder(table.spec()) .withPath(dataFile) .withFormat("PARQUET") .withRecordCount(recordCount) .withFileSizeInBytes(dataFileSizeInBytes) - // TODO: metrics + .withMetrics(metrics) .build(); } catch (Exception e) { // FIXME if (retryLog != null) {