Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 189 additions & 4 deletions src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeIPv4andIPv6.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeObject.h>
#include <Common/DateLUTImpl.h>
#include <Processors/Chunk.h>
Expand All @@ -39,10 +40,12 @@
#include <Common/quoteString.h>
#include <Formats/insertNullAsDefaultIfNeeded.h>
#include <algorithm>
#include <bit>
#include <arrow/builder.h>
#include <arrow/array.h>
#include <arrow/util/key_value_metadata.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <base/unaligned.h>


/// UINT16 and UINT32 are processed separately, see comments in readColumnFromArrowColumn.
Expand Down Expand Up @@ -86,6 +89,25 @@ static bool emptyTimezoneAsUTC(const std::string & format_name, const FormatSett
return format_name == "Parquet" && format_settings.parquet.local_time_as_utc;
}

static bool isUUIDField(const arrow::Field & field)
{
// Check for our ClickHouse/Arrow extension name
if (field.HasMetadata())
{
auto metadata = field.metadata();
auto ext_name = metadata->Get("ARROW:extension:name");
if (ext_name.ok() && *ext_name == "arrow.uuid")
return true;

// Also check the Parquet logical type hint we added to the writer
auto pq_type = metadata->Get("PARQUET:logical_type");
if (pq_type.ok() && *pq_type == "UUID")
return true;
}
return field.type()->id() == arrow::Type::EXTENSION &&
std::static_pointer_cast<arrow::ExtensionType>(field.type())->extension_name() == "arrow.uuid";
}

/// Inserts numeric data right into internal column data to reduce an overhead
template <typename NumericType, typename VectorType = ColumnVector<NumericType>>
static ColumnWithTypeAndName readColumnWithNumericData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
Expand Down Expand Up @@ -502,6 +524,61 @@ static ColumnWithTypeAndName readColumnWithDecimalData(const std::shared_ptr<arr
return readColumnWithDecimalDataImpl<Decimal256, DecimalArray>(arrow_column, column_name, internal_type);
}

static ColumnWithTypeAndName readColumnWithUUIDFromFixedBinaryData(
const std::shared_ptr<arrow::ChunkedArray> & arrow_column,
const std::string & column_name,
DataTypePtr type_hint)
{
auto column = type_hint->createColumn();
auto & column_data = assert_cast<ColumnVector<UUID> &>(*column).getData();
column_data.reserve(arrow_column->length());

for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
{
const auto & arrow_chunk = *(arrow_column->chunk(chunk_i));
const auto & fixed_binary_array = assert_cast<const arrow::FixedSizeBinaryArray &>(arrow_chunk);

// Security check: Ensure we actually got 16 bytes per row
if (fixed_binary_array.byte_width() != sizeof(UUID))
throw Exception(ErrorCodes::INCORRECT_DATA,
"Cannot read UUID from Arrow FixedSizeBinary array with byte_width != {}", sizeof(UUID));

for (int64_t i = 0; i < fixed_binary_array.length(); ++i)
{
if (fixed_binary_array.IsNull(i))
{
// The Nullable wrapper handles the actual null map later; just insert a dummy value
column_data.emplace_back(UUID{});
}
else
{
UUID res;
std::memcpy(&res, fixed_binary_array.GetValue(i), 16);

auto * bytes = reinterpret_cast<uint8_t *>(&res);

// Only swap the 64-bit halves back if the host CPU is Little-Endian
if constexpr (std::endian::native == std::endian::little)
{
std::reverse(bytes, bytes + 8);
std::reverse(bytes + 8, bytes + 16);
}
else
{
// Big-Endian: The bytes are already in network order, but the
// 64-bit halves are in the wrong order for Arrow (low||high).
// Swap the first 8 bytes with the second 8 bytes.
std::swap_ranges(bytes, bytes + 8, bytes + 8);
}

column_data.emplace_back(res);
}
}
}

return {std::move(column), type_hint, column_name};
}

/// Creates a null bytemap from arrow's null bitmap
static ColumnPtr readByteMapFromArrowColumn(const std::shared_ptr<arrow::ChunkedArray> & arrow_column)
{
Expand Down Expand Up @@ -945,11 +1022,49 @@ static ColumnWithTypeAndName readNonNullableColumnFromArrowColumn(
}
return readColumnWithStringData<arrow::BinaryArray>(arrow_column, column_name);
}
case arrow::Type::EXTENSION:
{
// Unwrap the Extension array into raw physical chunks
auto ext_type = std::static_pointer_cast<arrow::ExtensionType>(arrow_column->type());
std::vector<std::shared_ptr<arrow::Array>> storage_chunks;

for (int i = 0; i < arrow_column->num_chunks(); ++i)
{
auto ext_array = std::static_pointer_cast<arrow::ExtensionArray>(arrow_column->chunk(i));
storage_chunks.push_back(ext_array->storage());
}

auto storage_column = std::make_shared<arrow::ChunkedArray>(storage_chunks, ext_type->storage_type());

std::shared_ptr<arrow::Field> storage_field = nullptr;

if (arrow_field)
storage_field = std::make_shared<arrow::Field>(arrow_field->name(),
ext_type->storage_type(),
arrow_field->nullable(),
arrow_field->metadata());

return readNonNullableColumnFromArrowColumn(
storage_column,
column_name,
full_column_name,
dictionary_infos,
type_hint,
is_map_nested_column,
make_nullable_if_low_cardinality,
geo_metadata,
settings,
storage_field,
parquet_columns_to_clickhouse,
clickhouse_columns_to_parquet);
}
case arrow::Type::FIXED_SIZE_BINARY:
{
if (type_hint)
DataTypePtr hint_to_check = type_hint ? removeNullable(type_hint) : nullptr;

if (hint_to_check)
{
switch (type_hint->getTypeId())
switch (hint_to_check->getTypeId())
{
case TypeIndex::Int128:
return readColumnWithBigIntegerFromFixedBinaryData<Int128>(arrow_column, column_name, type_hint);
Expand All @@ -959,11 +1074,20 @@ static ColumnWithTypeAndName readNonNullableColumnFromArrowColumn(
return readColumnWithBigIntegerFromFixedBinaryData<Int256>(arrow_column, column_name, type_hint);
case TypeIndex::UInt256:
return readColumnWithBigIntegerFromFixedBinaryData<UInt256>(arrow_column, column_name, type_hint);
case TypeIndex::UUID:
return readColumnWithUUIDFromFixedBinaryData(arrow_column, column_name, type_hint);
default:
break;
}
}

/// Correctly triggers the UUID reader for metadata-flagged columns.
if (arrow_field && isUUIDField(*arrow_field))
{
return readColumnWithUUIDFromFixedBinaryData(arrow_column, column_name, std::make_shared<DataTypeUUID>());
}

// Default fallback
return readColumnWithFixedStringData(arrow_column, column_name);
}
case arrow::Type::LARGE_STRING:
Expand Down Expand Up @@ -1461,12 +1585,73 @@ static void checkStatus(const arrow::Status & status, const String & column_name
throw Exception{ErrorCodes::UNKNOWN_EXCEPTION, "Error with a {} column '{}': {}.", format_name, column_name, status.ToString()};
}

static std::shared_ptr<arrow::DataType> unwrapArrowExtensionTypesRecursively(const std::shared_ptr<arrow::DataType> & type)
{
if (!type) return type;

if (type->id() == arrow::Type::EXTENSION)
return std::static_pointer_cast<arrow::ExtensionType>(type)->storage_type();

if (type->id() == arrow::Type::LIST)
{
auto list_type = std::static_pointer_cast<arrow::ListType>(type);
auto value_field = list_type->value_field();
return arrow::list(value_field->WithType(unwrapArrowExtensionTypesRecursively(value_field->type())));
}

if (type->id() == arrow::Type::LARGE_LIST)
{
auto large_list_type = std::static_pointer_cast<arrow::LargeListType>(type);
auto value_field = large_list_type->value_field();
return arrow::large_list(value_field->WithType(unwrapArrowExtensionTypesRecursively(value_field->type())));
}

if (type->id() == arrow::Type::FIXED_SIZE_LIST)
{
auto fixed_list = std::static_pointer_cast<arrow::FixedSizeListType>(type);
auto value_field = fixed_list->value_field();
return arrow::fixed_size_list(value_field->WithType(unwrapArrowExtensionTypesRecursively(value_field->type())), fixed_list->list_size());
}

if (type->id() == arrow::Type::MAP)
{
auto map_type = std::static_pointer_cast<arrow::MapType>(type);
auto item_field = map_type->item_field();

// arrow::map expects a DataType for the key (since keys cannot be nullable),
// but accepts a Field for the item to preserve custom nullability.
return arrow::map(unwrapArrowExtensionTypesRecursively(map_type->key_type()),
item_field->WithType(unwrapArrowExtensionTypesRecursively(item_field->type())),
map_type->keys_sorted()
);
}

if (type->id() == arrow::Type::STRUCT)
{
auto struct_type = std::static_pointer_cast<arrow::StructType>(type);
std::vector<std::shared_ptr<arrow::Field>> new_fields;
for (const auto & struct_field : struct_type->fields())
{
// WithType preserves the field name and nullable status, only changing the underlying type
new_fields.push_back(struct_field->WithType(unwrapArrowExtensionTypesRecursively(struct_field->type())));
}
return arrow::struct_(new_fields);
}

return type;
}

/// Create empty arrow column using specified field
static std::shared_ptr<arrow::ChunkedArray> createArrowColumn(const std::shared_ptr<arrow::Field> & field, const String & format_name)
{
// We unwrap the type ONLY for the `build_type`
// passed to MakeBuilder. We DO NOT mutate the `field` itself.
// This provides Arrow with the primitive storage type it needs for RAM allocation
// without destroying the logical metadata required by ClickHouse for type inference
std::shared_ptr<arrow::DataType> build_type = unwrapArrowExtensionTypesRecursively(field->type());

std::unique_ptr<arrow::ArrayBuilder> array_builder;
/// default_memory_pool() uses posix_memalign which is intercepted and counted in MemoryTracker.
arrow::Status status = MakeBuilder(arrow::default_memory_pool(), field->type(), &array_builder);
arrow::Status status = MakeBuilder(arrow::default_memory_pool(), build_type, &array_builder);
checkStatus(status, field->name(), format_name);

std::shared_ptr<arrow::Array> arrow_array;
Expand Down
Loading
Loading