Skip to content

Commit 043dbd5

Browse files
manuzhangcodex
andcommitted
feat(spec): add unknown datatype support
Add Iceberg unknown type parsing and conversions across schema, Arrow, Avro, HMS, and Glue paths. Reject non-null defaults for unknown fields and make set predicate formatting deterministic for stable tests. Co-authored-by: Codex <codex@openai.com>
1 parent 5eb342f commit 043dbd5

14 files changed

Lines changed: 280 additions & 83 deletions

File tree

crates/catalog/glue/src/schema.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,12 @@ impl SchemaVisitor for GlueSchemaBuilder {
157157

158158
fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) -> iceberg::Result<Self::T> {
159159
let glue_type = match p {
160+
PrimitiveType::Unknown => {
161+
return Err(Error::new(
162+
ErrorKind::FeatureUnsupported,
163+
format!("Conversion from {p:?} is not supported"),
164+
));
165+
}
160166
PrimitiveType::Boolean => "boolean".to_string(),
161167
PrimitiveType::Int => "int".to_string(),
162168
PrimitiveType::Long => "bigint".to_string(),

crates/catalog/hms/src/schema.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ impl SchemaVisitor for HiveSchemaBuilder {
114114

115115
fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) -> iceberg::Result<String> {
116116
let hive_type = match p {
117+
PrimitiveType::Unknown => {
118+
return Err(Error::new(
119+
ErrorKind::FeatureUnsupported,
120+
format!("Conversion from {p:?} is not supported"),
121+
));
122+
}
117123
PrimitiveType::Boolean => "boolean".to_string(),
118124
PrimitiveType::Int => "int".to_string(),
119125
PrimitiveType::Long => "bigint".to_string(),

crates/iceberg/src/arrow/reader/projection.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ impl ArrowReader {
6161
/// Nested types (struct/list/map) are flattened in Parquet's columnar format.
6262
fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
6363
match field.field_type.as_ref() {
64+
Type::Primitive(PrimitiveType::Unknown) => {}
6465
Type::Primitive(_) => {
6566
field_ids.push(field.id);
6667
}
@@ -94,6 +95,7 @@ impl ArrowReader {
9495
(Some(lhs), Some(rhs)) if lhs == rhs => true,
9596
(Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
9697
(Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
98+
(Some(PrimitiveType::Unknown), Some(_)) => true,
9799
(
98100
Some(PrimitiveType::Decimal {
99101
precision: file_precision,

crates/iceberg/src/arrow/schema.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> Resu
120120
| DataType::Utf8
121121
| DataType::LargeUtf8
122122
| DataType::Utf8View
123+
| DataType::Null
123124
| DataType::Binary
124125
| DataType::LargeBinary
125126
| DataType::BinaryView
@@ -428,6 +429,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter {
428429

429430
fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
430431
match p {
432+
DataType::Null => Ok(Type::Primitive(PrimitiveType::Unknown)),
431433
DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)),
432434
DataType::Int8 | DataType::Int16 | DataType::Int32 => {
433435
Ok(Type::Primitive(PrimitiveType::Int))
@@ -613,6 +615,9 @@ impl SchemaVisitor for ToArrowSchemaConverter {
613615
p: &crate::spec::PrimitiveType,
614616
) -> crate::Result<ArrowSchemaOrFieldOrType> {
615617
match p {
618+
crate::spec::PrimitiveType::Unknown => {
619+
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Null))
620+
}
616621
crate::spec::PrimitiveType::Boolean => {
617622
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Boolean))
618623
}
@@ -1116,6 +1121,7 @@ pub fn datum_to_arrow_type_with_ree(datum: &Datum) -> DataType {
11161121

11171122
// Match on the PrimitiveType from the Datum to determine the Arrow type
11181123
match datum.data_type() {
1124+
PrimitiveType::Unknown => make_ree(DataType::Null),
11191125
PrimitiveType::Boolean => make_ree(DataType::Boolean),
11201126
PrimitiveType::Int => make_ree(DataType::Int32),
11211127
PrimitiveType::Long => make_ree(DataType::Int64),
@@ -1915,6 +1921,13 @@ mod tests {
19151921
assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
19161922
}
19171923

1924+
{
1925+
let arrow_type = DataType::Null;
1926+
let iceberg_type = Type::Primitive(PrimitiveType::Unknown);
1927+
assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1928+
assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1929+
}
1930+
19181931
// test struct type
19191932
{
19201933
// no metadata will cause error

crates/iceberg/src/arrow/value.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
206206

207207
fn primitive(&mut self, p: &PrimitiveType, partner: &ArrayRef) -> Result<Vec<Option<Literal>>> {
208208
match p {
209+
PrimitiveType::Unknown => Ok(vec![None; partner.len()]),
209210
PrimitiveType::Boolean => {
210211
let array = partner
211212
.as_any()
@@ -629,6 +630,7 @@ pub(crate) fn create_primitive_array_single_element(
629630
prim_lit: &Option<PrimitiveLiteral>,
630631
) -> Result<ArrayRef> {
631632
match (data_type, prim_lit) {
633+
(DataType::Null, _) => Ok(Arc::new(arrow_array::NullArray::new(1))),
632634
(DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => {
633635
Ok(Arc::new(BooleanArray::from(vec![*v])))
634636
}

crates/iceberg/src/avro/schema.rs

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl SchemaVisitor for SchemaToAvroSchema {
7474
record.name = Name::from(format!("r{}", field.id).as_str());
7575
}
7676

77-
if !field.required {
77+
if !field.required && !matches!(field_schema, AvroSchema::Null) {
7878
field_schema = avro_optional(field_schema)?;
7979
}
8080

@@ -126,7 +126,7 @@ impl SchemaVisitor for SchemaToAvroSchema {
126126
record.name = Name::from(format!("r{}", list.element_field.id).as_str());
127127
}
128128

129-
if !list.element_field.required {
129+
if !list.element_field.required && !matches!(field_schema, AvroSchema::Null) {
130130
field_schema = avro_optional(field_schema)?;
131131
}
132132

@@ -147,7 +147,7 @@ impl SchemaVisitor for SchemaToAvroSchema {
147147
) -> Result<AvroSchemaOrField> {
148148
let key_field_schema = key_value.unwrap_left();
149149
let mut value_field_schema = value.unwrap_left();
150-
if !map.value_field.required {
150+
if !map.value_field.required && !matches!(value_field_schema, AvroSchema::Null) {
151151
value_field_schema = avro_optional(value_field_schema)?;
152152
}
153153

@@ -222,6 +222,7 @@ impl SchemaVisitor for SchemaToAvroSchema {
222222

223223
fn primitive(&mut self, p: &PrimitiveType) -> Result<AvroSchemaOrField> {
224224
let avro_schema = match p {
225+
PrimitiveType::Unknown => AvroSchema::Null,
225226
PrimitiveType::Boolean => AvroSchema::Boolean,
226227
PrimitiveType::Int => AvroSchema::Int,
227228
PrimitiveType::Long => AvroSchema::Long,
@@ -304,6 +305,10 @@ pub(crate) fn avro_decimal_schema(precision: usize, scale: usize) -> Result<Avro
304305
}
305306

306307
fn avro_optional(avro_schema: AvroSchema) -> Result<AvroSchema> {
308+
if matches!(avro_schema, AvroSchema::Null) {
309+
return Ok(AvroSchema::Null);
310+
}
311+
307312
Ok(AvroSchema::Union(UnionSchema::new(vec![
308313
AvroSchema::Null,
309314
avro_schema,
@@ -440,10 +445,11 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {
440445
let field_id =
441446
Self::get_element_id_from_attributes(&avro_field.custom_attributes, FIELD_ID_PROP)?;
442447

443-
let optional = is_avro_optional(&avro_field.schema);
448+
let optional = is_avro_optional(&avro_field.schema)
449+
|| matches!(&avro_field.schema, AvroSchema::Null);
444450

445-
let mut field =
446-
NestedField::new(field_id, &avro_field.name, field_type.unwrap(), !optional);
451+
let field_type = field_type.unwrap_or(Type::Primitive(PrimitiveType::Unknown));
452+
let mut field = NestedField::new(field_id, &avro_field.name, field_type, !optional);
447453

448454
if let Some(doc) = &avro_field.doc {
449455
field = field.with_doc(doc);
@@ -475,18 +481,21 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {
475481
}
476482

477483
if options.len() == 1 {
478-
Ok(Some(options.remove(0).unwrap()))
484+
Ok(options
485+
.remove(0)
486+
.or(Some(Type::Primitive(PrimitiveType::Unknown))))
479487
} else {
480488
Ok(Some(options.remove(1).unwrap()))
481489
}
482490
}
483491

484492
fn array(&mut self, array: &ArraySchema, item: Option<Type>) -> Result<Self::T> {
485493
let element_field_id = Self::get_element_id_from_attributes(&array.attributes, ELEMENT_ID)?;
494+
let item = item.unwrap_or(Type::Primitive(PrimitiveType::Unknown));
486495
let element_field = NestedField::list_element(
487496
element_field_id,
488-
item.unwrap(),
489-
!is_avro_optional(&array.items),
497+
item,
498+
!is_avro_optional(&array.items) && !matches!(array.items.as_ref(), AvroSchema::Null),
490499
)
491500
.into();
492501
Ok(Some(Type::List(ListType { element_field })))
@@ -497,10 +506,11 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {
497506
let key_field =
498507
NestedField::map_key_element(key_field_id, Type::Primitive(PrimitiveType::String));
499508
let value_field_id = Self::get_element_id_from_attributes(&map.attributes, VALUE_ID)?;
509+
let value = value.unwrap_or(Type::Primitive(PrimitiveType::Unknown));
500510
let value_field = NestedField::map_value_element(
501511
value_field_id,
502-
value.unwrap(),
503-
!is_avro_optional(&map.types),
512+
value,
513+
!is_avro_optional(&map.types) && !matches!(map.types.as_ref(), AvroSchema::Null),
504514
);
505515
Ok(Some(Type::Map(MapType {
506516
key_field: key_field.into(),
@@ -550,12 +560,7 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {
550560
"Can't convert avro map schema, missing key schema.",
551561
)
552562
})?;
553-
let value = value.ok_or_else(|| {
554-
Error::new(
555-
ErrorKind::DataInvalid,
556-
"Can't convert avro map schema, missing value schema.",
557-
)
558-
})?;
563+
let value = value.unwrap_or(Type::Primitive(PrimitiveType::Unknown));
559564
let key_id = Self::get_element_id_from_attributes(
560565
&array.fields[0].custom_attributes,
561566
FIELD_ID_PROP,
@@ -568,7 +573,8 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {
568573
let value_field = NestedField::map_value_element(
569574
value_id,
570575
value,
571-
!is_avro_optional(&array.fields[1].schema),
576+
!is_avro_optional(&array.fields[1].schema)
577+
&& !matches!(&array.fields[1].schema, AvroSchema::Null),
572578
);
573579
Ok(Some(Type::Map(MapType {
574580
key_field: key_field.into(),
@@ -650,6 +656,25 @@ mod tests {
650656
assert_eq!(iceberg_schema, converted_avro_converted_iceberg_schema);
651657
}
652658

659+
#[test]
660+
fn test_unknown_type_schema_conversion() {
661+
let schema = Schema::builder()
662+
.with_fields(vec![
663+
NestedField::optional(1, "empty", PrimitiveType::Unknown.into()).into(),
664+
])
665+
.build()
666+
.unwrap();
667+
668+
let avro_schema = schema_to_avro_schema("table", &schema).unwrap();
669+
let AvroSchema::Record(record) = &avro_schema else {
670+
panic!("expected avro record schema");
671+
};
672+
assert!(matches!(record.fields[0].schema, AvroSchema::Null));
673+
assert_eq!(record.fields[0].default, Some(Value::Null));
674+
675+
assert_eq!(schema, avro_schema_to_schema(&avro_schema).unwrap());
676+
}
677+
653678
#[test]
654679
fn test_manifest_file_v1_schema() {
655680
let fields = vec![

crates/iceberg/src/expr/predicate.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,12 @@ impl<T: Bind> Bind for SetExpression<T> {
310310

311311
impl<T: Display + Debug> Display for SetExpression<T> {
312312
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
313-
let mut literal_strs = self.literals.iter().map(|l| format!("{l}"));
313+
let mut literals = self.literals.iter().collect_vec();
314+
literals.sort_by(|left, right| {
315+
left.partial_cmp(right)
316+
.unwrap_or_else(|| left.to_string().cmp(&right.to_string()))
317+
});
318+
let mut literal_strs = literals.into_iter().map(ToString::to_string);
314319

315320
write!(f, "{} {} ({})", self.term, self.op, literal_strs.join(", "))
316321
}
@@ -1363,7 +1368,7 @@ mod tests {
13631368
let schema = table_schema_simple();
13641369
let expr = Reference::new("bar").is_in([Datum::int(10), Datum::int(20)]);
13651370
let bound_expr = expr.bind(schema, true).unwrap();
1366-
assert_eq!(&format!("{bound_expr}"), "bar IN (20, 10)");
1371+
assert_eq!(&format!("{bound_expr}"), "bar IN (10, 20)");
13671372
test_bound_predicate_serialize_diserialize(bound_expr);
13681373
}
13691374

@@ -1398,7 +1403,7 @@ mod tests {
13981403
let schema = table_schema_simple();
13991404
let expr = Reference::new("bar").is_not_in([Datum::int(10), Datum::int(20)]);
14001405
let bound_expr = expr.bind(schema, true).unwrap();
1401-
assert_eq!(&format!("{bound_expr}"), "bar NOT IN (20, 10)");
1406+
assert_eq!(&format!("{bound_expr}"), "bar NOT IN (10, 20)");
14021407
test_bound_predicate_serialize_diserialize(bound_expr);
14031408
}
14041409

@@ -1571,13 +1576,7 @@ mod tests {
15711576
let expected_bound = expected_predicate.bind(schema, true).unwrap();
15721577

15731578
assert_eq!(result, expected_bound);
1574-
// Note: HashSet order may vary, so we check that it contains the expected format
1575-
let result_str = format!("{result}");
1576-
assert!(
1577-
result_str.contains("bar NOT IN")
1578-
&& result_str.contains("10")
1579-
&& result_str.contains("20")
1580-
);
1579+
assert_eq!(&format!("{result}"), "bar NOT IN (10, 20)");
15811580
}
15821581

15831582
#[test]

0 commit comments

Comments
 (0)