diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3bf1fe73ff..f4b774858f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -183,6 +183,20 @@ jobs: cargo test --no-fail-fast ${{ matrix.test-suite.args }} fi + # HDFS integration tests are `#[ignore]`d so a plain `cargo test` + # skips them locally on macOS/Windows where the docker fixture's + # host networking is unreliable; CI opts them in by filtering on + # the `file_io_hdfs` substring in the test name. + - name: Run HDFS integration tests + if: matrix.test-suite.name == 'default' + shell: bash + env: + RUSTFLAGS: "-C debuginfo=0" + run: | + cargo nextest run ${{ matrix.test-suite.args }} \ + --run-ignored=only \ + -E 'test(file_io_hdfs)' + - name: Stop Docker containers if: always() && matrix.test-suite.name == 'default' run: make docker-down diff --git a/Cargo.lock b/Cargo.lock index 02d2a7e946..406407f9bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2509,6 +2509,15 @@ dependencies = [ "syn", ] +[[package]] +name = "des" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdd80ce8ce993de27e9f063a444a4d53ce8e8db4c1f00cc03af5ad5a9867a1e" +dependencies = [ + "cipher", +] + [[package]] name = "diff" version = "0.1.13" @@ -2574,6 +2583,18 @@ dependencies = [ "const-random", ] +[[package]] +name = "dns-lookup" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e39034cee21a2f5bbb66ba0e3689819c4bb5d00382a282006e802a7ffa6c41d" +dependencies = [ + "cfg-if", + "libc", + "socket2 0.6.3", + "windows-sys 0.60.2", +] + [[package]] name = "dotenvy" version = "0.15.7" @@ -3008,6 +3029,34 @@ dependencies = [ "slab", ] +[[package]] +name = "g2gen" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5a7e0eb46f83a20260b850117d204366674e85d3a908d90865c78df9a6b1dfc" +dependencies = [ + "g2poly", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "g2p" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "539e2644c030d3bf4cd208cb842d2ce2f80e82e6e8472390bcef83ceba0d80ad" +dependencies = [ + "g2gen", + "g2poly", +] + +[[package]] +name = "g2poly" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "312d2295c7302019c395cfb90dacd00a82a2eabd700429bba9c7a3f38dbbe11b" + [[package]] name = "generic-array" version = "0.14.7" @@ -3168,6 +3217,47 @@ dependencies = [ "hashbrown 0.15.5", ] +[[package]] +name = "hdfs-native" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51610510377a0847d53b78b53f9c6c9b7df3ffb300d1181b2e04f68bba363734" +dependencies = [ + "aes", + "base64", + "bitflags", + "bumpalo", + "bytes", + "cbc", + "chrono", + "cipher", + "crc", + "ctr", + "des", + "dns-lookup", + "futures", + "g2p", + "hex", + "hmac", + "libc", + "libloading", + "log", + "md-5", + "num-traits", + "once_cell", + "prost", + "prost-types", + "rand 0.9.4", + "regex", + "roxmltree", + "socket2 0.6.3", + "thiserror 2.0.18", + "tokio", + "url", + "uuid", + "whoami", +] + [[package]] name = "heck" version = "0.5.0" @@ -4122,6 +4212,16 @@ version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +[[package]] +name = "libloading" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "754ca22de805bb5744484a5b151a9e1a8e837d5dc232c2d7d8c2e3492edc8b60" +dependencies = [ + "cfg-if", + "windows-link", +] + [[package]] name = "liblzma" version = "0.4.6" @@ -4710,6 +4810,7 @@ dependencies = [ "opendal-service-azdls", "opendal-service-fs", "opendal-service-gcs", + "opendal-service-hdfs-native", "opendal-service-oss", "opendal-service-s3", ] @@ -4849,6 +4950,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-hdfs-native" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2aba838c127b1131f9ec81d46248ada30b08f25a762b8e062dcd19be9e7a85f" +dependencies = [ + "bytes", + "futures", + "hdfs-native", + "log", + "opendal-core", + "serde", +] + [[package]] name = "opendal-service-oss" version = "0.56.0" @@ -5987,6 +6102,15 @@ dependencies = [ "byteorder", ] +[[package]] +name = "roxmltree" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1964b10c76125c36f8afe190065a4bf9a87bf324842c05701330bba9f1cacbb" +dependencies = [ + "memchr", +] + [[package]] name = "rsa" version = "0.9.10" @@ -7991,6 +8115,7 @@ checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" dependencies = [ "libredox", "wasite", + "web-sys", ] [[package]] diff --git a/crates/storage/opendal/Cargo.toml b/crates/storage/opendal/Cargo.toml index 80eeaa3d04..e02c4a48f5 100644 --- a/crates/storage/opendal/Cargo.toml +++ b/crates/storage/opendal/Cargo.toml @@ -28,11 +28,22 @@ keywords = ["iceberg", "opendal", "storage"] [features] default = ["opendal-memory", "opendal-fs", "opendal-s3"] -opendal-all = ["opendal-memory", "opendal-fs", "opendal-s3", "opendal-gcs", "opendal-oss", "opendal-azdls"] +opendal-all = [ + "opendal-memory", + "opendal-fs", + "opendal-s3", + "opendal-gcs", + "opendal-oss", + "opendal-azdls", + "opendal-hdfs-native", +] opendal-azdls = ["opendal/services-azdls"] opendal-fs = ["opendal/services-fs"] opendal-gcs = ["opendal/services-gcs"] +# Requires `libgssapi_krb5` at the OS level (e.g. `brew install krb5` on macOS, +# `apt install libgssapi-krb5-2` on Debian). +opendal-hdfs-native = ["opendal/services-hdfs-native"] opendal-memory = ["opendal/services-memory"] opendal-oss = ["opendal/services-oss"] opendal-s3 = ["opendal/services-s3", "reqsign-aws-v4", "reqsign-core"] diff --git a/crates/storage/opendal/src/hdfs.rs b/crates/storage/opendal/src/hdfs.rs new file mode 100644 index 0000000000..1592738107 --- /dev/null +++ b/crates/storage/opendal/src/hdfs.rs @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! HDFS storage support via OpenDAL's `services-hdfs-native` backend. +//! +//! Cluster topology (HA name services, namenode RPC addresses) and Kerberos +//! authentication are entirely delegated to `hdfs-native` and its +//! environment. `hdfs-native` reads `core-site.xml` / `hdfs-site.xml` from +//! `$HADOOP_CONF_DIR` (or `$HADOOP_HOME/etc/hadoop`); Kerberos goes through +//! `libgssapi_krb5` via the standard `KRB5CCNAME` / `KRB5_CONFIG` env. The +//! caller's `libgssapi_krb5` must be installed on the host (e.g. +//! `brew install krb5` on macOS, `apt install libgssapi-krb5-2` on Debian) +//! for HDFS calls to link at runtime. +//! +//! No iceberg-level HDFS configuration is exposed - mirroring the Java +//! HadoopFileIO, which has no iceberg-side HDFS knobs and defers everything +//! to Hadoop's `Configuration`. Paths with an authority (`hdfs://ns/foo`) +//! route to that name node; authority-less paths (`hdfs:///foo`) are passed +//! to `hdfs-native` without an explicit name node, so it picks up +//! `fs.defaultFS` from the loaded Hadoop config. + +use iceberg::{Error, ErrorKind, Result}; +use opendal::Operator; +use opendal::services::HdfsNative; +use url::Url; + +use crate::utils::from_opendal_error; + +/// Parse an HDFS path into its name node (when an authority is present) and +/// the relative path beginning with `/`. +/// +/// The returned `Option` is `Some("hdfs://")` when the +/// input has an authority and `None` when it does not. `None` causes the +/// operator to be built without an explicit name node, so that `hdfs-native` +/// resolves `fs.defaultFS` from the loaded Hadoop config. +pub(crate) fn parse_hdfs_path(path: &str) -> Result<(Option, &str)> { + let url = Url::parse(path).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid hdfs path: {path}: {e}"), + ) + })?; + if url.scheme() != "hdfs" { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid hdfs path: {path}, expected scheme `hdfs://`"), + )); + } + + let name_node = url.host_str().filter(|h| !h.is_empty()).map(|host| { + url.port() + .map(|port| format!("hdfs://{host}:{port}")) + .unwrap_or_else(|| format!("hdfs://{host}")) + }); + + // `url.path()` borrows from `url` and can't be returned with the input's + // lifetime. Slice the path component out of the original input instead; + // it always starts at the first `/` after the `hdfs://` prefix, or is + // implicitly `/` when only an authority is given. + let after_scheme = &path["hdfs://".len()..]; + let rel = match after_scheme.find('/') { + Some(i) => &after_scheme[i..], + None => "/", + }; + + Ok((name_node, rel)) +} + +/// Build a new OpenDAL [`Operator`] for the given name node, or - when +/// `name_node` is `None` - one that defers to `fs.defaultFS` from the +/// `hdfs-native`-loaded Hadoop config. +pub(crate) fn hdfs_operator_build(name_node: Option<&str>) -> Result { + let mut builder = HdfsNative::default().root("/"); + if let Some(nn) = name_node { + builder = builder.name_node(nn); + } + Ok(Operator::new(builder).map_err(from_opendal_error)?.finish()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_hdfs_path_with_authority_and_rel() { + let (nn, rel) = parse_hdfs_path("hdfs://nameservice1/a/b").unwrap(); + + assert_eq!(nn.as_deref(), Some("hdfs://nameservice1")); + assert_eq!(rel, "/a/b"); + } + + #[test] + fn test_parse_hdfs_path_with_authority_and_port() { + let (nn, rel) = parse_hdfs_path("hdfs://nn:8020/foo").unwrap(); + + assert_eq!(nn.as_deref(), Some("hdfs://nn:8020")); + assert_eq!(rel, "/foo"); + } + + #[test] + fn test_parse_hdfs_path_with_authority_no_path() { + let (nn, rel) = parse_hdfs_path("hdfs://nameservice1").unwrap(); + + assert_eq!(nn.as_deref(), Some("hdfs://nameservice1")); + assert_eq!(rel, "/"); + } + + #[test] + fn test_parse_hdfs_path_with_authority_trailing_slash() { + let (nn, rel) = parse_hdfs_path("hdfs://nameservice1/").unwrap(); + + assert_eq!(nn.as_deref(), Some("hdfs://nameservice1")); + assert_eq!(rel, "/"); + } + + #[test] + fn test_parse_hdfs_path_authority_less_returns_none() { + let (nn, rel) = parse_hdfs_path("hdfs:///a/b").unwrap(); + + assert_eq!(nn, None); + assert_eq!(rel, "/a/b"); + } + + #[test] + fn test_parse_hdfs_path_wrong_scheme_errors() { + let err = parse_hdfs_path("file:///tmp/x").unwrap_err(); + + assert!(err.to_string().contains("expected scheme `hdfs://`")); + } + + #[test] + fn test_parse_hdfs_path_invalid_url_errors() { + let err = parse_hdfs_path("not-a-url").unwrap_err(); + + assert!(err.to_string().contains("Invalid hdfs path")); + } +} diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs index 65deaa5f44..9bfc1b8573 100644 --- a/crates/storage/opendal/src/lib.rs +++ b/crates/storage/opendal/src/lib.rs @@ -66,6 +66,14 @@ cfg_if! { } } +cfg_if! { + if #[cfg(feature = "opendal-hdfs-native")] { + mod hdfs; + use hdfs::*; + use std::sync::RwLock; + } +} + cfg_if! { if #[cfg(feature = "opendal-memory")] { mod memory; @@ -120,6 +128,9 @@ pub enum OpenDalStorageFactory { /// Azure Data Lake Storage factory. #[cfg(feature = "opendal-azdls")] Azdls, + /// HDFS storage factory (via OpenDAL `services-hdfs-native`). + #[cfg(feature = "opendal-hdfs-native")] + Hdfs, } #[typetag::serde(name = "OpenDalStorageFactory")] @@ -152,6 +163,10 @@ impl StorageFactory for OpenDalStorageFactory { OpenDalStorageFactory::Azdls => Ok(Arc::new(OpenDalStorage::Azdls { config: azdls_config_parse(config.props().clone())?.into(), })), + #[cfg(feature = "opendal-hdfs-native")] + OpenDalStorageFactory::Hdfs => Ok(Arc::new(OpenDalStorage::Hdfs { + operators: Arc::new(RwLock::new(HashMap::new())), + })), #[cfg(all( not(feature = "opendal-memory"), not(feature = "opendal-fs"), @@ -159,6 +174,7 @@ impl StorageFactory for OpenDalStorageFactory { not(feature = "opendal-gcs"), not(feature = "opendal-oss"), not(feature = "opendal-azdls"), + not(feature = "opendal-hdfs-native"), ))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -218,6 +234,20 @@ pub enum OpenDalStorage { /// Azure DLS configuration. config: Arc, }, + /// HDFS storage variant (via OpenDAL `services-hdfs-native`). + /// + /// Accepts paths of the form `hdfs:///` (or + /// `hdfs:///` for authority-less paths, which defer to + /// `fs.defaultFS` from the `hdfs-native`-loaded Hadoop config). The + /// authority - or `None` when absent - keys a per-name-node `Operator` + /// cache, so distinct name nodes coexist in a single storage instance. + #[cfg(feature = "opendal-hdfs-native")] + Hdfs { + /// Operator cache. `Some("hdfs://")` for paths with an + /// authority; `None` for authority-less paths (`fs.defaultFS`). + #[serde(skip, default)] + operators: Arc, Operator>>>, + }, } impl OpenDalStorage { @@ -311,12 +341,40 @@ impl OpenDalStorage { } #[cfg(feature = "opendal-azdls")] OpenDalStorage::Azdls { config } => azdls_create_operator(path, config)?, + #[cfg(feature = "opendal-hdfs-native")] + OpenDalStorage::Hdfs { operators } => { + let (name_node, rel) = parse_hdfs_path(path)?; + + if let Some(op) = operators + .read() + .map_err(|_| { + Error::new(ErrorKind::Unexpected, "HDFS operator cache lock poisoned") + })? + .get(&name_node) + { + (op.clone(), rel) + } else { + let mut cache = operators.write().map_err(|_| { + Error::new(ErrorKind::Unexpected, "HDFS operator cache lock poisoned") + })?; + let op = match cache.get(&name_node) { + Some(op) => op.clone(), + None => { + let op = hdfs_operator_build(name_node.as_deref())?; + cache.insert(name_node, op.clone()); + op + } + }; + (op, rel) + } + } #[cfg(all( not(feature = "opendal-s3"), not(feature = "opendal-fs"), not(feature = "opendal-gcs"), not(feature = "opendal-oss"), not(feature = "opendal-azdls"), + not(feature = "opendal-hdfs-native"), ))] _ => { return Err(Error::new( @@ -408,12 +466,18 @@ impl OpenDalStorage { let relative_path_len = azure_path.path.len(); Ok(&path[path.len() - relative_path_len..]) } + #[cfg(feature = "opendal-hdfs-native")] + OpenDalStorage::Hdfs { .. } => { + let (_, rel) = parse_hdfs_path(path)?; + Ok(rel) + } #[cfg(all( not(feature = "opendal-s3"), not(feature = "opendal-fs"), not(feature = "opendal-gcs"), not(feature = "opendal-oss"), not(feature = "opendal-azdls"), + not(feature = "opendal-hdfs-native"), ))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -693,6 +757,47 @@ mod tests { ); } + #[cfg(feature = "opendal-hdfs-native")] + #[test] + fn test_relativize_path_hdfs() { + let storage = OpenDalStorage::Hdfs { + operators: Arc::new(RwLock::new(HashMap::new())), + }; + + assert_eq!( + storage + .relativize_path("hdfs://nameservice1/a/b.parquet") + .unwrap(), + "/a/b.parquet" + ); + assert_eq!( + storage + .relativize_path("hdfs://nn:8020/warehouse/db/t") + .unwrap(), + "/warehouse/db/t" + ); + } + + #[cfg(feature = "opendal-hdfs-native")] + #[test] + fn test_relativize_path_hdfs_authority_less() { + let storage = OpenDalStorage::Hdfs { + operators: Arc::new(RwLock::new(HashMap::new())), + }; + + assert_eq!(storage.relativize_path("hdfs:///a/b").unwrap(), "/a/b"); + } + + #[cfg(feature = "opendal-hdfs-native")] + #[test] + fn test_relativize_path_hdfs_wrong_scheme_errors() { + let storage = OpenDalStorage::Hdfs { + operators: Arc::new(RwLock::new(HashMap::new())), + }; + + assert!(storage.relativize_path("s3://bucket/x").is_err()); + } + #[cfg(feature = "opendal-azdls")] #[test] fn test_relativize_path_azdls() { diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs index 621495519a..bd8564544f 100644 --- a/crates/storage/opendal/src/resolving.rs +++ b/crates/storage/opendal/src/resolving.rs @@ -50,6 +50,7 @@ pub const SCHEME_ABFSS: &str = "abfss"; pub const SCHEME_ABFS: &str = "abfs"; pub const SCHEME_WASBS: &str = "wasbs"; pub const SCHEME_WASB: &str = "wasb"; +pub const SCHEME_HDFS: &str = "hdfs"; /// Parse a URL scheme string. fn parse_scheme(scheme: &str) -> Result<&'static str> { @@ -60,6 +61,7 @@ fn parse_scheme(scheme: &str) -> Result<&'static str> { SCHEME_GS | SCHEME_GCS => Ok("gcs"), SCHEME_OSS => Ok("oss"), SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok("azdls"), + SCHEME_HDFS => Ok("hdfs"), s => Err(Error::new( ErrorKind::FeatureUnsupported, format!("Unsupported storage scheme: {s}"), @@ -114,6 +116,10 @@ fn build_storage_for_scheme( config: Arc::new(config), }) } + #[cfg(feature = "opendal-hdfs-native")] + "hdfs" => Ok(OpenDalStorage::Hdfs { + operators: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())), + }), #[cfg(feature = "opendal-fs")] "file" => Ok(OpenDalStorage::LocalFs), #[cfg(feature = "opendal-memory")] @@ -344,6 +350,33 @@ mod tests { assert!(Arc::ptr_eq(&a, &c), "s3 and s3n should share one instance"); } + #[cfg(feature = "opendal-hdfs-native")] + #[test] + fn test_resolve_hdfs_returns_hdfs_variant() { + let storage = empty_resolving_storage(); + + let resolved = storage.resolve("hdfs://nameservice1/a/b").unwrap(); + + assert!( + matches!(&*resolved, crate::OpenDalStorage::Hdfs { .. }), + "expected Hdfs variant, got {resolved:?}" + ); + } + + #[cfg(feature = "opendal-hdfs-native")] + #[test] + fn test_resolve_hdfs_distinct_authorities_share_instance() { + let storage = empty_resolving_storage(); + + let a = storage.resolve("hdfs://ns1/a").unwrap(); + let b = storage.resolve("hdfs://ns2/b").unwrap(); + + assert!( + Arc::ptr_eq(&a, &b), + "different authorities should share the OpenDalStorage::Hdfs instance (operator cache is internal)" + ); + } + #[cfg(feature = "opendal-azdls")] #[test] fn test_resolve_azdls_aliases_share_instance() { diff --git a/crates/storage/opendal/tests/file_io_hdfs_test.rs b/crates/storage/opendal/tests/file_io_hdfs_test.rs new file mode 100644 index 0000000000..a29ff2a2b9 --- /dev/null +++ b/crates/storage/opendal/tests/file_io_hdfs_test.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for HDFS FileIO via OpenDAL `services-hdfs-native`. +//! +//! These tests need a single-node HDFS cluster (the `hdfs-namenode` + +//! `hdfs-datanode` services from `dev/docker-compose.yaml`, mirroring +//! apache/opendal's own `fixtures/hdfs/docker-compose-hdfs-cluster.yml`). +//! The docker fixture uses `network_mode: host`, which works on Linux CI +//! but has known issues on macOS / Windows Docker Desktop. Tests are +//! therefore marked `#[ignore]` so a plain `cargo test` skips them; CI +//! opts in with `cargo nextest run --run-ignored=only ...`. +#[cfg(feature = "opendal-hdfs-native")] +mod tests { + use std::sync::Arc; + + use bytes::Bytes; + use iceberg::io::{FileIO, FileIOBuilder}; + use iceberg_storage_opendal::OpenDalStorageFactory; + use iceberg_test_utils::{get_hdfs_endpoint, normalize_test_name_with_parts, set_up}; + + fn get_file_io() -> FileIO { + set_up(); + FileIOBuilder::new(Arc::new(OpenDalStorageFactory::Hdfs)).build() + } + + fn test_path(suffix: &str) -> String { + format!( + "{}/{}", + get_hdfs_endpoint(), + normalize_test_name_with_parts!(suffix) + ) + } + + #[tokio::test] + #[ignore = "Linux-only: HDFS docker fixture uses host networking"] + async fn test_file_io_hdfs_exists() { + let file_io = get_file_io(); + + let absent = test_path("test_file_io_hdfs_exists_absent"); + assert!(!file_io.exists(&absent).await.unwrap()); + } + + #[tokio::test] + #[ignore = "Linux-only: HDFS docker fixture uses host networking"] + async fn test_file_io_hdfs_write_and_read() { + let file_io = get_file_io(); + let path = test_path("test_file_io_hdfs_write_and_read"); + let _ = file_io.delete(&path).await; + + let output = file_io.new_output(&path).unwrap(); + output + .write(Bytes::from_static(b"hello hdfs")) + .await + .unwrap(); + + assert!(file_io.exists(&path).await.unwrap()); + let input = file_io.new_input(&path).unwrap(); + assert_eq!( + input.read().await.unwrap(), + Bytes::from_static(b"hello hdfs") + ); + } + + #[tokio::test] + #[ignore = "Linux-only: HDFS docker fixture uses host networking"] + async fn test_file_io_hdfs_metadata() { + let file_io = get_file_io(); + let path = test_path("test_file_io_hdfs_metadata"); + let _ = file_io.delete(&path).await; + let content = Bytes::from_static(b"0123456789"); + + file_io + .new_output(&path) + .unwrap() + .write(content.clone()) + .await + .unwrap(); + + let metadata = file_io.new_input(&path).unwrap().metadata().await.unwrap(); + assert_eq!(metadata.size, content.len() as u64); + } + + #[tokio::test] + #[ignore = "Linux-only: HDFS docker fixture uses host networking"] + async fn test_file_io_hdfs_delete() { + let file_io = get_file_io(); + let path = test_path("test_file_io_hdfs_delete"); + + file_io + .new_output(&path) + .unwrap() + .write(Bytes::from_static(b"x")) + .await + .unwrap(); + assert!(file_io.exists(&path).await.unwrap()); + + file_io.delete(&path).await.unwrap(); + assert!(!file_io.exists(&path).await.unwrap()); + } + + #[tokio::test] + #[ignore = "Linux-only: HDFS docker fixture uses host networking"] + async fn test_file_io_hdfs_delete_prefix() { + let file_io = get_file_io(); + let dir = test_path("test_file_io_hdfs_delete_prefix"); + let _ = file_io.delete_prefix(&dir).await; + + for i in 0..3 { + let path = format!("{dir}/file_{i}"); + file_io + .new_output(&path) + .unwrap() + .write(Bytes::from(format!("payload {i}"))) + .await + .unwrap(); + } + assert!(file_io.exists(&format!("{dir}/file_0")).await.unwrap()); + + file_io.delete_prefix(&dir).await.unwrap(); + + for i in 0..3 { + assert!(!file_io.exists(&format!("{dir}/file_{i}")).await.unwrap()); + } + } + + #[tokio::test] + #[ignore = "Linux-only: HDFS docker fixture uses host networking"] + async fn test_file_io_hdfs_reader_range() { + let file_io = get_file_io(); + let path = test_path("test_file_io_hdfs_reader_range"); + let _ = file_io.delete(&path).await; + let content = Bytes::from_static(b"abcdefghij"); + + file_io + .new_output(&path) + .unwrap() + .write(content.clone()) + .await + .unwrap(); + + let reader = file_io.new_input(&path).unwrap().reader().await.unwrap(); + assert_eq!( + reader.read(0..5).await.unwrap(), + Bytes::from_static(b"abcde") + ); + assert_eq!( + reader.read(5..10).await.unwrap(), + Bytes::from_static(b"fghij") + ); + } + + #[tokio::test] + #[ignore = "Linux-only: HDFS docker fixture uses host networking"] + async fn test_file_io_hdfs_streaming_writer() { + let file_io = get_file_io(); + let path = test_path("test_file_io_hdfs_streaming_writer"); + let _ = file_io.delete(&path).await; + + let output = file_io.new_output(&path).unwrap(); + let mut writer = output.writer().await.unwrap(); + writer.write(Bytes::from_static(b"part1 ")).await.unwrap(); + writer.write(Bytes::from_static(b"part2")).await.unwrap(); + writer.close().await.unwrap(); + + let read = file_io.new_input(&path).unwrap().read().await.unwrap(); + assert_eq!(read, Bytes::from_static(b"part1 part2")); + } +} diff --git a/crates/test_utils/src/lib.rs b/crates/test_utils/src/lib.rs index e44d96c385..f027df12e9 100644 --- a/crates/test_utils/src/lib.rs +++ b/crates/test_utils/src/lib.rs @@ -42,6 +42,7 @@ mod common { pub const ENV_HMS_ENDPOINT: &str = "ICEBERG_TEST_HMS_ENDPOINT"; pub const ENV_GLUE_ENDPOINT: &str = "ICEBERG_TEST_GLUE_ENDPOINT"; pub const ENV_GCS_ENDPOINT: &str = "ICEBERG_TEST_GCS_ENDPOINT"; + pub const ENV_HDFS_ENDPOINT: &str = "ICEBERG_TEST_HDFS_ENDPOINT"; // Default ports matching dev/docker-compose.yaml pub const DEFAULT_MINIO_PORT: u16 = 9000; @@ -49,6 +50,7 @@ mod common { pub const DEFAULT_HMS_PORT: u16 = 9083; pub const DEFAULT_GLUE_PORT: u16 = 5001; pub const DEFAULT_GCS_PORT: u16 = 4443; + pub const DEFAULT_HDFS_NN_PORT: u16 = 8020; /// Returns the MinIO S3-compatible endpoint. /// Checks ICEBERG_TEST_MINIO_ENDPOINT env var, otherwise returns localhost default. @@ -84,6 +86,13 @@ mod common { .unwrap_or_else(|_| format!("http://localhost:{DEFAULT_GCS_PORT}")) } + /// Returns the HDFS NameNode endpoint (e.g. `hdfs://localhost:8020`). + /// Checks ICEBERG_TEST_HDFS_ENDPOINT env var, otherwise returns localhost default. + pub fn get_hdfs_endpoint() -> String { + std::env::var(ENV_HDFS_ENDPOINT) + .unwrap_or_else(|_| format!("hdfs://localhost:{DEFAULT_HDFS_NN_PORT}")) + } + /// Helper to clean up a namespace and its tables before a test runs. /// This handles the case where previous test runs left data in the persistent database. pub async fn cleanup_namespace(catalog: &C, ns: &NamespaceIdent) { diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 21920c9ce6..8cd23885b3 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -147,6 +147,50 @@ services: timeout: 5s retries: 5 + # ============================================================================= + # HDFS - single-node NameNode + DataNode for HDFS tests + # ============================================================================= + # Mirrors apache/opendal's fixtures/hdfs/docker-compose-hdfs-cluster.yml: + # same bde2020 images, host networking on both services. Host networking + # is required because hdfs-native 0.13.5 connects to the DataNode by IP + # from `DatanodeIdProto.ip_addr` (not by hostname). On a docker bridge + # the DN would register with an unroutable bridge IP; host networking + # lets it bind directly on the host network namespace so the registered + # address is host-reachable. + # + # This works on Linux CI runners. On macOS / Windows Docker Desktop + # host networking has known issues (e.g. unresolvable VM hostname), so + # the HDFS integration tests are `#[ignore]`d; CI explicitly opts them + # in via `cargo nextest --run-ignored=only` (see .github/workflows/ci.yml). + hdfs-namenode: + image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8 + network_mode: "host" + environment: + CLUSTER_NAME: iceberg-rust-test + CORE_CONF_fs_defaultFS: hdfs://localhost:8020 + CORE_CONF_hadoop_http_staticuser_user: root + HDFS_CONF_dfs_permissions_enabled: false + HDFS_CONF_dfs_replication: 1 + healthcheck: + test: ["CMD-SHELL", "hdfs dfsadmin -safemode get | grep -q OFF"] + interval: 5s + timeout: 5s + retries: 30 + start_period: 30s + + hdfs-datanode: + image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8 + network_mode: "host" + depends_on: + hdfs-namenode: + condition: service_healthy + environment: + SERVICE_PRECONDITION: localhost:8020 + CORE_CONF_fs_defaultFS: hdfs://localhost:8020 + CORE_CONF_hadoop_http_staticuser_user: root + HDFS_CONF_dfs_permissions_enabled: false + HDFS_CONF_dfs_replication: 1 + # ============================================================================= # Fake GCS Server - GCS emulator for GCS tests # =============================================================================