Skip to content

Commit 9f7b237

Browse files
fix
1 parent 3cdf39a commit 9f7b237

23 files changed

Lines changed: 469 additions & 271 deletions

CMakeLists.txt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ option(ICEBERG_BUILD_TESTS "Build tests" ON)
4545
option(ICEBERG_BUILD_BUNDLE "Build the battery included library" ON)
4646
option(ICEBERG_BUILD_REST "Build rest catalog client" ON)
4747
option(ICEBERG_BUILD_REST_INTEGRATION_TESTS "Build rest catalog integration tests" OFF)
48-
option(ICEBERG_S3 "Build with S3 support" ON)
48+
option(ICEBERG_S3 "Build with S3 support" OFF)
4949
option(ICEBERG_ENABLE_ASAN "Enable Address Sanitizer" OFF)
5050
option(ICEBERG_ENABLE_UBSAN "Enable Undefined Behavior Sanitizer" OFF)
5151

@@ -69,6 +69,12 @@ if(ICEBERG_BUILD_REST_INTEGRATION_TESTS AND WIN32)
6969
message(WARNING "Cannot build rest integration test on Windows, turning it off.")
7070
endif()
7171

72+
# ICEBERG_S3 requires ICEBERG_BUILD_BUNDLE
73+
if(NOT ICEBERG_BUILD_BUNDLE AND ICEBERG_S3)
74+
set(ICEBERG_S3 OFF)
75+
message(STATUS "ICEBERG_S3 is disabled because ICEBERG_BUILD_BUNDLE is OFF")
76+
endif()
77+
7278
include(CMakeParseArguments)
7379
include(IcebergBuildUtils)
7480
include(IcebergSanitizer)

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ set(ICEBERG_SOURCES
104104
update/update_statistics.cc
105105
util/bucket_util.cc
106106
util/content_file_util.cc
107+
util/file_io_util.cc
107108
util/conversions.cc
108109
util/decimal.cc
109110
util/gzip_internal.cc
@@ -177,7 +178,7 @@ add_subdirectory(util)
177178
if(ICEBERG_BUILD_BUNDLE)
178179
set(ICEBERG_BUNDLE_SOURCES
179180
arrow/arrow_fs_file_io.cc
180-
arrow/arrow_s3_file_io.cc
181+
arrow/s3/arrow_s3_file_io.cc
181182
arrow/file_io_register.cc
182183
arrow/metadata_column_util.cc
183184
avro/avro_data_util.cc

src/iceberg/arrow/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,5 @@
1616
# under the License.
1717

1818
iceberg_install_all_headers(iceberg/arrow)
19+
20+
add_subdirectory(s3)

src/iceberg/arrow/arrow_file_io.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,16 @@ ICEBERG_BUNDLE_EXPORT std::unique_ptr<FileIO> MakeLocalFileIO();
3838
/// This function initializes the S3 subsystem if not already initialized (thread-safe).
3939
/// The S3 initialization is done once per process using std::call_once.
4040
///
41-
/// \param uri An S3 URI (must start with "s3://") used to validate the scheme.
42-
/// \param properties Optional configuration properties for S3 access. See S3Properties
41+
/// \param properties Configuration properties for S3 access. See S3Properties
4342
/// for available keys (credentials, region, endpoint, timeouts, etc.).
44-
/// \return A FileIO instance for S3 operations, or an error if S3 is not supported
45-
/// or the URI is invalid.
43+
/// \return A FileIO instance for S3 operations, or an error if S3 is not supported.
4644
ICEBERG_BUNDLE_EXPORT Result<std::unique_ptr<FileIO>> MakeS3FileIO(
47-
const std::string& uri,
4845
const std::unordered_map<std::string, std::string>& properties = {});
4946

47+
/// \brief Finalize (clean up) the Arrow S3 subsystem.
48+
///
49+
/// Must be called before process exit if S3 was initialized, otherwise Arrow's
50+
/// static destructors may cause a non-zero exit.
51+
ICEBERG_BUNDLE_EXPORT Status FinalizeS3();
52+
5053
} // namespace iceberg::arrow

src/iceberg/arrow/file_io_register.cc

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,27 +26,23 @@
2626

2727
namespace iceberg::arrow {
2828

29-
void RegisterFileIO() {
29+
void RegisterLocalFileIO() {
3030
static std::once_flag flag;
3131
std::call_once(flag, []() {
32-
// Register Arrow local filesystem FileIO
3332
FileIORegistry::Register(
34-
FileIORegistry::kArrowLocalFileIO,
35-
[](const std::string& /*warehouse*/,
36-
const std::unordered_map<std::string, std::string>& /*properties*/)
37-
-> Result<std::shared_ptr<FileIO>> {
38-
return std::shared_ptr<FileIO>(MakeLocalFileIO());
39-
});
33+
std::string(FileIORegistry::kArrowLocalFileIO),
34+
[](const std::unordered_map<std::string, std::string>& /*properties*/)
35+
-> Result<std::unique_ptr<FileIO>> { return MakeLocalFileIO(); });
36+
});
37+
}
4038

41-
// Register Arrow S3 FileIO
39+
void RegisterS3FileIO() {
40+
static std::once_flag flag;
41+
std::call_once(flag, []() {
4242
FileIORegistry::Register(
43-
FileIORegistry::kArrowS3FileIO,
44-
[](const std::string& warehouse,
45-
const std::unordered_map<std::string, std::string>& properties)
46-
-> Result<std::shared_ptr<FileIO>> {
47-
ICEBERG_ASSIGN_OR_RAISE(auto file_io, MakeS3FileIO(warehouse, properties));
48-
return std::shared_ptr<FileIO>(std::move(file_io));
49-
});
43+
std::string(FileIORegistry::kArrowS3FileIO),
44+
[](const std::unordered_map<std::string, std::string>& properties)
45+
-> Result<std::unique_ptr<FileIO>> { return MakeS3FileIO(properties); });
5046
});
5147
}
5248

src/iceberg/arrow/file_io_register.h

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,10 @@
2626

2727
namespace iceberg::arrow {
2828

29-
/// \brief Register Arrow FileIO implementations (local and S3) into the
30-
/// FileIORegistry.
31-
///
32-
/// This function is idempotent and thread-safe. It registers:
33-
/// - ArrowFileIO (local filesystem)
34-
/// - ArrowS3FileIO (S3 filesystem)
35-
///
36-
/// Must be called before using FileIORegistry::Load() with the built-in
37-
/// implementation names (e.g., from RestCatalog::Make(config)).
38-
ICEBERG_BUNDLE_EXPORT void RegisterFileIO();
29+
/// \brief Register the Arrow local filesystem FileIO into the FileIORegistry.
30+
ICEBERG_BUNDLE_EXPORT void RegisterLocalFileIO();
31+
32+
/// \brief Register the Arrow S3 FileIO into the FileIORegistry.
33+
ICEBERG_BUNDLE_EXPORT void RegisterS3FileIO();
3934

4035
} // namespace iceberg::arrow
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
iceberg_install_all_headers(iceberg/arrow/s3)
Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
#include <cstdlib>
2121
#include <mutex>
22-
#include <stdexcept>
22+
#include <string>
2323

2424
#include <arrow/filesystem/filesystem.h>
2525
#ifdef ICEBERG_S3_ENABLED
@@ -32,32 +32,28 @@
3232
#include "iceberg/arrow/arrow_file_io.h"
3333
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
3434
#include "iceberg/arrow/arrow_status_internal.h"
35-
#include "iceberg/arrow/s3_properties.h"
35+
#include "iceberg/arrow/s3/s3_properties.h"
3636
#include "iceberg/util/macros.h"
37+
#include "iceberg/util/string_util.h"
3738

3839
namespace iceberg::arrow {
3940

4041
namespace {
4142

42-
Status EnsureS3Initialized() {
4343
#if ICEBERG_ARROW_HAS_S3
44+
Status EnsureS3Initialized() {
4445
static std::once_flag init_flag;
4546
static ::arrow::Status init_status = ::arrow::Status::OK();
4647
std::call_once(init_flag, []() {
47-
::arrow::fs::S3GlobalOptions options;
48+
auto options = ::arrow::fs::S3GlobalOptions::Defaults();
4849
init_status = ::arrow::fs::InitializeS3(options);
4950
});
5051
if (!init_status.ok()) {
5152
return std::unexpected(Error{.kind = ::iceberg::arrow::ToErrorKind(init_status),
5253
.message = init_status.ToString()});
5354
}
5455
return {};
55-
#else
56-
return NotImplemented("Arrow S3 support is not enabled");
57-
#endif
5856
}
59-
60-
#if ICEBERG_ARROW_HAS_S3
6157
/// \brief Configure S3Options from a properties map.
6258
///
6359
/// \param properties The configuration properties map.
@@ -67,9 +63,9 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
6763
::arrow::fs::S3Options options;
6864

6965
// Configure credentials
70-
auto access_key_it = properties.find(S3Properties::kAccessKeyId);
71-
auto secret_key_it = properties.find(S3Properties::kSecretAccessKey);
72-
auto session_token_it = properties.find(S3Properties::kSessionToken);
66+
auto access_key_it = properties.find(std::string(S3Properties::kAccessKeyId));
67+
auto secret_key_it = properties.find(std::string(S3Properties::kSecretAccessKey));
68+
auto session_token_it = properties.find(std::string(S3Properties::kSessionToken));
7369

7470
if (access_key_it != properties.end() && secret_key_it != properties.end()) {
7571
if (session_token_it != properties.end()) {
@@ -84,13 +80,13 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
8480
}
8581

8682
// Configure region
87-
auto region_it = properties.find(S3Properties::kRegion);
83+
auto region_it = properties.find(std::string(S3Properties::kRegion));
8884
if (region_it != properties.end()) {
8985
options.region = region_it->second;
9086
}
9187

9288
// Configure endpoint (for MinIO, LocalStack, etc.)
93-
auto endpoint_it = properties.find(S3Properties::kEndpoint);
89+
auto endpoint_it = properties.find(std::string(S3Properties::kEndpoint));
9490
if (endpoint_it != properties.end()) {
9591
options.endpoint_override = endpoint_it->second;
9692
} else {
@@ -106,36 +102,30 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
106102
}
107103
}
108104

109-
auto path_style_it = properties.find(S3Properties::kPathStyleAccess);
105+
auto path_style_it = properties.find(std::string(S3Properties::kPathStyleAccess));
110106
if (path_style_it != properties.end() && path_style_it->second == "true") {
111107
options.force_virtual_addressing = false;
112108
}
113109

114110
// Configure SSL
115-
auto ssl_it = properties.find(S3Properties::kSslEnabled);
111+
auto ssl_it = properties.find(std::string(S3Properties::kSslEnabled));
116112
if (ssl_it != properties.end() && ssl_it->second == "false") {
117113
options.scheme = "http";
118114
}
119115

120116
// Configure timeouts
121-
auto connect_timeout_it = properties.find(S3Properties::kConnectTimeoutMs);
117+
auto connect_timeout_it = properties.find(std::string(S3Properties::kConnectTimeoutMs));
122118
if (connect_timeout_it != properties.end()) {
123-
try {
124-
options.connect_timeout = std::stod(connect_timeout_it->second) / 1000.0;
125-
} catch (const std::exception& e) {
126-
return InvalidArgument("Invalid {}: '{}' ({})", S3Properties::kConnectTimeoutMs,
127-
connect_timeout_it->second, e.what());
128-
}
119+
ICEBERG_ASSIGN_OR_RAISE(auto timeout_ms,
120+
StringUtils::ParseNumber<double>(connect_timeout_it->second));
121+
options.connect_timeout = timeout_ms / 1000.0;
129122
}
130123

131-
auto socket_timeout_it = properties.find(S3Properties::kSocketTimeoutMs);
124+
auto socket_timeout_it = properties.find(std::string(S3Properties::kSocketTimeoutMs));
132125
if (socket_timeout_it != properties.end()) {
133-
try {
134-
options.request_timeout = std::stod(socket_timeout_it->second) / 1000.0;
135-
} catch (const std::exception& e) {
136-
return InvalidArgument("Invalid {}: '{}' ({})", S3Properties::kSocketTimeoutMs,
137-
socket_timeout_it->second, e.what());
138-
}
126+
ICEBERG_ASSIGN_OR_RAISE(auto timeout_ms,
127+
StringUtils::ParseNumber<double>(socket_timeout_it->second));
128+
options.request_timeout = timeout_ms / 1000.0;
139129
}
140130

141131
return options;
@@ -145,13 +135,9 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
145135
} // namespace
146136

147137
Result<std::unique_ptr<FileIO>> MakeS3FileIO(
148-
const std::string& uri,
149138
const std::unordered_map<std::string, std::string>& properties) {
150-
if (!uri.starts_with("s3://")) {
151-
return InvalidArgument("S3 URI must start with s3://");
152-
}
153139
#if !ICEBERG_ARROW_HAS_S3
154-
return NotImplemented("Arrow S3 support is not enabled");
140+
return NotSupported("Arrow S3 support is not enabled");
155141
#else
156142
ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized());
157143

@@ -163,4 +149,14 @@ Result<std::unique_ptr<FileIO>> MakeS3FileIO(
163149
#endif
164150
}
165151

152+
Status FinalizeS3() {
153+
#if ICEBERG_ARROW_HAS_S3
154+
auto status = ::arrow::fs::FinalizeS3();
155+
ICEBERG_ARROW_RETURN_NOT_OK(status);
156+
return {};
157+
#else
158+
return NotSupported("Arrow S3 support is not enabled");
159+
#endif
160+
}
161+
166162
} // namespace iceberg::arrow
Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
#pragma once
2121

22-
#include <string>
22+
#include <string_view>
2323

2424
namespace iceberg::arrow {
2525

@@ -29,24 +29,26 @@ namespace iceberg::arrow {
2929
/// via the Arrow filesystem integration, following the Iceberg spec for
3030
/// S3 configuration properties.
3131
struct S3Properties {
32+
/// S3 URI scheme
33+
static constexpr std::string_view kS3Schema = "s3";
3234
/// AWS access key ID
33-
static constexpr const char* kAccessKeyId = "s3.access-key-id";
35+
static constexpr std::string_view kAccessKeyId = "s3.access-key-id";
3436
/// AWS secret access key
35-
static constexpr const char* kSecretAccessKey = "s3.secret-access-key";
37+
static constexpr std::string_view kSecretAccessKey = "s3.secret-access-key";
3638
/// AWS session token (for temporary credentials)
37-
static constexpr const char* kSessionToken = "s3.session-token";
39+
static constexpr std::string_view kSessionToken = "s3.session-token";
3840
/// AWS region
39-
static constexpr const char* kRegion = "s3.region";
41+
static constexpr std::string_view kRegion = "s3.region";
4042
/// Custom endpoint override (for MinIO, LocalStack, etc.)
41-
static constexpr const char* kEndpoint = "s3.endpoint";
43+
static constexpr std::string_view kEndpoint = "s3.endpoint";
4244
/// Whether to use path-style access (needed for MinIO)
43-
static constexpr const char* kPathStyleAccess = "s3.path-style-access";
45+
static constexpr std::string_view kPathStyleAccess = "s3.path-style-access";
4446
/// Whether SSL is enabled
45-
static constexpr const char* kSslEnabled = "s3.ssl.enabled";
47+
static constexpr std::string_view kSslEnabled = "s3.ssl.enabled";
4648
/// Connection timeout in milliseconds
47-
static constexpr const char* kConnectTimeoutMs = "s3.connect-timeout-ms";
49+
static constexpr std::string_view kConnectTimeoutMs = "s3.connect-timeout-ms";
4850
/// Socket timeout in milliseconds
49-
static constexpr const char* kSocketTimeoutMs = "s3.socket-timeout-ms";
51+
static constexpr std::string_view kSocketTimeoutMs = "s3.socket-timeout-ms";
5052
};
5153

5254
} // namespace iceberg::arrow

src/iceberg/catalog/rest/catalog_properties.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,12 @@ Result<SnapshotMode> RestCatalogProperties::SnapshotLoadingMode() const {
6161
}
6262
}
6363

64+
void RestCatalogProperties::SetFileIO(std::shared_ptr<FileIO> file_io) {
65+
file_io_ = std::move(file_io);
66+
}
67+
68+
const std::shared_ptr<FileIO>& RestCatalogProperties::GetFileIO() const {
69+
return file_io_;
70+
}
71+
6472
} // namespace iceberg::rest

0 commit comments

Comments
 (0)