Skip to content

Commit 3b7cb9b

Browse files
committed
address some issues
1 parent 5d68d1d commit 3b7cb9b

26 files changed

Lines changed: 501 additions & 643 deletions

ci/scripts/build_iceberg.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ CMAKE_ARGS=(
3636
"-DCMAKE_INSTALL_PREFIX=${CMAKE_INSTALL_PREFIX:-${ICEBERG_HOME}}"
3737
"-DICEBERG_BUILD_STATIC=ON"
3838
"-DICEBERG_BUILD_SHARED=ON"
39+
"-DICEBERG_S3=ON"
3940
"-DICEBERG_BUILD_REST_INTEGRATION_TESTS=${build_rest_integration_test}"
4041
)
4142

ci/scripts/start_minio.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ wait_for_minio() {
3535
fi
3636
sleep 1
3737
done
38+
echo "MinIO did not become ready after 30 seconds." >&2
39+
echo "Endpoint: ${MINIO_ENDPOINT}" >&2
40+
if command -v docker >/dev/null 2>&1; then
41+
docker logs "${MINIO_CONTAINER_NAME}" 2>&1 || true
42+
fi
3843
return 1
3944
}
4045

src/iceberg/CMakeLists.txt

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ set(ICEBERG_SOURCES
107107
update/update_statistics.cc
108108
util/bucket_util.cc
109109
util/content_file_util.cc
110-
util/file_io_util.cc
111110
util/conversions.cc
112111
util/decimal.cc
113112
util/gzip_internal.cc
@@ -251,14 +250,17 @@ if(ICEBERG_BUILD_BUNDLE)
251250
OUTPUTS
252251
ICEBERG_BUNDLE_LIBRARIES)
253252

254-
if(ICEBERG_S3)
255-
foreach(target iceberg_bundle_static iceberg_bundle_shared)
256-
if(TARGET ${target})
253+
foreach(target iceberg_bundle_static iceberg_bundle_shared)
254+
if(TARGET ${target})
255+
if(ICEBERG_S3)
257256
target_compile_definitions(${target}
258257
PUBLIC "$<BUILD_INTERFACE:ICEBERG_S3_ENABLED=1>")
258+
else()
259+
target_compile_definitions(${target}
260+
PUBLIC "$<BUILD_INTERFACE:ICEBERG_S3_ENABLED=0>")
259261
endif()
260-
endforeach()
261-
endif()
262+
endif()
263+
endforeach()
262264

263265
add_subdirectory(arrow)
264266
add_subdirectory(avro)

src/iceberg/arrow/arrow_file_io.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ ICEBERG_BUNDLE_EXPORT std::unique_ptr<FileIO> MakeLocalFileIO();
3636
/// \brief Create an S3 FileIO backed by Arrow's S3FileSystem.
3737
///
3838
/// This function initializes the S3 subsystem if not already initialized (thread-safe).
39-
/// The S3 initialization is done once per process using std::call_once.
39+
/// The S3 initialization is cached once per process.
4040
///
4141
/// \param properties Configuration properties for S3 access. See S3Properties
4242
/// for available keys (credentials, region, endpoint, timeouts, etc.).

src/iceberg/arrow/file_io_register.cc

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,44 @@
1818
#include "iceberg/arrow/file_io_register.h"
1919

2020
#include <mutex>
21+
#include <string>
2122

2223
#include "iceberg/arrow/arrow_file_io.h"
23-
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
2424
#include "iceberg/file_io_registry.h"
25-
#include "iceberg/util/macros.h"
2625

2726
namespace iceberg::arrow {
2827

28+
namespace {
29+
2930
void RegisterLocalFileIO() {
30-
static std::once_flag flag;
31-
std::call_once(flag, []() {
32-
FileIORegistry::Register(
33-
std::string(FileIORegistry::kArrowLocalFileIO),
34-
[](const std::unordered_map<std::string, std::string>& /*properties*/)
35-
-> Result<std::unique_ptr<FileIO>> { return MakeLocalFileIO(); });
36-
});
31+
FileIORegistry::Register(
32+
std::string(FileIORegistry::kArrowLocalFileIO),
33+
[](const std::unordered_map<std::string, std::string>& /*properties*/)
34+
-> Result<std::unique_ptr<FileIO>> { return MakeLocalFileIO(); });
3735
}
3836

3937
void RegisterS3FileIO() {
38+
#if ICEBERG_S3_ENABLED
39+
FileIORegistry::Register(
40+
std::string(FileIORegistry::kArrowS3FileIO),
41+
[](const std::unordered_map<std::string, std::string>& properties)
42+
-> Result<std::unique_ptr<FileIO>> { return MakeS3FileIO(properties); });
43+
#endif
44+
}
45+
46+
} // namespace
47+
48+
void EnsureArrowFileIOsRegistered() {
4049
static std::once_flag flag;
4150
std::call_once(flag, []() {
42-
FileIORegistry::Register(
43-
std::string(FileIORegistry::kArrowS3FileIO),
44-
[](const std::unordered_map<std::string, std::string>& properties)
45-
-> Result<std::unique_ptr<FileIO>> { return MakeS3FileIO(properties); });
51+
RegisterLocalFileIO();
52+
RegisterS3FileIO();
4653
});
4754
}
4855

56+
[[maybe_unused]] const bool kArrowFileIOsRegistered = []() {
57+
EnsureArrowFileIOsRegistered();
58+
return true;
59+
}();
60+
4961
} // namespace iceberg::arrow

src/iceberg/arrow/file_io_register.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@
2626

2727
namespace iceberg::arrow {
2828

29-
/// \brief Register the Arrow local filesystem FileIO into the FileIORegistry.
30-
ICEBERG_BUNDLE_EXPORT void RegisterLocalFileIO();
31-
32-
/// \brief Register the Arrow S3 FileIO into the FileIORegistry.
33-
ICEBERG_BUNDLE_EXPORT void RegisterS3FileIO();
29+
/// \brief Register built-in Arrow FileIO implementations into the FileIORegistry.
30+
///
31+
/// This operation is idempotent and safe to call multiple times.
32+
ICEBERG_BUNDLE_EXPORT void EnsureArrowFileIOsRegistered();
3433

3534
} // namespace iceberg::arrow

src/iceberg/arrow/s3/arrow_s3_file_io.cc

Lines changed: 61 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,13 @@
1818
*/
1919

2020
#include <cstdlib>
21-
#include <mutex>
21+
#include <optional>
2222
#include <string>
23+
#include <string_view>
2324

2425
#include <arrow/filesystem/filesystem.h>
25-
#ifdef ICEBERG_S3_ENABLED
26+
#if ICEBERG_S3_ENABLED
2627
# include <arrow/filesystem/s3fs.h>
27-
# define ICEBERG_ARROW_HAS_S3 1
28-
#else
29-
# define ICEBERG_ARROW_HAS_S3 0
3028
#endif
3129

3230
#include "iceberg/arrow/arrow_file_io.h"
@@ -40,55 +38,77 @@ namespace iceberg::arrow {
4038

4139
namespace {
4240

43-
#if ICEBERG_ARROW_HAS_S3
41+
#if ICEBERG_S3_ENABLED
42+
const std::string* FindProperty(
43+
const std::unordered_map<std::string, std::string>& properties,
44+
std::string_view key) {
45+
auto it = properties.find(std::string(key));
46+
return it == properties.end() ? nullptr : &it->second;
47+
}
48+
49+
Result<std::optional<bool>> ParseOptionalBool(
50+
const std::unordered_map<std::string, std::string>& properties,
51+
std::string_view key) {
52+
const auto* value = FindProperty(properties, key);
53+
if (value == nullptr) {
54+
return std::nullopt;
55+
}
56+
if (*value == "true") {
57+
return true;
58+
}
59+
if (*value == "false") {
60+
return false;
61+
}
62+
return InvalidArgument(R"("{}" must be "true" or "false")", key);
63+
}
64+
4465
Status EnsureS3Initialized() {
45-
static std::once_flag init_flag;
46-
static ::arrow::Status init_status = ::arrow::Status::OK();
47-
std::call_once(init_flag, []() {
66+
static const ::arrow::Status init_status = []() {
4867
auto options = ::arrow::fs::S3GlobalOptions::Defaults();
49-
init_status = ::arrow::fs::InitializeS3(options);
50-
});
68+
return ::arrow::fs::InitializeS3(options);
69+
}();
5170
if (!init_status.ok()) {
5271
return std::unexpected(Error{.kind = ::iceberg::arrow::ToErrorKind(init_status),
5372
.message = init_status.ToString()});
5473
}
5574
return {};
5675
}
76+
5777
/// \brief Configure S3Options from a properties map.
5878
///
5979
/// \param properties The configuration properties map.
6080
/// \return Configured S3Options.
6181
Result<::arrow::fs::S3Options> ConfigureS3Options(
6282
const std::unordered_map<std::string, std::string>& properties) {
63-
::arrow::fs::S3Options options;
83+
auto options = ::arrow::fs::S3Options::Defaults();
6484

6585
// Configure credentials
66-
auto access_key_it = properties.find(std::string(S3Properties::kAccessKeyId));
67-
auto secret_key_it = properties.find(std::string(S3Properties::kSecretAccessKey));
68-
auto session_token_it = properties.find(std::string(S3Properties::kSessionToken));
69-
70-
if (access_key_it != properties.end() && secret_key_it != properties.end()) {
71-
if (session_token_it != properties.end()) {
72-
options.ConfigureAccessKey(access_key_it->second, secret_key_it->second,
73-
session_token_it->second);
86+
const auto* access_key = FindProperty(properties, S3Properties::kAccessKeyId);
87+
const auto* secret_key = FindProperty(properties, S3Properties::kSecretAccessKey);
88+
const auto* session_token = FindProperty(properties, S3Properties::kSessionToken);
89+
90+
if ((access_key == nullptr) != (secret_key == nullptr)) {
91+
return InvalidArgument(
92+
"S3 client access key ID and secret access key must be set at the same time");
93+
}
94+
if (access_key != nullptr) {
95+
if (session_token != nullptr) {
96+
options.ConfigureAccessKey(*access_key, *secret_key, *session_token);
7497
} else {
75-
options.ConfigureAccessKey(access_key_it->second, secret_key_it->second);
98+
options.ConfigureAccessKey(*access_key, *secret_key);
7699
}
77-
} else {
78-
// Use default credential chain (environment, instance profile, etc.)
79-
options.ConfigureDefaultCredentials();
80100
}
81101

82102
// Configure region
83-
auto region_it = properties.find(std::string(S3Properties::kRegion));
84-
if (region_it != properties.end()) {
85-
options.region = region_it->second;
103+
if (const auto* region = FindProperty(properties, S3Properties::kRegion);
104+
region != nullptr) {
105+
options.region = *region;
86106
}
87107

88108
// Configure endpoint (for MinIO, LocalStack, etc.)
89-
auto endpoint_it = properties.find(std::string(S3Properties::kEndpoint));
90-
if (endpoint_it != properties.end()) {
91-
options.endpoint_override = endpoint_it->second;
109+
if (const auto* endpoint = FindProperty(properties, S3Properties::kEndpoint);
110+
endpoint != nullptr) {
111+
options.endpoint_override = *endpoint;
92112
} else {
93113
// Fall back to AWS standard environment variables for endpoint override
94114
const char* s3_endpoint_env = std::getenv("AWS_ENDPOINT_URL_S3");
@@ -102,14 +122,16 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
102122
}
103123
}
104124

105-
auto path_style_it = properties.find(std::string(S3Properties::kPathStyleAccess));
106-
if (path_style_it != properties.end() && path_style_it->second == "true") {
107-
options.force_virtual_addressing = false;
125+
ICEBERG_ASSIGN_OR_RAISE(const auto path_style_access,
126+
ParseOptionalBool(properties, S3Properties::kPathStyleAccess));
127+
if (path_style_access.has_value()) {
128+
options.force_virtual_addressing = !*path_style_access;
108129
}
109130

110131
// Configure SSL
111-
auto ssl_it = properties.find(std::string(S3Properties::kSslEnabled));
112-
if (ssl_it != properties.end() && ssl_it->second == "false") {
132+
ICEBERG_ASSIGN_OR_RAISE(const auto ssl_enabled,
133+
ParseOptionalBool(properties, S3Properties::kSslEnabled));
134+
if (ssl_enabled.has_value() && !*ssl_enabled) {
113135
options.scheme = "http";
114136
}
115137

@@ -136,21 +158,21 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
136158

137159
Result<std::unique_ptr<FileIO>> MakeS3FileIO(
138160
const std::unordered_map<std::string, std::string>& properties) {
139-
#if !ICEBERG_ARROW_HAS_S3
140-
return NotSupported("Arrow S3 support is not enabled");
141-
#else
161+
#if ICEBERG_S3_ENABLED
142162
ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized());
143163

144164
// Configure S3 options from properties (uses default credentials if empty)
145165
ICEBERG_ASSIGN_OR_RAISE(auto options, ConfigureS3Options(properties));
146166
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::S3FileSystem::Make(options));
147167

148168
return std::make_unique<ArrowFileSystemFileIO>(std::move(fs));
169+
#else
170+
return NotSupported("Arrow S3 support is not enabled");
149171
#endif
150172
}
151173

152174
Status FinalizeS3() {
153-
#if ICEBERG_ARROW_HAS_S3
175+
#if ICEBERG_S3_ENABLED
154176
auto status = ::arrow::fs::FinalizeS3();
155177
ICEBERG_ARROW_RETURN_NOT_OK(status);
156178
return {};

src/iceberg/catalog/rest/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ set(ICEBERG_REST_SOURCES
3030
json_serde.cc
3131
resource_paths.cc
3232
rest_catalog.cc
33+
rest_file_io.cc
3334
rest_util.cc
3435
types.cc)
3536

src/iceberg/catalog/rest/catalog_properties.cc

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,4 @@ Result<SnapshotMode> RestCatalogProperties::SnapshotLoadingMode() const {
6161
}
6262
}
6363

64-
void RestCatalogProperties::SetFileIO(std::shared_ptr<FileIO> file_io) {
65-
file_io_ = std::move(file_io);
66-
}
67-
68-
const std::shared_ptr<FileIO>& RestCatalogProperties::GetFileIO() const {
69-
return file_io_;
70-
}
71-
7264
} // namespace iceberg::rest

src/iceberg/catalog/rest/catalog_properties.h

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919

2020
#pragma once
2121

22-
#include <memory>
2322
#include <string>
2423
#include <unordered_map>
2524

2625
#include "iceberg/catalog/rest/iceberg_rest_export.h"
27-
#include "iceberg/file_io.h"
2826
#include "iceberg/result.h"
2927
#include "iceberg/util/config.h"
3028

@@ -49,6 +47,8 @@ class ICEBERG_REST_EXPORT RestCatalogProperties
4947
inline static Entry<std::string> kName{"name", ""};
5048
/// \brief The warehouse path.
5149
inline static Entry<std::string> kWarehouse{"warehouse", ""};
50+
/// \brief The FileIO implementation name.
51+
inline static Entry<std::string> kIOImpl{"io-impl", ""};
5252
/// \brief The optional prefix for REST API paths.
5353
inline static Entry<std::string> kPrefix{"prefix", ""};
5454
/// \brief The encoded separator used to join namespace levels in REST paths.
@@ -77,16 +77,6 @@ class ICEBERG_REST_EXPORT RestCatalogProperties
7777
/// "REFS", or an error if the value is invalid. Parsing is
7878
/// case-insensitive to match Java behavior.
7979
Result<SnapshotMode> SnapshotLoadingMode() const;
80-
81-
/// \brief Set a custom FileIO instance. If not set, FileIO will be
82-
/// auto-detected from warehouse URI scheme or "io-impl" property.
83-
void SetFileIO(std::shared_ptr<FileIO> file_io);
84-
85-
/// \brief Get the configured FileIO instance (may be nullptr).
86-
const std::shared_ptr<FileIO>& GetFileIO() const;
87-
88-
private:
89-
std::shared_ptr<FileIO> file_io_;
9080
};
9181

9282
} // namespace iceberg::rest

0 commit comments

Comments
 (0)