From db104dba4248c757e49baeb947a3a0afe5715c6f Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 17 Apr 2025 10:53:12 -0400 Subject: [PATCH 1/6] [12] Added metrics when writing files to iceberg from parquet. --- .../java/com/altinity/ice/internal/cmd/Insert.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java index 6f022227..a0e53ed4 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java @@ -14,12 +14,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; + +import org.apache.iceberg.*; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; @@ -36,6 +32,7 @@ import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; @@ -233,12 +230,15 @@ public static void run( logger.info("{}: adding data file", file); long recordCount = metadata.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum(); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig); df = new DataFiles.Builder(table.spec()) .withPath(dataFile) .withFormat("PARQUET") .withRecordCount(recordCount) .withFileSizeInBytes(dataFileSizeInBytes) + .withMetrics(metrics) // TODO: metrics .build(); } catch (Exception e) { // FIXME From c0b6c29c377533c4941ac865d4726cbb3324fcef Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 20 Apr 2025 21:43:18 -0400 Subject: [PATCH 2/6] [12] Removed empty space in imports. --- ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java | 1 - 1 file changed, 1 deletion(-) diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java index a0e53ed4..c519a64d 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java @@ -14,7 +14,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; - import org.apache.iceberg.*; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; From e6178be218113a917e3bc2f86949162ec664063a Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 22 Apr 2025 13:50:33 -0400 Subject: [PATCH 3/6] Fixed formatting. --- .../com/altinity/ice/internal/cmd/Insert.java | 167 +++++++++--------- 1 file changed, 83 insertions(+), 84 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java index 867e88b8..7d123e4f 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java @@ -16,7 +16,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; - import org.apache.iceberg.*; import org.apache.iceberg.aws.s3.S3FileIO; import org.apache.iceberg.catalog.TableIdentifier; @@ -56,19 +55,19 @@ private Insert() {} // TODO: refactor public static void run( - RESTCatalog catalog, - TableIdentifier nsTable, - String[] files, - DataFileNamingStrategy.Name dataFileNamingStrategy, - boolean skipDuplicates, - boolean noCommit, - boolean noCopy, - boolean forceNoCopy, - boolean forceTableAuth, - boolean s3NoSignRequest, - boolean s3CopyObject, - String retryListFile) - throws IOException { + RESTCatalog catalog, + TableIdentifier nsTable, + String[] files, + DataFileNamingStrategy.Name dataFileNamingStrategy, + boolean skipDuplicates, + boolean noCommit, + boolean noCopy, + boolean forceNoCopy, + boolean forceTableAuth, + boolean s3NoSignRequest, + boolean s3CopyObject, + String retryListFile) + throws IOException { if (files.length == 0) { // no work to be done return; @@ -82,7 +81,7 @@ public static void run( if (forceTableAuth) { if (!(tableIO instanceof S3FileIO)) { throw new UnsupportedOperationException( - "--force-table-auth is currently only supported for s3:// tables"); + "--force-table-auth is currently only supported for s3:// tables"); } s3ClientSupplier = ((S3FileIO) tableIO)::client; } else { @@ -91,18 +90,18 @@ public static void run( Lazy s3ClientLazy = new Lazy<>(s3ClientSupplier); try { var filesExpanded = - Arrays.stream(files) - .flatMap( - s -> { - if (s.startsWith("s3://") && s.contains("*")) { - var b = S3.bucketPath(s); - return S3 - .listWildcard(s3ClientLazy.getValue(), b.bucket(), b.path(), -1) - .stream(); - } - return Stream.of(s); - }) - .toList(); + Arrays.stream(files) + .flatMap( + s -> { + if (s.startsWith("s3://") && s.contains("*")) { + var b = S3.bucketPath(s); + return S3 + .listWildcard(s3ClientLazy.getValue(), b.bucket(), b.path(), -1) + .stream(); + } + return Stream.of(s); + }) + .toList(); if (filesExpanded.isEmpty()) { throw new BadRequestException("No matching files found"); } @@ -115,26 +114,26 @@ public static void run( Set tableDataFiles; try (var plan = table.newScan().planFiles()) { tableDataFiles = - StreamSupport.stream(plan.spliterator(), false) - .map(f -> f.file().location()) - .collect(Collectors.toSet()); + StreamSupport.stream(plan.spliterator(), false) + .map(f -> f.file().location()) + .collect(Collectors.toSet()); } String dstPath = DataFileNamingStrategy.defaultDataLocation(table); DataFileNamingStrategy dstDataFileSource = - switch (dataFileNamingStrategy) { - case DEFAULT -> - new DataFileNamingStrategy.Default(dstPath, System.currentTimeMillis() + "-"); - case INPUT_FILENAME -> new DataFileNamingStrategy.InputFilename(dstPath); - }; + switch (dataFileNamingStrategy) { + case DEFAULT -> + new DataFileNamingStrategy.Default(dstPath, System.currentTimeMillis() + "-"); + case INPUT_FILENAME -> new DataFileNamingStrategy.InputFilename(dstPath); + }; AppendFiles appendOp = table.newAppend(); try (FileIO inputIO = Input.newIO(filesExpanded.getFirst(), table, s3ClientLazy); - RetryLog retryLog = - retryListFile != null && !retryListFile.isEmpty() - ? new RetryLog(retryListFile) - : null; ) { + RetryLog retryLog = + retryListFile != null && !retryListFile.isEmpty() + ? new RetryLog(retryListFile) + : null ) { boolean atLeastOneFileAppended = false; // TODO: parallel @@ -145,34 +144,34 @@ public static void run( logger.info("{}: jvm: {}", file, Stats.gather()); Function checkNotExists = - dataFile -> { - if (tableDataFiles.contains(dataFile)) { - if (skipDuplicates) { - logger.info("{}: duplicate (skipping)", file); - return true; + dataFile -> { + if (tableDataFiles.contains(dataFile)) { + if (skipDuplicates) { + logger.info("{}: duplicate (skipping)", file); + return true; + } + throw new AlreadyExistsException( + String.format("%s is already referenced by the table", dataFile)); } - throw new AlreadyExistsException( - String.format("%s is already referenced by the table", dataFile)); - } - return false; - }; + return false; + }; InputFile inputFile = - Input.newFile(file, catalog, inputIO == null ? tableIO : inputIO); + Input.newFile(file, catalog, inputIO == null ? tableIO : inputIO); ParquetMetadata metadata = Metadata.read(inputFile); MessageType type = metadata.getFileMetaData().getSchema(); Schema fileSchema = - ParquetSchemaUtil.convert(type); // nameMapping applied (when present) + ParquetSchemaUtil.convert(type); // nameMapping applied (when present) if (!sameSchema(table, fileSchema)) { throw new BadRequestException( - String.format("%s's schema doesn't match table's schema", file)); + String.format("%s's schema doesn't match table's schema", file)); } // assuming datafiles can be anywhere when table.location() is empty var noCopyPossible = file.startsWith(table.location()) || forceNoCopy; // TODO: check before uploading anything if (noCopy && !noCopyPossible) { throw new BadRequestException( - file + " cannot be added to catalog without copy"); // TODO: explain + file + " cannot be added to catalog without copy"); // TODO: explain } long dataFileSizeInBytes; var dataFile = replacePrefix(file, "s3a://", "s3://"); @@ -184,7 +183,7 @@ public static void run( } else if (s3CopyObject) { if (!dataFile.startsWith("s3://") || !table.location().startsWith("s3://")) { throw new BadRequestException( - "--s3-copy-object is only supported between s3:// buckets"); + "--s3-copy-object is only supported between s3:// buckets"); } String dstDataFile = dstDataFileSource.get(file); if (checkNotExists.apply(dstDataFile)) { @@ -194,12 +193,12 @@ public static void run( S3.BucketPath dst = S3.bucketPath(dstDataFile); logger.info("{}: fast copying to {}", file, dstDataFile); CopyObjectRequest copyReq = - CopyObjectRequest.builder() - .sourceBucket(src.bucket()) - .sourceKey(src.path()) - .destinationBucket(dst.bucket()) - .destinationKey(dst.path()) - .build(); + CopyObjectRequest.builder() + .sourceBucket(src.bucket()) + .sourceKey(src.path()) + .destinationBucket(dst.bucket()) + .destinationKey(dst.path()) + .build(); s3ClientLazy.getValue().copyObject(copyReq); dataFileSizeInBytes = inputFile.getLength(); dataFile = dstDataFile; @@ -209,21 +208,21 @@ public static void run( continue; } OutputFile outputFile = - tableIO.newOutputFile(replacePrefix(dstDataFile, "s3://", "s3a://")); + tableIO.newOutputFile(replacePrefix(dstDataFile, "s3://", "s3a://")); // TODO: support transferTo below (note that compression, etc. might be different) // try (var d = outputFile.create()) { try (var s = inputFile.newStream()) { // s.transferTo(d); }} Parquet.ReadBuilder readBuilder = - Parquet.read(inputFile) - .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) - .project(tableSchema); // TODO: ? + Parquet.read(inputFile) + .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) + .project(tableSchema); // TODO: ? // TODO: reuseContainers? Parquet.WriteBuilder writeBuilder = - Parquet.write(outputFile) - .overwrite( - dataFileNamingStrategy == DataFileNamingStrategy.Name.INPUT_FILENAME) - .createWriterFunc(GenericParquetWriter::buildWriter) - .schema(tableSchema); + Parquet.write(outputFile) + .overwrite( + dataFileNamingStrategy == DataFileNamingStrategy.Name.INPUT_FILENAME) + .createWriterFunc(GenericParquetWriter::buildWriter) + .schema(tableSchema); logger.info("{}: copying to {}", file, dstDataFile); // file size may have changed due to different compression, etc. dataFileSizeInBytes = copy(readBuilder, writeBuilder); @@ -231,17 +230,17 @@ public static void run( } logger.info("{}: adding data file", file); long recordCount = - metadata.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum(); + metadata.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum(); MetricsConfig metricsConfig = MetricsConfig.forTable(table); Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig); df = - new DataFiles.Builder(table.spec()) - .withPath(dataFile) - .withFormat("PARQUET") - .withRecordCount(recordCount) - .withFileSizeInBytes(dataFileSizeInBytes) - .withMetrics(metrics) - .build(); + new DataFiles.Builder(table.spec()) + .withPath(dataFile) + .withFormat("PARQUET") + .withRecordCount(recordCount) + .withFileSizeInBytes(dataFileSizeInBytes) + .withMetrics(metrics) + .build(); } catch (Exception e) { // FIXME if (retryLog != null) { logger.error("{}: error (adding to retry list and continuing)", file, e); @@ -285,13 +284,13 @@ private static boolean sameSchema(Table table, Schema fileSchema) { NameMapping mapping = NameMappingParser.fromJson(nameMapping); Map tableSchemaIdToName = tableSchema.idToName(); var tableSchemaWithNameMappingApplied = - TypeUtil.assignIds( - Types.StructType.of(tableSchema.columns()), - oldId -> { - var fieldName = tableSchemaIdToName.get(oldId); - MappedField mappedField = mapping.find(fieldName); - return mappedField.id(); - }); + TypeUtil.assignIds( + Types.StructType.of(tableSchema.columns()), + oldId -> { + var fieldName = tableSchemaIdToName.get(oldId); + MappedField mappedField = mapping.find(fieldName); + return mappedField.id(); + }); sameSchema = tableSchemaWithNameMappingApplied.asStructType().equals(fileSchema.asStruct()); } else { sameSchema = tableSchema.sameSchema(fileSchema); From c065ff0aeed2366353a81e32623ad2613616d3c0 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 22 Apr 2025 15:42:22 -0400 Subject: [PATCH 4/6] Added metrics in describe command. --- examples/scratch/README.md | 6 +- .../altinity/ice/internal/cmd/Describe.java | 96 ++++++++++++++++++- .../com/altinity/ice/internal/cmd/Insert.java | 2 +- 3 files changed, 99 insertions(+), 5 deletions(-) diff --git a/examples/scratch/README.md b/examples/scratch/README.md index f11e9f95..78e7b373 100644 --- a/examples/scratch/README.md +++ b/examples/scratch/README.md @@ -19,10 +19,12 @@ local-mc mb --ignore-existing local/bucket1 # start Iceberg REST Catalog server ice-rest-catalog + or using the jar file. +/ice/examples/scratch$ java -jar ../../ice-rest-catalog/target/ice-rest-catalog-jar-with-dependencies.jar & + # insert data into catalog -ice insert flowers.iris -p \ - file://iris.parquet +ice ice insert nyc.taxis -p \ https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java index 1676d8df..5b05fa46 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java @@ -3,16 +3,23 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.Table; +import org.apache.iceberg.*; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; public final class Describe { @@ -102,6 +109,8 @@ public static void run(RESTCatalog catalog, String target, boolean json) throws } sb.append("\t\t\t\tlocation: " + snapshot.manifestListLocation() + "\n"); } + + printTableMetrics(table, sb); } if (!tableMatched) { sb.append(" []\n"); @@ -117,6 +126,89 @@ public static void run(RESTCatalog catalog, String target, boolean json) throws System.out.println(r); } + /** + * Print table metrics + * + * @param table + */ + private static void printTableMetrics(Table table, StringBuilder buffer) throws IOException { + TableScan scan = table.newScan().includeColumnStats(); + CloseableIterable tasks = scan.planFiles(); + + for (FileScanTask task : tasks) { + DataFile dataFile = task.file(); + buffer.append("\tMetrics:\n"); + buffer.append("\t File: " + dataFile.path() + "\n"); + buffer.append("\t Record Count: " + dataFile.recordCount() + "\n"); + + Map valueCounts = dataFile.valueCounts(); + Map nullCounts = dataFile.nullValueCounts(); + Map lowerBounds = dataFile.lowerBounds(); + Map upperBounds = dataFile.upperBounds(); + + for (Types.NestedField field : table.schema().columns()) { + int id = field.fieldId(); + buffer.append("\n\t Column: " + field.name() + "\n"); + buffer.append("\t valueCount = " + valueCounts.get(id) + "\n"); + buffer.append("\t nullCount = " + nullCounts.get(id) + "\n"); + + ByteBuffer lower = lowerBounds.get(id); + ByteBuffer upper = upperBounds.get(id); + + String lowerStr = + lower != null ? (String) convertByteBufferToNumber(lower, field.type()) : "null"; + String upperStr = + upper != null ? (String) convertByteBufferToNumber(upper, field.type()) : "null"; + + buffer.append("\t lowerBound = " + lowerStr + "\n"); + buffer.append("\t upperBound = " + upperStr + "\n"); + } + } + + tasks.close(); + } + + public static Object convertByteBufferToNumber(ByteBuffer buffer, Type type) { + ByteBuffer copy = buffer.duplicate(); // Don't mutate original + + if (type.isPrimitiveType()) { + switch (type.asPrimitiveType().typeId()) { + case INTEGER: + return copy.getInt(); + case LONG: + return copy.getLong(); + case FLOAT: + return copy.getFloat(); + case DOUBLE: + return copy.getDouble(); + case BOOLEAN: + return copy.get() != 0; + case DATE: + return copy.getInt(); // Often encoded as int days since epoch + case TIME: + return copy.getLong(); // microseconds since midnight + case TIMESTAMP: + return copy.getLong(); // microseconds since epoch + case STRING: + byte[] strBytes = new byte[copy.remaining()]; + copy.get(strBytes); + return new String(strBytes, StandardCharsets.UTF_8); + case FIXED: + case BINARY: + byte[] binBytes = new byte[copy.remaining()]; + copy.get(binBytes); + return binBytes; + case DECIMAL: + // Example assumes precision/scale = 10,2; adjust for your schema + BigInteger unscaled = new BigInteger(copy.array()); + return new BigDecimal(unscaled, 2); + default: + return "Unsupported type: " + type; + } + } + return "Non-primitive type: " + type; + } + private static String convertYamlToJson(String yaml) throws IOException { ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory()); Object obj = yamlReader.readValue(yaml, Object.class); diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java index 7d123e4f..0f311695 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java @@ -133,7 +133,7 @@ public static void run( RetryLog retryLog = retryListFile != null && !retryListFile.isEmpty() ? new RetryLog(retryListFile) - : null ) { + : null) { boolean atLeastOneFileAppended = false; // TODO: parallel From a2f16471451e88e9ec3e67d05b384d516e45ba80 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 22 Apr 2025 20:53:54 -0400 Subject: [PATCH 5/6] Used iceberg conversions to get lower bound and upper bound values. --- .../main/java/com/altinity/ice/internal/cmd/Describe.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java index 5b05fa46..f75ab444 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java @@ -18,6 +18,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -156,9 +157,9 @@ private static void printTableMetrics(Table table, StringBuilder buffer) throws ByteBuffer upper = upperBounds.get(id); String lowerStr = - lower != null ? (String) convertByteBufferToNumber(lower, field.type()) : "null"; + lower != null ? Conversions.fromByteBuffer(field.type(), lower).toString() : "null"; String upperStr = - upper != null ? (String) convertByteBufferToNumber(upper, field.type()) : "null"; + upper != null ? Conversions.fromByteBuffer(field.type(), upper).toString() : "null"; buffer.append("\t lowerBound = " + lowerStr + "\n"); buffer.append("\t upperBound = " + upperStr + "\n"); From 8a047b88d9ed3a223c241b339ae2ab1d2373e15e Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 22 Apr 2025 21:07:09 -0400 Subject: [PATCH 6/6] Add command line arguments to include metrics for describe command. --- ice/src/main/java/com/altinity/ice/Main.java | 8 ++- .../altinity/ice/internal/cmd/Describe.java | 69 ++++--------------- 2 files changed, 20 insertions(+), 57 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/Main.java b/ice/src/main/java/com/altinity/ice/Main.java index a45eb5a8..a26ed39f 100644 --- a/ice/src/main/java/com/altinity/ice/Main.java +++ b/ice/src/main/java/com/altinity/ice/Main.java @@ -69,10 +69,14 @@ void describe( @CommandLine.Option( names = {"--json"}, description = "Output JSON instead of YAML") - boolean json) + boolean json, + @CommandLine.Option( + names = {"--include-metrics"}, + description = "Include table metrics in the output") + boolean includeMetrics) throws IOException { try (RESTCatalog catalog = loadCatalog(this.configFile)) { - Describe.run(catalog, target, json); + Describe.run(catalog, target, json, includeMetrics); } } diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java index f75ab444..06fd03e9 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Describe.java @@ -3,10 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -19,7 +16,6 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.Conversions; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; public final class Describe { @@ -27,7 +23,8 @@ public final class Describe { private Describe() {} // TODO: refactor: the use of StringBuilder below is absolutely criminal - public static void run(RESTCatalog catalog, String target, boolean json) throws IOException { + public static void run(RESTCatalog catalog, String target, boolean json, boolean includeMetrics) + throws IOException { String targetNamespace = null; String targetTable = null; if (target != null && !target.isEmpty()) { @@ -111,7 +108,9 @@ public static void run(RESTCatalog catalog, String target, boolean json) throws sb.append("\t\t\t\tlocation: " + snapshot.manifestListLocation() + "\n"); } - printTableMetrics(table, sb); + if (includeMetrics) { + printTableMetrics(table, sb); + } } if (!tableMatched) { sb.append(" []\n"); @@ -138,20 +137,21 @@ private static void printTableMetrics(Table table, StringBuilder buffer) throws for (FileScanTask task : tasks) { DataFile dataFile = task.file(); - buffer.append("\tMetrics:\n"); - buffer.append("\t File: " + dataFile.path() + "\n"); - buffer.append("\t Record Count: " + dataFile.recordCount() + "\n"); + buffer.append("\t\t\tmetrics:\n"); + buffer.append("\t\t\t file: " + dataFile.path() + "\n"); + buffer.append("\t\t\t record_count: " + dataFile.recordCount() + "\n"); Map valueCounts = dataFile.valueCounts(); Map nullCounts = dataFile.nullValueCounts(); Map lowerBounds = dataFile.lowerBounds(); Map upperBounds = dataFile.upperBounds(); + buffer.append("\t\t\t columns:\n"); for (Types.NestedField field : table.schema().columns()) { int id = field.fieldId(); - buffer.append("\n\t Column: " + field.name() + "\n"); - buffer.append("\t valueCount = " + valueCounts.get(id) + "\n"); - buffer.append("\t nullCount = " + nullCounts.get(id) + "\n"); + buffer.append("\t\t\t " + field.name() + ":\n"); + buffer.append("\t\t\t value_count: " + valueCounts.get(id) + "\n"); + buffer.append("\t\t\t null_count: " + nullCounts.get(id) + "\n"); ByteBuffer lower = lowerBounds.get(id); ByteBuffer upper = upperBounds.get(id); @@ -161,55 +161,14 @@ private static void printTableMetrics(Table table, StringBuilder buffer) throws String upperStr = upper != null ? Conversions.fromByteBuffer(field.type(), upper).toString() : "null"; - buffer.append("\t lowerBound = " + lowerStr + "\n"); - buffer.append("\t upperBound = " + upperStr + "\n"); + buffer.append("\t\t\t lower_bound: " + lowerStr + "\n"); + buffer.append("\t\t\t upper_bound: " + upperStr + "\n"); } } tasks.close(); } - public static Object convertByteBufferToNumber(ByteBuffer buffer, Type type) { - ByteBuffer copy = buffer.duplicate(); // Don't mutate original - - if (type.isPrimitiveType()) { - switch (type.asPrimitiveType().typeId()) { - case INTEGER: - return copy.getInt(); - case LONG: - return copy.getLong(); - case FLOAT: - return copy.getFloat(); - case DOUBLE: - return copy.getDouble(); - case BOOLEAN: - return copy.get() != 0; - case DATE: - return copy.getInt(); // Often encoded as int days since epoch - case TIME: - return copy.getLong(); // microseconds since midnight - case TIMESTAMP: - return copy.getLong(); // microseconds since epoch - case STRING: - byte[] strBytes = new byte[copy.remaining()]; - copy.get(strBytes); - return new String(strBytes, StandardCharsets.UTF_8); - case FIXED: - case BINARY: - byte[] binBytes = new byte[copy.remaining()]; - copy.get(binBytes); - return binBytes; - case DECIMAL: - // Example assumes precision/scale = 10,2; adjust for your schema - BigInteger unscaled = new BigInteger(copy.array()); - return new BigDecimal(unscaled, 2); - default: - return "Unsupported type: " + type; - } - } - return "Non-primitive type: " + type; - } - private static String convertYamlToJson(String yaml) throws IOException { ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory()); Object obj = yamlReader.readValue(yaml, Object.class);