Skip to content

Commit 7063180

Browse files
authored
impl(rest): add completion queue and thread types with no grpc deps (#16078)
1 parent efaacf4 commit 7063180

12 files changed

Lines changed: 523 additions & 33 deletions

google/cloud/google_cloud_cpp_common.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ google_cloud_cpp_common_hdrs = [
7474
"internal/retry_info.h",
7575
"internal/retry_loop_helpers.h",
7676
"internal/retry_policy_impl.h",
77+
"internal/run_async_base.h",
7778
"internal/service_endpoint.h",
7879
"internal/sha256_hash.h",
7980
"internal/sha256_hmac.h",

google/cloud/google_cloud_cpp_common.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ add_library(
117117
internal/retry_loop_helpers.h
118118
internal/retry_policy_impl.cc
119119
internal/retry_policy_impl.h
120+
internal/run_async_base.h
120121
internal/service_endpoint.cc
121122
internal/service_endpoint.h
122123
internal/sha256_hash.cc

google/cloud/google_cloud_cpp_rest_internal.bzl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ google_cloud_cpp_rest_internal_hdrs = [
6565
"internal/rest_opentelemetry.h",
6666
"internal/rest_options.h",
6767
"internal/rest_parse_json_error.h",
68+
"internal/rest_pure_background_threads_impl.h",
69+
"internal/rest_pure_completion_queue_impl.h",
6870
"internal/rest_request.h",
6971
"internal/rest_response.h",
7072
"internal/rest_retry_loop.h",
@@ -121,6 +123,8 @@ google_cloud_cpp_rest_internal_srcs = [
121123
"internal/rest_lro_helpers.cc",
122124
"internal/rest_opentelemetry.cc",
123125
"internal/rest_parse_json_error.cc",
126+
"internal/rest_pure_background_threads_impl.cc",
127+
"internal/rest_pure_completion_queue_impl.cc",
124128
"internal/rest_request.cc",
125129
"internal/rest_response.cc",
126130
"internal/rest_set_metadata.cc",

google/cloud/google_cloud_cpp_rest_internal.cmake

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ add_library(
113113
internal/rest_options.h
114114
internal/rest_parse_json_error.cc
115115
internal/rest_parse_json_error.h
116+
internal/rest_pure_background_threads_impl.cc
117+
internal/rest_pure_background_threads_impl.h
118+
internal/rest_pure_completion_queue_impl.cc
119+
internal/rest_pure_completion_queue_impl.h
116120
internal/rest_request.cc
117121
internal/rest_request.h
118122
internal/rest_response.cc

google/cloud/internal/completion_queue_impl.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "google/cloud/async_operation.h"
1919
#include "google/cloud/future.h"
2020
#include "google/cloud/internal/invoke_result.h"
21+
#include "google/cloud/internal/run_async_base.h"
2122
#include "google/cloud/internal/throw_delegate.h"
2223
#include "google/cloud/status_or.h"
2324
#include "google/cloud/version.h"
@@ -30,12 +31,6 @@ namespace cloud {
3031
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3132
namespace internal {
3233

33-
// Type erase the callables in RunAsync()
34-
struct RunAsyncBase {
35-
virtual ~RunAsyncBase() = default;
36-
virtual void exec() = 0;
37-
};
38-
3934
/**
4035
* The implementation details for `CompletionQueue`.
4136
*

google/cloud/internal/rest_completion_queue_impl.cc

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,50 +13,35 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/internal/rest_completion_queue_impl.h"
16-
#include "google/cloud/internal/timer_queue.h"
1716

1817
namespace google {
1918
namespace cloud {
2019
namespace rest_internal {
2120
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2221

2322
RestCompletionQueueImpl::RestCompletionQueueImpl()
24-
: tq_(internal::TimerQueue::Create()) {}
23+
: impl_(std::make_shared<RestPureCompletionQueueImpl>()) {}
2524

26-
void RestCompletionQueueImpl::Run() { tq_->Service(); }
25+
void RestCompletionQueueImpl::Run() { impl_->Run(); }
2726

28-
void RestCompletionQueueImpl::Shutdown() { tq_->Shutdown(); }
27+
void RestCompletionQueueImpl::Shutdown() { impl_->Shutdown(); }
2928

3029
void RestCompletionQueueImpl::CancelAll() {}
3130

3231
future<StatusOr<std::chrono::system_clock::time_point>>
3332
RestCompletionQueueImpl::MakeDeadlineTimer(
3433
std::chrono::system_clock::time_point deadline) {
35-
return tq_->Schedule(deadline);
34+
return impl_->MakeDeadlineTimer(deadline);
3635
}
3736

3837
future<StatusOr<std::chrono::system_clock::time_point>>
3938
RestCompletionQueueImpl::MakeRelativeTimer(std::chrono::nanoseconds duration) {
40-
using std::chrono::system_clock;
41-
auto d = std::chrono::duration_cast<system_clock::duration>(duration);
42-
if (d < duration) d += system_clock::duration{1};
43-
return MakeDeadlineTimer(system_clock::now() + d);
39+
return impl_->MakeRelativeTimer(duration);
4440
}
4541

46-
// Use an "immediately" expiring timer in order to get the thread(s) servicing
47-
// the TimerQueue to execute the function. However, if the timer
48-
// expires before .then() is invoked, the lambda will be immediately called and
49-
// the passing of execution to the queue servicing thread will not occur.
5042
void RestCompletionQueueImpl::RunAsync(
5143
std::unique_ptr<internal::RunAsyncBase> function) {
52-
++run_async_counter_;
53-
tq_->Schedule([f = std::move(function)](auto) { f->exec(); });
54-
}
55-
56-
void RestCompletionQueueImpl::StartOperation(
57-
std::shared_ptr<internal::AsyncGrpcOperation>,
58-
absl::FunctionRef<void(void*)>) {
59-
GCP_LOG(FATAL) << " function not supported.\n";
44+
impl_->RunAsync(std::move(function));
6045
}
6146

6247
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/internal/rest_completion_queue_impl.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "google/cloud/future.h"
1919
#include "google/cloud/internal/completion_queue_impl.h"
20+
#include "google/cloud/internal/rest_pure_completion_queue_impl.h"
2021
#include "google/cloud/internal/timer_queue.h"
2122
#include "google/cloud/log.h"
2223
#include "google/cloud/status_or.h"
@@ -69,19 +70,18 @@ class RestCompletionQueueImpl final
6970
/// This function is not supported by RestCompletionQueueImpl, but as the
7071
/// function is pure virtual, it must be overridden.
7172
void StartOperation(std::shared_ptr<internal::AsyncGrpcOperation>,
72-
absl::FunctionRef<void(void*)>) override;
73+
absl::FunctionRef<void(void*)>) override {
74+
GCP_LOG(FATAL) << " function not supported.\n";
75+
}
7376

7477
/// The underlying gRPC completion queue, which does not exist.
7578
grpc::CompletionQueue* cq() override { return nullptr; }
7679

7780
/// Some counters for testing and debugging.
78-
std::int64_t run_async_counter() const { return run_async_counter_.load(); }
81+
std::int64_t run_async_counter() const { return impl_->run_async_counter(); }
7982

8083
private:
81-
std::shared_ptr<internal::TimerQueue> tq_;
82-
83-
// These are metrics used in testing.
84-
std::atomic<std::int64_t> run_async_counter_{0};
84+
std::shared_ptr<RestPureCompletionQueueImpl> impl_;
8585
};
8686

8787
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/internal/rest_pure_background_threads_impl.h"
16+
#include "google/cloud/future.h"
17+
#include "google/cloud/internal/call_context.h"
18+
#include "google/cloud/internal/rest_pure_completion_queue_impl.h"
19+
#include "google/cloud/log.h"
20+
#include <algorithm>
21+
#include <system_error>
22+
23+
namespace google {
24+
namespace cloud {
25+
namespace rest_internal {
26+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
27+
28+
AutomaticallyCreatedRestPureBackgroundThreads::
29+
AutomaticallyCreatedRestPureBackgroundThreads(std::size_t thread_count)
30+
: cq_(std::make_shared<RestPureCompletionQueueImpl>()),
31+
pool_(thread_count == 0 ? 1 : thread_count) {
32+
std::generate_n(pool_.begin(), pool_.size(), [this] {
33+
promise<void> started;
34+
auto thread = std::thread(
35+
[](RestPureCompletionQueue cq, promise<void>& started,
36+
internal::CallContext c) {
37+
internal::ScopedCallContext scope(std::move(c));
38+
started.set_value();
39+
cq.Run();
40+
},
41+
cq_, std::ref(started), internal::CallContext{});
42+
started.get_future().wait();
43+
return thread;
44+
});
45+
}
46+
47+
AutomaticallyCreatedRestPureBackgroundThreads::
48+
~AutomaticallyCreatedRestPureBackgroundThreads() {
49+
Shutdown();
50+
}
51+
52+
void AutomaticallyCreatedRestPureBackgroundThreads::Shutdown() {
53+
cq_.Shutdown();
54+
for (auto& t : pool_) {
55+
#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
56+
try {
57+
#endif
58+
t.join();
59+
#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
60+
} catch (std::system_error const& e) {
61+
GCP_LOG(FATAL)
62+
<< "AutomaticallyCreatedRestPureBackgroundThreads::Shutdown: "
63+
<< e.what();
64+
}
65+
#endif
66+
}
67+
pool_.clear();
68+
}
69+
70+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
71+
} // namespace rest_internal
72+
} // namespace cloud
73+
} // namespace google
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_REST_PURE_BACKGROUND_THREADS_IMPL_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_REST_PURE_BACKGROUND_THREADS_IMPL_H
17+
18+
#include "google/cloud/internal/rest_pure_completion_queue_impl.h"
19+
#include "google/cloud/version.h"
20+
#include <thread>
21+
#include <vector>
22+
23+
namespace google {
24+
namespace cloud {
25+
namespace rest_internal {
26+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
27+
28+
/// This interface mimics `google::cloud::BackgroundThreads` except it returns
29+
/// a `RestPureCompletionQueue` instead of a `CompletionQueue`. This is
30+
/// important as `CompletionQueue` has gRPC dependencies we need to avoid in a
31+
/// pure REST library.
32+
class RestPureBackgroundThreads {
33+
public:
34+
virtual ~RestPureBackgroundThreads() = default;
35+
36+
/// The completion queue used for the background operations.
37+
virtual RestPureCompletionQueue cq() const = 0;
38+
};
39+
40+
/// Background threads that run on a RestPureCompletionQueue.
41+
class AutomaticallyCreatedRestPureBackgroundThreads
42+
: public RestPureBackgroundThreads {
43+
public:
44+
explicit AutomaticallyCreatedRestPureBackgroundThreads(
45+
std::size_t thread_count = 1U);
46+
~AutomaticallyCreatedRestPureBackgroundThreads() override;
47+
48+
RestPureCompletionQueue cq() const override { return cq_; }
49+
void Shutdown();
50+
std::size_t pool_size() const { return pool_.size(); }
51+
52+
private:
53+
RestPureCompletionQueue cq_;
54+
std::vector<std::thread> pool_;
55+
};
56+
57+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
58+
} // namespace rest_internal
59+
} // namespace cloud
60+
} // namespace google
61+
62+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_REST_PURE_BACKGROUND_THREADS_IMPL_H
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/internal/rest_pure_completion_queue_impl.h"
16+
#include "google/cloud/internal/timer_queue.h"
17+
18+
namespace google {
19+
namespace cloud {
20+
namespace rest_internal {
21+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
22+
23+
RestPureCompletionQueue::RestPureCompletionQueue()
24+
: impl_(std::make_shared<RestPureCompletionQueueImpl>()) {}
25+
26+
RestPureCompletionQueueImpl::RestPureCompletionQueueImpl()
27+
: tq_(internal::TimerQueue::Create()) {}
28+
29+
void RestPureCompletionQueueImpl::Run() { tq_->Service(); }
30+
31+
void RestPureCompletionQueueImpl::Shutdown() { tq_->Shutdown(); }
32+
33+
void RestPureCompletionQueueImpl::CancelAll() {}
34+
35+
future<StatusOr<std::chrono::system_clock::time_point>>
36+
RestPureCompletionQueueImpl::MakeDeadlineTimer(
37+
std::chrono::system_clock::time_point deadline) {
38+
return tq_->Schedule(deadline);
39+
}
40+
41+
future<StatusOr<std::chrono::system_clock::time_point>>
42+
RestPureCompletionQueueImpl::MakeRelativeTimer(
43+
std::chrono::nanoseconds duration) {
44+
using std::chrono::system_clock;
45+
auto d = std::chrono::duration_cast<system_clock::duration>(duration);
46+
if (d < duration) d += system_clock::duration{1};
47+
return MakeDeadlineTimer(system_clock::now() + d);
48+
}
49+
50+
// Use an "immediately" expiring timer in order to get the thread(s) servicing
51+
// the TimerQueue to execute the function. However, if the timer
52+
// expires before .then() is invoked, the lambda will be immediately called and
53+
// the passing of execution to the queue servicing thread will not occur.
54+
void RestPureCompletionQueueImpl::RunAsync(
55+
std::unique_ptr<internal::RunAsyncBase> function) {
56+
++run_async_counter_;
57+
tq_->Schedule([f = std::move(function)](auto) { f->exec(); });
58+
}
59+
60+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
61+
} // namespace rest_internal
62+
} // namespace cloud
63+
} // namespace google

0 commit comments

Comments
 (0)