Skip to content

Commit e763727

Browse files
committed
make Store's public methods return Result<T,Error>, propagate and update every callers accordingly
1 parent 634e1c8 commit e763727

12 files changed

Lines changed: 417 additions & 362 deletions

File tree

bin/ethlambda/src/checkpoint_sync.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ pub enum CheckpointSyncError {
1919
Http(#[from] reqwest::Error),
2020
#[error("SSZ deserialization failed: {0:?}")]
2121
SszDecode(DecodeError),
22+
#[error("Storage error: {0}")]
23+
Storage(#[from] ethlambda_storage::Error),
2224
#[error("checkpoint state slot cannot be 0")]
2325
SlotIsZero,
2426
#[error("checkpoint state has no validators")]

bin/ethlambda/src/main.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ async fn main() -> eyre::Result<()> {
169169
// and the API server (which exposes GET/POST admin endpoints).
170170
let aggregator = AggregatorController::new(options.is_aggregator);
171171

172-
let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone());
172+
let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone())
173+
.expect("failed to spawn blockchain actor");
173174

174175
// Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the
175176
// AggregatorController — subnet subscriptions are decided once here and
@@ -458,7 +459,8 @@ async fn fetch_initial_state(
458459
let Some(checkpoint_url) = checkpoint_url else {
459460
info!("No checkpoint sync URL provided, initializing from genesis state");
460461
let genesis_state = State::from_genesis(genesis.genesis_time, validators);
461-
return Ok(Store::from_anchor_state(backend, genesis_state));
462+
return Store::from_anchor_state(backend, genesis_state)
463+
.map_err(checkpoint_sync::CheckpointSyncError::from);
462464
};
463465

464466
// Checkpoint sync path
@@ -476,5 +478,6 @@ async fn fetch_initial_state(
476478
);
477479

478480
// Store the anchor state and header, without body
479-
Ok(Store::from_anchor_state(backend, state))
481+
Store::from_anchor_state(backend, state)
482+
.map_err(checkpoint_sync::CheckpointSyncError::from)
480483
}

crates/blockchain/src/aggregation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ pub fn snapshot_aggregation_inputs(store: &Store) -> Option<AggregationSnapshot>
127127
return None;
128128
}
129129

130-
let head_state = store.head_state();
130+
let head_state = store.head_state().ok()?;
131131
let validators = &head_state.validators;
132132

133133
let gossip_roots: HashSet<H256> = gossip_groups

crates/blockchain/src/lib.rs

Lines changed: 67 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::collections::{HashMap, HashSet, VecDeque};
22
use std::time::{Duration, SystemTime};
33

4+
type Error = ethlambda_storage::Error;
5+
46
use ethlambda_network_api::{BlockChainToP2PRef, InitP2P};
57
use ethlambda_state_transition::is_proposer;
68
use ethlambda_storage::{ALL_TABLES, Store};
@@ -60,10 +62,10 @@ impl BlockChain {
6062
store: Store,
6163
validator_keys: HashMap<u64, ValidatorKeyPair>,
6264
aggregator: AggregatorController,
63-
) -> BlockChain {
65+
) -> Result<BlockChain, Error> {
6466
metrics::set_is_aggregator(aggregator.is_enabled());
6567
metrics::set_node_sync_status(metrics::SyncStatus::Idle);
66-
let genesis_time = store.config().genesis_time;
68+
let genesis_time = store.config()?.genesis_time;
6769
let key_manager = key_manager::KeyManager::new(validator_keys);
6870
let handle = BlockChainServer {
6971
store,
@@ -83,7 +85,7 @@ impl BlockChain {
8385
handle.context(),
8486
block_chain_protocol::Tick,
8587
);
86-
BlockChain { handle }
88+
Ok(BlockChain { handle })
8789
}
8890

8991
pub fn actor_ref(&self) -> &ActorRef<BlockChainServer> {
@@ -126,8 +128,8 @@ pub struct BlockChainServer {
126128
}
127129

128130
impl BlockChainServer {
129-
async fn on_tick(&mut self, timestamp_ms: u64, ctx: &Context<Self>) {
130-
let genesis_time_ms = self.store.config().genesis_time * 1000;
131+
async fn on_tick(&mut self, timestamp_ms: u64, ctx: &Context<Self>) -> Result<(), Error>{
132+
let genesis_time_ms = self.store.config()?.genesis_time * 1000;
131133

132134
// Calculate current slot and interval from milliseconds
133135
let time_since_genesis_ms = timestamp_ms.saturating_sub(genesis_time_ms);
@@ -136,9 +138,9 @@ impl BlockChainServer {
136138

137139
// Fail fast: a state with zero validators is invalid and would cause
138140
// panics in proposer selection and attestation processing.
139-
if self.store.head_state().validators.is_empty() {
141+
if self.store.head_state()?.validators.is_empty() {
140142
error!("Head state has no validators, skipping tick");
141-
return;
143+
return Ok(());
142144
}
143145

144146
// Update current slot metric
@@ -154,9 +156,11 @@ impl BlockChainServer {
154156
// At interval 0, check if we will propose (but don't build the block yet).
155157
// Tick forkchoice first to accept attestations, then build the block
156158
// using the freshly-accepted attestations.
157-
let proposer_validator_id = (interval == 0 && slot > 0)
158-
.then(|| self.get_our_proposer(slot))
159-
.flatten();
159+
let proposer_validator_id = if interval == 0 && slot > 0 {
160+
self.get_our_proposer(slot)?
161+
} else {
162+
None
163+
};
160164

161165
// Tick the store first - this accepts attestations at interval 0 if we have a proposal
162166
store::on_tick(
@@ -182,9 +186,10 @@ impl BlockChainServer {
182186
}
183187

184188
// Update safe target slot metric (updated by store.on_tick at interval 3)
185-
metrics::update_safe_target_slot(self.store.safe_target_slot());
189+
metrics::update_safe_target_slot(self.store.safe_target_slot()?);
186190
// Update head slot metric (head may change when attestations are promoted at intervals 0/4)
187-
metrics::update_head_slot(self.store.head_slot());
191+
metrics::update_head_slot(self.store.head_slot()?);
192+
Ok(())
188193
}
189194

190195
/// Kick off a committee-signature aggregation session:
@@ -244,21 +249,25 @@ impl BlockChainServer {
244249
}
245250

246251
/// Returns the validator ID if any of our validators is the proposer for this slot.
247-
fn get_our_proposer(&self, slot: u64) -> Option<u64> {
248-
let head_state = self.store.head_state();
252+
fn get_our_proposer(&self, slot: u64) -> Result<Option<u64>, Error> {
253+
let head_state = self.store.head_state()?;
249254
let num_validators = head_state.validators.len() as u64;
250255

251-
self.key_manager
256+
Ok(self.key_manager
252257
.validator_ids()
253258
.into_iter()
254-
.find(|&vid| is_proposer(vid, slot, num_validators))
259+
.find(|&vid| is_proposer(vid, slot, num_validators)))
255260
}
256261

257262
fn produce_attestations(&mut self, slot: u64, is_aggregator: bool) {
258263
let _timing = metrics::time_attestations_production();
259264

260265
// Produce attestation data once for all validators
261-
let attestation_data = store::produce_attestation_data(&self.store, slot);
266+
let Ok(attestation_data) = store::produce_attestation_data(&self.store, slot)
267+
.inspect_err(|err| error!(%slot, %err, "Failed to produce attestation data"))
268+
else {
269+
return;
270+
};
262271

263272
// For each registered validator, produce and publish attestation
264273
for validator_id in self.key_manager.validator_ids() {
@@ -359,14 +368,14 @@ impl BlockChainServer {
359368

360369
fn process_block(&mut self, signed_block: SignedBlock) -> Result<(), StoreError> {
361370
store::on_block(&mut self.store, signed_block)?;
362-
let head_slot = self.store.head_slot();
371+
let head_slot = self.store.head_slot()?;
363372
metrics::update_head_slot(head_slot);
364-
metrics::update_latest_justified_slot(self.store.latest_justified().slot);
365-
metrics::update_latest_finalized_slot(self.store.latest_finalized().slot);
373+
metrics::update_latest_justified_slot(self.store.latest_justified()?.slot);
374+
metrics::update_latest_finalized_slot(self.store.latest_finalized()?.slot);
366375
metrics::update_validators_count(self.key_manager.validator_ids().len() as u64);
367376

368377
// Update sync status based on head slot vs wall clock slot
369-
let current_slot = self.store.time() / INTERVALS_PER_SLOT;
378+
let current_slot = self.store.time()? / INTERVALS_PER_SLOT;
370379
let status = if head_slot >= current_slot {
371380
metrics::SyncStatus::Synced
372381
} else {
@@ -389,13 +398,15 @@ impl BlockChainServer {
389398
// Here we process blocks iteratively, to avoid recursive calls that could
390399
// cause a stack overflow.
391400
while let Some(block) = queue.pop_front() {
392-
self.process_or_pend_block(block, &mut queue);
401+
let _ = self.process_or_pend_block(block, &mut queue)
402+
.inspect_err(|e| error!(%e, "Failed to process or pend block"));
393403
}
394404

395405
// Prune old states and blocks AFTER the entire cascade completes.
396406
// Running this mid-cascade would delete states that pending children
397407
// still need, causing re-processing loops when fallback pruning is active.
398-
self.store.prune_old_data();
408+
let _ = self.store.prune_old_data()
409+
.inspect_err(|e| error!(%e, "Failed to prune old data"));
399410
}
400411

401412
/// Try to process a single block. If its parent state is missing, store it
@@ -405,7 +416,7 @@ impl BlockChainServer {
405416
&mut self,
406417
signed_block: SignedBlock,
407418
queue: &mut VecDeque<SignedBlock>,
408-
) {
419+
) -> Result<(), Error> {
409420
let slot = signed_block.message.slot;
410421
let block_root = signed_block.message.hash_tree_root();
411422
let parent_root = signed_block.message.parent_root;
@@ -415,13 +426,13 @@ impl BlockChainServer {
415426
// already part of the canonical chain and cannot affect fork choice.
416427
// Discard any pending children: since we won't process this block,
417428
// children referencing it as parent would remain stuck indefinitely.
418-
if slot <= self.store.latest_finalized().slot {
429+
if slot <= self.store.latest_finalized()?.slot {
419430
self.discard_pending_subtree(block_root);
420-
return;
431+
return Ok(());
421432
}
422433

423434
// Check if parent state exists before attempting to process
424-
if !self.store.has_state(&parent_root) {
435+
if !self.store.has_state(&parent_root)? {
425436
info!(%slot, %parent_root, %block_root, "Block parent missing, storing as pending");
426437

427438
// Resolve the actual missing ancestor by walking the chain. A stale entry
@@ -435,7 +446,8 @@ impl BlockChainServer {
435446
self.pending_block_parents.insert(block_root, missing_root);
436447

437448
// Persist block data to DB (no LiveChain entry — invisible to fork choice)
438-
self.store.insert_pending_block(block_root, signed_block);
449+
let _ = self.store.insert_pending_block(block_root, signed_block)
450+
.inspect_err(|e| warn!(%block_root, %e, "Failed to persist pending block"));
439451

440452
// Store only the H256 reference in memory
441453
self.pending_blocks
@@ -447,16 +459,30 @@ impl BlockChainServer {
447459
// session, the actual missing block is further up the chain.
448460
// Note: this loop always terminates — blocks reference parents by hash,
449461
// so a cycle would require a hash collision.
450-
while let Some(header) = self.store.get_block_header(&missing_root) {
451-
if self.store.has_state(&header.parent_root) {
462+
loop {
463+
let header = match self.store.get_block_header(&missing_root) {
464+
Ok(Some(h)) => h,
465+
Ok(None) => break,
466+
Err(e) => {
467+
error!(%missing_root, %e, "DB error during pending-block walk-up");
468+
break;
469+
}
470+
};
471+
if self.store.has_state(&header.parent_root)? {
452472
// Parent state available — enqueue for processing, cascade
453473
// handles the rest via the outer loop.
454-
let block = self
455-
.store
456-
.get_signed_block(&missing_root)
457-
.expect("header and parent state exist, so the full signed block must too");
458-
queue.push_back(block);
459-
return;
474+
match self.store.get_signed_block(&missing_root) {
475+
Ok(Some(block)) => {
476+
queue.push_back(block);
477+
}
478+
Ok(None) => {
479+
warn!(%missing_root, "Pending block missing from DB during walk-up");
480+
}
481+
Err(e) => {
482+
error!(%missing_root, %e, "DB error fetching pending block during walk-up");
483+
}
484+
}
485+
return Ok(());
460486
}
461487
// Block exists but parent doesn't have state — register as pending
462488
// so the cascade works when the true ancestor arrives
@@ -471,7 +497,7 @@ impl BlockChainServer {
471497

472498
// Request the actual missing block from network
473499
self.request_missing_block(missing_root);
474-
return;
500+
return Ok(());
475501
}
476502

477503
// Parent exists, proceed with processing
@@ -499,6 +525,7 @@ impl BlockChainServer {
499525
);
500526
}
501527
}
528+
Ok(())
502529
}
503530

504531
fn request_missing_block(&mut self, block_root: H256) {
@@ -528,7 +555,7 @@ impl BlockChainServer {
528555
self.pending_block_parents.remove(&block_root);
529556

530557
// Load block data from DB
531-
let Some(child_block) = self.store.get_signed_block(&block_root) else {
558+
let Ok(Some(child_block)) = self.store.get_signed_block(&block_root) else {
532559
warn!(
533560
block_root = %ShortRoot(&block_root.0),
534561
"Pending block missing from DB, skipping"
@@ -588,7 +615,8 @@ impl BlockChainServer {
588615
let timestamp = SystemTime::UNIX_EPOCH
589616
.elapsed()
590617
.expect("already past the unix epoch");
591-
self.on_tick(timestamp.as_millis() as u64, ctx).await;
618+
let _ = self.on_tick(timestamp.as_millis() as u64, ctx).await
619+
.inspect_err(|e| error!(%e, "Tick failed"));
592620
// Schedule the next tick at the next 800ms interval boundary
593621
let ms_since_epoch = timestamp.as_millis() as u64;
594622
let ms_to_next_interval =

0 commit comments

Comments
 (0)