Skip to content

Commit f77386b

Browse files
committed
Fix table schema check when inserting
1 parent 91dccca commit f77386b

2 files changed

Lines changed: 60 additions & 31 deletions

File tree

ice/src/main/java/com/altinity/ice/internal/cmd/CreateTable.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@
33
import com.altinity.ice.internal.io.Input;
44
import com.altinity.ice.internal.parquet.Metadata;
55
import java.io.IOException;
6+
import java.util.Map;
67
import org.apache.iceberg.PartitionSpec;
78
import org.apache.iceberg.Schema;
9+
import org.apache.iceberg.TableProperties;
810
import org.apache.iceberg.catalog.TableIdentifier;
911
import org.apache.iceberg.exceptions.AlreadyExistsException;
1012
import org.apache.iceberg.io.InputFile;
13+
import org.apache.iceberg.mapping.MappingUtil;
14+
import org.apache.iceberg.mapping.NameMapping;
15+
import org.apache.iceberg.mapping.NameMappingParser;
1116
import org.apache.iceberg.parquet.ParquetSchemaUtil;
1217
import org.apache.iceberg.rest.RESTCatalog;
1318
import org.apache.parquet.schema.MessageType;
@@ -28,8 +33,15 @@ public static void run(
2833
MessageType type = Metadata.read(inputFile).getFileMetaData().getSchema();
2934
Schema fileSchema = ParquetSchemaUtil.convert(type);
3035
try {
36+
Map<String, String> props = null;
37+
if (!ParquetSchemaUtil.hasIds(type)) {
38+
// force name-based resolution instead of position-based resolution
39+
NameMapping mapping = MappingUtil.create(fileSchema);
40+
String mappingJson = NameMappingParser.toJson(mapping);
41+
props = Map.of(TableProperties.DEFAULT_NAME_MAPPING, mappingJson);
42+
}
3143
// if we don't set location, it's automatically set to $warehouse/$namespace/$table
32-
catalog.createTable(nsTable, fileSchema, PartitionSpec.unpartitioned(), location, null);
44+
catalog.createTable(nsTable, fileSchema, PartitionSpec.unpartitioned(), location, props);
3345
} catch (AlreadyExistsException e) {
3446
if (ignoreAlreadyExists) {
3547
return;

ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,40 @@
44
import com.altinity.ice.internal.io.Input;
55
import com.altinity.ice.internal.parquet.Metadata;
66
import java.io.IOException;
7+
import java.util.Map;
78
import java.util.Set;
89
import java.util.stream.Collectors;
910
import java.util.stream.StreamSupport;
10-
import org.apache.iceberg.*;
11+
import org.apache.iceberg.AppendFiles;
12+
import org.apache.iceberg.ContentFile;
13+
import org.apache.iceberg.DataFile;
14+
import org.apache.iceberg.DataFiles;
15+
import org.apache.iceberg.Schema;
16+
import org.apache.iceberg.Snapshot;
17+
import org.apache.iceberg.Table;
18+
import org.apache.iceberg.TableProperties;
19+
import org.apache.iceberg.Transaction;
1120
import org.apache.iceberg.catalog.TableIdentifier;
1221
import org.apache.iceberg.data.Record;
1322
import org.apache.iceberg.data.parquet.GenericParquetReaders;
1423
import org.apache.iceberg.data.parquet.GenericParquetWriter;
1524
import org.apache.iceberg.exceptions.BadRequestException;
16-
import org.apache.iceberg.io.*;
17-
import org.apache.iceberg.mapping.MappingUtil;
25+
import org.apache.iceberg.io.CloseableIterable;
26+
import org.apache.iceberg.io.FileAppender;
27+
import org.apache.iceberg.io.FileIO;
28+
import org.apache.iceberg.io.InputFile;
29+
import org.apache.iceberg.io.OutputFile;
30+
import org.apache.iceberg.mapping.MappedField;
1831
import org.apache.iceberg.mapping.NameMapping;
1932
import org.apache.iceberg.mapping.NameMappingParser;
2033
import org.apache.iceberg.parquet.Parquet;
2134
import org.apache.iceberg.parquet.ParquetSchemaUtil;
2235
import org.apache.iceberg.rest.RESTCatalog;
36+
import org.apache.iceberg.types.TypeUtil;
37+
import org.apache.iceberg.types.Types;
2338
import org.apache.parquet.hadoop.metadata.BlockMetaData;
2439
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
40+
import org.apache.parquet.schema.MessageType;
2541
import org.slf4j.Logger;
2642
import org.slf4j.LoggerFactory;
2743

@@ -51,7 +67,6 @@ public static void run(
5167
}
5268
Transaction tx = table.newTransaction();
5369
AppendFiles appendOp = tx.newAppend();
54-
boolean tableNameMappingUpdated = false;
5570
Set<String> dataFilesSet = null;
5671
try (FileIO inputIO = Input.newIO(dataFiles[0], table);
5772
FileIO tableIO = table.io()) {
@@ -61,9 +76,12 @@ public static void run(
6176
InputFile inputFile = Input.newFile(file, catalog, inputIO == null ? tableIO : inputIO);
6277
ParquetMetadata metadata = Metadata.read(inputFile);
6378

64-
if (!table
65-
.schema()
66-
.sameSchema(ParquetSchemaUtil.convert(metadata.getFileMetaData().getSchema()))) {
79+
Schema tableSchema = table.schema();
80+
81+
MessageType type = metadata.getFileMetaData().getSchema();
82+
Schema fileSchema = ParquetSchemaUtil.convert(type); // nameMapping applied (when present)
83+
84+
if (!sameSchema(table, fileSchema)) {
6785
throw new BadRequestException(
6886
String.format("%s's schema doesn't match table's schema", file));
6987
}
@@ -90,15 +108,14 @@ public static void run(
90108
// s.transferTo(d); }}
91109
Parquet.ReadBuilder readBuilder =
92110
Parquet.read(inputFile)
93-
.createReaderFunc(
94-
fileSchema -> GenericParquetReaders.buildReader(table.schema(), fileSchema))
95-
.project(table.schema());
111+
.createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s))
112+
.project(tableSchema); // TODO: ?
96113
// TODO: reuseContainers?
97114

98115
Parquet.WriteBuilder writeBuilder =
99116
Parquet.write(outputFile)
100117
.createWriterFunc(GenericParquetWriter::buildWriter)
101-
.schema(table.schema());
118+
.schema(tableSchema);
102119

103120
// file size may have changed due to different compression, etc.
104121
dataFileSizeInBytes = copy(readBuilder, writeBuilder);
@@ -118,16 +135,6 @@ public static void run(
118135
throw new BadRequestException(
119136
String.format("%s is already part of the table", dataFile));
120137
}
121-
if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null
122-
&& !tableNameMappingUpdated
123-
&& !ParquetSchemaUtil.hasIds(metadata.getFileMetaData().getSchema())) {
124-
// attempts to updateProperties as part of tx result in
125-
// java.lang.IllegalStateException: Cannot create new UpdateProperties: last operation
126-
// has not committed
127-
// TODO: move it out of here
128-
setNameMapping(table, dryRun);
129-
tableNameMappingUpdated = true;
130-
}
131138
dataFileSizeInBytes = inputFile.getLength();
132139
}
133140
long recordCount =
@@ -144,24 +151,34 @@ public static void run(
144151
}
145152
appendOp.commit();
146153
if (!dryRun) {
154+
// TODO: log
147155
tx.commitTransaction();
148156
} else {
149157
logger.warn("Table.Transaction commit skipped (--dry-run)");
150158
}
151159
}
152160
}
153161

154-
private static void setNameMapping(Table table, boolean dryRyn) {
155-
// forces name-based resolution instead of position-based resolution
156-
NameMapping mapping = MappingUtil.create(table.schema());
157-
String mappingJson = NameMappingParser.toJson(mapping);
158-
UpdateProperties updatePropertiesOp = table.updateProperties();
159-
updatePropertiesOp.set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson);
160-
if (!dryRyn) {
161-
updatePropertiesOp.commit();
162+
private static boolean sameSchema(Table table, Schema fileSchema) {
163+
boolean sameSchema;
164+
Schema tableSchema = table.schema();
165+
String nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
166+
if (nameMapping != null && !nameMapping.isEmpty()) {
167+
NameMapping mapping = NameMappingParser.fromJson(nameMapping);
168+
Map<Integer, String> tableSchemaIdToName = tableSchema.idToName();
169+
var tableSchemaWithNameMappingApplied =
170+
TypeUtil.assignIds(
171+
Types.StructType.of(tableSchema.columns()),
172+
oldId -> {
173+
var fieldName = tableSchemaIdToName.get(oldId);
174+
MappedField mappedField = mapping.find(fieldName);
175+
return mappedField.id();
176+
});
177+
sameSchema = tableSchemaWithNameMappingApplied.asStructType().equals(fileSchema.asStruct());
162178
} else {
163-
logger.warn("Table.UpdateProperties commit skipped (--dry-run)");
179+
sameSchema = tableSchema.sameSchema(fileSchema);
164180
}
181+
return sameSchema;
165182
}
166183

167184
private static long copy(Parquet.ReadBuilder rb, Parquet.WriteBuilder wb) throws IOException {

0 commit comments

Comments
 (0)