Skip to content

Commit 24bd74b

Browse files
prestwichclaude
andauthored
feat(host-rpc): add backpressure controls to notifier (#123)
* feat(host-rpc): add backpressure controls to notifier (ENG-2104) Bound concurrent RPC block fetches with configurable max_rpc_concurrency (default 8) and coalesce stale subscription headers via Latest<S> stream adapter to prevent slow-consumer backlog buildup. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(host-rpc): add clamping to all builder setters and update doc example Add consistent minimum-value clamping (to 1) for buffer_capacity and backfill_batch_size to match max_rpc_concurrency. Add the missing with_max_rpc_concurrency call to the builder doc example. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6ab1327 commit 24bd74b

6 files changed

Lines changed: 167 additions & 29 deletions

File tree

crates/host-rpc/src/builder.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use tracing::warn;
1010
/// let notifier = RpcHostNotifierBuilder::new(provider)
1111
/// .with_buffer_capacity(128)
1212
/// .with_backfill_batch_size(64)
13+
/// .with_max_rpc_concurrency(16)
1314
/// .with_genesis_timestamp(1_606_824_023)
1415
/// .build()
1516
/// .await?;
@@ -19,6 +20,7 @@ pub struct RpcHostNotifierBuilder<P> {
1920
provider: P,
2021
buffer_capacity: usize,
2122
backfill_batch_size: u64,
23+
max_rpc_concurrency: usize,
2224
slot_seconds: u64,
2325
genesis_timestamp: u64,
2426
}
@@ -33,20 +35,33 @@ where
3335
provider,
3436
buffer_capacity: crate::DEFAULT_BUFFER_CAPACITY,
3537
backfill_batch_size: crate::DEFAULT_BACKFILL_BATCH_SIZE,
38+
max_rpc_concurrency: crate::DEFAULT_MAX_RPC_CONCURRENCY,
3639
slot_seconds: crate::notifier::DEFAULT_SLOT_SECONDS,
3740
genesis_timestamp: 0,
3841
}
3942
}
4043

4144
/// Set the block buffer capacity (default: 64).
45+
///
46+
/// Values below 1 are clamped to 1.
4247
pub const fn with_buffer_capacity(mut self, capacity: usize) -> Self {
43-
self.buffer_capacity = capacity;
48+
self.buffer_capacity = if capacity > 0 { capacity } else { 1 };
4449
self
4550
}
4651

4752
/// Set the backfill batch size (default: 32).
53+
///
54+
/// Values below 1 are clamped to 1.
4855
pub const fn with_backfill_batch_size(mut self, batch_size: u64) -> Self {
49-
self.backfill_batch_size = batch_size;
56+
self.backfill_batch_size = if batch_size > 0 { batch_size } else { 1 };
57+
self
58+
}
59+
60+
/// Set the maximum number of concurrent RPC block fetches (default: 8).
61+
///
62+
/// Values below 1 are clamped to 1.
63+
pub const fn with_max_rpc_concurrency(mut self, max_rpc_concurrency: usize) -> Self {
64+
self.max_rpc_concurrency = if max_rpc_concurrency > 0 { max_rpc_concurrency } else { 1 };
5065
self
5166
}
5267

@@ -75,6 +90,7 @@ where
7590
header_sub,
7691
self.buffer_capacity,
7792
self.backfill_batch_size,
93+
self.max_rpc_concurrency,
7894
self.slot_seconds,
7995
self.genesis_timestamp,
8096
))

crates/host-rpc/src/config.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use crate::{DEFAULT_BACKFILL_BATCH_SIZE, DEFAULT_BUFFER_CAPACITY, RpcHostNotifierBuilder};
1+
use crate::{
2+
DEFAULT_BACKFILL_BATCH_SIZE, DEFAULT_BUFFER_CAPACITY, DEFAULT_MAX_RPC_CONCURRENCY,
3+
RpcHostNotifierBuilder,
4+
};
25
use alloy::providers::RootProvider;
36
use init4_bin_base::utils::{calc::SlotCalculator, from_env::FromEnv, provider::PubSubConfig};
47

@@ -9,6 +12,7 @@ use init4_bin_base::utils::{calc::SlotCalculator, from_env::FromEnv, provider::P
912
/// - `SIGNET_HOST_URL` – WebSocket or IPC URL for the host EL client (required)
1013
/// - `SIGNET_HOST_BUFFER_CAPACITY` – Local chain view size (default: 64)
1114
/// - `SIGNET_HOST_BACKFILL_BATCH_SIZE` – Blocks per backfill batch (default: 32)
15+
/// - `SIGNET_HOST_MAX_RPC_CONCURRENCY` – Max concurrent RPC block fetches (default: 8)
1216
///
1317
/// # Example
1418
///
@@ -43,6 +47,13 @@ pub struct HostRpcConfig {
4347
optional
4448
)]
4549
backfill_batch_size: Option<u64>,
50+
/// Maximum number of concurrent RPC block fetches.
51+
#[from_env(
52+
var = "SIGNET_HOST_MAX_RPC_CONCURRENCY",
53+
desc = "Max concurrent RPC requests [default: 8]",
54+
optional
55+
)]
56+
max_rpc_concurrency: Option<usize>,
4657
}
4758

4859
impl HostRpcConfig {
@@ -60,6 +71,9 @@ impl HostRpcConfig {
6071
.with_backfill_batch_size(
6172
self.backfill_batch_size.unwrap_or(DEFAULT_BACKFILL_BATCH_SIZE),
6273
)
74+
.with_max_rpc_concurrency(
75+
self.max_rpc_concurrency.unwrap_or(DEFAULT_MAX_RPC_CONCURRENCY),
76+
)
6377
.with_genesis_timestamp(slot_calculator.start_timestamp()))
6478
}
6579
}

crates/host-rpc/src/latest.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use futures_util::Stream;
2+
use std::{
3+
pin::Pin,
4+
task::{Context, Poll},
5+
};
6+
7+
/// Stream adapter that coalesces buffered items, yielding only the
8+
/// most recent ready item on each poll.
9+
///
10+
/// When the consumer is slow, items accumulate in the inner stream.
11+
/// `Latest` drains all buffered ready items on each poll and returns
12+
/// only the last one, discarding stale intermediates and recording a
13+
/// metric for each skipped item.
14+
pub(crate) struct Latest<S> {
15+
inner: S,
16+
}
17+
18+
impl<S> Latest<S> {
19+
/// Wrap `inner` in a `Latest` combinator.
20+
pub(crate) const fn new(inner: S) -> Self {
21+
Self { inner }
22+
}
23+
}
24+
25+
impl<S: std::fmt::Debug> std::fmt::Debug for Latest<S> {
26+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27+
f.debug_struct("Latest").field("inner", &self.inner).finish()
28+
}
29+
}
30+
31+
impl<S> Stream for Latest<S>
32+
where
33+
S: Stream + Unpin,
34+
{
35+
type Item = S::Item;
36+
37+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38+
let inner = Pin::new(&mut self.inner);
39+
let first = match inner.poll_next(cx) {
40+
Poll::Pending => return Poll::Pending,
41+
Poll::Ready(item) => item,
42+
};
43+
44+
// Stream is exhausted or had no item.
45+
let Some(mut latest) = first else {
46+
return Poll::Ready(None);
47+
};
48+
49+
// Drain all remaining ready items, keeping only the most recent.
50+
let mut skipped: u64 = 0;
51+
while let Poll::Ready(Some(newer)) = Pin::new(&mut self.inner).poll_next(cx) {
52+
latest = newer;
53+
skipped += 1;
54+
}
55+
56+
if skipped > 0 {
57+
crate::metrics::inc_headers_coalesced(skipped);
58+
}
59+
60+
Poll::Ready(Some(latest))
61+
}
62+
}
63+
64+
#[cfg(test)]
65+
mod tests {
66+
use super::Latest;
67+
use futures_util::{StreamExt, stream};
68+
69+
#[tokio::test]
70+
async fn single_item_yields_immediately() {
71+
let mut s = Latest::new(stream::iter([42u32]));
72+
assert_eq!(s.next().await, Some(42));
73+
assert_eq!(s.next().await, None);
74+
}
75+
76+
#[tokio::test]
77+
async fn multiple_ready_items_yields_last() {
78+
let mut s = Latest::new(stream::iter([1u32, 2, 3, 4, 5]));
79+
// All items are immediately ready; Latest should drain and return the last.
80+
assert_eq!(s.next().await, Some(5));
81+
assert_eq!(s.next().await, None);
82+
}
83+
84+
#[tokio::test]
85+
async fn empty_stream_yields_none() {
86+
let mut s = Latest::new(stream::iter(Vec::<u32>::new()));
87+
assert_eq!(s.next().await, None);
88+
}
89+
90+
#[tokio::test]
91+
async fn fused_after_inner_terminates() {
92+
let mut s = Latest::new(stream::iter([7u32]));
93+
assert_eq!(s.next().await, Some(7));
94+
// Subsequent polls after termination should return None.
95+
assert_eq!(s.next().await, None);
96+
assert_eq!(s.next().await, None);
97+
}
98+
}

crates/host-rpc/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
pub(crate) const DEFAULT_BUFFER_CAPACITY: usize = 64;
1616
/// Default backfill batch size.
1717
pub(crate) const DEFAULT_BACKFILL_BATCH_SIZE: u64 = 32;
18+
/// Default maximum number of concurrent RPC block fetches.
19+
pub(crate) const DEFAULT_MAX_RPC_CONCURRENCY: usize = 8;
1820

1921
mod builder;
2022
pub use builder::RpcHostNotifierBuilder;
@@ -28,9 +30,10 @@ pub use error::RpcHostError;
2830
mod notifier;
2931
pub use notifier::RpcHostNotifier;
3032

33+
mod latest;
34+
mod metrics;
3135
mod segment;
3236
pub use segment::{RpcBlock, RpcChainSegment};
33-
mod metrics;
3437

3538
mod alias;
3639
pub use alias::RpcAliasOracle;

crates/host-rpc/src/metrics.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const BACKFILL_BATCHES: &str = "host_rpc.backfill_batches";
1010
const TAG_REFRESHES: &str = "host_rpc.tag_refreshes";
1111
const STALE_HINTS: &str = "host_rpc.stale_hints";
1212
const RPC_ERRORS: &str = "host_rpc.rpc_errors";
13+
const HEADERS_COALESCED: &str = "host_rpc.headers_coalesced";
1314

1415
const WALK_CHAIN_DURATION: &str = "host_rpc.walk_chain.duration_ms";
1516
const FETCH_BLOCK_DURATION: &str = "host_rpc.fetch_block.duration_ms";
@@ -33,6 +34,7 @@ fn ensure_described() {
3334
describe_counter!(TAG_REFRESHES, "Epoch boundary tag refreshes");
3435
describe_counter!(STALE_HINTS, "Stale subscription hints that fell back to latest");
3536
describe_counter!(RPC_ERRORS, "RPC transport/provider errors");
37+
describe_counter!(HEADERS_COALESCED, "Stale subscription headers coalesced");
3638
describe_histogram!(WALK_CHAIN_DURATION, "Time to walk the chain (ms)");
3739
describe_histogram!(FETCH_BLOCK_DURATION, "Single block+receipts fetch (ms)");
3840
describe_histogram!(BACKFILL_BATCH_DURATION, "Full backfill batch (ms)");
@@ -118,3 +120,9 @@ pub(crate) fn set_tip(number: u64) {
118120
ensure_described();
119121
gauge!(TIP_NUMBER).set(number as f64);
120122
}
123+
124+
/// Increment the headers-coalesced counter.
125+
pub(crate) fn inc_headers_coalesced(count: u64) {
126+
ensure_described();
127+
counter!(HEADERS_COALESCED).increment(count);
128+
}

crates/host-rpc/src/notifier.rs

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{RpcBlock, RpcChainSegment, RpcHostError};
1+
use crate::{RpcBlock, RpcChainSegment, RpcHostError, latest::Latest};
22
use alloy::{
33
consensus::{BlockHeader, transaction::Recovered},
44
eips::{BlockId, BlockNumberOrTag},
@@ -8,7 +8,7 @@ use alloy::{
88
pubsub::SubscriptionStream,
99
rpc::types::Header as RpcHeader,
1010
};
11-
use futures_util::{StreamExt, stream::FuturesOrdered};
11+
use futures_util::{StreamExt, TryStreamExt, stream};
1212
use signet_node_types::{HostNotification, HostNotificationKind, HostNotifier, RevertRange};
1313
use signet_types::primitives::{RecoveredBlock, SealedBlock, TransactionSigned};
1414
use std::{collections::VecDeque, sync::Arc, time::Instant};
@@ -56,7 +56,8 @@ pub struct RpcHostNotifier<P> {
5656
provider: P,
5757

5858
/// Subscription stream of new block headers (used as wake-up signal).
59-
header_sub: SubscriptionStream<RpcHeader>,
59+
/// Wrapped in [`Latest`] to coalesce stale buffered headers.
60+
header_sub: Latest<SubscriptionStream<RpcHeader>>,
6061

6162
/// Local chain view — lightweight ring buffer of (number, hash).
6263
chain_view: VecDeque<(u64, B256)>,
@@ -80,6 +81,9 @@ pub struct RpcHostNotifier<P> {
8081
/// Max blocks per backfill batch.
8182
backfill_batch_size: u64,
8283

84+
/// Maximum number of concurrent RPC block fetches.
85+
max_rpc_concurrency: usize,
86+
8387
/// Seconds per slot, used for epoch calculation.
8488
slot_seconds: u64,
8589

@@ -92,6 +96,7 @@ impl<P> core::fmt::Debug for RpcHostNotifier<P> {
9296
f.debug_struct("RpcHostNotifier")
9397
.field("chain_view_len", &self.chain_view.len())
9498
.field("buffer_capacity", &self.buffer_capacity)
99+
.field("max_rpc_concurrency", &self.max_rpc_concurrency)
95100
.field("backfill_from", &self.backfill_from)
96101
.finish_non_exhaustive()
97102
}
@@ -107,19 +112,21 @@ where
107112
header_sub: SubscriptionStream<RpcHeader>,
108113
buffer_capacity: usize,
109114
backfill_batch_size: u64,
115+
max_rpc_concurrency: usize,
110116
slot_seconds: u64,
111117
genesis_timestamp: u64,
112118
) -> Self {
113119
Self {
114120
provider,
115-
header_sub,
121+
header_sub: Latest::new(header_sub),
116122
chain_view: VecDeque::with_capacity(buffer_capacity),
117123
buffer_capacity,
118124
cached_safe: None,
119125
cached_finalized: None,
120126
last_tag_epoch: None,
121127
backfill_from: None,
122128
backfill_batch_size,
129+
max_rpc_concurrency,
123130
slot_seconds,
124131
genesis_timestamp,
125132
}
@@ -213,22 +220,17 @@ where
213220
/// Fetch full blocks+receipts for a list of hashes, concurrently.
214221
///
215222
/// Hashes must be in ascending block-number order. Results preserve
216-
/// that order.
223+
/// that order. Concurrency is bounded by [`Self::max_rpc_concurrency`].
217224
#[tracing::instrument(level = "debug", skip_all, fields(count = hashes.len()))]
218225
async fn fetch_blocks_by_hash(
219226
&self,
220227
hashes: &[(u64, B256)],
221228
) -> Result<Vec<Arc<RpcBlock>>, RpcHostError> {
222-
let mut futures = hashes
223-
.iter()
224-
.map(|&(_, hash)| self.fetch_block_by_hash(hash))
225-
.collect::<FuturesOrdered<_>>();
226-
227-
let mut blocks = Vec::with_capacity(hashes.len());
228-
while let Some(result) = futures.next().await {
229-
blocks.push(Arc::new(result?));
230-
}
231-
Ok(blocks)
229+
stream::iter(hashes.iter().copied().map(|(_, hash)| self.fetch_block_by_hash(hash)))
230+
.buffered(self.max_rpc_concurrency)
231+
.map_ok(Arc::new)
232+
.try_collect()
233+
.await
232234
}
233235

234236
/// Fetch a single block with receipts by number (used for backfill only).
@@ -250,22 +252,19 @@ where
250252

251253
/// Fetch a range of blocks by number concurrently (used for backfill only).
252254
///
253-
/// Returns an empty `Vec` if `from > to`.
255+
/// Returns an empty `Vec` if `from > to`. Concurrency is bounded by
256+
/// [`Self::max_rpc_concurrency`].
254257
#[tracing::instrument(level = "debug", skip_all, fields(from, to))]
255258
async fn fetch_range(&self, from: u64, to: u64) -> Result<Vec<Arc<RpcBlock>>, RpcHostError> {
256259
if from > to {
257260
return Ok(Vec::new());
258261
}
259262

260-
let mut futures = (from..=to)
261-
.map(|number| self.fetch_block_by_number(number))
262-
.collect::<FuturesOrdered<_>>();
263-
264-
let mut blocks = Vec::with_capacity((to - from + 1) as usize);
265-
while let Some(result) = futures.next().await {
266-
blocks.push(Arc::new(result?));
267-
}
268-
Ok(blocks)
263+
stream::iter((from..=to).map(|number| self.fetch_block_by_number(number)))
264+
.buffered(self.max_rpc_concurrency)
265+
.map_ok(Arc::new)
266+
.try_collect()
267+
.await
269268
}
270269

271270
// ── Epoch / tag helpers ────────────────────────────────────────

0 commit comments

Comments
 (0)