Skip to content

Commit d8fda10

Browse files
committed
ADD: Add connection and auth timeouts for Live
1 parent ada2f31 commit d8fda10

12 files changed

Lines changed: 232 additions & 49 deletions

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
### Enhancements
66
- Added `TryNextRecord` and `FillBuffer` to `LiveBlocking` for more fine-grained
77
control around I/O
8+
- Added `TimeoutConf` struct and `SetTimeoutConf()` builder method for configuring connect
9+
and auth timeouts on the Live client (defaults to 10s and 30s)
10+
- Added `SessionId()` and `Timeouts()` getters to `LiveBlocking` and `LiveThreaded`
811

912
## 0.52.0 - 2026-03-31
1013

include/databento/detail/live_connection.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ class LiveConnection : IWritable {
2525
public:
2626
LiveConnection(ILogReceiver* log_receiver, const std::string& gateway,
2727
std::uint16_t port);
28+
LiveConnection(ILogReceiver* log_receiver, const std::string& gateway,
29+
std::uint16_t port, TcpClient::RetryConf retry_conf);
2830

2931
void WriteAll(std::string_view str);
3032
void WriteAll(const std::byte* buffer, std::size_t size);

include/databento/detail/tcp_client.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class TcpClient {
2020
struct RetryConf {
2121
std::uint32_t max_attempts{1};
2222
std::chrono::seconds max_wait{std::chrono::minutes{1}};
23+
std::chrono::seconds connect_timeout{std::chrono::seconds{10}};
2324
};
2425

2526
TcpClient(ILogReceiver* log_receiver, const std::string& gateway, std::uint16_t port);

include/databento/live.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ class LiveBuilder {
5757
LiveBuilder& SetCompression(Compression compression);
5858
// Sets the behavior of the gateway when the client falls behind real time.
5959
LiveBuilder& SetSlowReaderBehavior(SlowReaderBehavior slow_reader_behavior);
60+
// Sets the timeouts for connecting and authenticating with the gateway.
61+
// Defaults to 10 seconds for connect and 30 seconds for auth.
62+
LiveBuilder& SetTimeoutConf(TimeoutConf timeout_conf);
6063

6164
/*
6265
* Build a live client instance
@@ -85,5 +88,6 @@ class LiveBuilder {
8588
std::string user_agent_ext_;
8689
Compression compression_{Compression::None};
8790
std::optional<SlowReaderBehavior> slow_reader_behavior_{};
91+
TimeoutConf timeout_conf_{};
8892
};
8993
} // namespace databento

include/databento/live_blocking.hpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ namespace databento {
2121
// Forward declaration
2222
class ILogReceiver;
2323
class LiveBuilder;
24+
25+
// Timeouts for the Live client's connection and authentication phases.
26+
struct TimeoutConf {
27+
std::chrono::seconds connect{10};
28+
std::chrono::seconds auth{30};
29+
};
30+
2431
class LiveThreaded;
2532

2633
// A client for interfacing with Databento's real-time and intraday replay
@@ -48,6 +55,8 @@ class LiveBlocking {
4855
std::optional<databento::SlowReaderBehavior> SlowReaderBehavior() const {
4956
return slow_reader_behavior_;
5057
}
58+
const databento::TimeoutConf& TimeoutConf() const { return timeout_conf_; }
59+
std::uint64_t SessionId() const { return session_id_; }
5160
const std::vector<LiveSubscription>& Subscriptions() const { return subscriptions_; }
5261
std::vector<LiveSubscription>& Subscriptions() { return subscriptions_; }
5362

@@ -119,21 +128,23 @@ class LiveBlocking {
119128
std::optional<std::chrono::seconds> heartbeat_interval,
120129
std::size_t buffer_size, std::string user_agent_ext,
121130
databento::Compression compression,
122-
std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
131+
std::optional<databento::SlowReaderBehavior> slow_reader_behavior,
132+
databento::TimeoutConf timeout_conf);
123133
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
124134
std::string gateway, std::uint16_t port, bool send_ts_out,
125135
VersionUpgradePolicy upgrade_policy,
126136
std::optional<std::chrono::seconds> heartbeat_interval,
127137
std::size_t buffer_size, std::string user_agent_ext,
128138
databento::Compression compression,
129-
std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
139+
std::optional<databento::SlowReaderBehavior> slow_reader_behavior,
140+
databento::TimeoutConf timeout_conf);
130141

131142
std::string DetermineGateway() const;
132143
std::uint64_t Authenticate();
133-
std::string DecodeChallenge();
144+
std::string DecodeChallenge(std::chrono::milliseconds timeout);
134145
std::string GenerateCramReply(std::string_view challenge_key);
135146
std::string EncodeAuthReq(std::string_view auth);
136-
std::uint64_t DecodeAuthResp();
147+
std::uint64_t DecodeAuthResp(std::chrono::milliseconds timeout);
137148
void IncrementSubCounter();
138149
void Subscribe(std::string_view sub_msg, const std::vector<std::string>& symbols,
139150
bool use_snapshot);
@@ -156,6 +167,7 @@ class LiveBlocking {
156167
const std::optional<std::chrono::seconds> heartbeat_interval_;
157168
const databento::Compression compression_;
158169
const std::optional<databento::SlowReaderBehavior> slow_reader_behavior_;
170+
const databento::TimeoutConf timeout_conf_;
159171
detail::LiveConnection connection_;
160172
std::uint32_t sub_counter_{};
161173
std::vector<LiveSubscription> subscriptions_;

include/databento/live_threaded.hpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "databento/datetime.hpp" // UnixNanos
1313
#include "databento/detail/scoped_thread.hpp" // ScopedThread
1414
#include "databento/enums.hpp" // Schema, SType
15+
#include "databento/live_blocking.hpp" // TimeoutConf
1516
#include "databento/live_subscription.hpp"
1617
#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback
1718

@@ -56,6 +57,8 @@ class LiveThreaded {
5657
std::optional<std::chrono::seconds> HeartbeatInterval() const;
5758
databento::Compression Compression() const;
5859
std::optional<databento::SlowReaderBehavior> SlowReaderBehavior() const;
60+
const databento::TimeoutConf& TimeoutConf() const;
61+
std::uint64_t SessionId() const;
5962
const std::vector<LiveSubscription>& Subscriptions() const;
6063
std::vector<LiveSubscription>& Subscriptions();
6164

@@ -111,14 +114,16 @@ class LiveThreaded {
111114
std::optional<std::chrono::seconds> heartbeat_interval,
112115
std::size_t buffer_size, std::string user_agent_ext,
113116
databento::Compression compression,
114-
std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
117+
std::optional<databento::SlowReaderBehavior> slow_reader_behavior,
118+
databento::TimeoutConf timeout_conf);
115119
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
116120
std::string gateway, std::uint16_t port, bool send_ts_out,
117121
VersionUpgradePolicy upgrade_policy,
118122
std::optional<std::chrono::seconds> heartbeat_interval,
119123
std::size_t buffer_size, std::string user_agent_ext,
120124
databento::Compression compression,
121-
std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
125+
std::optional<databento::SlowReaderBehavior> slow_reader_behavior,
126+
databento::TimeoutConf timeout_conf);
122127

123128
// unique_ptr to be movable
124129
std::unique_ptr<Impl> impl_;

src/detail/live_connection.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ LiveConnection::LiveConnection(ILogReceiver* log_receiver, const std::string& ga
1010
std::uint16_t port)
1111
: client_{log_receiver, gateway, port} {}
1212

13+
LiveConnection::LiveConnection(ILogReceiver* log_receiver, const std::string& gateway,
14+
std::uint16_t port, TcpClient::RetryConf retry_conf)
15+
: client_{log_receiver, gateway, port, retry_conf} {}
16+
1317
void LiveConnection::WriteAll(std::string_view str) { client_.WriteAll(str); }
1418

1519
void LiveConnection::WriteAll(const std::byte* buffer, std::size_t size) {

src/detail/tcp_client.cpp

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
#ifdef _WIN32
44
#include <winsock2.h> // closesocket, recv, send, socket
55
#else
6+
#include <fcntl.h> // fcntl, F_GETFL, F_SETFL, O_NONBLOCK
67
#include <netdb.h> // addrinfo, gai_strerror, getaddrinfo, freeaddrinfo
78
#include <netinet/in.h> // htons, IPPROTO_TCP
89
#include <sys/poll.h> // pollfd
9-
#include <sys/socket.h> // AF_INET, connect, recv, send, sockaddr, sockaddr_in, socket, SOCK_STREAM
10-
#include <unistd.h> // close, ssize_t
10+
#include <sys/socket.h> // AF_INET, connect, recv, send, sockaddr, sockaddr_in, socket, SOCK_STREAM, getsockopt, SO_ERROR, SOL_SOCKET
11+
#include <unistd.h> // close, ssize_t
1112

1213
#include <cerrno> // errno
1314
#endif
@@ -31,6 +32,55 @@ int GetErrNo() {
3132
return errno;
3233
#endif
3334
}
35+
36+
int Poll(::pollfd* fds, std::uint32_t nfds, int timeout_ms) {
37+
#ifdef _WIN32
38+
return ::WSAPoll(fds, nfds, timeout_ms);
39+
#else
40+
return ::poll(fds, static_cast<::nfds_t>(nfds), timeout_ms);
41+
#endif
42+
}
43+
44+
#ifdef _WIN32
45+
constexpr int kConnectInProgress = WSAEWOULDBLOCK;
46+
#else
47+
constexpr int kConnectInProgress = EINPROGRESS;
48+
#endif
49+
50+
// Saves the current blocking state, sets non-blocking, and returns a RAII guard
51+
// that restores the original state on destruction.
52+
struct BlockingGuard {
53+
databento::detail::Socket fd;
54+
#ifdef _WIN32
55+
// No state to save on Windows
56+
#else
57+
int original_flags;
58+
#endif
59+
60+
explicit BlockingGuard(databento::detail::Socket fd) : fd{fd} {
61+
#ifdef _WIN32
62+
unsigned long mode = 1;
63+
::ioctlsocket(fd, FIONBIO, &mode);
64+
#else
65+
original_flags = ::fcntl(fd, F_GETFL, 0);
66+
::fcntl(fd, F_SETFL, original_flags | O_NONBLOCK);
67+
#endif
68+
}
69+
70+
~BlockingGuard() {
71+
#ifdef _WIN32
72+
unsigned long mode = 0;
73+
::ioctlsocket(fd, FIONBIO, &mode);
74+
#else
75+
::fcntl(fd, F_SETFL, original_flags);
76+
#endif
77+
}
78+
79+
BlockingGuard(const BlockingGuard&) = delete;
80+
BlockingGuard& operator=(const BlockingGuard&) = delete;
81+
BlockingGuard(BlockingGuard&&) = delete;
82+
BlockingGuard& operator=(BlockingGuard&&) = delete;
83+
};
3484
} // namespace
3585

3686
TcpClient::TcpClient(ILogReceiver* log_receiver, const std::string& gateway,
@@ -83,12 +133,7 @@ databento::IReadable::Result TcpClient::ReadSome(std::byte* buffer,
83133
// having no timeout
84134
const auto timeout_ms = timeout.count() ? static_cast<int>(timeout.count()) : -1;
85135
while (true) {
86-
const int poll_status =
87-
#ifdef _WIN32
88-
::WSAPoll(&fds, 1, timeout_ms);
89-
#else
90-
::poll(&fds, 1, timeout_ms);
91-
#endif
136+
const int poll_status = Poll(&fds, 1, timeout_ms);
92137
if (poll_status > 0) {
93138
return ReadSome(buffer, max_size);
94139
}
@@ -130,13 +175,34 @@ databento::detail::ScopedFd TcpClient::InitSocket(ILogReceiver* log_receiver,
130175
}
131176
std::unique_ptr<addrinfo, decltype(&::freeaddrinfo)> res{out, &::freeaddrinfo};
132177
const auto max_attempts = std::max<std::uint32_t>(retry_conf.max_attempts, 1);
178+
const auto timeout_ms = static_cast<int>(
179+
std::chrono::duration_cast<std::chrono::milliseconds>(retry_conf.connect_timeout)
180+
.count());
133181
std::chrono::seconds backoff{1};
134182
for (std::uint32_t attempt = 0; attempt < max_attempts; ++attempt) {
135-
if (::connect(scoped_fd.Get(), res->ai_addr, res->ai_addrlen) == 0) {
183+
BlockingGuard guard{scoped_fd.Get()};
184+
185+
const int connect_ret = ::connect(scoped_fd.Get(), res->ai_addr, res->ai_addrlen);
186+
bool connected = (connect_ret == 0);
187+
if (!connected && ::GetErrNo() == kConnectInProgress) {
188+
pollfd pfd{scoped_fd.Get(), POLLOUT, {}};
189+
const int poll_ret = Poll(&pfd, 1, timeout_ms);
190+
if (poll_ret > 0) {
191+
int so_error = 0;
192+
socklen_t len = sizeof(so_error);
193+
::getsockopt(scoped_fd.Get(), SOL_SOCKET, SO_ERROR, &so_error, &len);
194+
connected = (so_error == 0);
195+
if (!connected) {
196+
errno = so_error;
197+
}
198+
}
199+
}
200+
201+
if (connected) {
136202
break;
137203
} else if (attempt + 1 == max_attempts) {
138204
std::ostringstream err_msg;
139-
err_msg << "Socket failed to connect after " << max_attempts << " attempts";
205+
err_msg << "Socket failed to connect after " << max_attempts << " attempt(s)";
140206
throw TcpError{::GetErrNo(), err_msg.str()};
141207
}
142208
std::ostringstream log_msg;

src/live.cpp

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,21 +90,28 @@ LiveBuilder& LiveBuilder::SetSlowReaderBehavior(
9090
return *this;
9191
}
9292

93+
LiveBuilder& LiveBuilder::SetTimeoutConf(TimeoutConf timeout_conf) {
94+
timeout_conf_ = timeout_conf;
95+
return *this;
96+
}
97+
9398
databento::LiveBlocking LiveBuilder::BuildBlocking() {
9499
Validate();
95100
if (gateway_.empty()) {
96101
return databento::LiveBlocking{log_receiver_, key_,
97102
dataset_, send_ts_out_,
98103
upgrade_policy_, heartbeat_interval_,
99104
buffer_size_, user_agent_ext_,
100-
compression_, slow_reader_behavior_};
105+
compression_, slow_reader_behavior_,
106+
timeout_conf_};
101107
}
102108
return databento::LiveBlocking{log_receiver_, key_,
103109
dataset_, gateway_,
104110
port_, send_ts_out_,
105111
upgrade_policy_, heartbeat_interval_,
106112
buffer_size_, user_agent_ext_,
107-
compression_, slow_reader_behavior_};
113+
compression_, slow_reader_behavior_,
114+
timeout_conf_};
108115
}
109116

110117
databento::LiveThreaded LiveBuilder::BuildThreaded() {
@@ -114,14 +121,16 @@ databento::LiveThreaded LiveBuilder::BuildThreaded() {
114121
dataset_, send_ts_out_,
115122
upgrade_policy_, heartbeat_interval_,
116123
buffer_size_, user_agent_ext_,
117-
compression_, slow_reader_behavior_};
124+
compression_, slow_reader_behavior_,
125+
timeout_conf_};
118126
}
119127
return databento::LiveThreaded{log_receiver_, key_,
120128
dataset_, gateway_,
121129
port_, send_ts_out_,
122130
upgrade_policy_, heartbeat_interval_,
123131
buffer_size_, user_agent_ext_,
124-
compression_, slow_reader_behavior_};
132+
compression_, slow_reader_behavior_,
133+
timeout_conf_};
125134
}
126135

127136
void LiveBuilder::Validate() {

0 commit comments

Comments
 (0)