Skip to content

fix: make signature aggregation asynchronous#299

Merged
MegaRedHand merged 10 commits intomainfrom
asynchronous-signature-aggregation
Apr 21, 2026
Merged

fix: make signature aggregation asynchronous#299
MegaRedHand merged 10 commits intomainfrom
asynchronous-signature-aggregation

Conversation

@MegaRedHand
Copy link
Copy Markdown
Collaborator

Summary

  • Moves committee-signature aggregation off the BlockChainServer actor thread onto a tokio::task::spawn_blocking worker. The actor no longer blocks for the duration of the XMSS proofs (previously 400-1200 ms per slot).
  • Bounds aggregation with a CancellationToken fired by a 750 ms deadline (self-message via send_after). Aggregates produced inside the window are streamed back via AggregateProduced actor messages and published immediately; post-deadline aggregates still get applied locally and published.
  • #[stopped] lifecycle hook cancels the in-flight worker and joins it (bounded at 2 s) so node shutdown is clean; new-session start joins any straggler from the previous slot and warns if it hadn't finished yet.

Architecture

  • crates/blockchain/src/store.rs now exposes pure helpers: snapshot_aggregation_inputs, aggregate_job, apply_aggregated_group, finalize_aggregation_session. The old aggregate_committee_signatures / try_aggregate (single inline function) are gone.
  • store::on_tick no longer calls into aggregation — the actor drives it.
  • New messages (pub(crate)): AggregateProduced, AggregationDone, AggregationDeadline. Late messages are fenced by a session id (the slot number).
  • Per-aggregate work on the actor: insert into new_payloads, delete consumed gossip sigs, publish via gossipsub. End-of-session work (gauge refresh) is batched in AggregationDone to avoid 2N lock acquisitions per slot.

Observations from devnet runs

Under a 4-node devnet with three induced partitions (via docker pause) across 200 slots:

  • Finality lag: 3 slots (vs 5 under a previous synchronous cap)
  • Sessions cancelled at deadline: 24 / 203 (11.8%) — only when groups_considered > 1
  • Missed slots: 13, all inside partition windows
  • No Prior aggregation worker still running warnings — worker wind-down is clean
  • Chain recovers finalization after each partition

Test plan

  • cargo check --workspace
  • make fmt, make lint (clippy clean)
  • cargo test -p ethlambda-blockchain --test forkchoice_spectests — 77/77 pass
  • 50-slot local devnet with 2 ethlambda + 1 zeam + 1 grandine — chain finalizes normally
  • 200-slot local devnet with induced partitions — consensus safety preserved, liveness recovers between partitions
  • Review on CI

@github-actions
Copy link
Copy Markdown

🤖 Claude Code Review

Now I have a full picture of the PR. Let me write up the review.


PR #299fix: make signature aggregation asynchronous

Overview

This PR moves committee-signature aggregation (400–1200 ms XMSS proofs) off the BlockChainServer actor thread onto a tokio::task::spawn_blocking worker. The actor now stays unblocked during the expensive cryptographic work. A 750 ms soft deadline cancels the token so the worker stops starting new groups; already-running proofs complete naturally. Results are streamed back via actor messages (AggregateProduced, AggregationDone) and fenced by a slot-number session ID.

The refactor is architecturally sound. The extraction of pure functions in store.rs (snapshot_aggregation_inputs, aggregate_job, apply_aggregated_group, finalize_aggregation_session) is clean and the store's threading contract is preserved.


Bug: AggregationDone lacks a session-ID fence

lib.rs, Handler<AggregationDone> — unlike AggregateProduced, this handler has no guard against stale messages from a prior session:

impl Handler<AggregationDone> for BlockChainServer {
    async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
        store::finalize_aggregation_session(&self.store);          // no session check
        metrics::observe_committee_signatures_aggregation(msg.total_elapsed); // ditto
        info!(..., session_id = msg.session_id, ...);
    }
}

Scenario that triggers it:

  • Session N's worker hits the deadline, gets cancelled but is slow finishing its last XMSS proof.
  • start_aggregation_session times out on join, drops the handle, and session N+1 starts.
  • Session N+1 produces and applies its aggregates normally.
  • Late AggregationDone from session N arrives.
    • finalize_aggregation_session fires a second time, mid-session-N+1 — metrics gauges are refreshed prematurely but at least reflect current counts, so no data corruption.
    • observe_committee_signatures_aggregation records session N's elapsed time into the histogram while session N+1 is still running → one spurious extra histogram entry, distorting the aggregation-time metric.
    • The info! log prints session N's stats labelled with N's session_id, with no indication it's stale.

Suggested fix — add the same guard that AggregateProduced uses:

async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
    let current = self.current_aggregation.as_ref().map(|s| s.session_id);
    if current != Some(msg.session_id) {
        return;
    }
    // ... rest unchanged
}

Performance: per-aggregate RocksDB writes replace a single batch

store.rs, apply_aggregated_group writes one payload per AggregateProduced message:

store.insert_new_aggregated_payload(output.hashed.clone(), output.proof.clone());

The original aggregate_committee_signatures called insert_new_aggregated_payloads_batch once for all groups in a slot. With N committees there are now N separate write transactions instead of one. In practice N is small (single digits for a small devnet), but it's a regression to note, especially under higher validator counts.


Minor: groups_considered overcounts eliminated jobs

In snapshot_aggregation_inputs:

let groups_considered = gossip_groups.len()
    + new_payload_keys.iter().filter(...).count();

This counts groups before build_job prunes them. jobs.len() can be less than groups_considered. The AggregationDone log reports groups_considered from the snapshot — which is the pre-filter count — so the difference between groups_considered and groups_aggregated in the log mixes two different filter stages (pre-build vs. post-build). Consider naming it groups_examined or emitting jobs.len() separately as jobs_prepared.


Minor: Completed session held in current_aggregation until next interval 2

When AggregationDone arrives, self.current_aggregation is not cleared:

impl Handler<AggregationDone> for BlockChainServer {
    async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
        // current_aggregation stays Some(finished_session)
    }
}

The finished JoinHandle, CancellationToken, and TimerHandle are retained for the remainder of the slot (up to ~3.2 s). The cost is tiny (small Arc overhead), but setting self.current_aggregation = None here would make the state machine more explicit: session is done when AggregationDone is handled.


Minor: pub structs with pub(crate) fields in store.rs

AggregationJob, AggregationSnapshot, and AggregatedGroupOutput are declared pub while all their fields are pub(crate). Since these types don't cross crate boundaries, pub(crate) on the struct declaration is more accurate.


Minor: Ignoring send error after early break in worker

for job in snapshot.jobs {
    if actor.send(AggregateProduced { ... }).is_err() {
        break;  // actor gone
    }
}
let _ = actor.send(AggregationDone { ... });  // also fails silently if actor gone

This is harmless — the let _ = correctly swallows the error — but a boolean tracking whether the actor was lost could avoid the redundant send. Low priority.


What looks good

  • Session fencing in AggregateProduced is correct and clearly written.
  • #[stopped] lifecycle hook properly cancels and joins the worker with a 2 s bound — clean shutdown path.
  • CancellationToken scoping is correct: the token is per-session and independent, so the #[stopped] hook doesn't accidentally cancel the next session's token.
  • snapshot_aggregation_inputs returns None when there's nothing to do, avoiding a no-op worker spawn.
  • Stale AggregationDeadline is safely handled via session ID even if TimerHandle drops don't cancel queued messages.
  • Metrics crossing the thread boundary via explicit observe_committee_signatures_aggregation(elapsed) rather than a drop-guard is the right call.
  • Spec tests updated cleanly; on_tick signature simplification is correct.

Automated review by Claude (Anthropic) · sonnet · custom prompt

@github-actions
Copy link
Copy Markdown

🤖 Codex Code Review

Findings

  1. High: missed-interval catch-up can now skip committee-signature aggregation for an entire slot. In crates/blockchain/src/lib.rs, aggregation is started only when the final wall-clock interval is 2, after crates/blockchain/src/store.rs has already advanced store.time() through every missed interval in a loop. If the actor wakes up late and a single tick jumps from interval 1 to 3/4, the old code still aggregated during catch-up; this version does not, so that slot’s local aggregates are silently lost. I’d drive aggregation off the same interval-advance loop, or have store::on_tick() report that interval 2 was traversed.

  2. Medium: session fencing is only slot, which is not unique enough to reject stale same-slot worker output. crates/blockchain/src/lib.rs sets session_id = slot, and crates/blockchain/src/lib.rs accepts any AggregateProduced whose id matches the current session. If interval 2 is entered twice for the same slot, late messages from the cancelled worker are indistinguishable from the replacement worker and will still be applied. Using a monotonic per-session counter and clearing the session on crates/blockchain/src/lib.rs would make the fence robust.

No other obvious consensus or memory-safety regressions stood out in the diff.

I couldn’t run cargo test here because rustup tries to write under a read-only /home/runner/.rustup/tmp.


Automated review by OpenAI Codex · gpt-5.4 · custom prompt

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 16, 2026

Greptile Summary

This PR moves committee-signature aggregation off the BlockChainServer actor thread onto a tokio::task::spawn_blocking worker, streaming results back as AggregateProduced / AggregationDone actor messages and bounding the session with a 750 ms cancellation deadline. The design is well-structured: session fencing on AggregateProduced, clean #[stopped] lifecycle shutdown, and a pure-function worker with no store access. The one open concern from a prior review thread (missing session-id fence on AggregationDone) remains unresolved; the only new finding here is that tokio-util/sync should be declared explicitly in Cargo.toml.

Confidence Score: 5/5

Safe to merge; the only new inline finding is a minor Cargo feature hygiene issue.

All new findings are P2. The prior-thread P1 concern (AggregationDone session fence) is not re-flagged here. The async architecture is sound, devnet validation is thorough, and the test plan is complete.

crates/blockchain/Cargo.toml — missing explicit features = ["sync"] for tokio-util

Important Files Changed

Filename Overview
crates/blockchain/src/aggregation.rs New module: defines all aggregation types, pure functions, and the run_aggregation_worker loop. Session-id fencing is handled for AggregateProduced but not AggregationDone.
crates/blockchain/src/lib.rs Actor orchestration with Handler impls for AggregateProduced (fenced), AggregationDone (no session fence — prior thread), and AggregationDeadline; #[stopped] lifecycle hook joins worker cleanly.
crates/blockchain/src/store.rs on_tick interval-2 branch is now a no-op; aggregation fully removed from the store tick.
crates/blockchain/Cargo.toml Adds tokio-util with default-features = false but without features = ["sync"], yet CancellationToken from tokio_util::sync is used — works today via Cargo feature unification but not self-contained.
crates/blockchain/src/metrics.rs No functional changes; metrics are queried by finalize_aggregation_session instead of being updated inline per-aggregate.
crates/blockchain/tests/signature_spectests.rs New test harness for verify_signatures spec fixtures; straightforward pass/fail matching, no issues.

Sequence Diagram

sequenceDiagram
    participant T as Tick (interval 2)
    participant A as BlockChainServer (actor)
    participant W as spawn_blocking worker
    participant M as Actor mailbox

    T->>A: handle_tick → on_tick(slot, interval=2)
    A->>A: start_aggregation_session(slot)
    A->>A: join prior worker (if any, ≤2s)
    A->>A: snapshot_aggregation_inputs(&store)
    A->>W: tokio::task::spawn_blocking(run_aggregation_worker)
    A->>M: send_after(750ms, AggregationDeadline{session_id})
    A->>A: store current_aggregation = Some(session)

    loop For each AggregationJob
        W->>W: check cancel.is_cancelled() → break if true
        W->>W: aggregate_job(job) [XMSS proofs]
        W->>M: send(AggregateProduced{session_id, output})
    end

    M->>A: AggregateProduced{session_id}
    A->>A: fence: current session_id == msg.session_id?
    A->>A: apply_aggregated_group (insert payload, delete gossip sigs)
    A->>A: publish via gossipsub p2p

    alt Deadline fires (750ms)
        M->>A: AggregationDeadline{session_id}
        A->>A: session.cancel.cancel()
    end

    W->>M: send(AggregationDone{session_id, stats, cancelled})
    M->>A: AggregationDone
    A->>A: finalize_aggregation_session (gauge refresh)
    A->>A: log summary metrics

    alt Actor stopping
        A->>A: on_stopped()
        A->>W: cancel.cancel()
        A->>W: join (≤2s timeout)
    end
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: crates/blockchain/Cargo.toml
Line: 25

Comment:
**`tokio-util/sync` feature not explicitly declared**

`default-features = false` is set but `features = ["sync"]` is absent. `CancellationToken` lives in `tokio_util::sync`, which is only compiled when that feature is enabled. The build currently passes via Cargo's feature unification (some transitive dependency in the workspace enables `sync`), but this makes the crate's compilation implicitly dependent on the rest of the dependency graph. A future dep-graph change could silently break this crate in isolation.

```suggestion
tokio-util = { version = "0.7", default-features = false, features = ["sync"] }
```

How can I resolve this? If you propose a fix, please make it concise.

Reviews (2): Last reviewed commit: "refactor: appease clippy" | Re-trigger Greptile

Comment thread crates/blockchain/src/lib.rs
Isolate committee-signature aggregation code in its own module so the
async worker orchestration, job types, and pure aggregation helpers live
together. The store now focuses on attestation/block state, and the
actor (lib.rs) only handles session lifecycle hooks on BlockChainServer.

No behavior change.
@MegaRedHand MegaRedHand marked this pull request as draft April 17, 2026 23:01
@MegaRedHand
Copy link
Copy Markdown
Collaborator Author

Blocked by #304

@MegaRedHand MegaRedHand marked this pull request as ready for review April 20, 2026 22:30
@github-actions
Copy link
Copy Markdown

🤖 Codex Code Review

  1. High: missed interval-2 aggregation rounds when the actor catches up after any delay. store::on_tick still advances through every skipped interval in its internal loop, but interval 2 is now a no-op, and the actor only starts aggregation if the current wall-clock interval is 2. If the actor wakes up at interval 3/4 after lag, that slot’s aggregation is silently skipped. This is a consensus-path behavior change from the old synchronous code. See lib.rs:148 and store.rs:209.

  2. High: the 750 ms deadline is advisory only; post-deadline aggregates can still be applied and published. The worker checks cancel.is_cancelled() only before starting each job, so one long aggregate_job can run past the deadline and still send AggregateProduced. The handler then accepts anything whose session_id matches the current session, without checking current slot/time. That means slot-N aggregates can land after interval 4 or even after later ticks have already advanced the store, changing promotion/fork-choice timing relative to the old path. See aggregation.rs:408 and lib.rs:637.

  3. Medium: AggregationDone is not fenced to the active session. If an older worker finishes after a newer session has started, this handler still refreshes gauges and logs a completion event against the current store state, which can misreport aggregation outcomes and obscure timing bugs during incident analysis. See lib.rs:666.

I couldn’t run cargo test -p ethlambda-blockchain --tests here because rustup failed creating temp files under /home/runner/.rustup/tmp on a read-only filesystem.


Automated review by OpenAI Codex · gpt-5.4 · custom prompt

@github-actions
Copy link
Copy Markdown

🤖 Claude Code Review

Good — I have the full diff and context. Let me write the review.


PR 299 — fix: make signature aggregation asynchronous

Overview

This PR moves the expensive XMSS committee-signature aggregation (400–1200 ms) off the BlockChainServer actor thread onto a tokio::task::spawn_blocking worker, keeping the actor's message loop responsive. A 750 ms soft deadline cancels new job starts; results stream back as AggregateProduced messages fenced by a slot-number session ID. The architectural separation between actor-thread store access (snapshot / apply / finalize) and off-thread cryptography (aggregate_job) is clean and correct.


Bugs

AggregationDone handler is missing the session-ID fence

Handler<AggregateProduced> (lib.rs:683–708) correctly drops messages from non-current sessions. Handler<AggregationDone> (lib.rs:711–728) has no such guard:

impl Handler<AggregationDone> for BlockChainServer {
    async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
        aggregation::finalize_aggregation_session(&self.store); // no session check
        metrics::observe_committee_signatures_aggregation(msg.total_elapsed);
        info!(..., session_id = msg.session_id, ...);
    }
}

This is the same stale-session scenario the PR itself documents: when a prior worker outlives the PRIOR_WORKER_JOIN_TIMEOUT (2 s) and the new session proceeds without it, its late AggregationDone will arrive while session N+1 is actively running. The consequences:

  • finalize_aggregation_session fires mid-session, snapshotting partially-applied gossip-signature gauges.
  • The observe_committee_signatures_aggregation call records session N's duration into the histogram, introducing a spurious extra data point for the wrong session.
  • The info! log prints the stale session stats with no stale-ness indicator, polluting the devnet trace.

Apply the same guard AggregateProduced already uses:

async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
    let current = self.current_aggregation.as_ref().map(|s| s.session_id);
    if current != Some(msg.session_id) {
        trace!(
            incoming_session_id = msg.session_id,
            current_session_id = ?current,
            "Dropping stale aggregation done"
        );
        return;
    }
    // ...
}

aggregate_job panics on absurd (but not impossible) slot values (aggregation.rs:304)

let slot_u32: u32 = job.slot.try_into().expect("slot exceeds u32");

u32::MAX slots is ≈ 136 years of 4-second slots — not a practical concern — but a panic is a hard crash for a consensus node. Prefer propagating failure:

let slot_u32: u32 = job.slot.try_into().ok()?;

aggregate_job already returns Option<AggregatedGroupOutput>, so ? fits naturally and keeps the call site consistent.


Correctness observation: worker panic leaves session in a dirty state

If run_aggregation_worker panics (e.g., from an internal expect in aggregate_job), AggregationDone is never sent. The actor doesn't know the worker is dead until the next slot's interval 2 join. During that window:

  • finalize_aggregation_session never fires → gossip-signature and aggregated-payload gauges remain at pre-aggregation values.
  • observe_committee_signatures_aggregation never fires → the histogram records no entry for that slot.
  • current_aggregation holds a finished (JoinHandle resolved Err) session.

This won't cause consensus failure (aggregation is best-effort), but the stuck metrics will make debugging harder. A simple mitigation is to wrap the worker body in std::panic::catch_unwind and send a "cancelled / failed" AggregationDone regardless.


Minor issues

Misleading struct-field comment (aggregation.rs:110–111)

/// Child of the actor cancellation token; fires either at the deadline or
/// when the actor itself is stopping.
pub(crate) cancel: CancellationToken,

The cancel token is created with CancellationToken::new() — it is an independent token, not a child of any parent. Child tokens in tokio-util are created with parent.child_token(). The comment overstates the relationship and will confuse readers who wonder where the parent token is. A more accurate version:

/// Independent cancellation token for this session. Fired either at the
/// soft deadline (AggregationDeadline handler) or when the actor stops
/// (on_stopped hook).

Per-aggregate store writes replace a batch (aggregation.rs:340)

The old aggregate_committee_signatures called insert_new_aggregated_payloads_batch once for all groups per slot. apply_aggregated_group now calls insert_new_aggregated_payload (singular) once per AggregateProduced message, resulting in N separate write transactions for N committees. For small devnets this is inconsequential; at higher validator counts where N > ~5 this is a meaningful increase in write amplification. Worth tracking as a follow-up.


groups_considered mixes pre-build and post-build counts (aggregation.rs:171–175, lib.rs:717)

groups_considered is computed before build_job prunes groups with insufficient material. The AggregationDone log reports groups_considered alongside groups_aggregated, implying the gap is "aggregations that failed"; it actually also includes groups that never had a viable job built. Naming the pre-prune field groups_examined (or emitting the post-prune jobs.len() as groups_viable) would make the log accurate.


aggregation_bits_from_validator_indices: misleading expect message (aggregation.rs:411)

AggregationBits::with_length(max_id + 1).expect("validator count exceeds limit");

The message says the validator count exceeds the limit, but the error condition is the opposite: with_length fails when the requested length exceeds the type's capacity limit — i.e., when the validator index is too large, not when there are too many validators. A clearer message: "validator index exceeds AggregationBits capacity".


Testing

The new pure functions in aggregation.rssnapshot_aggregation_inputs, build_job, aggregate_job, select_proofs_greedily — have no unit tests. These are the right functions to cover with isolated unit tests since they're deterministic, accept minimal inputs, and have distinct edge cases (empty gossip groups, proofs with unresolvable pubkeys, greedy proof selection with partial overlap). The fork-choice spec tests exercise the system end-to-end but won't catch regressions in the snapshot or greedy-selection logic in isolation.


Summary

The design is sound and the separation of concerns is well executed. One definite bug to fix before merge: add the session-ID fence to Handler<AggregationDone> to match Handler<AggregateProduced>. The expect? conversion in aggregate_job is low-risk but worthwhile. The comment inaccuracy on cancel is a maintainability concern. The rest are follow-up items.


Automated review by Claude (Anthropic) · sonnet · custom prompt

@MegaRedHand MegaRedHand merged commit 98ba479 into main Apr 21, 2026
3 checks passed
@MegaRedHand MegaRedHand deleted the asynchronous-signature-aggregation branch April 21, 2026 17:56
@MegaRedHand MegaRedHand linked an issue Apr 28, 2026 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Cap aggregation on aggregator and proposer

2 participants