Skip to content

Commit 3935687

Browse files
prestwichclaude
andauthored
refactor(blobber): abstract blob sources and remove reth deps (#112)
* feat(blobber): add BlobSource and AsyncBlobSource traits Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(blobber): abstract blob sources and remove all reth dependencies - Extract BlobExplorerSource, BeaconBlobSource, PylonBlobSource as AsyncBlobSource impls - Make BlobFetcher non-generic with trait object sources - Replace LruMap with schnellru, GetBlobsResponse with alloy import - Replace BlobStoreError with boxed error - Remove dead FetchError variants (ConsensusClientUrlNotSet, PylonClientUrlNotSet, BlobCountMismatch) - Delete shim.rs (RecoveredBlockShim moves to host-reth in later task) - Remove reth, reth-chainspec, reth-transaction-pool dependencies Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(blobber): add MemoryBlobSource for testing Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(host-reth): add RethBlobSource and absorb RecoveredBlockShim Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(node-tests): replace reth pool with MemoryBlobSource Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(node-tests): remove remaining reth deps from utils.rs and rpc_debug.rs Replace reth type re-exports with signet-types and alloy equivalents. Remove reth from Cargo.toml entirely. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(blobber): add tracing for source errors and guard against empty blobs - Log source errors at DEBUG level in fetch_blobs (errors only matter if ALL sources fail) - Return Ok(None) instead of Ok(Some(empty)) when sources return zero blobs, so the fetcher continues to the next source - Fix stale comment referencing tokio::select (uses select_all) - Merge split alloy imports in rpc_debug.rs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(blobber): address PR review feedback - Consolidate BlobFuture/BlobSourceError type aliases into source.rs as pub(crate) and import from concrete source files - Add .error_for_status()? to beacon and pylon HTTP requests - Remove unused BuilderError::Client variant - Rename Blobs::FromPool to Blobs::FromSidecar Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 23ed864 commit 3935687

25 files changed

Lines changed: 484 additions & 566 deletions

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ itertools = "0.14.0"
106106
metrics = "0.24.2"
107107
openssl = { version = "0.10", features = ["vendored"] }
108108
reqwest = "0.12.9"
109+
schnellru = "0.2"
109110
serde = { version = "1.0.217", features = ["derive"] }
110111
serde_json = "1.0.137"
111112
tracing = "0.1.41"

crates/blobber/Cargo.toml

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,10 @@ alloy.workspace = true
1313

1414
init4-bin-base.workspace = true
1515
signet-extract.workspace = true
16-
signet-types.workspace = true
1716
signet-zenith.workspace = true
1817

19-
reth.workspace = true
20-
reth-chainspec.workspace = true
21-
reth-transaction-pool = { workspace = true, optional = true }
22-
18+
futures-util.workspace = true
19+
schnellru.workspace = true
2320
serde.workspace = true
2421
tokio.workspace = true
2522
tracing.workspace = true
@@ -31,11 +28,9 @@ thiserror.workspace = true
3128
[dev-dependencies]
3229
signet-constants = { workspace = true, features = ["test-utils"] }
3330

34-
reth-transaction-pool = { workspace = true, features = ["test-utils"] }
35-
3631
eyre.workspace = true
3732
serde_json.workspace = true
3833
tempfile.workspace = true
3934

4035
[features]
41-
test-utils = ["signet-constants/test-utils", "dep:reth-transaction-pool", "reth-transaction-pool?/test-utils"]
36+
test-utils = ["signet-constants/test-utils"]
Lines changed: 65 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,140 +1,92 @@
1-
use crate::{BlobCacher, BlobFetcher, BlobFetcherConfig};
2-
use reth::transaction_pool::TransactionPool;
3-
use url::Url;
1+
use crate::{
2+
AsyncBlobSource, BlobCacher, BlobFetcher, BlobFetcherConfig, BlobSource,
3+
sources::{BeaconBlobSource, BlobExplorerSource, PylonBlobSource},
4+
};
45

56
/// Errors that can occur while building the [`BlobFetcher`] with a
67
/// [`BlobFetcherBuilder`].
7-
#[derive(Debug, thiserror::Error)]
8+
#[derive(Debug, Clone, Copy, thiserror::Error)]
89
pub enum BuilderError {
9-
/// The transaction pool was not provided.
10-
#[error("transaction pool is required")]
11-
MissingPool,
12-
/// The explorer URL was not provided or could not be parsed.
13-
#[error("explorer URL is required and must be valid")]
14-
MissingExplorerUrl,
1510
/// The URL provided was invalid.
1611
#[error("invalid URL provided")]
1712
Url(#[from] url::ParseError),
18-
/// The client was not provided.
19-
#[error("client is required")]
20-
MissingClient,
21-
/// The client failed to build.
22-
#[error("failed to build client: {0}")]
23-
Client(#[from] reqwest::Error),
24-
/// The slot calculator was not provided.
25-
#[error("slot calculator is required")]
26-
MissingSlotCalculator,
2713
}
2814

2915
/// Builder for the [`BlobFetcher`].
30-
#[derive(Debug, Default, Clone)]
31-
pub struct BlobFetcherBuilder<Pool> {
32-
pool: Option<Pool>,
33-
explorer_url: Option<String>,
34-
client: Option<reqwest::Client>,
35-
cl_url: Option<String>,
36-
pylon_url: Option<String>,
16+
///
17+
/// Add synchronous and asynchronous blob sources, then call [`build`] to
18+
/// produce a [`BlobFetcher`] or [`build_cache`] for a [`BlobCacher`].
19+
///
20+
/// [`build`]: BlobFetcherBuilder::build
21+
/// [`build_cache`]: BlobFetcherBuilder::build_cache
22+
#[derive(Default)]
23+
pub struct BlobFetcherBuilder {
24+
sync_sources: Vec<Box<dyn BlobSource>>,
25+
async_sources: Vec<Box<dyn AsyncBlobSource>>,
3726
}
3827

39-
impl<Pool> BlobFetcherBuilder<Pool> {
40-
/// Set the transaction pool to use for the extractor.
41-
pub fn with_pool<P2>(self, pool: P2) -> BlobFetcherBuilder<P2> {
42-
BlobFetcherBuilder {
43-
pool: Some(pool),
44-
explorer_url: self.explorer_url,
45-
client: self.client,
46-
cl_url: self.cl_url,
47-
pylon_url: self.pylon_url,
48-
}
49-
}
50-
51-
/// Set the transaction pool to use a mock test pool.
52-
#[cfg(feature = "test-utils")]
53-
pub fn with_test_pool(self) -> BlobFetcherBuilder<reth_transaction_pool::test_utils::TestPool> {
54-
self.with_pool(reth_transaction_pool::test_utils::testing_pool())
55-
}
56-
57-
/// Set the configuration for the CL url, pylon url, from the provided
58-
/// [`BlobFetcherConfig`].
59-
pub fn with_config(self, config: &BlobFetcherConfig) -> Result<Self, BuilderError> {
60-
let this = self.with_explorer_url(config.blob_explorer_url());
61-
let this =
62-
if let Some(cl_url) = config.cl_url() { this.with_cl_url(cl_url)? } else { this };
63-
64-
if let Some(pylon_url) = config.pylon_url() {
65-
this.with_pylon_url(pylon_url)
66-
} else {
67-
Ok(this)
68-
}
28+
impl core::fmt::Debug for BlobFetcherBuilder {
29+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
30+
f.debug_struct("BlobFetcherBuilder")
31+
.field("sync_sources", &self.sync_sources.len())
32+
.field("async_sources", &self.async_sources.len())
33+
.finish()
6934
}
35+
}
7036

71-
/// Set the blob explorer URL to use for the extractor. This will be used
72-
/// to construct a [`foundry_blob_explorers::Client`].
73-
pub fn with_explorer_url(mut self, explorer_url: &str) -> Self {
74-
self.explorer_url = Some(explorer_url.to_string());
37+
impl BlobFetcherBuilder {
38+
/// Adds a synchronous blob source.
39+
pub fn with_source(mut self, source: impl BlobSource + 'static) -> Self {
40+
self.sync_sources.push(Box::new(source));
7541
self
7642
}
7743

78-
/// Set the [`reqwest::Client`] to use for the extractor. This client will
79-
/// be used to make requests to the blob explorer, and the CL and Pylon URLs
80-
/// if provided.
81-
pub fn with_client(mut self, client: reqwest::Client) -> Self {
82-
self.client = Some(client);
44+
/// Adds an asynchronous blob source.
45+
pub fn with_async_source(mut self, source: impl AsyncBlobSource + 'static) -> Self {
46+
self.async_sources.push(Box::new(source));
8347
self
8448
}
8549

86-
/// Set the [`reqwest::Client`] via a [reqwest::ClientBuilder]. This
87-
/// function will immediately build the client and return an error if it
88-
/// fails.
50+
/// Configures standard remote sources from a [`BlobFetcherConfig`].
8951
///
90-
/// This client will be used to make requests to the blob explorer, and the
91-
/// CL and Pylon URLs if provided.
92-
pub fn with_client_builder(self, client: reqwest::ClientBuilder) -> Result<Self, BuilderError> {
93-
client.build().map(|client| self.with_client(client)).map_err(Into::into)
94-
}
95-
96-
/// Set the CL URL to use for the extractor.
97-
pub fn with_cl_url(mut self, cl_url: &str) -> Result<Self, BuilderError> {
98-
self.cl_url = Some(cl_url.to_string());
99-
Ok(self)
100-
}
101-
102-
/// Set the Pylon URL to use for the extractor.
103-
pub fn with_pylon_url(mut self, pylon_url: &str) -> Result<Self, BuilderError> {
104-
self.pylon_url = Some(pylon_url.to_string());
105-
Ok(self)
52+
/// This constructs a [`BlobExplorerSource`], and optionally a
53+
/// [`BeaconBlobSource`] and [`PylonBlobSource`] depending on whether
54+
/// the config provides CL and Pylon URLs.
55+
pub fn with_config(
56+
self,
57+
config: &BlobFetcherConfig,
58+
client: reqwest::Client,
59+
) -> Result<Self, BuilderError> {
60+
let explorer = foundry_blob_explorers::Client::new_with_client(
61+
config.blob_explorer_url(),
62+
client.clone(),
63+
);
64+
let this = self.with_async_source(BlobExplorerSource::new(explorer));
65+
66+
let this = match config.cl_url() {
67+
Some(cl) => {
68+
let url = url::Url::parse(cl)?;
69+
this.with_async_source(BeaconBlobSource::new(client.clone(), url))
70+
}
71+
None => this,
72+
};
73+
74+
match config.pylon_url() {
75+
Some(pylon) => {
76+
let url = url::Url::parse(pylon)?;
77+
Ok(this.with_async_source(PylonBlobSource::new(client, url)))
78+
}
79+
None => Ok(this),
80+
}
10681
}
107-
}
108-
109-
impl<Pool: TransactionPool> BlobFetcherBuilder<Pool> {
110-
/// Build the [`BlobFetcher`] with the provided parameters.
111-
pub fn build(self) -> Result<BlobFetcher<Pool>, BuilderError> {
112-
let pool = self.pool.ok_or(BuilderError::MissingPool)?;
113-
114-
let explorer_url = self.explorer_url.ok_or(BuilderError::MissingExplorerUrl)?;
115-
116-
let cl_url = self.cl_url.map(parse_url).transpose()?;
11782

118-
let pylon_url = self.pylon_url.map(parse_url).transpose()?;
119-
120-
let client = self.client.ok_or(BuilderError::MissingClient)?;
121-
122-
let explorer =
123-
foundry_blob_explorers::Client::new_with_client(explorer_url, client.clone());
124-
125-
Ok(BlobFetcher::new(pool, explorer, client, cl_url, pylon_url))
83+
/// Build the [`BlobFetcher`].
84+
pub fn build(self) -> BlobFetcher {
85+
BlobFetcher::new(self.sync_sources, self.async_sources)
12686
}
12787

128-
/// Build a [`BlobCacher`] with the provided parameters.
129-
pub fn build_cache(self) -> Result<BlobCacher<Pool>, BuilderError>
130-
where
131-
Pool: 'static,
132-
{
133-
let fetcher = self.build()?;
134-
Ok(BlobCacher::new(fetcher))
88+
/// Build a [`BlobCacher`] wrapping the constructed [`BlobFetcher`].
89+
pub fn build_cache(self) -> BlobCacher {
90+
BlobCacher::new(self.build())
13591
}
13692
}
137-
138-
fn parse_url(url: String) -> Result<Url, BuilderError> {
139-
Url::parse(url.as_ref()).map_err(BuilderError::Url)
140-
}

crates/blobber/src/blobs/cache.rs

Lines changed: 12 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use crate::{BlobFetcher, BlobberError, BlobberResult, Blobs, FetchResult};
1+
use crate::{BlobFetcher, BlobSpec, BlobberError, BlobberResult, Blobs, FetchResult};
22
use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _};
33
use alloy::eips::eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA;
44
use alloy::eips::merge::EPOCH_SLOTS;
55
use alloy::primitives::{B256, Bytes, keccak256};
66
use core::fmt;
7-
use reth::{network::cache::LruMap, transaction_pool::TransactionPool};
7+
use schnellru::{ByLength, LruMap};
88
use signet_extract::ExtractedEvent;
99
use signet_zenith::Zenith::BlockSubmitted;
1010
use signet_zenith::ZenithBlock;
@@ -144,22 +144,22 @@ impl<Coder> CacheHandle<Coder> {
144144
}
145145

146146
/// Retrieves blobs and stores them in a cache for later use.
147-
pub struct BlobCacher<Pool> {
148-
fetcher: BlobFetcher<Pool>,
147+
pub struct BlobCacher {
148+
fetcher: BlobFetcher,
149149

150150
cache: Mutex<LruMap<(usize, B256), Blobs>>,
151151
}
152152

153-
impl<Pool: fmt::Debug> fmt::Debug for BlobCacher<Pool> {
153+
impl fmt::Debug for BlobCacher {
154154
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155155
f.debug_struct("BlobCacher").field("fetcher", &self.fetcher).finish_non_exhaustive()
156156
}
157157
}
158158

159-
impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
160-
/// Creates a new `BlobCacher` with the provided extractor and cache size.
161-
pub fn new(fetcher: BlobFetcher<Pool>) -> Self {
162-
Self { fetcher, cache: LruMap::new(BLOB_CACHE_SIZE).into() }
159+
impl BlobCacher {
160+
/// Creates a new `BlobCacher` with the provided fetcher and cache size.
161+
pub fn new(fetcher: BlobFetcher) -> Self {
162+
Self { fetcher, cache: LruMap::new(ByLength::new(BLOB_CACHE_SIZE)).into() }
163163
}
164164

165165
/// Fetches blobs for a given slot and transaction hash.
@@ -176,12 +176,14 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
176176
return Ok(blobs.clone());
177177
}
178178

179+
let spec = BlobSpec { tx_hash, slot, versioned_hashes };
180+
179181
// Cache miss, use the fetcher to retrieve blobs
180182
// Retry fetching blobs up to `FETCH_RETRIES` times
181183
for attempt in 1..=FETCH_RETRIES {
182184
let Ok(blobs) = self
183185
.fetcher
184-
.fetch_blobs(slot, tx_hash, &versioned_hashes)
186+
.fetch_blobs(&spec)
185187
.instrument(debug_span!("fetch_blobs_loop", attempt))
186188
.await
187189
else {
@@ -229,89 +231,3 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
229231
CacheHandle { sender, _coder: PhantomData }
230232
}
231233
}
232-
233-
#[cfg(test)]
234-
mod tests {
235-
use crate::BlobFetcher;
236-
237-
use super::*;
238-
use alloy::{
239-
consensus::{SidecarBuilder, SignableTransaction as _, TxEip2930},
240-
eips::Encodable2718,
241-
primitives::{TxKind, U256, bytes},
242-
rlp::encode,
243-
signers::{SignerSync, local::PrivateKeySigner},
244-
};
245-
use reth::primitives::Transaction;
246-
use reth_transaction_pool::{
247-
PoolTransaction, TransactionOrigin,
248-
test_utils::{MockTransaction, testing_pool},
249-
};
250-
use signet_types::{constants::SignetSystemConstants, primitives::TransactionSigned};
251-
252-
#[tokio::test]
253-
async fn test_fetch_from_pool() -> eyre::Result<()> {
254-
let wallet = PrivateKeySigner::random();
255-
let pool = testing_pool();
256-
257-
let test = signet_constants::KnownChains::Test;
258-
259-
let constants: SignetSystemConstants = test.try_into().unwrap();
260-
261-
let explorer_url = "https://api.holesky.blobscan.com/";
262-
let client = reqwest::Client::builder().use_rustls_tls();
263-
264-
let tx = Transaction::Eip2930(TxEip2930 {
265-
chain_id: 17001,
266-
nonce: 2,
267-
gas_limit: 50000,
268-
gas_price: 1_500_000_000,
269-
to: TxKind::Call(constants.host_zenith()),
270-
value: U256::from(1_f64),
271-
input: bytes!(""),
272-
..Default::default()
273-
});
274-
275-
let encoded_transactions =
276-
encode(vec![sign_tx_with_key_pair(wallet.clone(), tx).encoded_2718()]);
277-
278-
let result = SidecarBuilder::<SimpleCoder>::from_slice(&encoded_transactions).build_4844();
279-
assert!(result.is_ok());
280-
281-
let mut mock_transaction = MockTransaction::eip4844_with_sidecar(result.unwrap().into());
282-
let transaction =
283-
sign_tx_with_key_pair(wallet, Transaction::from(mock_transaction.clone()));
284-
285-
mock_transaction.set_hash(*transaction.hash());
286-
287-
pool.add_transaction(TransactionOrigin::Local, mock_transaction.clone()).await?;
288-
289-
// Spawn the cache
290-
let cache = BlobFetcher::builder()
291-
.with_pool(pool.clone())
292-
.with_explorer_url(explorer_url)
293-
.with_client_builder(client)
294-
.unwrap()
295-
.build_cache()?;
296-
let handle = cache.spawn::<SimpleCoder>();
297-
298-
let got = handle
299-
.fetch_blobs(
300-
0, // this is ignored by the pool
301-
*mock_transaction.hash(),
302-
mock_transaction.blob_versioned_hashes().unwrap().to_owned(),
303-
)
304-
.await;
305-
assert!(got.is_ok());
306-
307-
let got_blobs = got.unwrap();
308-
assert!(got_blobs.len() == 1);
309-
310-
Ok(())
311-
}
312-
313-
fn sign_tx_with_key_pair(wallet: PrivateKeySigner, tx: Transaction) -> TransactionSigned {
314-
let signature = wallet.sign_hash_sync(&tx.signature_hash()).unwrap();
315-
TransactionSigned::new_unhashed(tx, signature)
316-
}
317-
}

0 commit comments

Comments
 (0)