Skip to content

Commit 8f9f4f8

Browse files
prestwichclaude
andauthored
feat: add signet-host-rpc crate with RpcHostNotifier (#107)
* feat: add signet-host-rpc crate with RpcHostNotifier Add new `signet-host-rpc` crate providing an RPC WebSocket-based HostNotifier implementation. Remove signet-sdk patch overrides now that the new version is published. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(host-rpc): rewrite notifier with hash-based chain walking Replace fetch-by-number with hash-anchored backward walk, eliminating TOCTOU race conditions in reorg detection. The subscription is now a wake-up signal; on each event the notifier walks from the hint hash backward until it finds overlap with a lightweight (u64, B256) ring buffer. Full blocks are fetched by hash only for the new segment. Key changes: - Hash-based walk algorithm (WalkResult enum: Advance/Reorg/Exhausted) - Parallel block+receipt fetches via tokio::try_join! and FuturesOrdered - Self-healing buffer exhaustion: clears state and re-enters backfill - Backfill stops at (latest - buffer_capacity), frontfill bridges gap - set_backfill_thresholds(None) resets to default per trait contract - fetch_range guards against from > to underflow - Segment: non-empty invariant enforced, pub(crate) visibility - Error: removed dead SubscriptionClosed/ReorgTooDeep variants - Builder: genesis_timestamp validation, uses pub(crate) constructor - Shared constants in lib.rs, private notifier fields - Tracing instrumentation on walk and notification methods Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore(host-rpc): add metrics dependency Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(host-rpc): add centralized metrics module with OnceLock descriptions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(host-rpc): instrument fetch methods and maybe_refresh_tags Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(host-rpc): instrument walk_chain with timing and exhaustion metrics Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(host-rpc): instrument handle_new_head with events and metrics Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(host-rpc): instrument drain_backfill with dynamic fields and metrics Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(host-rpc): record metrics on drain_backfill error paths Add inc_rpc_errors and record_backfill_batch on fetch_range and maybe_refresh_tags failure paths for consistent error observability. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(host-rpc): address review feedback - O(1) view_hash using front number + index math with debug_assert - Extract convert_rpc_block helper to deduplicate block conversion - Concurrent safe/finalized tag fetches via tokio::try_join! - Add inspect_err for metrics on stale-hint fallback walk - Make slot_seconds configurable via builder (default 12) - Error on missing cached_finalized during walk exhaustion recovery Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent fc5d73f commit 8f9f4f8

9 files changed

Lines changed: 993 additions & 0 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ signet-blobber = { version = "0.16.0-rc.7", path = "crates/blobber" }
3838
signet-block-processor = { version = "0.16.0-rc.7", path = "crates/block-processor" }
3939
signet-genesis = { version = "0.16.0-rc.7", path = "crates/genesis" }
4040
signet-host-reth = { version = "0.16.0-rc.7", path = "crates/host-reth" }
41+
signet-host-rpc = { version = "0.16.0-rc.7", path = "crates/host-rpc" }
4142
signet-node = { version = "0.16.0-rc.7", path = "crates/node" }
4243
signet-node-config = { version = "0.16.0-rc.7", path = "crates/node-config" }
4344
signet-node-tests = { version = "0.16.0-rc.7", path = "crates/node-tests" }

crates/host-rpc/Cargo.toml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[package]
2+
name = "signet-host-rpc"
3+
description = "RPC-based implementation of the HostNotifier trait for signet-node."
4+
version.workspace = true
5+
edition.workspace = true
6+
rust-version.workspace = true
7+
authors.workspace = true
8+
license.workspace = true
9+
homepage.workspace = true
10+
repository.workspace = true
11+
12+
[dependencies]
13+
signet-node-types.workspace = true
14+
signet-extract.workspace = true
15+
signet-types.workspace = true
16+
17+
alloy.workspace = true
18+
futures-util.workspace = true
19+
metrics.workspace = true
20+
thiserror.workspace = true
21+
tokio.workspace = true
22+
tracing.workspace = true

crates/host-rpc/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# signet-host-rpc
2+
3+
RPC-based implementation of the `HostNotifier` trait for signet-node.
4+
5+
Connects to any Ethereum execution layer client via WebSocket, following
6+
the host chain without embedding a full reth node.

crates/host-rpc/src/builder.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use crate::{RpcHostError, RpcHostNotifier};
2+
use alloy::providers::Provider;
3+
use tracing::warn;
4+
5+
/// Builder for [`RpcHostNotifier`].
6+
///
7+
/// # Example
8+
///
9+
/// ```ignore
10+
/// let notifier = RpcHostNotifierBuilder::new(provider)
11+
/// .with_buffer_capacity(128)
12+
/// .with_backfill_batch_size(64)
13+
/// .with_genesis_timestamp(1_606_824_023)
14+
/// .build()
15+
/// .await?;
16+
/// ```
17+
#[derive(Debug)]
18+
pub struct RpcHostNotifierBuilder<P> {
19+
provider: P,
20+
buffer_capacity: usize,
21+
backfill_batch_size: u64,
22+
slot_seconds: u64,
23+
genesis_timestamp: u64,
24+
}
25+
26+
impl<P> RpcHostNotifierBuilder<P>
27+
where
28+
P: Provider + Clone,
29+
{
30+
/// Create a new builder with the given provider.
31+
pub const fn new(provider: P) -> Self {
32+
Self {
33+
provider,
34+
buffer_capacity: crate::DEFAULT_BUFFER_CAPACITY,
35+
backfill_batch_size: crate::DEFAULT_BACKFILL_BATCH_SIZE,
36+
slot_seconds: crate::notifier::DEFAULT_SLOT_SECONDS,
37+
genesis_timestamp: 0,
38+
}
39+
}
40+
41+
/// Set the block buffer capacity (default: 64).
42+
pub const fn with_buffer_capacity(mut self, capacity: usize) -> Self {
43+
self.buffer_capacity = capacity;
44+
self
45+
}
46+
47+
/// Set the backfill batch size (default: 32).
48+
pub const fn with_backfill_batch_size(mut self, batch_size: u64) -> Self {
49+
self.backfill_batch_size = batch_size;
50+
self
51+
}
52+
53+
/// Set the slot duration in seconds (default: 12).
54+
pub const fn with_slot_seconds(mut self, slot_seconds: u64) -> Self {
55+
self.slot_seconds = slot_seconds;
56+
self
57+
}
58+
59+
/// Set the genesis timestamp for epoch calculation.
60+
pub const fn with_genesis_timestamp(mut self, timestamp: u64) -> Self {
61+
self.genesis_timestamp = timestamp;
62+
self
63+
}
64+
65+
/// Build the notifier, establishing the `newHeads` WebSocket subscription.
66+
pub async fn build(self) -> Result<RpcHostNotifier<P>, RpcHostError> {
67+
if self.genesis_timestamp == 0 {
68+
warn!("genesis_timestamp not set; epoch calculations will use Unix epoch");
69+
}
70+
let sub = self.provider.subscribe_blocks().await?;
71+
let header_sub = sub.into_stream();
72+
73+
Ok(RpcHostNotifier::new(
74+
self.provider,
75+
header_sub,
76+
self.buffer_capacity,
77+
self.backfill_batch_size,
78+
self.slot_seconds,
79+
self.genesis_timestamp,
80+
))
81+
}
82+
}

crates/host-rpc/src/error.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use alloy::{
2+
eips::BlockNumberOrTag,
3+
primitives::B256,
4+
transports::{RpcError, TransportErrorKind},
5+
};
6+
7+
/// Errors from the RPC host notifier.
8+
#[derive(Debug, thiserror::Error)]
9+
pub enum RpcHostError {
10+
/// An RPC call failed.
11+
#[error("rpc error: {0}")]
12+
Rpc(#[from] RpcError<TransportErrorKind>),
13+
14+
/// The RPC node returned no block for the requested hash.
15+
#[error("missing block with hash {0}")]
16+
MissingBlockByHash(B256),
17+
18+
/// The RPC node returned no block for the requested number or tag.
19+
#[error("missing block {0}")]
20+
MissingBlock(BlockNumberOrTag),
21+
22+
/// Walk exhaustion recovery requires a cached finalized block number,
23+
/// but none has been fetched yet.
24+
#[error("no cached finalized block number for exhaustion recovery")]
25+
NoFinalizedBlock,
26+
}

crates/host-rpc/src/lib.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#![doc = include_str!("../README.md")]
2+
#![warn(
3+
missing_copy_implementations,
4+
missing_debug_implementations,
5+
missing_docs,
6+
unreachable_pub,
7+
clippy::missing_const_for_fn,
8+
rustdoc::all
9+
)]
10+
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
11+
#![deny(unused_must_use, rust_2018_idioms)]
12+
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
13+
14+
/// Default block buffer capacity.
15+
pub(crate) const DEFAULT_BUFFER_CAPACITY: usize = 64;
16+
/// Default backfill batch size.
17+
pub(crate) const DEFAULT_BACKFILL_BATCH_SIZE: u64 = 32;
18+
19+
mod builder;
20+
pub use builder::RpcHostNotifierBuilder;
21+
22+
mod error;
23+
pub use error::RpcHostError;
24+
25+
mod notifier;
26+
pub use notifier::RpcHostNotifier;
27+
28+
mod segment;
29+
pub use segment::{RpcBlock, RpcChainSegment};
30+
mod metrics;

crates/host-rpc/src/metrics.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram};
2+
use std::{sync::OnceLock, time::Duration};
3+
4+
// ── Metric name constants ──────────────────────────────────────────
5+
6+
const BLOCKS_FETCHED: &str = "host_rpc.blocks_fetched";
7+
const REORGS: &str = "host_rpc.reorgs";
8+
const WALK_EXHAUSTED: &str = "host_rpc.walk_exhausted";
9+
const BACKFILL_BATCHES: &str = "host_rpc.backfill_batches";
10+
const TAG_REFRESHES: &str = "host_rpc.tag_refreshes";
11+
const STALE_HINTS: &str = "host_rpc.stale_hints";
12+
const RPC_ERRORS: &str = "host_rpc.rpc_errors";
13+
14+
const WALK_CHAIN_DURATION: &str = "host_rpc.walk_chain.duration_ms";
15+
const FETCH_BLOCK_DURATION: &str = "host_rpc.fetch_block.duration_ms";
16+
const BACKFILL_BATCH_DURATION: &str = "host_rpc.backfill_batch.duration_ms";
17+
const HANDLE_NEW_HEAD_DURATION: &str = "host_rpc.handle_new_head.duration_ms";
18+
const REORG_DEPTH: &str = "host_rpc.reorg.depth";
19+
20+
const CHAIN_VIEW_LEN: &str = "host_rpc.chain_view.len";
21+
const TIP_NUMBER: &str = "host_rpc.tip.number";
22+
23+
// ── Self-registering descriptions ──────────────────────────────────
24+
25+
static DESCRIBED: OnceLock<()> = OnceLock::new();
26+
27+
fn ensure_described() {
28+
DESCRIBED.get_or_init(|| {
29+
describe_counter!(BLOCKS_FETCHED, "Total blocks fetched via RPC");
30+
describe_counter!(REORGS, "Chain reorg events detected");
31+
describe_counter!(WALK_EXHAUSTED, "Walk exhausted the chain view buffer");
32+
describe_counter!(BACKFILL_BATCHES, "Backfill batches completed");
33+
describe_counter!(TAG_REFRESHES, "Epoch boundary tag refreshes");
34+
describe_counter!(STALE_HINTS, "Stale subscription hints that fell back to latest");
35+
describe_counter!(RPC_ERRORS, "RPC transport/provider errors");
36+
describe_histogram!(WALK_CHAIN_DURATION, "Time to walk the chain (ms)");
37+
describe_histogram!(FETCH_BLOCK_DURATION, "Single block+receipts fetch (ms)");
38+
describe_histogram!(BACKFILL_BATCH_DURATION, "Full backfill batch (ms)");
39+
describe_histogram!(HANDLE_NEW_HEAD_DURATION, "Full new-head processing (ms)");
40+
describe_histogram!(REORG_DEPTH, "Number of blocks reverted in a reorg");
41+
describe_gauge!(CHAIN_VIEW_LEN, "Current chain view buffer size");
42+
describe_gauge!(TIP_NUMBER, "Current tip block number");
43+
});
44+
}
45+
46+
// ── Helper functions ───────────────────────────────────────────────
47+
48+
/// Record the duration of a `walk_chain` call.
49+
pub(crate) fn record_walk_duration(duration: Duration) {
50+
ensure_described();
51+
histogram!(WALK_CHAIN_DURATION).record(duration.as_secs_f64() * 1000.0);
52+
}
53+
54+
/// Increment the walk-exhausted counter.
55+
pub(crate) fn inc_walk_exhausted() {
56+
ensure_described();
57+
counter!(WALK_EXHAUSTED).increment(1);
58+
}
59+
60+
/// Record a single block fetch duration.
61+
pub(crate) fn record_fetch_block_duration(duration: Duration) {
62+
ensure_described();
63+
histogram!(FETCH_BLOCK_DURATION).record(duration.as_secs_f64() * 1000.0);
64+
}
65+
66+
/// Record blocks fetched with a mode label.
67+
pub(crate) fn inc_blocks_fetched(count: u64, mode: &'static str) {
68+
ensure_described();
69+
counter!(BLOCKS_FETCHED, "mode" => mode).increment(count);
70+
}
71+
72+
/// Record a reorg event with its depth.
73+
pub(crate) fn inc_reorgs(depth: u64) {
74+
ensure_described();
75+
counter!(REORGS).increment(1);
76+
histogram!(REORG_DEPTH).record(depth as f64);
77+
}
78+
79+
/// Increment the stale-hints counter.
80+
pub(crate) fn inc_stale_hints() {
81+
ensure_described();
82+
counter!(STALE_HINTS).increment(1);
83+
}
84+
85+
/// Increment the RPC errors counter.
86+
pub(crate) fn inc_rpc_errors() {
87+
ensure_described();
88+
counter!(RPC_ERRORS).increment(1);
89+
}
90+
91+
/// Record a backfill batch completion.
92+
pub(crate) fn record_backfill_batch(duration: Duration) {
93+
ensure_described();
94+
counter!(BACKFILL_BATCHES).increment(1);
95+
histogram!(BACKFILL_BATCH_DURATION).record(duration.as_secs_f64() * 1000.0);
96+
}
97+
98+
/// Record `handle_new_head` duration.
99+
pub(crate) fn record_handle_new_head_duration(duration: Duration) {
100+
ensure_described();
101+
histogram!(HANDLE_NEW_HEAD_DURATION).record(duration.as_secs_f64() * 1000.0);
102+
}
103+
104+
/// Increment the tag-refreshes counter.
105+
pub(crate) fn inc_tag_refreshes() {
106+
ensure_described();
107+
counter!(TAG_REFRESHES).increment(1);
108+
}
109+
110+
/// Update the chain view length gauge.
111+
pub(crate) fn set_chain_view_len(len: usize) {
112+
ensure_described();
113+
gauge!(CHAIN_VIEW_LEN).set(len as f64);
114+
}
115+
116+
/// Update the tip block number gauge.
117+
pub(crate) fn set_tip(number: u64) {
118+
ensure_described();
119+
gauge!(TIP_NUMBER).set(number as f64);
120+
}

0 commit comments

Comments
 (0)