Flink 2.1: Support UUID type in Avro and Parquet readers and writers#16097
Flink 2.1: Support UUID type in Avro and Parquet readers and writers#16097joyhaldar wants to merge 7 commits into
Conversation
…iters Co-authored-by: Joy Haldar <joy.haldar@target.com>
Co-authored-by: Joy Haldar <joy.haldar@target.com>
|
Could we please check that the UUID written out by Flink is the same than the UUID written out by the Generic/Spark writer? IIRC there were some issues that the Parquet UUID reader and the Avro UUID reader were different in some cases, and I'm not sure that this was fixed |
|
CC: @mxm, @Guosmilesmile |
There was a problem hiding this comment.
Thanks for the PR! Could we add an e2e UT to prove that this type is usable? Since there is no UUID type in Flink, I looked at the type conversion and it treats UUID as BinaryType(16). From what it looks like, Flink should be able to read it, but writing would probably fail . May be we should do more than this.
| required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum precision | ||
| required(117, "time", Types.TimeType.get())); | ||
| required(117, "time", Types.TimeType.get()), | ||
| required(118, "uuid", Types.UUIDType.get())); |
There was a problem hiding this comment.
E2E UT may be something like this. This is just for my own testing purposes, and I hope it can provide some inspiration.
class TestFlinkUuidType extends CatalogTestBase {
private static final String TABLE_NAME = "test_table";
private Table icebergTable;
@TempDir private Path warehouseDir;
@Parameters(name = "catalogName={0}, baseNamespace={1}")
protected static List<Object[]> parameters() {
return Arrays.asList(
// For now hive metadata is not supported variant, so we only test hadoop catalog
new Object[] {"testhadoop", Namespace.empty()},
new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")});
}
@Override
@BeforeEach
public void before() {
super.before();
sql("CREATE DATABASE %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
Schema schema =
new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.UUIDType.get()));
icebergTable =
validationCatalog.createTable(
TableIdentifier.of(icebergNamespace, TABLE_NAME),
schema,
PartitionSpec.unpartitioned(),
ImmutableMap.of(
"format-version",
"3",
"write.format.default",
FileFormat.AVRO.name()));
}
@TestTemplate
public void testInsertUuidFromFlink() throws Exception {
UUID expectedUuid = UUID.fromString("123e4567-e89b-12d3-a456-426614174000");
sql(
"INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))",
TABLE_NAME,
"123e4567e89b12d3a456426614174000");
List<Record> records = SimpleDataUtil.tableRecords(icebergTable);
assertThat(records).hasSize(1);
assertThat(records.get(0).getField("data")).isInstanceOf(UUID.class).isEqualTo(expectedUuid);
}
@TestTemplate
public void testReadUuidFromFlink() throws Exception {
UUID expectedUuid = UUID.fromString("0f8fad5b-d9cb-469f-a165-70867728950e");
ImmutableList.Builder<Record> builder = ImmutableList.builder();
builder.add(GenericRecord.create(icebergTable.schema()).copy("id", 1, "data", expectedUuid));
new GenericAppenderHelper(icebergTable, FileFormat.PARQUET, warehouseDir)
.appendToTable(builder.build());
icebergTable.refresh();
List<GenericRowData> genericRowData = Lists.newArrayList();
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
icebergTable.newScan().planTasks()) {
for (CombinedScanTask combinedScanTask : combinedScanTasks) {
try (DataIterator<RowData> dataIterator =
ReaderUtil.createDataIterator(
combinedScanTask, icebergTable.schema(), icebergTable.schema())) {
while (dataIterator.hasNext()) {
GenericRowData rowData = (GenericRowData) dataIterator.next();
genericRowData.add(rowData);
}
}
}
}
assertThat(genericRowData).hasSize(1);
assertThat(genericRowData.get(0).getField(1)).isInstanceOf(byte[].class);
byte[] uuidBytes = (byte[]) genericRowData.get(0).getField(1);
assertThat(uuidBytes).hasSize(16);
ByteBuffer bb = ByteBuffer.wrap(uuidBytes);
UUID actualUuid = new UUID(bb.getLong(), bb.getLong());
assertThat(actualUuid).isEqualTo(expectedUuid);
}
}Cannot write incompatible dataset to table with schema:
table {
1: id: required int
2: data: optional uuid
}
Provided schema:
table {
1: id: required int
2: data: optional fixed[16]
}
Problems:
* data: fixed[16] cannot be promoted to uuid
java.lang.IllegalArgumentException: Cannot write incompatible dataset to table with schema:
table {
1: id: required int
2: data: optional uuid
}
Provided schema:
table {
1: id: required int
2: data: optional fixed[16]
}
Problems:
* data: fixed[16] cannot be promoted to uuid
at org.apache.iceberg.types.TypeUtil.checkSchemaCompatibility(TypeUtil.java:531)
at org.apache.iceberg.types.TypeUtil.validateWriteSchema(TypeUtil.java:480)
at org.apache.iceberg.flink.sink.FlinkSink.toFlinkRowType(FlinkSink.java:755)
at org.apache.iceberg.flink.sink.FlinkSink$Builder.chainIcebergOperators(FlinkSink.java:457)
at org.apache.iceberg.flink.sink.FlinkSink$Builder.append(FlinkSink.java:488)
at org.apache.iceberg.flink.IcebergTableSink.createLegacySink(IcebergTableSink.java:217)
at org.apache.iceberg.flink.IcebergTableSink.lambda$getSinkRuntimeProvider$0(IcebergTableSink.java:164)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:409)There was a problem hiding this comment.
Or perhaps we should first clarify what the proper way to handle the UUID type on the Flink write side should be?
There was a problem hiding this comment.
Thank you for your review @Guosmilesmile.
Based on your feedback, I have added TestFlinkUuidType covering:
UUIDround-trip read acrossAvro,Parquet,ORC(Generic writer -> Flink read)UUIDwrite through Flink writer across all three formats (Flink write -> Generic read)- SQL INSERT failing with
fixed[16] cannot be promoted to uuidacross all three formats
The SQL failure documents what you ran into in your comment, it's at Iceberg's schema check before reaching the writers this PR fixes IIUC.
mxm
left a comment
There was a problem hiding this comment.
I think this looks good. +1 on an e2e test to validate the wiring works as intended.
Co-authored-by: Joy Haldar <joy.haldar@target.com>
Thank you for your review @mxm. I have added
The SQL failure documents what @Guosmilesmile ran into in his comment, it's at Iceberg's schema check before reaching the writers this PR fixes IIUC. cc: @pvary |
| @TestTemplate | ||
| public void testUuidWrittenByGenericWriterParquet() throws Exception { | ||
| runReadRoundTripTest(FileFormat.PARQUET); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testUuidWrittenByGenericWriterAvro() throws Exception { | ||
| runReadRoundTripTest(FileFormat.AVRO); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testUuidWrittenByGenericWriterOrc() throws Exception { | ||
| runReadRoundTripTest(FileFormat.ORC); | ||
| } |
There was a problem hiding this comment.
Could we make FileFormat part of the parameters, so that this method only needs to appear once?
| for (DataFile dataFile : writer.dataFiles()) { | ||
| append.appendFile(dataFile); | ||
| } | ||
| append.commit(); |
| @TestTemplate | ||
| public void testWriteUuidViaFlinkWriterParquet() throws Exception { | ||
| runWriteTest(FileFormat.PARQUET); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testWriteUuidViaFlinkWriterAvro() throws Exception { | ||
| runWriteTest(FileFormat.AVRO); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testWriteUuidViaFlinkWriterOrc() throws Exception { | ||
| runWriteTest(FileFormat.ORC); | ||
| } |
| assertThatThrownBy( | ||
| () -> sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, uuidHex)) | ||
| .hasMessageContaining("fixed[16] cannot be promoted to uuid"); |
There was a problem hiding this comment.
Currently, mapping the UUID type in Flink throws an error. However, I'd prefer to make this type actually usable rather than simply catching the error. Otherwise, this type wouldn't be usable for writes in Flink — if it can only be used for reads, its value would be significantly diminished.
@pvary @mxm Flink doesn't have a UUID type. If the underlying table uses UUID and Flink SQL uses BINARY(16) as the corresponding type, would that be appropriate?
There was a problem hiding this comment.
Thank you @Guosmilesmile. IIUC, Flink writes to UUID columns do work via the Java API and the testWriteUuidViaFlinkWriter test in this PR shows this. It's the Flink SQL path that fails since Flink has no UUID type. Please correct me if I am wrong.
Fully agree the SQL path would be much more useful. Let me know how you all want to proceed and I can work on it.
There was a problem hiding this comment.
I think it would be good to make that work. It shouldn't be too hard.
Co-authored-by: Joy Haldar <joy.haldar@target.com>
mxm
left a comment
There was a problem hiding this comment.
@joyhaldar Thanks for the update.
The usual workflow is to merge the changes for the newest Flink version first, then backport to the older versions. It's starting to get hard to review the changes. Could you remove all changes apart from the Flink 2.1 changes?
|
|
||
| @TestTemplate | ||
| public void testWriteUuidViaFlinkWriter() throws Exception { | ||
| runWriteTest(); |
There was a problem hiding this comment.
Can we inline all the separate methods into the test methods? We don't reuse them across the tests.
Co-authored-by: Joy Haldar <joy.haldar@target.com>
Done, thank you for the guidance. Scoped to Flink 2.1 only. Also reverted the change to Sorry about the noise on the earlier scope, I should have started narrower. |
Co-authored-by: Joy Haldar <joy.haldar@target.com>
| TableIdentifier.of(icebergNamespace, TABLE_NAME), | ||
| SCHEMA, | ||
| PartitionSpec.unpartitioned(), | ||
| ImmutableMap.of("format-version", "3", "write.format.default", fileFormat.name())); |
There was a problem hiding this comment.
UUID is not a V3 feature, so we can drop this.
| assertThatThrownBy( | ||
| () -> sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, uuidHex)) | ||
| .hasMessageContaining("fixed[16] cannot be promoted to uuid"); |
There was a problem hiding this comment.
I think it would be good to make that work. It shouldn't be too hard.
| assertThatThrownBy( | ||
| () -> sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, uuidHex)) | ||
| .hasMessageContaining("fixed[16] cannot be promoted to uuid"); |
There was a problem hiding this comment.
It would be good if we could fix this.
| new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), FileFormat.PARQUET}, | ||
| new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), FileFormat.AVRO}, | ||
| new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), FileFormat.ORC}); |
There was a problem hiding this comment.
Do we need these namespace tests?
Co-authored-by: Joy Haldar <joy.haldar@target.com>
Summary
Flink's
RowDatastoresUUIDas bytes, but a few Flink readers/writers weren't aligned with this, causing the crashes in #14330.Fixes
FlinkAvroWriter: was sending UUID to ValueWriters.uuids(), causingClassCastExceptionFlinkParquetWriters: no UUID logicFlinkParquetReaders: no UUID logicAdds
TestFlinkUuidTypecoveringUUIDread/write acrossAvro,Parquet,ORC, and the SQL insert behavior.Addresses #14330