Skip to content

Commit 71606d7

Browse files
MisterRaindropSreesh Maheshwar
authored andcommitted
feat: S3 file IO support with Arrow filesystem
Add S3FileIO backed by Arrow's S3FileSystem, adapted from apache#548. - S3Properties: standard Iceberg S3 property keys (credentials, region, endpoint, etc.) - MakeS3FileIO: creates Arrow S3FileSystem from properties map with thread-safe init - ResolvePath: strips URI scheme for Arrow filesystem compatibility - ICEBERG_S3 CMake option gating Arrow S3 support with #ifdef fallback Original-Author: liuxiaoyu <xliu19163@gmail.com> Based-On: apache#548
1 parent 42b00d4 commit 71606d7

8 files changed

Lines changed: 223 additions & 3 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ option(ICEBERG_BUILD_TESTS "Build tests" ON)
4545
option(ICEBERG_BUILD_BUNDLE "Build the battery included library" ON)
4646
option(ICEBERG_BUILD_REST "Build rest catalog client" ON)
4747
option(ICEBERG_BUILD_REST_INTEGRATION_TESTS "Build rest catalog integration tests" OFF)
48+
option(ICEBERG_S3 "Build with S3 support" ON)
4849
option(ICEBERG_ENABLE_ASAN "Enable Address Sanitizer" OFF)
4950
option(ICEBERG_ENABLE_UBSAN "Enable Undefined Behavior Sanitizer" OFF)
5051

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ function(resolve_arrow_dependency)
102102
# Work around undefined symbol: arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*)
103103
set(ARROW_IPC ON)
104104
set(ARROW_FILESYSTEM ON)
105+
set(ARROW_S3 ${ICEBERG_S3})
105106
set(ARROW_JSON ON)
106107
set(ARROW_PARQUET ON)
107108
set(ARROW_SIMD_LEVEL "NONE")

src/iceberg/CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ if(ICEBERG_BUILD_BUNDLE)
197197
parquet/parquet_schema_util.cc
198198
parquet/parquet_writer.cc)
199199

200+
list(APPEND ICEBERG_BUNDLE_SOURCES arrow/arrow_s3_file_io.cc)
201+
200202
# Libraries to link with exported libiceberg_bundle.{so,a}.
201203
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)
202204
set(ICEBERG_BUNDLE_SHARED_BUILD_INTERFACE_LIBS)
@@ -247,6 +249,12 @@ if(ICEBERG_BUILD_BUNDLE)
247249
OUTPUTS
248250
ICEBERG_BUNDLE_LIBRARIES)
249251

252+
if(ICEBERG_S3)
253+
foreach(LIB ${ICEBERG_BUNDLE_LIBRARIES})
254+
target_compile_definitions(${LIB} PRIVATE ICEBERG_S3_ENABLED)
255+
endforeach()
256+
endif()
257+
250258
add_subdirectory(arrow)
251259
add_subdirectory(avro)
252260
add_subdirectory(parquet)

src/iceberg/arrow/arrow_file_io.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,28 @@
2020
#pragma once
2121

2222
#include <memory>
23+
#include <string>
24+
#include <unordered_map>
2325

2426
#include "iceberg/file_io.h"
2527
#include "iceberg/iceberg_bundle_export.h"
28+
#include "iceberg/result.h"
2629

2730
namespace iceberg::arrow {
2831

2932
ICEBERG_BUNDLE_EXPORT std::unique_ptr<FileIO> MakeMockFileIO();
3033

3134
ICEBERG_BUNDLE_EXPORT std::unique_ptr<FileIO> MakeLocalFileIO();
3235

36+
/// \brief Create an S3 FileIO backed by Arrow's S3FileSystem.
37+
///
38+
/// Thread-safe: initializes the Arrow S3 subsystem on first call via std::once_flag.
39+
///
40+
/// \param properties S3 configuration properties. See S3Properties for available keys
41+
/// (credentials, region, endpoint, path-style, SSL, etc.). If credentials are not
42+
/// provided, falls back to the default AWS credential chain.
43+
/// \return A FileIO instance for S3 operations, or an error if initialization fails.
44+
ICEBERG_BUNDLE_EXPORT Result<std::unique_ptr<FileIO>> MakeS3FileIO(
45+
const std::unordered_map<std::string, std::string>& properties);
46+
3347
} // namespace iceberg::arrow

src/iceberg/arrow/arrow_fs_file_io.cc

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,25 @@
2525
#include "iceberg/arrow/arrow_file_io.h"
2626
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
2727
#include "iceberg/arrow/arrow_status_internal.h"
28+
#include "iceberg/util/macros.h"
2829

2930
namespace iceberg::arrow {
3031

32+
Result<std::string> ArrowFileSystemFileIO::ResolvePath(const std::string& file_location) {
33+
// Arrow's S3FileSystem expects paths as "bucket/key", not "s3://bucket/key".
34+
// Strip the scheme if present so the filesystem receives a bare path.
35+
static constexpr std::string_view kSchemeDelimiter = "://";
36+
if (auto pos = file_location.find(kSchemeDelimiter); pos != std::string::npos) {
37+
return file_location.substr(pos + kSchemeDelimiter.size());
38+
}
39+
return file_location;
40+
}
41+
3142
/// \brief Read the content of the file at the given location.
3243
Result<std::string> ArrowFileSystemFileIO::ReadFile(const std::string& file_location,
3344
std::optional<size_t> length) {
34-
::arrow::fs::FileInfo file_info(file_location);
45+
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
46+
::arrow::fs::FileInfo file_info(path);
3547
if (length.has_value()) {
3648
file_info.set_size(length.value());
3749
}
@@ -47,6 +59,10 @@ Result<std::string> ArrowFileSystemFileIO::ReadFile(const std::string& file_loca
4759
ICEBERG_ARROW_ASSIGN_OR_RETURN(
4860
auto read_bytes,
4961
file->Read(read_length, reinterpret_cast<uint8_t*>(&content[offset])));
62+
if (read_bytes == 0) {
63+
return IOError("Unexpected EOF reading {}: got {} of {} bytes", file_location,
64+
offset, file_size);
65+
}
5066
remain -= read_bytes;
5167
offset += read_bytes;
5268
}
@@ -57,7 +73,8 @@ Result<std::string> ArrowFileSystemFileIO::ReadFile(const std::string& file_loca
5773
/// \brief Write the given content to the file at the given location.
5874
Status ArrowFileSystemFileIO::WriteFile(const std::string& file_location,
5975
std::string_view content) {
60-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(file_location));
76+
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
77+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(path));
6178
ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size()));
6279
ICEBERG_ARROW_RETURN_NOT_OK(file->Flush());
6380
ICEBERG_ARROW_RETURN_NOT_OK(file->Close());
@@ -66,7 +83,8 @@ Status ArrowFileSystemFileIO::WriteFile(const std::string& file_location,
6683

6784
/// \brief Delete a file at the given location.
6885
Status ArrowFileSystemFileIO::DeleteFile(const std::string& file_location) {
69-
ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location));
86+
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
87+
ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(path));
7088
return {};
7189
}
7290

src/iceberg/arrow/arrow_fs_file_io_internal.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,18 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
5656
const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return arrow_fs_; }
5757

5858
private:
59+
/// \brief Resolve a file location to a filesystem path.
60+
///
61+
/// If the location contains a URI scheme (e.g., "s3://bucket/key"), strips
62+
/// the scheme to produce a bare path ("bucket/key") that Arrow's S3FileSystem
63+
/// expects. Returns the location unchanged if no scheme is present.
64+
///
65+
/// TODO(apache/iceberg-cpp#548): This only applies when called through
66+
/// FileIO::ReadFile/WriteFile/DeleteFile. The Avro reader/writer bypass FileIO
67+
/// and call fs()->OpenInputFile() directly, requiring separate URI stripping.
68+
/// See avro_reader.cc and avro_writer.cc for the workaround.
69+
Result<std::string> ResolvePath(const std::string& file_location);
70+
5971
std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_;
6072
};
6173

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/arrow/arrow_file_io.h"
21+
#include "iceberg/result.h"
22+
23+
#ifdef ICEBERG_S3_ENABLED
24+
25+
# include <mutex>
26+
27+
# include <arrow/filesystem/s3fs.h>
28+
29+
# include "iceberg/arrow/arrow_fs_file_io_internal.h"
30+
# include "iceberg/arrow/arrow_status_internal.h"
31+
# include "iceberg/arrow/s3_properties.h"
32+
# include "iceberg/util/macros.h"
33+
34+
namespace iceberg::arrow {
35+
36+
namespace {
37+
38+
Status EnsureS3Initialized() {
39+
static std::once_flag init_flag;
40+
static ::arrow::Status init_status;
41+
std::call_once(init_flag, [] {
42+
// TODO(smaheshwar): support options from ::arrow::fs::S3GlobalOptions when needed
43+
init_status = ::arrow::fs::EnsureS3Initialized();
44+
});
45+
if (!init_status.ok()) {
46+
return IOError("Arrow S3 initialization failed: {}", init_status.ToString());
47+
}
48+
return {};
49+
}
50+
51+
std::string_view GetProperty(
52+
const std::unordered_map<std::string, std::string>& properties,
53+
std::string_view key) {
54+
if (auto it = properties.find(std::string(key)); it != properties.end()) {
55+
return it->second;
56+
}
57+
return {};
58+
}
59+
60+
Result<::arrow::fs::S3Options> ConfigureS3Options(
61+
const std::unordered_map<std::string, std::string>& properties) {
62+
::arrow::fs::S3Options options;
63+
64+
auto access_key = GetProperty(properties, S3Properties::kAccessKeyId);
65+
auto secret_key = GetProperty(properties, S3Properties::kSecretAccessKey);
66+
auto session_token = GetProperty(properties, S3Properties::kSessionToken);
67+
68+
if (!access_key.empty() && !secret_key.empty()) {
69+
if (!session_token.empty()) {
70+
options.ConfigureAccessKey(std::string(access_key), std::string(secret_key),
71+
std::string(session_token));
72+
} else {
73+
options.ConfigureAccessKey(std::string(access_key), std::string(secret_key));
74+
}
75+
} else {
76+
options.ConfigureDefaultCredentials();
77+
}
78+
79+
if (auto region = GetProperty(properties, S3Properties::kRegion); !region.empty()) {
80+
options.region = std::string(region);
81+
}
82+
83+
if (auto endpoint = GetProperty(properties, S3Properties::kEndpoint);
84+
!endpoint.empty()) {
85+
options.endpoint_override = std::string(endpoint);
86+
}
87+
88+
if (GetProperty(properties, S3Properties::kPathStyleAccess) == "true") {
89+
options.force_virtual_addressing = false;
90+
}
91+
92+
if (GetProperty(properties, S3Properties::kSslEnabled) == "false") {
93+
options.scheme = "http";
94+
}
95+
96+
return options;
97+
}
98+
99+
} // namespace
100+
101+
// TODO(smaheshwar): Expose a FinalizeS3() wrapper so users don't need arrow headers
102+
// directly. Arrow's S3 subsystem should be finalized at shutdown.
103+
Result<std::unique_ptr<FileIO>> MakeS3FileIO(
104+
const std::unordered_map<std::string, std::string>& properties) {
105+
ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized());
106+
107+
ICEBERG_ASSIGN_OR_RAISE(auto options, ConfigureS3Options(properties));
108+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::S3FileSystem::Make(options));
109+
110+
return std::make_unique<ArrowFileSystemFileIO>(std::move(fs));
111+
}
112+
113+
} // namespace iceberg::arrow
114+
115+
#else // !ICEBERG_S3_ENABLED
116+
117+
namespace iceberg::arrow {
118+
119+
Result<std::unique_ptr<FileIO>> MakeS3FileIO(
120+
const std::unordered_map<std::string, std::string>& /*properties*/) {
121+
return NotSupported("Arrow S3 support is not enabled");
122+
}
123+
124+
} // namespace iceberg::arrow
125+
126+
#endif // ICEBERG_S3_ENABLED

src/iceberg/arrow/s3_properties.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <string_view>
23+
24+
namespace iceberg::arrow {
25+
26+
/// \brief Iceberg S3 file IO property keys.
27+
struct S3Properties {
28+
static constexpr std::string_view kAccessKeyId = "s3.access-key-id";
29+
static constexpr std::string_view kSecretAccessKey = "s3.secret-access-key";
30+
static constexpr std::string_view kSessionToken = "s3.session-token";
31+
static constexpr std::string_view kRegion = "s3.region";
32+
static constexpr std::string_view kEndpoint = "s3.endpoint";
33+
static constexpr std::string_view kPathStyleAccess = "s3.path-style-access";
34+
static constexpr std::string_view kSslEnabled = "s3.ssl.enabled";
35+
36+
static constexpr std::string_view kS3Scheme = "s3://";
37+
static constexpr std::string_view kS3aScheme = "s3a://";
38+
};
39+
40+
} // namespace iceberg::arrow

0 commit comments

Comments
 (0)