From 9ec8ad2454af9a961ccbd2e54438bf0ef0a20888 Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Mon, 11 May 2026 19:08:30 +0200 Subject: [PATCH 1/2] feat: HuggingFace Hub storage backend and CDC table properties MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds two opt-in capabilities for storing Iceberg tables on HuggingFace Hub with content-defined chunking for efficient deduplication. New `opendal-hf` feature on `iceberg-storage-opendal` (off by default, included in `opendal-all`) that wires HuggingFace's OpenDAL service into `FileIO`. Paths use the form: hf:////[@]/ where `repo_type` must be one of `models`, `datasets`, `spaces`, or `buckets` (XET-backed object storage). The prefix is mandatory — there is no implicit default. Configuration is passed via `FileIOBuilder` properties: - `hf.token` — API token (required for private repos / writes) - `hf.endpoint` — Hub endpoint, defaults to https://huggingface.co - `hf.revision` — fallback revision when a path has no `@` `OpenDalResolvingStorage` recognises the `hf` scheme and lazily constructs a per-scheme storage instance. `delete_stream` groups paths by `/` so that bucket and dataset paths to the same repo do not share an operator. New table properties under the `write.parquet.content-defined-chunking.*` namespace (matching PyIceberg convention): - `write.parquet.content-defined-chunking.enabled` (bool, default false) - `write.parquet.content-defined-chunking.min-chunk-size` (bytes, default 256 KiB) - `write.parquet.content-defined-chunking.max-chunk-size` (bytes, default 1 MiB) - `write.parquet.content-defined-chunking.norm-level` (i32, default 0) CDC is opt-in: it activates only when `enabled = "true"` is set explicitly. Size/level properties without the enabled flag are parsed and stored but have no effect. Defaults match parquet's own `CdcOptions` defaults so the Iceberg layer stays in sync. CDC options are applied directly in the DataFusion physical write plan. --- bindings/python/pyproject.toml | 2 +- .../python/tests/test_huggingface_and_cdc.py | 174 ++++++++ crates/iceberg/src/io/storage/config/hf.rs | 104 +++++ crates/iceberg/src/io/storage/config/mod.rs | 2 + crates/iceberg/src/spec/table_properties.rs | 189 ++++++++- .../datafusion/src/physical_plan/write.rs | 13 +- crates/storage/opendal/Cargo.toml | 3 +- crates/storage/opendal/src/hf.rs | 348 ++++++++++++++++ crates/storage/opendal/src/lib.rs | 57 ++- crates/storage/opendal/src/resolving.rs | 15 +- .../storage/opendal/tests/file_io_hf_test.rs | 376 ++++++++++++++++++ 11 files changed, 1270 insertions(+), 13 deletions(-) create mode 100644 bindings/python/tests/test_huggingface_and_cdc.py create mode 100644 crates/iceberg/src/io/storage/config/hf.rs create mode 100644 crates/storage/opendal/src/hf.rs create mode 100644 crates/storage/opendal/tests/file_io_hf_test.rs diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml index 0933bdc5e9..c35526e155 100644 --- a/bindings/python/pyproject.toml +++ b/bindings/python/pyproject.toml @@ -60,7 +60,7 @@ filterwarnings = [ dev = [ "maturin>=1.0,<2.0", "pytest>=8.3.2", - "datafusion==52.*", + "datafusion==53.*", # Set a PyIceberg lower bound; otherwise the resolver may select the # unrelated 0.0.2 stub. # Keep pyarrow direct because current pyiceberg[pyarrow] releases depend on diff --git a/bindings/python/tests/test_huggingface_and_cdc.py b/bindings/python/tests/test_huggingface_and_cdc.py new file mode 100644 index 0000000000..d51ccd22ee --- /dev/null +++ b/bindings/python/tests/test_huggingface_and_cdc.py @@ -0,0 +1,174 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Tests for HuggingFace Hub URI support and CDC (content-defined chunking) options. + +CDC options are standard Iceberg table properties and work in both Rust and PyIceberg +automatically — no special API is required beyond setting string properties. + +HF credentials are passed as file_io_properties to IcebergDataFusionTable. +Tests requiring a real HF token are skipped when HF_OPENDAL_TOKEN is not set. +""" + +import os +import pytest +import pyarrow as pa +import datafusion +from datafusion import SessionContext +from packaging.version import Version +from pyiceberg.catalog import load_catalog +from pyiceberg_core.datafusion import IcebergDataFusionTable + +requires_datafusion_53 = pytest.mark.skipif( + Version(datafusion.__version__) < Version("53.0.0"), + reason="IcebergDataFusionTable requires datafusion>=53 for FFI compatibility", +) + +# Property name constants — same strings as the Rust TableProperties constants. +HF_TOKEN = "hf.token" +HF_ENDPOINT = "hf.endpoint" +HF_REVISION = "hf.revision" + +PARQUET_CDC_ENABLED = "write.parquet.content-defined-chunking.enabled" +PARQUET_CDC_MIN_CHUNK_SIZE = "write.parquet.content-defined-chunking.min-chunk-size" +PARQUET_CDC_MAX_CHUNK_SIZE = "write.parquet.content-defined-chunking.max-chunk-size" +PARQUET_CDC_NORM_LEVEL = "write.parquet.content-defined-chunking.norm-level" + + +# --------------------------------------------------------------------------- +# CDC tests — run without any external credentials +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def local_catalog(tmp_path_factory: pytest.TempPathFactory): + warehouse = tmp_path_factory.mktemp("cdc_warehouse") + return load_catalog( + "default", + **{ + "uri": f"sqlite:///{warehouse}/pyiceberg_catalog.db", + "warehouse": f"file://{warehouse}", + }, + ) + + +@pytest.fixture(scope="module") +def sample_table() -> pa.Table: + return pa.table( + { + "id": pa.array(list(range(1000)), type=pa.int32()), + "payload": pa.array( + [f"row-{i:06d}" for i in range(1000)], type=pa.large_utf8() + ), + } + ) + + +def test_cdc_table_properties_are_persisted(local_catalog, sample_table): + """Table properties with CDC options are stored and returned as-is.""" + local_catalog.create_namespace_if_not_exists("cdc_ns") + + # Use values that differ from parquet defaults (256 KiB min, 1 MiB max, 0 norm). + tbl = local_catalog.create_table_if_not_exists( + "cdc_ns.cdc_persist", + schema=sample_table.schema, + properties={ + PARQUET_CDC_MIN_CHUNK_SIZE: "65536", + PARQUET_CDC_MAX_CHUNK_SIZE: "524288", + PARQUET_CDC_NORM_LEVEL: "2", + }, + ) + + props = tbl.properties + assert props.get(PARQUET_CDC_MIN_CHUNK_SIZE) == "65536" + assert props.get(PARQUET_CDC_MAX_CHUNK_SIZE) == "524288" + assert props.get(PARQUET_CDC_NORM_LEVEL) == "2" + + +def test_cdc_write_via_pyiceberg(local_catalog, sample_table): + """PyIceberg tbl.append() writes parquet with CDC options when properties are set.""" + local_catalog.create_namespace_if_not_exists("cdc_ns") + + tbl = local_catalog.create_table_if_not_exists( + "cdc_ns.cdc_pyiceberg_write", + schema=sample_table.schema, + properties={PARQUET_CDC_ENABLED: "true"}, + ) + tbl.append(sample_table) + + result = tbl.scan().to_arrow() + assert len(result) == len(sample_table) + + +@requires_datafusion_53 +def test_cdc_write_and_read_via_datafusion(local_catalog, sample_table): + """A table with CDC properties can be written and read back via DataFusion.""" + local_catalog.create_namespace_if_not_exists("cdc_ns") + + tbl = local_catalog.create_table_if_not_exists( + "cdc_ns.cdc_write_read", + schema=sample_table.schema, + properties={PARQUET_CDC_ENABLED: "true"}, + ) + tbl.append(sample_table) + + provider = IcebergDataFusionTable( + identifier=tbl.name(), + metadata_location=tbl.metadata_location, + file_io_properties=tbl.io.properties, + ) + + ctx = SessionContext() + ctx.register_table("cdc_table", provider) + assert ctx.table("cdc_table").count() == len(sample_table) + + + +# --------------------------------------------------------------------------- +# HF tests — skipped when HF_OPENDAL_TOKEN is not set +# --------------------------------------------------------------------------- + +HF_TOKEN_ENV = "HF_OPENDAL_TOKEN" +HF_TABLE_METADATA_ENV = "HF_OPENDAL_TABLE_METADATA" + +requires_hf = pytest.mark.skipif( + not os.environ.get(HF_TOKEN_ENV) or not os.environ.get(HF_TABLE_METADATA_ENV), + reason=f"{HF_TOKEN_ENV} or {HF_TABLE_METADATA_ENV} not set", +) + + +@requires_hf +@requires_datafusion_53 +def test_hf_file_io_properties_accepted(): + """IcebergDataFusionTable accepts hf.token in file_io_properties without auth/URI errors. + + HF_OPENDAL_TABLE_METADATA must point to a valid metadata JSON file; the test + verifies that HF credentials are wired correctly and the table can be read. + """ + token = os.environ[HF_TOKEN_ENV] + metadata_location = os.environ[HF_TABLE_METADATA_ENV] + + provider = IcebergDataFusionTable( + identifier=["default", "hf_test"], + metadata_location=metadata_location, + file_io_properties={HF_TOKEN: token}, + ) + + ctx = SessionContext() + ctx.register_table("hf_table", provider) + # A successful count proves the HF backend can read the table's data files. + assert ctx.table("hf_table").count() >= 0 diff --git a/crates/iceberg/src/io/storage/config/hf.rs b/crates/iceberg/src/io/storage/config/hf.rs new file mode 100644 index 0000000000..fdb79a5af2 --- /dev/null +++ b/crates/iceberg/src/io/storage/config/hf.rs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! HuggingFace Hub storage configuration. + +use serde::{Deserialize, Serialize}; +use typed_builder::TypedBuilder; + +use super::StorageConfig; +use crate::Result; + +/// HuggingFace Hub authentication token. +pub const HF_TOKEN: &str = "hf.token"; +/// HuggingFace Hub endpoint URL. Defaults to `https://huggingface.co`. +pub const HF_ENDPOINT: &str = "hf.endpoint"; +/// Default git revision/branch for all paths that don't specify one. Defaults to `main`. +pub const HF_REVISION: &str = "hf.revision"; + +/// HuggingFace Hub storage configuration. +/// +/// Repo type, repo ID, and revision are normally encoded in the file path URI +/// (`hf:////[@]/`, where `` +/// is one of `models`, `datasets`, `spaces`, or `buckets`). +/// The fields here provide credentials and a default revision fallback. +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] +pub struct HfConfig { + /// HuggingFace Hub API token (required for private repos and write access). + #[builder(default, setter(strip_option, into))] + pub token: Option, + /// HuggingFace Hub endpoint. Defaults to `https://huggingface.co`. + #[builder(default, setter(strip_option, into))] + pub endpoint: Option, + /// Default revision to use when a path URI does not specify one. + #[builder(default, setter(strip_option, into))] + pub revision: Option, +} + +impl TryFrom<&StorageConfig> for HfConfig { + type Error = crate::Error; + + fn try_from(config: &StorageConfig) -> Result { + let props = config.props(); + let mut cfg = HfConfig::default(); + if let Some(token) = props.get(HF_TOKEN) { + cfg.token = Some(token.clone()); + } + if let Some(endpoint) = props.get(HF_ENDPOINT) { + cfg.endpoint = Some(endpoint.clone()); + } + if let Some(revision) = props.get(HF_REVISION) { + cfg.revision = Some(revision.clone()); + } + Ok(cfg) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_hf_config_builder() { + let cfg = HfConfig::builder() + .token("hf_mytoken") + .endpoint("https://huggingface.co") + .revision("dev") + .build(); + assert_eq!(cfg.token.as_deref(), Some("hf_mytoken")); + assert_eq!(cfg.endpoint.as_deref(), Some("https://huggingface.co")); + assert_eq!(cfg.revision.as_deref(), Some("dev")); + } + + #[test] + fn test_hf_config_from_storage_config() { + let storage_config = StorageConfig::new() + .with_prop(HF_TOKEN, "hf_abc123") + .with_prop(HF_ENDPOINT, "https://huggingface.co"); + + let cfg = HfConfig::try_from(&storage_config).unwrap(); + assert_eq!(cfg.token.as_deref(), Some("hf_abc123")); + assert_eq!(cfg.endpoint.as_deref(), Some("https://huggingface.co")); + } + + #[test] + fn test_hf_config_empty() { + let cfg = HfConfig::try_from(&StorageConfig::new()).unwrap(); + assert_eq!(cfg.token, None); + assert_eq!(cfg.endpoint, None); + } +} diff --git a/crates/iceberg/src/io/storage/config/mod.rs b/crates/iceberg/src/io/storage/config/mod.rs index cbdb537303..2350aab6dd 100644 --- a/crates/iceberg/src/io/storage/config/mod.rs +++ b/crates/iceberg/src/io/storage/config/mod.rs @@ -32,6 +32,7 @@ mod azdls; mod gcs; +mod hf; mod oss; mod s3; @@ -39,6 +40,7 @@ use std::collections::HashMap; pub use azdls::*; pub use gcs::*; +pub use hf::*; pub use oss::*; pub use s3::*; use serde::{Deserialize, Serialize}; diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index a3d4e7fdaa..dc21da565c 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -22,8 +22,6 @@ use std::str::FromStr; use crate::compression::CompressionCodec; use crate::error::{Error, ErrorKind, Result}; -// Helper function to parse a property from a HashMap -// If the property is not found, use the default value fn parse_property( properties: &HashMap, key: &str, @@ -121,6 +119,15 @@ pub struct TableProperties { /// Whether garbage collection is enabled on drop. /// When `false`, data files will not be deleted when a table is dropped. pub gc_enabled: bool, + /// Whether content-defined chunking is enabled. + /// `true` only when `write.parquet.content-defined-chunking.enabled = "true"`. + pub cdc_enabled: bool, + /// Content-defined chunking minimum chunk size in bytes. + pub cdc_min_chunk_size: usize, + /// Content-defined chunking maximum chunk size in bytes. + pub cdc_max_chunk_size: usize, + /// Content-defined chunking normalization level (gearhash bit adjustment). + pub cdc_norm_level: i32, } impl TableProperties { @@ -226,6 +233,26 @@ impl TableProperties { pub const PROPERTY_GC_ENABLED: &str = "gc.enabled"; /// Default value for gc.enabled pub const PROPERTY_GC_ENABLED_DEFAULT: bool = true; + + /// Enable content-defined chunking with parquet defaults (or per-property overrides). + pub const PROPERTY_PARQUET_CDC_ENABLED: &str = "write.parquet.content-defined-chunking.enabled"; + /// Default value for content-defined chunking enabled. + pub const PROPERTY_PARQUET_CDC_ENABLED_DEFAULT: bool = false; + /// Minimum chunk size in bytes for content-defined chunking. + pub const PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE: &str = + "write.parquet.content-defined-chunking.min-chunk-size"; + /// Default matches `parquet::file::properties::DEFAULT_CDC_MIN_CHUNK_SIZE`. + pub const PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE_DEFAULT: usize = 256 * 1024; + /// Maximum chunk size in bytes for content-defined chunking. + pub const PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE: &str = + "write.parquet.content-defined-chunking.max-chunk-size"; + /// Default matches `parquet::file::properties::DEFAULT_CDC_MAX_CHUNK_SIZE`. + pub const PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE_DEFAULT: usize = 1024 * 1024; + /// Normalization level (gearhash bit adjustment) for content-defined chunking. + pub const PROPERTY_PARQUET_CDC_NORM_LEVEL: &str = + "write.parquet.content-defined-chunking.norm-level"; + /// Default matches `parquet::file::properties::DEFAULT_CDC_NORM_LEVEL`. + pub const PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT: i32 = 0; } impl TryFrom<&HashMap> for TableProperties { @@ -275,6 +302,26 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_GC_ENABLED, TableProperties::PROPERTY_GC_ENABLED_DEFAULT, )?, + cdc_enabled: parse_property( + props, + TableProperties::PROPERTY_PARQUET_CDC_ENABLED, + TableProperties::PROPERTY_PARQUET_CDC_ENABLED_DEFAULT, + )?, + cdc_min_chunk_size: parse_property( + props, + TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE, + TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE_DEFAULT, + )?, + cdc_max_chunk_size: parse_property( + props, + TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE, + TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE_DEFAULT, + )?, + cdc_norm_level: parse_property( + props, + TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL, + TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT, + )?, }) } } @@ -583,4 +630,142 @@ mod tests { ); } } + + #[test] + fn test_cdc_disabled_by_default() { + let props = HashMap::new(); + let tp = TableProperties::try_from(&props).unwrap(); + assert!(!tp.cdc_enabled); + } + + #[test] + fn test_cdc_enabled_via_flag() { + let props = HashMap::from([( + TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(), + "true".to_string(), + )]); + let tp = TableProperties::try_from(&props).unwrap(); + assert!(tp.cdc_enabled); + assert_eq!(tp.cdc_min_chunk_size, 256 * 1024); + assert_eq!(tp.cdc_max_chunk_size, 1024 * 1024); + assert_eq!(tp.cdc_norm_level, 0); + } + + #[test] + fn test_cdc_size_props_alone_do_not_enable() { + let props = HashMap::from([( + TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(), + "262144".to_string(), + )]); + let tp = TableProperties::try_from(&props).unwrap(); + assert!(!tp.cdc_enabled); + } + + #[test] + fn test_cdc_custom_values() { + let props = HashMap::from([ + ( + TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(), + "true".to_string(), + ), + ( + TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(), + "200000".to_string(), + ), + ( + TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE.to_string(), + "900000".to_string(), + ), + ( + TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(), + "1".to_string(), + ), + ]); + let tp = TableProperties::try_from(&props).unwrap(); + assert!(tp.cdc_enabled); + assert_eq!(tp.cdc_min_chunk_size, 200000); + assert_eq!(tp.cdc_max_chunk_size, 900000); + assert_eq!(tp.cdc_norm_level, 1); + } + + #[test] + fn test_cdc_partial_override() { + let props = HashMap::from([ + ( + TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(), + "true".to_string(), + ), + ( + TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(), + "2".to_string(), + ), + ]); + let tp = TableProperties::try_from(&props).unwrap(); + assert!(tp.cdc_enabled); + assert_eq!(tp.cdc_min_chunk_size, 256 * 1024); + assert_eq!(tp.cdc_max_chunk_size, 1024 * 1024); + assert_eq!(tp.cdc_norm_level, 2); + } + + #[test] + fn test_cdc_negative_norm_level() { + let props = HashMap::from([ + ( + TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(), + "true".to_string(), + ), + ( + TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(), + "-2".to_string(), + ), + ]); + let tp = TableProperties::try_from(&props).unwrap(); + assert_eq!(tp.cdc_norm_level, -2); + } + + #[test] + fn test_cdc_invalid_min_chunk_size() { + let props = HashMap::from([ + ( + TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(), + "true".to_string(), + ), + ( + TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(), + "not_a_number".to_string(), + ), + ]); + let err = TableProperties::try_from(&props).unwrap_err(); + assert!( + err.to_string().contains( + "Invalid value for write.parquet.content-defined-chunking.min-chunk-size" + ) + ); + } + + #[test] + fn test_cdc_invalid_norm_level() { + let props = HashMap::from([ + ( + TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(), + "true".to_string(), + ), + ( + TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(), + "not_a_number".to_string(), + ), + ]); + let err = TableProperties::try_from(&props).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid value for write.parquet.content-defined-chunking.norm-level") + ); + } + + #[test] + fn test_cdc_no_properties() { + let props = HashMap::from([("some.other.property".to_string(), "value".to_string())]); + let tp = TableProperties::try_from(&props).unwrap(); + assert!(!tp.cdc_enabled); + } } diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 3b227e20fa..282d1005ba 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -45,7 +45,7 @@ use iceberg::writer::file_writer::location_generator::{ }; use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; use iceberg::{Error, ErrorKind}; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{CdcOptions, WriterPropertiesBuilder}; use uuid::Uuid; use crate::physical_plan::DATA_FILES_COL_NAME; @@ -226,8 +226,17 @@ impl ExecutionPlan for IcebergWriteExec { } // Create data file writer builder + let cdc_options = table_props.cdc_enabled.then_some(CdcOptions { + min_chunk_size: table_props.cdc_min_chunk_size, + max_chunk_size: table_props.cdc_max_chunk_size, + norm_level: table_props.cdc_norm_level, + }); + let writer_properties = WriterPropertiesBuilder::default() + .set_content_defined_chunking(cdc_options) + .build(); + let parquet_file_writer_builder = ParquetWriterBuilder::new_with_match_mode( - WriterProperties::default(), + writer_properties, self.table.metadata().current_schema().clone(), FieldMatchMode::Name, ); diff --git a/crates/storage/opendal/Cargo.toml b/crates/storage/opendal/Cargo.toml index 80eeaa3d04..4d26d25162 100644 --- a/crates/storage/opendal/Cargo.toml +++ b/crates/storage/opendal/Cargo.toml @@ -28,11 +28,12 @@ keywords = ["iceberg", "opendal", "storage"] [features] default = ["opendal-memory", "opendal-fs", "opendal-s3"] -opendal-all = ["opendal-memory", "opendal-fs", "opendal-s3", "opendal-gcs", "opendal-oss", "opendal-azdls"] +opendal-all = ["opendal-memory", "opendal-fs", "opendal-s3", "opendal-gcs", "opendal-oss", "opendal-azdls", "opendal-hf"] opendal-azdls = ["opendal/services-azdls"] opendal-fs = ["opendal/services-fs"] opendal-gcs = ["opendal/services-gcs"] +opendal-hf = ["opendal/services-hf"] opendal-memory = ["opendal/services-memory"] opendal-oss = ["opendal/services-oss"] opendal-s3 = ["opendal/services-s3", "reqsign-aws-v4", "reqsign-core"] diff --git a/crates/storage/opendal/src/hf.rs b/crates/storage/opendal/src/hf.rs new file mode 100644 index 0000000000..a7ca2d884d --- /dev/null +++ b/crates/storage/opendal/src/hf.rs @@ -0,0 +1,348 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! HuggingFace Hub storage backend. + +use std::collections::HashMap; + +use iceberg::io::{HF_ENDPOINT, HF_REVISION, HF_TOKEN}; +use iceberg::{Error, ErrorKind, Result}; +use opendal::{Configurator, Operator, OperatorUri}; + +use crate::utils::from_opendal_error; + +// --------------------------------------------------------------------------- +// Minimal URI parser — extracts only what the caller needs. +// TODO: remove once opendal-service-hf exports its URI parser publicly. +// --------------------------------------------------------------------------- + +/// Repository type of a HuggingFace Hub repository. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum HfRepoType { + /// Model repository (`models/` prefix). + Model, + /// Dataset repository (`datasets/` prefix). + Dataset, + /// Spaces application repository (`spaces/` prefix). + Space, + /// XET-backed object-storage bucket (`buckets/` prefix). + Bucket, +} + +impl HfRepoType { + /// Parse a repo-type keyword (singular or plural) into the corresponding variant. + fn parse(s: &str) -> Option { + match s.to_lowercase().replace(' ', "").as_str() { + "model" | "models" => Some(Self::Model), + "dataset" | "datasets" => Some(Self::Dataset), + "space" | "spaces" => Some(Self::Space), + "bucket" | "buckets" => Some(Self::Bucket), + _ => None, + } + } + + fn canonical(self) -> &'static str { + match self { + Self::Model => "models", + Self::Dataset => "datasets", + Self::Space => "spaces", + Self::Bucket => "buckets", + } + } +} + +/// Parsed HuggingFace URI: `hf:///[@][/]`. +/// +/// `repo_type` must be explicitly specified — there is no implicit default. +/// Only the fields required by this crate are stored; revision is consumed +/// during parsing but not retained. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct HfUri { + pub repo_type: HfRepoType, + /// e.g. `"user/my-repo"`. + pub repo_id: String, + /// Path within the repository, e.g. `"train/data.parquet"`. Empty at repo root. + pub path: String, +} + +impl HfUri { + /// Parse a full `hf://…` URI or the bare path portion (without scheme). + /// Returns `None` if the URI does not begin with a recognized repo-type prefix + /// (`models/`, `datasets/`, `spaces/`, or `buckets/`). + pub(crate) fn parse(full_uri: &str) -> Option { + let s = full_uri.strip_prefix("hf://").unwrap_or(full_uri); + if s.is_empty() { + return None; + } + + // Require an explicit repo_type prefix — no implicit default. + let (first, rest) = s.split_once('/')?; + let repo_type = HfRepoType::parse(first)?; + let s = rest; + + // Remaining: `[@][/]` + let (repo_id, path) = if s.contains('/') { + // Check if `@` appears in the first two slash-segments (the repo_id portion). + // This distinguishes "user/repo@rev/file" from "user/repo/path/@file". + let first_two = s.splitn(3, '/').take(2).collect::>().join("/"); + if first_two.contains('@') { + let (repo_id, rev_and_path) = s.split_once('@').unwrap(); + let rev_and_path = rev_and_path.replace("%2F", "/"); + (repo_id.to_string(), path_after_revision(&rev_and_path)) + } else { + let segs: Vec<_> = s.splitn(3, '/').collect(); + let repo_id = format!("{}/{}", segs[0], segs[1]); + let path = segs.get(2).copied().unwrap_or("").to_string(); + (repo_id, path) + } + } else if let Some((repo_id, _)) = s.split_once('@') { + (repo_id.to_string(), String::new()) + } else { + (s.to_string(), String::new()) + }; + + Some(Self { + repo_type, + repo_id, + path, + }) + } +} + +/// Given the string after `@`, extract the path-in-repo, correctly skipping +/// multi-segment special refs (`refs/convert/parquet`, `refs/pr/N`). +/// These are the only two multi-segment special ref prefixes in HF's git model. +fn path_after_revision(rev_and_path: &str) -> String { + if !rev_and_path.contains('/') { + return String::new(); + } + if let Some(rest) = rev_and_path.strip_prefix("refs/convert/") { + return rest + .find('/') + .map_or(String::new(), |i| rest[i + 1..].to_string()); + } + if let Some(rest) = rev_and_path.strip_prefix("refs/pr/") { + return rest + .find('/') + .map_or(String::new(), |i| rest[i + 1..].to_string()); + } + rev_and_path + .split_once('/') + .map(|(_, path)| path.to_string()) + .unwrap_or_default() +} + +// --------------------------------------------------------------------------- +// Public helpers used by lib.rs +// --------------------------------------------------------------------------- + +/// Parse iceberg `StorageConfig` properties into an opendal [`opendal::services::HfConfig`]. +pub(crate) fn hf_config_parse(m: HashMap) -> Result { + let mut cfg = opendal::services::HfConfig::default(); + if let Some(token) = m.get(HF_TOKEN) { + cfg.token = Some(token.clone()); + } + if let Some(endpoint) = m.get(HF_ENDPOINT) { + cfg.endpoint = Some(endpoint.clone()); + } + if let Some(revision) = m.get(HF_REVISION) { + cfg.revision = Some(revision.clone()); + } + Ok(cfg) +} + +/// Build an [`Operator`] for the given `hf://…` path and return it together with +/// the relative path-in-repo. +/// +/// URI parsing is delegated to opendal's [`HfConfig::from_uri`]. The base config +/// provides fallback values for `revision` and `endpoint`; the `token` is always +/// taken from the base config and never from the URI. +pub(crate) fn hf_config_build<'a>( + cfg: &opendal::services::HfConfig, + path: &'a str, +) -> Result<(Operator, &'a str)> { + let uri = OperatorUri::new(path, Vec::<(String, String)>::new()).map_err(|e| { + Error::new(ErrorKind::DataInvalid, format!("Invalid hf url: {path}")).with_source(e) + })?; + + let mut hf_cfg = opendal::services::HfConfig::from_uri(&uri).map_err(|e| { + Error::new(ErrorKind::DataInvalid, format!("Invalid hf url: {path}")).with_source(e) + })?; + + // Token must come from config only, never from the URI. + hf_cfg.token = cfg.token.clone(); + + if hf_cfg.endpoint.is_none() { + hf_cfg.endpoint = cfg.endpoint.clone(); + } + if hf_cfg.revision.is_none() { + hf_cfg.revision = cfg.revision.clone(); + } + + let parsed = HfUri::parse(path) + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, format!("Invalid hf url: {path}")))?; + let relative_path = &path[path.len() - parsed.path.len()..]; + + let op = Operator::from_config(hf_cfg) + .map_err(from_opendal_error)? + .finish(); + Ok((op, relative_path)) +} + +/// Returns a stable cache key for `delete_stream` batching: `"/"` +/// (e.g. `"buckets/user/my-repo"`), without revision. +/// Repo type is included so bucket and dataset paths to the same repo use separate operators. +/// Falls back to the full path so that unparsable paths never share an operator accidentally. +pub(crate) fn hf_batch_key(path: &str) -> String { + HfUri::parse(path) + .map(|u| format!("{}/{}", u.repo_type.canonical(), u.repo_id)) + .unwrap_or_else(|| path.to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse(uri: &str) -> HfUri { + HfUri::parse(uri).unwrap_or_else(|| panic!("parse failed for {uri:?}")) + } + + #[test] + fn test_model_prefix() { + let u = parse("hf://models/user/my-model/path/to/file.parquet"); + assert_eq!(u.repo_type, HfRepoType::Model); + assert_eq!(u.repo_id, "user/my-model"); + assert_eq!(u.path, "path/to/file.parquet"); + } + + #[test] + fn test_dataset_prefix() { + let u = parse("hf://datasets/user/my-dataset/train/data.parquet"); + assert_eq!(u.repo_type, HfRepoType::Dataset); + assert_eq!(u.repo_id, "user/my-dataset"); + assert_eq!(u.path, "train/data.parquet"); + } + + #[test] + fn test_bucket_prefix() { + let u = parse("hf://buckets/myorg/my-bucket/iceberg/metadata/v1.json"); + assert_eq!(u.repo_type, HfRepoType::Bucket); + assert_eq!(u.repo_id, "myorg/my-bucket"); + assert_eq!(u.path, "iceberg/metadata/v1.json"); + } + + #[test] + fn test_revision() { + let u = parse("hf://datasets/user/my-dataset@main/train/data.parquet"); + assert_eq!(u.repo_type, HfRepoType::Dataset); + assert_eq!(u.repo_id, "user/my-dataset"); + assert_eq!(u.path, "train/data.parquet"); + } + + #[test] + fn test_refs_convert_revision() { + let u = parse("hf://datasets/squad@refs/convert/parquet/data.parquet"); + assert_eq!(u.path, "data.parquet"); + } + + #[test] + fn test_refs_pr_revision() { + let u = parse("hf://models/user/repo@refs/pr/10/file.txt"); + assert_eq!(u.path, "file.txt"); + } + + #[test] + fn test_encoded_revision() { + let u = parse("hf://models/user/repo@refs%2Fpr%2F10/file.txt"); + assert_eq!(u.path, "file.txt"); + } + + #[test] + fn test_no_path() { + let u = parse("hf://models/user/my-model"); + assert_eq!(u.repo_id, "user/my-model"); + assert_eq!(u.path, ""); + } + + #[test] + fn test_at_in_path_not_revision() { + let u = parse("hf://models/user/repo/path/@not-a-revision.txt"); + assert_eq!(u.path, "path/@not-a-revision.txt"); + } + + #[test] + fn test_single_segment_repo_id() { + // Without revision and path: unambiguous. + let u = parse("hf://models/gpt2"); + assert_eq!(u.repo_type, HfRepoType::Model); + assert_eq!(u.repo_id, "gpt2"); + assert_eq!(u.path, ""); + + // With explicit revision: single-segment repos with paths are parsed correctly. + let u = parse("hf://models/gpt2@main/config.json"); + assert_eq!(u.repo_type, HfRepoType::Model); + assert_eq!(u.repo_id, "gpt2"); + assert_eq!(u.path, "config.json"); + } + + #[test] + fn test_batch_key() { + assert_eq!( + hf_batch_key("hf://datasets/user/repo@main/path/file.parquet"), + "datasets/user/repo" + ); + assert_eq!( + hf_batch_key("hf://buckets/org/bucket/data/file.parquet"), + "buckets/org/bucket" + ); + // Same repo_id, different repo_type → different keys. + assert_ne!( + hf_batch_key("hf://buckets/user/repo/file"), + hf_batch_key("hf://datasets/user/repo/file"), + ); + } + + #[test] + fn test_invalid_uri() { + assert!(HfUri::parse("hf://").is_none()); + // bare repo-type, no repo_id + assert!(HfUri::parse("hf://datasets").is_none()); + // missing repo-type prefix + assert!(HfUri::parse("hf://user/my-model").is_none()); + assert!(HfUri::parse("hf://gpt2").is_none()); + // unrecognized repo-type prefix + assert!(HfUri::parse("hf://repos/user/repo/file").is_none()); + } + + #[test] + fn test_hf_config_build_relative_path() { + let cfg = opendal::services::HfConfig::default(); + + let (_, rel) = hf_config_build( + &cfg, + "hf://datasets/user/my-dataset@main/train/data.parquet", + ) + .unwrap(); + assert_eq!(rel, "train/data.parquet"); + + let (_, rel) = hf_config_build(&cfg, "hf://models/user/my-model/config.json").unwrap(); + assert_eq!(rel, "config.json"); + + let (_, rel) = hf_config_build(&cfg, "hf://models/user/my-model").unwrap(); + assert_eq!(rel, ""); + } +} diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs index 65deaa5f44..67113833f6 100644 --- a/crates/storage/opendal/src/lib.rs +++ b/crates/storage/opendal/src/lib.rs @@ -51,6 +51,14 @@ cfg_if! { } } +cfg_if! { + if #[cfg(feature = "opendal-hf")] { + mod hf; + use hf::*; + use opendal::services::HfConfig; + } +} + cfg_if! { if #[cfg(feature = "opendal-fs")] { mod fs; @@ -120,6 +128,9 @@ pub enum OpenDalStorageFactory { /// Azure Data Lake Storage factory. #[cfg(feature = "opendal-azdls")] Azdls, + /// HuggingFace Hub storage factory. + #[cfg(feature = "opendal-hf")] + Hf, } #[typetag::serde(name = "OpenDalStorageFactory")] @@ -152,6 +163,10 @@ impl StorageFactory for OpenDalStorageFactory { OpenDalStorageFactory::Azdls => Ok(Arc::new(OpenDalStorage::Azdls { config: azdls_config_parse(config.props().clone())?.into(), })), + #[cfg(feature = "opendal-hf")] + OpenDalStorageFactory::Hf => Ok(Arc::new(OpenDalStorage::Hf { + config: hf_config_parse(config.props().clone())?.into(), + })), #[cfg(all( not(feature = "opendal-memory"), not(feature = "opendal-fs"), @@ -159,6 +174,7 @@ impl StorageFactory for OpenDalStorageFactory { not(feature = "opendal-gcs"), not(feature = "opendal-oss"), not(feature = "opendal-azdls"), + not(feature = "opendal-hf"), ))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -218,6 +234,16 @@ pub enum OpenDalStorage { /// Azure DLS configuration. config: Arc, }, + /// HuggingFace Hub storage variant. + /// + /// Accepts paths of the form + /// `hf:////[@]/`, + /// where `` must be one of `models`, `datasets`, `spaces`, or `buckets`. + #[cfg(feature = "opendal-hf")] + Hf { + /// HuggingFace Hub configuration (token + endpoint). + config: Arc, + }, } impl OpenDalStorage { @@ -311,12 +337,15 @@ impl OpenDalStorage { } #[cfg(feature = "opendal-azdls")] OpenDalStorage::Azdls { config } => azdls_create_operator(path, config)?, + #[cfg(feature = "opendal-hf")] + OpenDalStorage::Hf { config } => hf_config_build(config, path)?, #[cfg(all( not(feature = "opendal-s3"), not(feature = "opendal-fs"), not(feature = "opendal-gcs"), not(feature = "opendal-oss"), not(feature = "opendal-azdls"), + not(feature = "opendal-hf"), ))] _ => { return Err(Error::new( @@ -332,6 +361,21 @@ impl OpenDalStorage { Ok((operator, relative_path)) } + /// Returns a cache key used by `delete_stream` to group paths by storage operator. + /// + /// For most backends the URL host (bucket name) is sufficient. For HF the host + /// encodes the repo type, not the repo identity, so a more specific key is used. + fn batch_key_for_path(&self, path: &str) -> String { + match self { + #[cfg(feature = "opendal-hf")] + OpenDalStorage::Hf { .. } => hf_batch_key(path), + _ => url::Url::parse(path) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(), + } + } + /// Extracts the relative path from an absolute path without building an operator. /// /// This is a lightweight alternative to [`create_operator`](Self::create_operator) for cases @@ -408,12 +452,20 @@ impl OpenDalStorage { let relative_path_len = azure_path.path.len(); Ok(&path[path.len() - relative_path_len..]) } + #[cfg(feature = "opendal-hf")] + OpenDalStorage::Hf { .. } => { + let parsed = hf::HfUri::parse(path).ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, format!("Invalid hf url: {path}")) + })?; + Ok(&path[path.len() - parsed.path.len()..]) + } #[cfg(all( not(feature = "opendal-s3"), not(feature = "opendal-fs"), not(feature = "opendal-gcs"), not(feature = "opendal-oss"), not(feature = "opendal-azdls"), + not(feature = "opendal-hf"), ))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -493,10 +545,7 @@ impl Storage for OpenDalStorage { let mut deleters: HashMap = HashMap::new(); while let Some(path) = paths.next().await { - let bucket = url::Url::parse(&path) - .ok() - .and_then(|u| u.host_str().map(|s| s.to_string())) - .unwrap_or_default(); + let bucket = self.batch_key_for_path(&path); let (relative_path, deleter) = match deleters.entry(bucket) { Entry::Occupied(entry) => { diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs index 621495519a..86993220a8 100644 --- a/crates/storage/opendal/src/resolving.rs +++ b/crates/storage/opendal/src/resolving.rs @@ -50,6 +50,7 @@ pub const SCHEME_ABFSS: &str = "abfss"; pub const SCHEME_ABFS: &str = "abfs"; pub const SCHEME_WASBS: &str = "wasbs"; pub const SCHEME_WASB: &str = "wasb"; +pub const SCHEME_HF: &str = "hf"; /// Parse a URL scheme string. fn parse_scheme(scheme: &str) -> Result<&'static str> { @@ -60,6 +61,7 @@ fn parse_scheme(scheme: &str) -> Result<&'static str> { SCHEME_GS | SCHEME_GCS => Ok("gcs"), SCHEME_OSS => Ok("oss"), SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok("azdls"), + SCHEME_HF => Ok("hf"), s => Err(Error::new( ErrorKind::FeatureUnsupported, format!("Unsupported storage scheme: {s}"), @@ -118,6 +120,13 @@ fn build_storage_for_scheme( "file" => Ok(OpenDalStorage::LocalFs), #[cfg(feature = "opendal-memory")] "memory" => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)), + #[cfg(feature = "opendal-hf")] + "hf" => { + let config = crate::hf::hf_config_parse(props.clone())?; + Ok(OpenDalStorage::Hf { + config: Arc::new(config), + }) + } unsupported => Err(Error::new( ErrorKind::FeatureUnsupported, format!("Unsupported storage scheme: {unsupported}"), @@ -196,7 +205,7 @@ impl StorageFactory for OpenDalResolvingStorageFactory { pub struct OpenDalResolvingStorage { /// Configuration properties shared across all backends. props: HashMap, - /// Cache of scheme to storage mappings. + /// Cache of canonical scheme to storage mappings. #[serde(skip, default)] storages: RwLock>>, /// Custom AWS credential loader for S3 storage. @@ -206,7 +215,7 @@ pub struct OpenDalResolvingStorage { } impl OpenDalResolvingStorage { - /// Resolve the storage for the given path by extracting the scheme and + /// Resolve the storage for the given path by extracting the canonical scheme and /// returning the cached or newly-created [`OpenDalStorage`]. fn resolve(&self, path: &str) -> Result> { let scheme = extract_scheme(path)?; @@ -281,7 +290,7 @@ impl Storage for OpenDalResolvingStorage { } async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> { - // Group paths by scheme so each resolved storage receives a batch, + // Group paths by canonical scheme so each resolved storage receives a batch, // avoiding repeated operator creation per path. let mut grouped: HashMap<&'static str, Vec> = HashMap::new(); while let Some(path) = paths.next().await { diff --git a/crates/storage/opendal/tests/file_io_hf_test.rs b/crates/storage/opendal/tests/file_io_hf_test.rs new file mode 100644 index 0000000000..4035b2a6d8 --- /dev/null +++ b/crates/storage/opendal/tests/file_io_hf_test.rs @@ -0,0 +1,376 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for FileIO HuggingFace Hub. +//! +//! These tests require a real HuggingFace token and are skipped when +//! `HF_OPENDAL_TOKEN` is not set in the environment. +//! +//! The following environment variables are used: +//! - `HF_OPENDAL_TOKEN` — HuggingFace API token (required) +//! - `HF_OPENDAL_BUCKET` — `owner/repo` for a bucket-type repo (required when running bucket tests) +//! - `HF_OPENDAL_DATASET` — `owner/repo` for a dataset-type repo (required when running dataset tests) + +#[cfg(feature = "opendal-hf")] +mod tests { + use std::sync::Arc; + + use bytes::Bytes; + use futures::StreamExt; + use iceberg::io::{FileIO, FileIOBuilder, HF_REVISION, HF_TOKEN}; + use iceberg_storage_opendal::{OpenDalResolvingStorageFactory, OpenDalStorageFactory}; + use iceberg_test_utils::{normalize_test_name_with_parts, set_up}; + + const ENV_HF_TOKEN: &str = "HF_OPENDAL_TOKEN"; + const ENV_HF_BUCKET: &str = "HF_OPENDAL_BUCKET"; + const ENV_HF_DATASET: &str = "HF_OPENDAL_DATASET"; + + macro_rules! require_env { + ($var:expr) => { + match std::env::var($var) { + Ok(v) => v, + Err(_) => { + eprintln!("Skipping HF test: {} not set", $var); + return; + } + } + }; + } + + fn get_file_io(token: &str) -> FileIO { + set_up(); + FileIOBuilder::new(Arc::new(OpenDalStorageFactory::Hf)) + .with_props(vec![(HF_TOKEN, token.to_string())]) + .build() + } + + fn get_resolving_file_io(token: &str) -> FileIO { + set_up(); + FileIOBuilder::new(Arc::new(OpenDalResolvingStorageFactory::new())) + .with_props(vec![(HF_TOKEN, token.to_string())]) + .build() + } + + // --- bucket tests --- + + #[tokio::test] + async fn test_hf_bucket_write_read_delete() { + let token = require_env!(ENV_HF_TOKEN); + let bucket = require_env!(ENV_HF_BUCKET); + let file_io = get_file_io(&token); + let path = format!( + "hf://buckets/{}/{}", + bucket, + normalize_test_name_with_parts!("test_hf_bucket_write_read_delete") + ); + + let _ = file_io.delete(&path).await; + assert!(!file_io.exists(&path).await.unwrap()); + + file_io + .new_output(&path) + .unwrap() + .write(Bytes::from_static(b"iceberg-hf-bucket")) + .await + .unwrap(); + assert!(file_io.exists(&path).await.unwrap()); + + let data = file_io.new_input(&path).unwrap().read().await.unwrap(); + assert_eq!(data, Bytes::from_static(b"iceberg-hf-bucket")); + + file_io.delete(&path).await.unwrap(); + assert!(!file_io.exists(&path).await.unwrap()); + } + + #[tokio::test] + async fn test_hf_bucket_overwrite() { + let token = require_env!(ENV_HF_TOKEN); + let bucket = require_env!(ENV_HF_BUCKET); + let file_io = get_file_io(&token); + let path = format!( + "hf://buckets/{}/{}", + bucket, + normalize_test_name_with_parts!("test_hf_bucket_overwrite") + ); + + let _ = file_io.delete(&path).await; + + file_io + .new_output(&path) + .unwrap() + .write(Bytes::from_static(b"first")) + .await + .unwrap(); + file_io + .new_output(&path) + .unwrap() + .write(Bytes::from_static(b"second")) + .await + .unwrap(); + + let data = file_io.new_input(&path).unwrap().read().await.unwrap(); + assert_eq!(data, Bytes::from_static(b"second")); + + file_io.delete(&path).await.unwrap(); + } + + #[tokio::test] + async fn test_hf_bucket_range_read() { + let token = require_env!(ENV_HF_TOKEN); + let bucket = require_env!(ENV_HF_BUCKET); + let file_io = get_file_io(&token); + let path = format!( + "hf://buckets/{}/{}", + bucket, + normalize_test_name_with_parts!("test_hf_bucket_range_read") + ); + + let _ = file_io.delete(&path).await; + file_io + .new_output(&path) + .unwrap() + .write(Bytes::from_static(b"hello world")) + .await + .unwrap(); + + let reader = file_io.new_input(&path).unwrap().reader().await.unwrap(); + let chunk = reader.read(6..11).await.unwrap(); + assert_eq!(chunk, Bytes::from_static(b"world")); + + file_io.delete(&path).await.unwrap(); + } + + #[tokio::test] + async fn test_hf_bucket_metadata() { + let token = require_env!(ENV_HF_TOKEN); + let bucket = require_env!(ENV_HF_BUCKET); + let file_io = get_file_io(&token); + let path = format!( + "hf://buckets/{}/{}", + bucket, + normalize_test_name_with_parts!("test_hf_bucket_metadata") + ); + + let _ = file_io.delete(&path).await; + file_io + .new_output(&path) + .unwrap() + .write(Bytes::from_static(b"metadata-test")) + .await + .unwrap(); + + let meta = file_io.new_input(&path).unwrap().metadata().await.unwrap(); + assert_eq!(meta.size, b"metadata-test".len() as u64); + + file_io.delete(&path).await.unwrap(); + } + + #[tokio::test] + async fn test_hf_bucket_delete_stream() { + let token = require_env!(ENV_HF_TOKEN); + let bucket = require_env!(ENV_HF_BUCKET); + let file_io = get_file_io(&token); + + let paths: Vec = (0..3) + .map(|i| { + format!( + "hf://buckets/{}/{}/file-{i}", + bucket, + normalize_test_name_with_parts!("test_hf_bucket_delete_stream") + ) + }) + .collect(); + + for path in &paths { + let _ = file_io.delete(path).await; + file_io + .new_output(path) + .unwrap() + .write(Bytes::from_static(b"x")) + .await + .unwrap(); + assert!(file_io.exists(path).await.unwrap()); + } + + let stream = futures::stream::iter(paths.clone()).boxed(); + file_io.delete_stream(stream).await.unwrap(); + + for path in &paths { + assert!(!file_io.exists(path).await.unwrap()); + } + } + + #[tokio::test] + async fn test_hf_bucket_delete_stream_empty() { + let token = require_env!(ENV_HF_TOKEN); + let file_io = get_file_io(&token); + file_io + .delete_stream(futures::stream::empty().boxed()) + .await + .unwrap(); + } + + // --- dataset tests --- + + #[tokio::test] + async fn test_hf_dataset_write_read_delete() { + let token = require_env!(ENV_HF_TOKEN); + let dataset = require_env!(ENV_HF_DATASET); + let file_io = get_file_io(&token); + let path = format!( + "hf://datasets/{}/{}", + dataset, + normalize_test_name_with_parts!("test_hf_dataset_write_read_delete") + ); + + let _ = file_io.delete(&path).await; + assert!(!file_io.exists(&path).await.unwrap()); + + file_io + .new_output(&path) + .unwrap() + .write(Bytes::from_static(b"iceberg-hf-dataset")) + .await + .unwrap(); + assert!(file_io.exists(&path).await.unwrap()); + + let data = file_io.new_input(&path).unwrap().read().await.unwrap(); + assert_eq!(data, Bytes::from_static(b"iceberg-hf-dataset")); + + file_io.delete(&path).await.unwrap(); + assert!(!file_io.exists(&path).await.unwrap()); + } + + // --- revision tests --- + + #[tokio::test] + async fn test_hf_explicit_revision_in_uri() { + let token = require_env!(ENV_HF_TOKEN); + let file_io = get_file_io(&token); + let name = normalize_test_name_with_parts!("test_hf_explicit_revision_in_uri"); + + let bucket = require_env!(ENV_HF_BUCKET); + // Write without revision, read back with explicit @main. + let write_path = format!("hf://buckets/{}/{}", bucket, name); + let read_path = format!("hf://buckets/{}@main/{}", bucket, name); + + let _ = file_io.delete(&write_path).await; + file_io + .new_output(&write_path) + .unwrap() + .write(Bytes::from_static(b"revision-test")) + .await + .unwrap(); + + let data = file_io.new_input(&read_path).unwrap().read().await.unwrap(); + assert_eq!(data, Bytes::from_static(b"revision-test")); + + file_io.delete(&write_path).await.unwrap(); + } + + #[tokio::test] + async fn test_hf_revision_from_config() { + let token = require_env!(ENV_HF_TOKEN); + set_up(); + + // Build FileIO with HF_REVISION set in config — paths without @revision use it. + let file_io = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::Hf)) + .with_props(vec![ + (HF_TOKEN, token.to_string()), + (HF_REVISION, "main".to_string()), + ]) + .build(); + + let bucket = require_env!(ENV_HF_BUCKET); + let path = format!( + "hf://buckets/{}/{}", + bucket, + normalize_test_name_with_parts!("test_hf_revision_from_config") + ); + + let _ = file_io.delete(&path).await; + file_io + .new_output(&path) + .unwrap() + .write(Bytes::from_static(b"config-revision")) + .await + .unwrap(); + + let data = file_io.new_input(&path).unwrap().read().await.unwrap(); + assert_eq!(data, Bytes::from_static(b"config-revision")); + + file_io.delete(&path).await.unwrap(); + } + + // --- resolving storage tests --- + + #[tokio::test] + async fn test_hf_resolving_storage() { + let token = require_env!(ENV_HF_TOKEN); + let file_io = get_resolving_file_io(&token); + + let bucket = require_env!(ENV_HF_BUCKET); + let path = format!( + "hf://buckets/{}/{}", + bucket, + normalize_test_name_with_parts!("test_hf_resolving_storage") + ); + + let _ = file_io.delete(&path).await; + + file_io + .new_output(&path) + .unwrap() + .write(Bytes::from_static(b"resolving")) + .await + .unwrap(); + + let data = file_io.new_input(&path).unwrap().read().await.unwrap(); + assert_eq!(data, Bytes::from_static(b"resolving")); + + file_io.delete(&path).await.unwrap(); + } + + #[tokio::test] + async fn test_hf_resolving_delete_stream_across_repo_types() { + let token = require_env!(ENV_HF_TOKEN); + let file_io = get_resolving_file_io(&token); + + let bucket = require_env!(ENV_HF_BUCKET); + let dataset = require_env!(ENV_HF_DATASET); + let name = normalize_test_name_with_parts!("test_hf_resolving_delete_stream_across"); + let bucket_path = format!("hf://buckets/{}/{}", bucket, name); + let dataset_path = format!("hf://datasets/{}/{}", dataset, name); + + for path in [&bucket_path, &dataset_path] { + let _ = file_io.delete(path).await; + file_io + .new_output(path) + .unwrap() + .write(Bytes::from_static(b"x")) + .await + .unwrap(); + assert!(file_io.exists(path).await.unwrap()); + } + + let stream = futures::stream::iter(vec![bucket_path.clone(), dataset_path.clone()]).boxed(); + file_io.delete_stream(stream).await.unwrap(); + + assert!(!file_io.exists(&bucket_path).await.unwrap()); + assert!(!file_io.exists(&dataset_path).await.unwrap()); + } +} From e4f8219f61d545014fa65264ef1b08c588f2df7f Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Mon, 11 May 2026 20:05:03 +0200 Subject: [PATCH 2/2] ci: add HuggingFace and CDC integration test workflow Two jobs gated on HF_TOKEN: Rust opendal-hf integration tests and Python CDC + HF tests. The Python HF test writes a table via PyIceberg and reads it back via IcebergDataFusionTable using the opendal-hf backend. Env vars: HF_TOKEN, HF_BUCKET, HF_DATASET. --- .github/workflows/ci_hf_cdc.yml | 110 ++++++++++++++++++ bindings/python/pyproject.toml | 3 + .../python/tests/test_huggingface_and_cdc.py | 100 +++++++++------- .../storage/opendal/tests/file_io_hf_test.rs | 14 +-- pyproject.toml | 66 +++++++++++ 5 files changed, 246 insertions(+), 47 deletions(-) create mode 100644 .github/workflows/ci_hf_cdc.yml create mode 100644 pyproject.toml diff --git a/.github/workflows/ci_hf_cdc.yml b/.github/workflows/ci_hf_cdc.yml new file mode 100644 index 0000000000..78cd50d0ae --- /dev/null +++ b/.github/workflows/ci_hf_cdc.yml @@ -0,0 +1,110 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: HuggingFace and CDC Integration Tests + +on: + push: + branches: + - main + pull_request: + paths: + - 'crates/storage/opendal/**' + - 'crates/iceberg/src/io/**' + - 'crates/iceberg/src/spec/table_properties.rs' + - 'crates/integrations/datafusion/**' + - 'bindings/python/tests/test_huggingface_and_cdc.py' + - '.github/workflows/ci_hf_cdc.yml' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} + cancel-in-progress: true + +permissions: + contents: read + +jobs: + hf-integration: + name: HuggingFace Hub integration tests + runs-on: ubuntu-latest + # Skip the job entirely when HF secrets are not available (e.g. PRs from forks). + if: ${{ secrets.HF_TOKEN != '' }} + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + persist-credentials: false + + - name: Setup Rust toolchain + uses: ./.github/actions/setup-builder + + - name: Cache Rust artifacts + uses: swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 # v2 + + - name: Install protoc + uses: arduino/setup-protoc@c65c819552d16ad3c9b72d9dfd5ba5237b9c906b # v3.0.0 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Run Rust HF integration tests + env: + HF_TOKEN: ${{ secrets.HF_TOKEN }} + HF_BUCKET: ${{ secrets.HF_BUCKET }} + HF_DATASET: ${{ secrets.HF_DATASET }} + run: | + cargo test -p iceberg-storage-opendal \ + --features opendal-hf \ + --test file_io_hf_test \ + -- --test-threads=1 + + cdc-python: + name: CDC and HuggingFace Python tests + runs-on: ubuntu-latest + # Skip when HF secrets are not available (e.g. PRs from forks). + if: ${{ secrets.HF_TOKEN != '' }} + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + persist-credentials: false + + - uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 + with: + python-version: "3.12" + + - uses: PyO3/maturin-action@e83996d129638aa358a18fbd1dfb82f0b0fb5d3b # v1.51.0 + with: + working-directory: "bindings/python" + command: build + args: --out dist -i python3.12 + + - uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0 + with: + version: "0.9.3" + enable-cache: true + + - name: Install dependencies + working-directory: "bindings/python" + run: | + make install + uv pip install --no-build --reinstall --find-links dist/ pyiceberg-core + + - name: Run CDC and HuggingFace Python tests + working-directory: "bindings/python" + env: + HF_TOKEN: ${{ secrets.HF_TOKEN }} + HF_DATASET: ${{ secrets.HF_DATASET }} + run: | + uv run --no-sync pytest tests/test_huggingface_and_cdc.py -v diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml index c35526e155..b57ea5bcab 100644 --- a/bindings/python/pyproject.toml +++ b/bindings/python/pyproject.toml @@ -54,6 +54,8 @@ ignore = ["F403", "F405"] [tool.pytest.ini_options] filterwarnings = [ "error", + # huggingface_hub uses hf_xet.upload_files() internally which is deprecated in hf_xet + "ignore::DeprecationWarning:huggingface_hub", ] [dependency-groups] @@ -68,4 +70,5 @@ dev = [ "pyiceberg[sql-sqlite]>=0.11", "pyarrow>=17", "fastavro>=1.11.1", + "huggingface_hub>=0.20", ] diff --git a/bindings/python/tests/test_huggingface_and_cdc.py b/bindings/python/tests/test_huggingface_and_cdc.py index d51ccd22ee..7e69ffe673 100644 --- a/bindings/python/tests/test_huggingface_and_cdc.py +++ b/bindings/python/tests/test_huggingface_and_cdc.py @@ -21,7 +21,7 @@ automatically — no special API is required beyond setting string properties. HF credentials are passed as file_io_properties to IcebergDataFusionTable. -Tests requiring a real HF token are skipped when HF_OPENDAL_TOKEN is not set. +Tests requiring live HF credentials are skipped when HF_TOKEN or HF_DATASET is not set. """ import os @@ -38,16 +38,6 @@ reason="IcebergDataFusionTable requires datafusion>=53 for FFI compatibility", ) -# Property name constants — same strings as the Rust TableProperties constants. -HF_TOKEN = "hf.token" -HF_ENDPOINT = "hf.endpoint" -HF_REVISION = "hf.revision" - -PARQUET_CDC_ENABLED = "write.parquet.content-defined-chunking.enabled" -PARQUET_CDC_MIN_CHUNK_SIZE = "write.parquet.content-defined-chunking.min-chunk-size" -PARQUET_CDC_MAX_CHUNK_SIZE = "write.parquet.content-defined-chunking.max-chunk-size" -PARQUET_CDC_NORM_LEVEL = "write.parquet.content-defined-chunking.norm-level" - # --------------------------------------------------------------------------- # CDC tests — run without any external credentials @@ -87,16 +77,18 @@ def test_cdc_table_properties_are_persisted(local_catalog, sample_table): "cdc_ns.cdc_persist", schema=sample_table.schema, properties={ - PARQUET_CDC_MIN_CHUNK_SIZE: "65536", - PARQUET_CDC_MAX_CHUNK_SIZE: "524288", - PARQUET_CDC_NORM_LEVEL: "2", + "write.parquet.content-defined-chunking.min-chunk-size": "65536", + "write.parquet.content-defined-chunking.max-chunk-size": "524288", + "write.parquet.content-defined-chunking.norm-level": "2", }, ) props = tbl.properties - assert props.get(PARQUET_CDC_MIN_CHUNK_SIZE) == "65536" - assert props.get(PARQUET_CDC_MAX_CHUNK_SIZE) == "524288" - assert props.get(PARQUET_CDC_NORM_LEVEL) == "2" + assert props.get("write.parquet.content-defined-chunking.min-chunk-size") == "65536" + assert ( + props.get("write.parquet.content-defined-chunking.max-chunk-size") == "524288" + ) + assert props.get("write.parquet.content-defined-chunking.norm-level") == "2" def test_cdc_write_via_pyiceberg(local_catalog, sample_table): @@ -106,7 +98,7 @@ def test_cdc_write_via_pyiceberg(local_catalog, sample_table): tbl = local_catalog.create_table_if_not_exists( "cdc_ns.cdc_pyiceberg_write", schema=sample_table.schema, - properties={PARQUET_CDC_ENABLED: "true"}, + properties={"write.parquet.content-defined-chunking.enabled": "true"}, ) tbl.append(sample_table) @@ -122,7 +114,7 @@ def test_cdc_write_and_read_via_datafusion(local_catalog, sample_table): tbl = local_catalog.create_table_if_not_exists( "cdc_ns.cdc_write_read", schema=sample_table.schema, - properties={PARQUET_CDC_ENABLED: "true"}, + properties={"write.parquet.content-defined-chunking.enabled": "true"}, ) tbl.append(sample_table) @@ -137,38 +129,66 @@ def test_cdc_write_and_read_via_datafusion(local_catalog, sample_table): assert ctx.table("cdc_table").count() == len(sample_table) - # --------------------------------------------------------------------------- -# HF tests — skipped when HF_OPENDAL_TOKEN is not set +# HF + CDC tests — skipped when HF_TOKEN or HF_DATASET is not set # --------------------------------------------------------------------------- -HF_TOKEN_ENV = "HF_OPENDAL_TOKEN" -HF_TABLE_METADATA_ENV = "HF_OPENDAL_TABLE_METADATA" - requires_hf = pytest.mark.skipif( - not os.environ.get(HF_TOKEN_ENV) or not os.environ.get(HF_TABLE_METADATA_ENV), - reason=f"{HF_TOKEN_ENV} or {HF_TABLE_METADATA_ENV} not set", + not os.environ.get("HF_TOKEN") or not os.environ.get("HF_DATASET"), + reason="HF_TOKEN or HF_DATASET not set", ) -@requires_hf -@requires_datafusion_53 -def test_hf_file_io_properties_accepted(): - """IcebergDataFusionTable accepts hf.token in file_io_properties without auth/URI errors. +@pytest.fixture(scope="module") +def hf_cdc_table(sample_table): + """Write a CDC-enabled Iceberg table to HF Hub once; share across HF tests. - HF_OPENDAL_TABLE_METADATA must point to a valid metadata JSON file; the test - verifies that HF credentials are wired correctly and the table can be read. + Uses FsspecFileIO backed by huggingface_hub's HfFileSystem (hf:// in fsspec). + HF_TOKEN is read from the environment automatically by HfFileSystem. """ - token = os.environ[HF_TOKEN_ENV] - metadata_location = os.environ[HF_TABLE_METADATA_ENV] + token = os.environ["HF_TOKEN"] + dataset = os.environ["HF_DATASET"] - provider = IcebergDataFusionTable( - identifier=["default", "hf_test"], - metadata_location=metadata_location, - file_io_properties={HF_TOKEN: token}, + warehouse = f"hf://datasets/{dataset}/iceberg-ci-{os.getpid()}" + catalog = load_catalog( + "hf_test", + **{ + "uri": "sqlite:///:memory:", + "warehouse": warehouse, + "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", + }, + ) + catalog.create_namespace("ns") + tbl = catalog.create_table( + "ns.cdc_tbl", + schema=sample_table.schema, + properties={"write.parquet.content-defined-chunking.enabled": "true"}, ) + tbl.append(sample_table) + # HfFileSystem.dircache may reflect the pre-write state; invalidate it so + # subsequent reads (info/open) see the files just uploaded via xet. + tbl.io.get_fs("hf").invalidate_cache() + return tbl, token + + +@requires_hf +def test_hf_cdc_write_and_read_via_pyarrow(hf_cdc_table, sample_table): + """PyIceberg writes CDC parquet to HF Hub; PyArrow scan reads it back.""" + tbl, _ = hf_cdc_table + result = tbl.scan().to_arrow() + assert len(result) == len(sample_table) + +@requires_hf +@requires_datafusion_53 +def test_hf_cdc_write_and_read_via_datafusion(hf_cdc_table, sample_table): + """PyIceberg writes CDC parquet to HF Hub; IcebergDataFusionTable reads it back via opendal-hf.""" + tbl, token = hf_cdc_table + provider = IcebergDataFusionTable( + identifier=tbl.name(), + metadata_location=tbl.metadata_location, + file_io_properties={"hf.token": token}, + ) ctx = SessionContext() ctx.register_table("hf_table", provider) - # A successful count proves the HF backend can read the table's data files. - assert ctx.table("hf_table").count() >= 0 + assert ctx.table("hf_table").count() == len(sample_table) diff --git a/crates/storage/opendal/tests/file_io_hf_test.rs b/crates/storage/opendal/tests/file_io_hf_test.rs index 4035b2a6d8..3c773887ff 100644 --- a/crates/storage/opendal/tests/file_io_hf_test.rs +++ b/crates/storage/opendal/tests/file_io_hf_test.rs @@ -18,12 +18,12 @@ //! Integration tests for FileIO HuggingFace Hub. //! //! These tests require a real HuggingFace token and are skipped when -//! `HF_OPENDAL_TOKEN` is not set in the environment. +//! `HF_TOKEN` is not set in the environment. //! //! The following environment variables are used: -//! - `HF_OPENDAL_TOKEN` — HuggingFace API token (required) -//! - `HF_OPENDAL_BUCKET` — `owner/repo` for a bucket-type repo (required when running bucket tests) -//! - `HF_OPENDAL_DATASET` — `owner/repo` for a dataset-type repo (required when running dataset tests) +//! - `HF_TOKEN` — HuggingFace API token (required) +//! - `HF_BUCKET` — `owner/repo` for a bucket-type repo (required when running bucket tests) +//! - `HF_DATASET` — `owner/repo` for a dataset-type repo (required when running dataset tests) #[cfg(feature = "opendal-hf")] mod tests { @@ -35,9 +35,9 @@ mod tests { use iceberg_storage_opendal::{OpenDalResolvingStorageFactory, OpenDalStorageFactory}; use iceberg_test_utils::{normalize_test_name_with_parts, set_up}; - const ENV_HF_TOKEN: &str = "HF_OPENDAL_TOKEN"; - const ENV_HF_BUCKET: &str = "HF_OPENDAL_BUCKET"; - const ENV_HF_DATASET: &str = "HF_OPENDAL_DATASET"; + const ENV_HF_TOKEN: &str = "HF_TOKEN"; + const ENV_HF_BUCKET: &str = "HF_BUCKET"; + const ENV_HF_DATASET: &str = "HF_DATASET"; macro_rules! require_env { ($var:expr) => { diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000..4c72f1203e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[build-system] +build-backend = "maturin" +requires = ["maturin>=1.0,<2.0"] + +[project] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Rust", +] +name = "pyiceberg-core" +readme = "project-description.md" +requires-python = ">=3.10,<4" +dynamic = ["version"] +license = { file = "LICENSE" } + +[tool.maturin] +features = ["pyo3/extension-module"] +module-name = "pyiceberg_core.pyiceberg_core_rust" +python-source = "python" +include = [ + { path = "LICENSE", format = ["sdist", "wheel"] }, + { path = "NOTICE", format = ["sdist", "wheel"] } +] + +[tool.ruff.lint] +ignore = ["F403", "F405"] + +[tool.pytest.ini_options] +filterwarnings = [ + "error", +] + +[dependency-groups] +dev = [ + "maturin>=1.0,<2.0", + "pytest>=8.3.2", + "datafusion==52.*", + "pyiceberg[sql-sqlite,pyarrow]", + "fastavro>=1.11.1", +]