diff --git a/Cargo.lock b/Cargo.lock index 1378eca1..ba59e2a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2083,6 +2083,7 @@ dependencies = [ "spawned-concurrency 0.5.0", "thiserror 2.0.18", "tokio", + "tokio-util", "tracing", ] diff --git a/crates/blockchain/Cargo.toml b/crates/blockchain/Cargo.toml index d3370e49..65c6ecf2 100644 --- a/crates/blockchain/Cargo.toml +++ b/crates/blockchain/Cargo.toml @@ -22,6 +22,7 @@ ethlambda-types.workspace = true spawned-concurrency.workspace = true tokio.workspace = true +tokio-util = { version = "0.7", default-features = false } rayon.workspace = true thiserror.workspace = true diff --git a/crates/blockchain/src/aggregation.rs b/crates/blockchain/src/aggregation.rs new file mode 100644 index 00000000..07829164 --- /dev/null +++ b/crates/blockchain/src/aggregation.rs @@ -0,0 +1,460 @@ +//! Committee-signature aggregation: off-thread worker orchestration and the +//! pure functions it runs. +//! +//! The blockchain actor fires one aggregation session per interval 2 via +//! [`run_aggregation_worker`]. The actor stays on its message loop; the worker +//! runs the expensive XMSS proofs on a `spawn_blocking` thread and streams +//! results back as [`AggregateProduced`] / [`AggregationDone`] messages. + +use std::collections::HashSet; +use std::time::{Duration, Instant}; + +use ethlambda_crypto::aggregate_mixed; +use ethlambda_storage::Store; +use ethlambda_types::{ + attestation::{AggregationBits, HashedAttestationData}, + block::{AggregatedSignatureProof, ByteListMiB}, + primitives::H256, + signature::{ValidatorPublicKey, ValidatorSignature}, + state::Validator, +}; +use spawned_concurrency::message::Message; +use spawned_concurrency::tasks::ActorRef; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; + +use crate::metrics; + +/// Soft deadline for committee-signature aggregation measured from the +/// interval-2 tick. After this much wall time elapses, the actor signals the +/// worker to stop via its cancellation token. The 50 ms budget before the next +/// interval (interval 3 at +800 ms) is reserved for publishing any late-arriving +/// aggregates and for gossip propagation margin. +pub(crate) const AGGREGATION_DEADLINE: Duration = Duration::from_millis(750); +/// Upper bound we wait for a prior worker to exit if it is still running when +/// the next session is about to start. Reached only in pathological cases +/// (mismatched timers, stuck proofs); we warn before blocking. +pub(crate) const PRIOR_WORKER_JOIN_TIMEOUT: Duration = Duration::from_secs(2); + +/// A single pre-prepared aggregation group. +/// +/// Built on the actor thread from a store snapshot; consumed by an off-thread +/// worker that only needs to run the expensive `aggregate_mixed` call. Holding +/// this struct requires no store access. +pub struct AggregationJob { + pub(crate) hashed: HashedAttestationData, + pub(crate) slot: u64, + /// Pre-resolved `(participant_pubkeys, proof_data)` pairs for children + /// selected via greedy coverage. + pub(crate) children: Vec<(Vec, ByteListMiB)>, + pub(crate) accepted_child_ids: Vec, + pub(crate) raw_pubkeys: Vec, + pub(crate) raw_sigs: Vec, + pub(crate) raw_ids: Vec, + /// Gossip-signature keys to delete on successful aggregation. + pub(crate) keys_to_delete: Vec<(u64, H256)>, +} + +/// All input needed to run a session of committee-signature aggregation off-thread. +pub struct AggregationSnapshot { + pub(crate) jobs: Vec, + pub(crate) groups_considered: usize, +} + +/// Result of one successful aggregation group. Carried back to the actor thread +/// as a message payload so the store can be updated and gossip publish fired. +pub struct AggregatedGroupOutput { + pub(crate) hashed: HashedAttestationData, + pub(crate) proof: AggregatedSignatureProof, + pub(crate) participants: Vec, + pub(crate) keys_to_delete: Vec<(u64, H256)>, +} + +/// Tracks an in-flight off-thread aggregation worker so the actor can cancel, +/// join, and correlate incoming result messages with the right session. +pub(crate) struct AggregationSession { + /// Slot at which this session was started; used as a fencing id so we can + /// drop late-arriving messages from a prior session. + pub(crate) session_id: u64, + /// Child of the actor cancellation token; fires either at the deadline or + /// when the actor itself is stopping. + pub(crate) cancel: CancellationToken, + /// Handle to the `spawn_blocking` worker. Held so `stopped()` / new-session + /// start can await completion. + pub(crate) worker: tokio::task::JoinHandle<()>, +} + +/// One successful aggregate streamed back from the worker. +pub(crate) struct AggregateProduced { + pub(crate) session_id: u64, + pub(crate) output: AggregatedGroupOutput, +} +impl Message for AggregateProduced { + type Result = (); +} + +/// Emitted by the worker after its loop exits (completion or cancellation). +pub(crate) struct AggregationDone { + pub(crate) session_id: u64, + pub(crate) groups_considered: usize, + pub(crate) groups_aggregated: usize, + pub(crate) total_raw_sigs: usize, + pub(crate) total_children: usize, + pub(crate) total_elapsed: Duration, + pub(crate) cancelled: bool, +} +impl Message for AggregationDone { + type Result = (); +} + +/// Self-message scheduled via `send_after` at interval-2 start. Cancels the +/// session's token so the worker stops starting new aggregations. +pub(crate) struct AggregationDeadline { + pub(crate) session_id: u64, +} +impl Message for AggregationDeadline { + type Result = (); +} + +/// Build a snapshot of everything needed to aggregate. Runs on the actor +/// thread, touches the store, does no heavy cryptography. Returns `None` when +/// there is nothing to aggregate so callers can avoid spawning an empty worker. +pub fn snapshot_aggregation_inputs(store: &Store) -> Option { + let gossip_groups = store.iter_gossip_signatures(); + let new_payload_keys = store.new_payload_keys(); + + if gossip_groups.is_empty() && new_payload_keys.is_empty() { + return None; + } + + let head_state = store.head_state(); + let validators = &head_state.validators; + + let gossip_roots: HashSet = gossip_groups + .iter() + .map(|(hashed, _)| hashed.root()) + .collect(); + + let groups_considered = gossip_groups.len() + + new_payload_keys + .iter() + .filter(|(root, _)| !gossip_roots.contains(root)) + .count(); + + let mut jobs = Vec::with_capacity(groups_considered); + + // Pass 1: attestation data with gossip signatures (may also reuse existing proofs as children). + for (hashed, validator_sigs) in &gossip_groups { + if let Some(job) = build_job(store, validators, hashed.clone(), Some(validator_sigs)) { + jobs.push(job); + } + } + + // Pass 2: attestation data with pending proofs but no gossip signatures — pure recursive merge. + for (data_root, att_data) in &new_payload_keys { + if gossip_roots.contains(data_root) { + continue; + } + // Cheap pre-check to skip the expensive `existing_proofs_for_data` clone when + // fewer than 2 proofs are present (merge needs at least 2). + if store.proof_count_for_data(data_root) < 2 { + continue; + } + let hashed = HashedAttestationData::new(att_data.clone()); + if let Some(job) = build_job(store, validators, hashed, None) { + jobs.push(job); + } + } + + Some(AggregationSnapshot { + jobs, + groups_considered, + }) +} + +/// Build one `AggregationJob` for a given attestation data. Returns `None` when +/// there is not enough material for a viable aggregation (no raw sigs and fewer +/// than two children). `validator_sigs` is `None` for Pass 2 (payload-only). +fn build_job( + store: &Store, + validators: &[Validator], + hashed: HashedAttestationData, + validator_sigs: Option<&[(u64, ValidatorSignature)]>, +) -> Option { + let data_root = hashed.root(); + let (new_proofs, known_proofs) = store.existing_proofs_for_data(&data_root); + let (child_proofs, covered) = select_proofs_greedily(&new_proofs, &known_proofs); + + let mut raw_sigs = Vec::new(); + let mut raw_pubkeys = Vec::new(); + let mut raw_ids = Vec::new(); + for (vid, sig) in validator_sigs.into_iter().flatten() { + if covered.contains(vid) { + continue; + } + let Some(validator) = validators.get(*vid as usize) else { + continue; + }; + let Ok(pubkey) = validator.get_attestation_pubkey() else { + continue; + }; + raw_sigs.push(sig.clone()); + raw_pubkeys.push(pubkey); + raw_ids.push(*vid); + } + + let (children, accepted_child_ids) = resolve_child_pubkeys(&child_proofs, validators); + + if raw_ids.is_empty() && children.len() < 2 { + return None; + } + + let keys_to_delete: Vec<(u64, H256)> = validator_sigs + .into_iter() + .flatten() + .map(|(vid, _)| (*vid, data_root)) + .collect(); + + let slot = hashed.data().slot; + Some(AggregationJob { + hashed, + slot, + children, + accepted_child_ids, + raw_pubkeys, + raw_sigs, + raw_ids, + keys_to_delete, + }) +} + +/// Resolve each child's participant pubkeys. Drops any child whose pubkeys +/// can't be fully resolved (passing fewer pubkeys than the proof expects would +/// produce an invalid aggregate). +fn resolve_child_pubkeys( + child_proofs: &[AggregatedSignatureProof], + validators: &[Validator], +) -> (Vec<(Vec, ByteListMiB)>, Vec) { + let mut children = Vec::with_capacity(child_proofs.len()); + let mut accepted_child_ids: Vec = Vec::new(); + + for proof in child_proofs { + let participant_ids: Vec = proof.participant_indices().collect(); + let child_pubkeys: Vec = participant_ids + .iter() + .filter_map(|&vid| validators.get(vid as usize)?.get_attestation_pubkey().ok()) + .collect(); + if child_pubkeys.len() != participant_ids.len() { + warn!( + expected = participant_ids.len(), + resolved = child_pubkeys.len(), + "Skipping child proof: could not resolve all participant pubkeys" + ); + continue; + } + accepted_child_ids.extend(&participant_ids); + children.push((child_pubkeys, proof.proof_data.clone())); + } + + (children, accepted_child_ids) +} + +/// Run the expensive `aggregate_mixed` call for a single prepared job. +/// +/// Pure function — no store access, safe to call from a `tokio::task::spawn_blocking` +/// worker. Returns `None` on cryptographic failure. +pub fn aggregate_job(job: AggregationJob) -> Option { + if job.raw_ids.is_empty() && job.children.len() < 2 { + return None; + } + + let slot_u32: u32 = job.slot.try_into().expect("slot exceeds u32"); + let data_root = job.hashed.root(); + + let proof_data = { + let _timing = metrics::time_pq_sig_aggregated_signatures_building(); + aggregate_mixed( + job.children, + job.raw_pubkeys, + job.raw_sigs, + &data_root, + slot_u32, + ) + } + .inspect_err(|err| warn!(%err, "Failed to aggregate committee signatures")) + .ok()?; + + let mut participants: Vec = job.raw_ids; + participants.extend(&job.accepted_child_ids); + participants.sort_unstable(); + participants.dedup(); + + let aggregation_bits = aggregation_bits_from_validator_indices(&participants); + + Some(AggregatedGroupOutput { + hashed: job.hashed, + proof: AggregatedSignatureProof::new(aggregation_bits, proof_data), + participants, + keys_to_delete: job.keys_to_delete, + }) +} + +/// Apply a worker-produced aggregate to the store. Called per message on the +/// actor thread; gauge metrics that depend on total counts are batched into +/// `finalize_aggregation_session` so we pay one lock per session instead of +/// one per aggregate. Idempotent wrt the gossip delete. +pub fn apply_aggregated_group(store: &mut Store, output: &AggregatedGroupOutput) { + store.insert_new_aggregated_payload(output.hashed.clone(), output.proof.clone()); + store.delete_gossip_signatures(&output.keys_to_delete); + + metrics::inc_pq_sig_aggregated_signatures(); + metrics::inc_pq_sig_attestations_in_aggregated_signatures(output.participants.len() as u64); +} + +/// End-of-session gauge refresh. Called once after the worker finishes so the +/// `lean_latest_new_aggregated_payloads` and `lean_gossip_signatures` gauges +/// settle on the final counts instead of being churned per aggregate. +pub fn finalize_aggregation_session(store: &Store) { + metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); + metrics::update_gossip_signatures(store.gossip_signatures_count()); +} + +/// Greedy set-cover selection of proofs to maximize validator coverage. +/// +/// Processes proof sets in priority order (new before known). Within each set, +/// repeatedly picks the proof covering the most uncovered validators until +/// no proof adds new coverage. This keeps the number of children minimal +/// while maximizing the validators we can skip re-aggregating from scratch. +fn select_proofs_greedily( + new_proofs: &[AggregatedSignatureProof], + known_proofs: &[AggregatedSignatureProof], +) -> (Vec, HashSet) { + let mut selected: Vec = Vec::new(); + let mut covered: HashSet = HashSet::new(); + + for proof_set in [new_proofs, known_proofs] { + let mut remaining: Vec<&AggregatedSignatureProof> = proof_set.iter().collect(); + + while !remaining.is_empty() { + let best_idx = remaining + .iter() + .enumerate() + .max_by_key(|(_, p)| { + p.participant_indices() + .filter(|vid| !covered.contains(vid)) + .count() + }) + .map(|(i, _)| i) + .expect("remaining is non-empty"); + + let new_coverage: HashSet = remaining[best_idx] + .participant_indices() + .filter(|vid| !covered.contains(vid)) + .collect(); + + if new_coverage.is_empty() { + break; + } + + selected.push(remaining.swap_remove(best_idx).clone()); + covered.extend(new_coverage); + } + } + + (selected, covered) +} + +/// Build an AggregationBits bitfield from a list of validator indices. +pub(crate) fn aggregation_bits_from_validator_indices(bits: &[u64]) -> AggregationBits { + if bits.is_empty() { + return AggregationBits::with_length(0).unwrap(); + } + let max_id = bits + .iter() + .copied() + .max() + .expect("already checked it's non-empty") as usize; + let mut aggregation_bits = + AggregationBits::with_length(max_id + 1).expect("validator count exceeds limit"); + + for &vid in bits { + aggregation_bits + .set(vid as usize, true) + .expect("capacity support highest validator id"); + } + aggregation_bits +} + +/// Worker loop — runs on a `spawn_blocking` thread, no store access. +/// +/// Pulls jobs from the snapshot, runs [`aggregate_job`] for each, and streams +/// successful aggregates back to the actor as [`AggregateProduced`] messages. +/// Emits [`AggregationDone`] when the loop exits (completion or cancellation). +pub(crate) fn run_aggregation_worker( + snapshot: AggregationSnapshot, + actor: ActorRef, + cancel: CancellationToken, + session_id: u64, +) { + let start = Instant::now(); + let groups_considered = snapshot.groups_considered; + let mut groups_aggregated = 0usize; + let mut total_raw_sigs = 0usize; + let mut total_children = 0usize; + + for job in snapshot.jobs { + if cancel.is_cancelled() { + break; + } + + let slot = job.slot; + let raw_sigs = job.raw_ids.len(); + let children = job.children.len(); + + let group_start = Instant::now(); + let Some(output) = aggregate_job(job) else { + let elapsed = group_start.elapsed(); + warn!( + session_id, + slot, + raw_sigs, + children, + ?elapsed, + "Committee signature aggregation failed" + ); + continue; + }; + let elapsed = group_start.elapsed(); + info!( + session_id, + slot, + raw_sigs, + children, + participants = output.participants.len(), + ?elapsed, + "Committee signature aggregated" + ); + + groups_aggregated += 1; + total_raw_sigs += raw_sigs; + total_children += children; + + if actor + .send(AggregateProduced { session_id, output }) + .is_err() + { + // Actor is gone; no point producing more. + break; + } + } + + let _ = actor.send(AggregationDone { + session_id, + groups_considered, + groups_aggregated, + total_raw_sigs, + total_children, + total_elapsed: start.elapsed(), + cancelled: cancel.is_cancelled(), + }); +} diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 9ee4ee8e..59a09be4 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -11,15 +11,21 @@ use ethlambda_types::{ primitives::{H256, HashTreeRoot as _}, }; +use crate::aggregation::{ + AGGREGATION_DEADLINE, AggregateProduced, AggregationDeadline, AggregationDone, + AggregationSession, PRIOR_WORKER_JOIN_TIMEOUT, run_aggregation_worker, +}; use crate::key_manager::ValidatorKeyPair; use spawned_concurrency::actor; use spawned_concurrency::error::ActorError; use spawned_concurrency::protocol; use spawned_concurrency::tasks::{Actor, ActorRef, ActorStart, Context, Handler, send_after}; +use tokio_util::sync::CancellationToken; use tracing::{error, info, trace, warn}; use crate::store::StoreError; +pub mod aggregation; pub(crate) mod fork_choice_tree; pub mod key_manager; pub mod metrics; @@ -57,6 +63,7 @@ impl BlockChain { pending_blocks: HashMap::new(), is_aggregator, pending_block_parents: HashMap::new(), + current_aggregation: None, } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -98,10 +105,15 @@ pub struct BlockChainServer { /// Whether this node acts as a committee aggregator. is_aggregator: bool, + + /// In-flight committee-signature aggregation, if any. Present only while a + /// worker started at the most recent interval 2 is still running or until + /// the next interval 2 takes over. + current_aggregation: Option, } impl BlockChainServer { - fn on_tick(&mut self, timestamp_ms: u64) { + async fn on_tick(&mut self, timestamp_ms: u64, ctx: &Context) { let genesis_time_ms = self.store.config().genesis_time * 1000; // Calculate current slot and interval from milliseconds @@ -127,19 +139,14 @@ impl BlockChainServer { .flatten(); // Tick the store first - this accepts attestations at interval 0 if we have a proposal - let new_aggregates = store::on_tick( + store::on_tick( &mut self.store, timestamp_ms, proposer_validator_id.is_some(), - self.is_aggregator, ); - if let Some(ref p2p) = self.p2p { - for aggregate in new_aggregates { - let _ = p2p - .publish_aggregated_attestation(aggregate) - .inspect_err(|err| error!(%err, "Failed to publish aggregated attestation")); - } + if interval == 2 && self.is_aggregator { + self.start_aggregation_session(slot, ctx).await; } // Now build and publish the block (after attestations have been accepted) @@ -158,6 +165,62 @@ impl BlockChainServer { metrics::update_head_slot(self.store.head_slot()); } + /// Kick off a committee-signature aggregation session: + /// 1. If a prior session is still running (pathological), warn and join it. + /// 2. Snapshot the aggregation inputs from the store. + /// 3. Spawn a `spawn_blocking` worker that streams results back as messages. + /// 4. Schedule the `AggregationDeadline` self-message at +750 ms. + async fn start_aggregation_session(&mut self, slot: u64, ctx: &Context) { + if let Some(prior) = self.current_aggregation.take() { + prior.cancel.cancel(); + if !prior.worker.is_finished() { + warn!( + prior_session_id = prior.session_id, + new_session_id = slot, + "Prior aggregation worker still running at next session start; joining before proceeding" + ); + } + match tokio::time::timeout(PRIOR_WORKER_JOIN_TIMEOUT, prior.worker).await { + Ok(Ok(())) => {} + Ok(Err(err)) => warn!(?err, "Prior aggregation worker task ended abnormally"), + Err(_) => warn!( + timeout_secs = PRIOR_WORKER_JOIN_TIMEOUT.as_secs(), + "Timed out joining prior aggregation worker" + ), + } + } + + let Some(snapshot) = aggregation::snapshot_aggregation_inputs(&self.store) else { + // No gossip sigs and no pending payloads — nothing to aggregate this slot. + return; + }; + + let session_id = slot; + // Independent token per session. Shutdown propagates via our + // #[stopped] hook which cancels any current session; the deadline + // timer cancels this specific session at +AGGREGATION_DEADLINE. + let cancel = CancellationToken::new(); + let actor_ref = ctx.actor_ref(); + + let worker_cancel = cancel.clone(); + let worker_actor = actor_ref.clone(); + let worker = tokio::task::spawn_blocking(move || { + run_aggregation_worker(snapshot, worker_actor, worker_cancel, session_id); + }); + + let _deadline_timer = send_after( + AGGREGATION_DEADLINE, + ctx.clone(), + AggregationDeadline { session_id }, + ); + + self.current_aggregation = Some(AggregationSession { + session_id, + cancel, + worker, + }); + } + /// Returns the validator ID if any of our validators is the proposer for this slot. fn get_our_proposer(&self, slot: u64) -> Option { let head_state = self.store.head_state(); @@ -497,7 +560,7 @@ impl BlockChainServer { let timestamp = SystemTime::UNIX_EPOCH .elapsed() .expect("already past the unix epoch"); - self.on_tick(timestamp.as_millis() as u64); + self.on_tick(timestamp.as_millis() as u64, ctx).await; // Schedule the next tick at the next 800ms interval boundary let ms_since_epoch = timestamp.as_millis() as u64; let ms_to_next_interval = @@ -508,6 +571,31 @@ impl BlockChainServer { block_chain_protocol::Tick, ); } + + /// Actor lifecycle hook: wait for any in-flight aggregation worker to exit + /// before the actor is fully stopped. We cancel the session's token and + /// wait up to PRIOR_WORKER_JOIN_TIMEOUT for the worker's current + /// `aggregate_job` call to finish (the proof itself cannot be interrupted). + #[stopped] + async fn on_stopped(&mut self, _ctx: &Context) { + let Some(session) = self.current_aggregation.take() else { + return; + }; + session.cancel.cancel(); + match tokio::time::timeout(PRIOR_WORKER_JOIN_TIMEOUT, session.worker).await { + Ok(Ok(())) => { + info!( + session_id = session.session_id, + "Aggregation worker joined on shutdown" + ); + } + Ok(Err(err)) => warn!(?err, "Aggregation worker task ended abnormally on shutdown"), + Err(_) => warn!( + timeout_secs = PRIOR_WORKER_JOIN_TIMEOUT.as_secs(), + "Timed out joining aggregation worker on shutdown" + ), + } + } } // --- Manual Handler impls for network-api messages --- @@ -540,3 +628,66 @@ impl Handler for BlockChainServer { self.on_gossip_aggregated_attestation(msg.attestation); } } + +// ------------------------------------------------------------------------- +// Aggregation message handlers (worker → actor, actor → self for deadline) +// ------------------------------------------------------------------------- + +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: AggregateProduced, _ctx: &Context) { + // Drop results from a prior session (or from an unexpected late worker). + // Current session may be None if the actor already cleaned it up; accept + // the message only when ids match. + let current = self.current_aggregation.as_ref().map(|s| s.session_id); + if current != Some(msg.session_id) { + trace!( + incoming_session_id = msg.session_id, + current_session_id = ?current, + "Dropping stale aggregate produced for non-current session" + ); + return; + } + + aggregation::apply_aggregated_group(&mut self.store, &msg.output); + + if let Some(ref p2p) = self.p2p { + let aggregate = SignedAggregatedAttestation { + data: msg.output.hashed.data().clone(), + proof: msg.output.proof, + }; + let _ = p2p + .publish_aggregated_attestation(aggregate) + .inspect_err(|err| error!(%err, "Failed to publish aggregated attestation")); + } + } +} + +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: AggregationDone, _ctx: &Context) { + aggregation::finalize_aggregation_session(&self.store); + metrics::observe_committee_signatures_aggregation(msg.total_elapsed); + + let aggregation_elapsed = msg.total_elapsed; + info!( + ?aggregation_elapsed, + session_id = msg.session_id, + groups_considered = msg.groups_considered, + groups_aggregated = msg.groups_aggregated, + total_raw_sigs = msg.total_raw_sigs, + total_children = msg.total_children, + cancelled = msg.cancelled, + aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64, + "Committee signatures aggregated" + ); + } +} + +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: AggregationDeadline, _ctx: &Context) { + if let Some(session) = &self.current_aggregation + && session.session_id == msg.session_id + { + session.cancel.cancel(); + } + } +} diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index e59c5b9a..aa6e195d 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -514,9 +514,11 @@ pub fn time_pq_sig_aggregated_signatures_verification() -> TimingGuard { TimingGuard::new(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS) } -/// Start timing committee signatures aggregation. Records duration when the guard is dropped. -pub fn time_committee_signatures_aggregation() -> TimingGuard { - TimingGuard::new(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS) +/// Observe committee-signature aggregation duration. Measured in the +/// off-thread worker and reported back via an `AggregationDone` message, so a +/// drop-guard that crosses the thread boundary is not appropriate here. +pub fn observe_committee_signatures_aggregation(elapsed: std::time::Duration) { + LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS.observe(elapsed.as_secs_f64()); } /// Update a table byte size gauge. diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 58f478f2..4bb01bcf 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -1,6 +1,6 @@ use std::collections::{HashMap, HashSet}; -use ethlambda_crypto::{aggregate_mixed, aggregate_proofs}; +use ethlambda_crypto::aggregate_proofs; use ethlambda_state_transition::{ is_proposer, process_block, process_slots, slot_is_justifiable_after, }; @@ -14,7 +14,7 @@ use ethlambda_types::{ block::{AggregatedAttestations, AggregatedSignatureProof, Block, BlockBody, SignedBlock}, checkpoint::Checkpoint, primitives::{H256, HashTreeRoot as _}, - signature::{ValidatorPublicKey, ValidatorSignature}, + signature::ValidatorSignature, state::State, }; use tracing::{info, trace, warn}; @@ -121,275 +121,6 @@ fn update_safe_target(store: &mut Store) { store.set_safe_target(safe_target); } -/// Aggregate committee signatures at interval 2 using mixed aggregation. -/// -/// Iterates over the union of attestation data with gossip signatures OR pending -/// new payloads (`new.keys() | gossip_sigs.keys()` in the spec). For each entry: -/// -/// 1. **Selects** existing proofs from new/known payload buffers (greedy set-cover) -/// 2. **Fills** uncovered validators with raw gossip signatures -/// 3. **Aggregates** both children proofs and raw signatures in a single `xmss_aggregate` call -/// -/// This matches the spec's incremental proof-building strategy: previous proofs -/// are fed as children so only genuinely new signatures are aggregated from scratch, -/// keeping proof trees shallow and avoiding redundant cryptographic work. -/// -/// Results are inserted into the new (pending) payload buffer. They become -/// fork-choice-active after `accept_new_attestations` promotes them to known -/// at interval 0 (with proposal) or interval 4. -fn aggregate_committee_signatures(store: &mut Store) -> Vec { - let gossip_groups = store.iter_gossip_signatures(); - let new_payload_keys = store.new_payload_keys(); - - if gossip_groups.is_empty() && new_payload_keys.is_empty() { - return Vec::new(); - } - let _timing = metrics::time_committee_signatures_aggregation(); - - let mut new_aggregates: Vec = Vec::new(); - - let head_state = store.head_state(); - let validators = &head_state.validators; - - let mut keys_to_delete: Vec<(u64, H256)> = Vec::new(); - let mut payload_entries: Vec<(HashedAttestationData, AggregatedSignatureProof)> = Vec::new(); - - let gossip_roots: HashSet = gossip_groups - .iter() - .map(|(hashed, _)| hashed.root()) - .collect(); - - // --- Pass 1: attestation data with gossip signatures --- - // - // Each entry may also have existing proofs (new/known) that become children. - for (hashed, validator_sigs) in &gossip_groups { - let data_root = hashed.root(); - let slot = hashed.data().slot; - - let (new_proofs, known_proofs) = store.existing_proofs_for_data(&data_root); - let (child_proofs, covered) = select_proofs_greedily(&new_proofs, &known_proofs); - - // Collect raw gossip signatures for uncovered validators. - let mut sigs = vec![]; - let mut pubkeys = vec![]; - let mut raw_ids = vec![]; - - for (vid, sig) in validator_sigs { - if covered.contains(vid) { - continue; - } - let Some(validator) = validators.get(*vid as usize) else { - continue; - }; - let Ok(pubkey) = validator.get_attestation_pubkey() else { - continue; - }; - sigs.push(sig.clone()); - pubkeys.push(pubkey); - raw_ids.push(*vid); - } - - if raw_ids.is_empty() && child_proofs.len() < 2 { - continue; - } - - let Some((proof, all_ids)) = try_aggregate( - &child_proofs, - pubkeys, - sigs, - &raw_ids, - &data_root, - slot, - &head_state, - ) else { - continue; - }; - - new_aggregates.push(SignedAggregatedAttestation { - data: hashed.data().clone(), - proof: proof.clone(), - }); - payload_entries.push((hashed.clone(), proof)); - - // Delete all gossip sigs for this data: raw ones were consumed, - // covered ones are redundant (already captured in child proofs). - keys_to_delete.extend(validator_sigs.iter().map(|(vid, _)| (*vid, data_root))); - - metrics::inc_pq_sig_aggregated_signatures(); - metrics::inc_pq_sig_attestations_in_aggregated_signatures(all_ids.len() as u64); - } - - // --- Pass 2: attestation data with new payloads but no gossip signatures --- - // - // Matches the `new.keys()` part of the spec's `new.keys() | gossip_sigs.keys()`. - // These entries have 0 raw signatures; they're only aggregated if 2+ existing - // proofs can be merged into one (pure recursive aggregation). - for (data_root, att_data) in &new_payload_keys { - if gossip_roots.contains(data_root) { - continue; - } - - // Short-circuit: avoid cloning proofs when there aren't enough to merge. - if store.proof_count_for_data(data_root) < 2 { - continue; - } - - let (new_proofs, known_proofs) = store.existing_proofs_for_data(data_root); - let (child_proofs, _covered) = select_proofs_greedily(&new_proofs, &known_proofs); - - if child_proofs.len() < 2 { - continue; - } - - let Some((proof, all_ids)) = try_aggregate( - &child_proofs, - vec![], - vec![], - &[], - data_root, - att_data.slot, - &head_state, - ) else { - continue; - }; - - let hashed = HashedAttestationData::new(att_data.clone()); - new_aggregates.push(SignedAggregatedAttestation { - data: att_data.clone(), - proof: proof.clone(), - }); - payload_entries.push((hashed, proof)); - - metrics::inc_pq_sig_aggregated_signatures(); - metrics::inc_pq_sig_attestations_in_aggregated_signatures(all_ids.len() as u64); - } - - // Insert into new (pending) payloads. They become fork-choice-active after - // accept_new_attestations promotes them to known at interval 0/4. - store.insert_new_aggregated_payloads_batch(payload_entries); - metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); - - // Delete consumed/redundant gossip signatures - store.delete_gossip_signatures(&keys_to_delete); - metrics::update_gossip_signatures(store.gossip_signatures_count()); - - new_aggregates -} - -/// Resolve child pubkeys, call `aggregate_mixed`, and build the combined proof. -/// -/// Returns `None` if aggregation fails (pubkey resolution or cryptographic error). -/// On success returns the proof and the full set of covered validator IDs. -fn try_aggregate( - child_proofs: &[AggregatedSignatureProof], - raw_pubkeys: Vec, - raw_sigs: Vec, - raw_ids: &[u64], - data_root: &H256, - slot: u64, - head_state: &State, -) -> Option<(AggregatedSignatureProof, Vec)> { - let validators = &head_state.validators; - - // Resolve each child's participant pubkeys. Skip children whose pubkeys - // can't be fully resolved: passing fewer pubkeys than the proof expects - // would produce an invalid aggregate. - let mut children_for_aggregation = Vec::with_capacity(child_proofs.len()); - let mut accepted_child_ids: Vec = Vec::new(); - for proof in child_proofs { - let participant_ids: Vec = proof.participant_indices().collect(); - let child_pubkeys: Vec = participant_ids - .iter() - .filter_map(|&vid| validators.get(vid as usize)?.get_attestation_pubkey().ok()) - .collect(); - if child_pubkeys.len() != participant_ids.len() { - warn!( - expected = participant_ids.len(), - resolved = child_pubkeys.len(), - "Skipping child proof: could not resolve all participant pubkeys" - ); - continue; - } - accepted_child_ids.extend(&participant_ids); - children_for_aggregation.push((child_pubkeys, proof.proof_data.clone())); - } - - // Re-check after potentially dropping children with unresolvable pubkeys. - if raw_ids.is_empty() && children_for_aggregation.len() < 2 { - return None; - } - - let slot_u32: u32 = slot.try_into().expect("slot exceeds u32"); - let proof_data = { - let _timing = metrics::time_pq_sig_aggregated_signatures_building(); - aggregate_mixed( - children_for_aggregation, - raw_pubkeys, - raw_sigs, - data_root, - slot_u32, - ) - } - .inspect_err(|err| warn!(%err, "Failed to aggregate committee signatures")) - .ok()?; - - let mut all_ids: Vec = raw_ids.to_vec(); - all_ids.extend(&accepted_child_ids); - all_ids.sort_unstable(); - all_ids.dedup(); - - let participants = aggregation_bits_from_validator_indices(&all_ids); - Some(( - AggregatedSignatureProof::new(participants, proof_data), - all_ids, - )) -} - -/// Greedy set-cover selection of proofs to maximize validator coverage. -/// -/// Processes proof sets in priority order (new before known). Within each set, -/// repeatedly picks the proof covering the most uncovered validators until -/// no proof adds new coverage. This keeps the number of children minimal -/// while maximizing the validators we can skip re-aggregating from scratch. -fn select_proofs_greedily( - new_proofs: &[AggregatedSignatureProof], - known_proofs: &[AggregatedSignatureProof], -) -> (Vec, HashSet) { - let mut selected: Vec = Vec::new(); - let mut covered: HashSet = HashSet::new(); - - for proof_set in [new_proofs, known_proofs] { - let mut remaining: Vec<&AggregatedSignatureProof> = proof_set.iter().collect(); - - while !remaining.is_empty() { - let best_idx = remaining - .iter() - .enumerate() - .max_by_key(|(_, p)| { - p.participant_indices() - .filter(|vid| !covered.contains(vid)) - .count() - }) - .map(|(i, _)| i) - .expect("remaining is non-empty"); - - let new_coverage: HashSet = remaining[best_idx] - .participant_indices() - .filter(|vid| !covered.contains(vid)) - .collect(); - - if new_coverage.is_empty() { - break; - } - - selected.push(remaining.swap_remove(best_idx).clone()); - covered.extend(new_coverage); - } - } - - (selected, covered) -} - /// Validate incoming attestation before processing. /// /// Ensures the vote respects the basic laws of time and topology: @@ -463,14 +194,7 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() /// 800ms interval. Slot and interval-within-slot are derived as: /// slot = store.time() / INTERVALS_PER_SLOT /// interval = store.time() % INTERVALS_PER_SLOT -pub fn on_tick( - store: &mut Store, - timestamp_ms: u64, - has_proposal: bool, - is_aggregator: bool, -) -> Vec { - let mut new_aggregates: Vec = Vec::new(); - +pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { // Convert UNIX timestamp (ms) to interval count since genesis let genesis_time_ms = store.config().genesis_time * 1000; let time_delta_ms = timestamp_ms.saturating_sub(genesis_time_ms); @@ -494,7 +218,11 @@ pub fn on_tick( let is_final_tick = store.time() == time; let should_signal_proposal = has_proposal && is_final_tick; - // NOTE: here we assume on_tick never skips intervals + // NOTE: here we assume on_tick never skips intervals. + // Interval 2 (committee-signature aggregation) is no longer handled here: + // the blockchain actor orchestrates the aggregation worker directly so + // the actor's message loop stays unblocked during the expensive XMSS + // proofs. See `BlockChainServer::start_aggregation_session` in `lib.rs`. match interval { 0 => { // Start of slot - process attestations if proposal exists @@ -506,10 +234,7 @@ pub fn on_tick( // Vote propagation — no action } 2 => { - // Aggregation interval - if is_aggregator { - new_aggregates.extend(aggregate_committee_signatures(store)); - } + // Aggregation is driven by the actor (off-thread); nothing to do here. } 3 => { // Update safe target for validators @@ -522,8 +247,6 @@ pub fn on_tick( _ => unreachable!("slots only have 5 intervals"), } } - - new_aggregates } /// Process a gossiped attestation with signature verification. @@ -953,7 +676,7 @@ fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; // Advance time to current slot (ticking intervals) - on_tick(store, slot_time_ms, true, false); + on_tick(store, slot_time_ms, true); // Process any pending attestations before proposal accept_new_attestations(store, false); @@ -1111,27 +834,6 @@ pub enum StoreError { TooManyAttestationData { count: usize, max: usize }, } -/// Build an AggregationBits bitfield from a list of validator indices. -fn aggregation_bits_from_validator_indices(bits: &[u64]) -> AggregationBits { - if bits.is_empty() { - return AggregationBits::with_length(0).unwrap(); - } - let max_id = bits - .iter() - .copied() - .max() - .expect("already checked it's non-empty") as usize; - let mut aggregation_bits = - AggregationBits::with_length(max_id + 1).expect("validator count exceeds limit"); - - for &vid in bits { - aggregation_bits - .set(vid as usize, true) - .expect("capacity support highest validator id"); - } - aggregation_bits -} - /// Compute the bitwise union (OR) of two AggregationBits bitfields. fn union_aggregation_bits(a: &AggregationBits, b: &AggregationBits) -> AggregationBits { let max_len = a.len().max(b.len()); diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index 14c5db04..13d8cf7e 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -72,7 +72,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; // NOTE: the has_proposal argument is set to true, following the spec - store::on_tick(&mut store, block_time_ms, true, false); + store::on_tick(&mut store, block_time_ms, true); let result = store::on_block_without_verification(&mut store, signed_block); assert_step_outcome(step_idx, step.valid, result)?; } @@ -88,7 +88,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { (None, None) => panic!("tick step missing both time and interval"), }; let has_proposal = step.has_proposal.unwrap_or(false); - store::on_tick(&mut store, timestamp_ms, has_proposal, false); + store::on_tick(&mut store, timestamp_ms, has_proposal); } "attestation" => { let att_data = step diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index c136590c..e7c1a888 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -51,7 +51,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // Advance time to the block's slot let block_time_ms = genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; - store::on_tick(&mut st, block_time_ms, true, false); + store::on_tick(&mut st, block_time_ms, true); // Process the block (this includes signature verification) let result = store::on_block(&mut st, signed_block);