Skip to content

Flink 2.1: Support UUID type in Avro and Parquet readers and writers#16097

Draft
joyhaldar wants to merge 7 commits into
apache:mainfrom
joyhaldar:flink-uuid-support
Draft

Flink 2.1: Support UUID type in Avro and Parquet readers and writers#16097
joyhaldar wants to merge 7 commits into
apache:mainfrom
joyhaldar:flink-uuid-support

Conversation

@joyhaldar
Copy link
Copy Markdown
Contributor

@joyhaldar joyhaldar commented Apr 24, 2026

Summary

Flink's RowData stores UUID as bytes, but a few Flink readers/writers weren't aligned with this, causing the crashes in #14330.

Fixes

  • FlinkAvroWriter: was sending UUID to ValueWriters.uuids(), causing ClassCastException
  • FlinkParquetWriters: no UUID logic
  • FlinkParquetReaders: no UUID logic

Adds TestFlinkUuidType covering UUID read/write across Avro, Parquet, ORC, and the SQL insert behavior.

Addresses #14330

…iters

Co-authored-by: Joy Haldar <joy.haldar@target.com>
Co-authored-by: Joy Haldar <joy.haldar@target.com>
@joyhaldar joyhaldar changed the title Core, Flink 2.1: Support UUID type in Avro and Parquet readers and writers Core, Flink: Support UUID type in Avro and Parquet readers and writers Apr 24, 2026
@pvary
Copy link
Copy Markdown
Contributor

pvary commented Apr 24, 2026

Could we please check that the UUID written out by Flink is the same than the UUID written out by the Generic/Spark writer?

IIRC there were some issues that the Parquet UUID reader and the Avro UUID reader were different in some cases, and I'm not sure that this was fixed

@pvary
Copy link
Copy Markdown
Contributor

pvary commented Apr 24, 2026

CC: @mxm, @Guosmilesmile

Copy link
Copy Markdown
Contributor

@Guosmilesmile Guosmilesmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Could we add an e2e UT to prove that this type is usable? Since there is no UUID type in Flink, I looked at the type conversion and it treats UUID as BinaryType(16). From what it looks like, Flink should be able to read it, but writing would probably fail . May be we should do more than this.

case UUID:
// UUID length is 16
return new BinaryType(16);

required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum precision
required(117, "time", Types.TimeType.get()));
required(117, "time", Types.TimeType.get()),
required(118, "uuid", Types.UUIDType.get()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E2E UT may be something like this. This is just for my own testing purposes, and I hope it can provide some inspiration.

class TestFlinkUuidType extends CatalogTestBase {

  private static final String TABLE_NAME = "test_table";
  private Table icebergTable;
  @TempDir private Path warehouseDir;

  @Parameters(name = "catalogName={0}, baseNamespace={1}")
  protected static List<Object[]> parameters() {
    return Arrays.asList(
        // For now hive metadata is not supported variant, so we only test hadoop catalog
        new Object[] {"testhadoop", Namespace.empty()},
        new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")});
  }

  @Override
  @BeforeEach
  public void before() {
    super.before();
    sql("CREATE DATABASE %s", flinkDatabase);
    sql("USE CATALOG %s", catalogName);
    sql("USE %s", DATABASE);

    Schema schema =
            new Schema(
                    Types.NestedField.required(1, "id", Types.IntegerType.get()),
                    Types.NestedField.optional(2, "data", Types.UUIDType.get()));
    icebergTable =
            validationCatalog.createTable(
                    TableIdentifier.of(icebergNamespace, TABLE_NAME),
                    schema,
                    PartitionSpec.unpartitioned(),
                    ImmutableMap.of(
                            "format-version",
                            "3",
                            "write.format.default",
                            FileFormat.AVRO.name()));
  }

  @TestTemplate
  public void testInsertUuidFromFlink() throws Exception {
    UUID expectedUuid = UUID.fromString("123e4567-e89b-12d3-a456-426614174000");
    sql(
            "INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))",
            TABLE_NAME,
            "123e4567e89b12d3a456426614174000");

    List<Record> records = SimpleDataUtil.tableRecords(icebergTable);
    assertThat(records).hasSize(1);
    assertThat(records.get(0).getField("data")).isInstanceOf(UUID.class).isEqualTo(expectedUuid);
  }

  @TestTemplate
  public void testReadUuidFromFlink() throws Exception {
    UUID expectedUuid = UUID.fromString("0f8fad5b-d9cb-469f-a165-70867728950e");
    ImmutableList.Builder<Record> builder = ImmutableList.builder();
    builder.add(GenericRecord.create(icebergTable.schema()).copy("id", 1, "data", expectedUuid));
    new GenericAppenderHelper(icebergTable, FileFormat.PARQUET, warehouseDir)
            .appendToTable(builder.build());
    icebergTable.refresh();

    List<GenericRowData> genericRowData = Lists.newArrayList();
    try (CloseableIterable<CombinedScanTask> combinedScanTasks =
                 icebergTable.newScan().planTasks()) {
      for (CombinedScanTask combinedScanTask : combinedScanTasks) {
        try (DataIterator<RowData> dataIterator =
                     ReaderUtil.createDataIterator(
                             combinedScanTask, icebergTable.schema(), icebergTable.schema())) {
          while (dataIterator.hasNext()) {
            GenericRowData rowData = (GenericRowData) dataIterator.next();
            genericRowData.add(rowData);
          }
        }
      }
    }

    assertThat(genericRowData).hasSize(1);
    assertThat(genericRowData.get(0).getField(1)).isInstanceOf(byte[].class);
    byte[] uuidBytes = (byte[]) genericRowData.get(0).getField(1);
    assertThat(uuidBytes).hasSize(16);

    ByteBuffer bb = ByteBuffer.wrap(uuidBytes);
    UUID actualUuid = new UUID(bb.getLong(), bb.getLong());
    assertThat(actualUuid).isEqualTo(expectedUuid);
  }
}
Cannot write incompatible dataset to table with schema:
table {
  1: id: required int
  2: data: optional uuid
}
Provided schema:
table {
  1: id: required int
  2: data: optional fixed[16]
}
Problems:
* data: fixed[16] cannot be promoted to uuid
java.lang.IllegalArgumentException: Cannot write incompatible dataset to table with schema:
table {
  1: id: required int
  2: data: optional uuid
}
Provided schema:
table {
  1: id: required int
  2: data: optional fixed[16]
}
Problems:
* data: fixed[16] cannot be promoted to uuid
	at org.apache.iceberg.types.TypeUtil.checkSchemaCompatibility(TypeUtil.java:531)
	at org.apache.iceberg.types.TypeUtil.validateWriteSchema(TypeUtil.java:480)
	at org.apache.iceberg.flink.sink.FlinkSink.toFlinkRowType(FlinkSink.java:755)
	at org.apache.iceberg.flink.sink.FlinkSink$Builder.chainIcebergOperators(FlinkSink.java:457)
	at org.apache.iceberg.flink.sink.FlinkSink$Builder.append(FlinkSink.java:488)
	at org.apache.iceberg.flink.IcebergTableSink.createLegacySink(IcebergTableSink.java:217)
	at org.apache.iceberg.flink.IcebergTableSink.lambda$getSinkRuntimeProvider$0(IcebergTableSink.java:164)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:409)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or perhaps we should first clarify what the proper way to handle the UUID type on the Flink write side should be?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review @Guosmilesmile.

Based on your feedback, I have added TestFlinkUuidType covering:

  • UUID round-trip read across Avro, Parquet, ORC (Generic writer -> Flink read)
  • UUID write through Flink writer across all three formats (Flink write -> Generic read)
  • SQL INSERT failing with fixed[16] cannot be promoted to uuid across all three formats

The SQL failure documents what you ran into in your comment, it's at Iceberg's schema check before reaching the writers this PR fixes IIUC.

Copy link
Copy Markdown
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good. +1 on an e2e test to validate the wiring works as intended.

Co-authored-by: Joy Haldar <joy.haldar@target.com>
@joyhaldar
Copy link
Copy Markdown
Contributor Author

I think this looks good. +1 on an e2e test to validate the wiring works as intended.

Thank you for your review @mxm.

I have added TestFlinkUuidType covering:

  • UUID round-trip read across Avro, Parquet, ORC (Generic writer -> Flink read)
  • UUID write through Flink writer across all three formats (Flink write -> Generic read)
  • SQL INSERT failing with fixed[16] cannot be promoted to uuid across all three formats

The SQL failure documents what @Guosmilesmile ran into in his comment, it's at Iceberg's schema check before reaching the writers this PR fixes IIUC.

cc: @pvary

Comment on lines +127 to +140
@TestTemplate
public void testUuidWrittenByGenericWriterParquet() throws Exception {
runReadRoundTripTest(FileFormat.PARQUET);
}

@TestTemplate
public void testUuidWrittenByGenericWriterAvro() throws Exception {
runReadRoundTripTest(FileFormat.AVRO);
}

@TestTemplate
public void testUuidWrittenByGenericWriterOrc() throws Exception {
runReadRoundTripTest(FileFormat.ORC);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make FileFormat part of the parameters, so that this method only needs to appear once?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

for (DataFile dataFile : writer.dataFiles()) {
append.appendFile(dataFile);
}
append.commit();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +182 to +195
@TestTemplate
public void testWriteUuidViaFlinkWriterParquet() throws Exception {
runWriteTest(FileFormat.PARQUET);
}

@TestTemplate
public void testWriteUuidViaFlinkWriterAvro() throws Exception {
runWriteTest(FileFormat.AVRO);
}

@TestTemplate
public void testWriteUuidViaFlinkWriterOrc() throws Exception {
runWriteTest(FileFormat.ORC);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +211 to +213
assertThatThrownBy(
() -> sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, uuidHex))
.hasMessageContaining("fixed[16] cannot be promoted to uuid");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, mapping the UUID type in Flink throws an error. However, I'd prefer to make this type actually usable rather than simply catching the error. Otherwise, this type wouldn't be usable for writes in Flink — if it can only be used for reads, its value would be significantly diminished.

@pvary @mxm Flink doesn't have a UUID type. If the underlying table uses UUID and Flink SQL uses BINARY(16) as the corresponding type, would that be appropriate?

Copy link
Copy Markdown
Contributor Author

@joyhaldar joyhaldar Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Guosmilesmile. IIUC, Flink writes to UUID columns do work via the Java API and the testWriteUuidViaFlinkWriter test in this PR shows this. It's the Flink SQL path that fails since Flink has no UUID type. Please correct me if I am wrong.

Fully agree the SQL path would be much more useful. Let me know how you all want to proceed and I can work on it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to make that work. It shouldn't be too hard.

Co-authored-by: Joy Haldar <joy.haldar@target.com>
Copy link
Copy Markdown
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joyhaldar Thanks for the update.

The usual workflow is to merge the changes for the newest Flink version first, then backport to the older versions. It's starting to get hard to review the changes. Could you remove all changes apart from the Flink 2.1 changes?


@TestTemplate
public void testWriteUuidViaFlinkWriter() throws Exception {
runWriteTest();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we inline all the separate methods into the test methods? We don't reuse them across the tests.

Copy link
Copy Markdown
Contributor Author

@joyhaldar joyhaldar Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Co-authored-by: Joy Haldar <joy.haldar@target.com>
@joyhaldar joyhaldar changed the title Core, Flink: Support UUID type in Avro and Parquet readers and writers Flink 2.1: Support UUID type in Avro and Parquet readers and writers Apr 27, 2026
@joyhaldar
Copy link
Copy Markdown
Contributor Author

@joyhaldar Thanks for the update.

The usual workflow is to merge the changes for the newest Flink version first, then backport to the older versions. It's starting to get hard to review the changes. Could you remove all changes apart from the Flink 2.1 changes?

Done, thank you for the guidance. Scoped to Flink 2.1 only.

Also reverted the change to DataTestBase.SUPPORTED_PRIMITIVES for now, since adding UUID there would regress v2.0 and v1.20 until they receive the same fixes. If this PR is accepted, I will follow up with backport PRs for v2.0 and v1.20, and then a separate PR to add UUID to SUPPORTED_PRIMITIVES.

Sorry about the noise on the earlier scope, I should have started narrower.

Co-authored-by: Joy Haldar <joy.haldar@target.com>
TableIdentifier.of(icebergNamespace, TABLE_NAME),
SCHEMA,
PartitionSpec.unpartitioned(),
ImmutableMap.of("format-version", "3", "write.format.default", fileFormat.name()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UUID is not a V3 feature, so we can drop this.

Comment on lines +211 to +213
assertThatThrownBy(
() -> sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, uuidHex))
.hasMessageContaining("fixed[16] cannot be promoted to uuid");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to make that work. It shouldn't be too hard.

Comment on lines +193 to +195
assertThatThrownBy(
() -> sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, uuidHex))
.hasMessageContaining("fixed[16] cannot be promoted to uuid");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good if we could fix this.

Comment on lines +81 to +83
new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), FileFormat.PARQUET},
new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), FileFormat.AVRO},
new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), FileFormat.ORC});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these namespace tests?

Co-authored-by: Joy Haldar <joy.haldar@target.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants