From 6ce0834924411326ff46c593d7023e089d5f02e9 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 15 Apr 2026 19:30:24 -0300 Subject: [PATCH 1/3] Bound gossip_signatures with FIFO eviction to prevent unbounded memory growth Replace the bare HashMap with a GossipSignatureBuffer that caps total individual signatures at 2048 (~6MB). When over capacity, the oldest data_root and all its signatures are evicted FIFO, matching the PayloadBuffer pattern. Finalization-based pruning is preserved as a complementary mechanism. Fixes #263 --- Cargo.lock | 2 + crates/storage/Cargo.toml | 2 + crates/storage/src/store.rs | 380 ++++++++++++++++++++++++++++++++---- 3 files changed, 345 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e72455c..1378eca1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2188,8 +2188,10 @@ name = "ethlambda-storage" version = "0.1.0" dependencies = [ "ethlambda-types", + "leansig", "libssz", "libssz-derive", + "rand 0.10.0", "rocksdb", "tempfile", "tracing", diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 2a877bb4..d389a3a9 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -20,3 +20,5 @@ libssz-derive.workspace = true [dev-dependencies] tempfile = "3" +leansig.workspace = true +rand.workspace = true diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index ed185a26..49c9e117 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -88,6 +88,11 @@ const AGGREGATED_PAYLOAD_CAP: usize = 512; /// Smaller than known since new payloads are drained every interval (~4s). const NEW_PAYLOAD_CAP: usize = 64; +/// Hard cap for the gossip signature buffer (individual signatures, not distinct data_roots). +/// With 4 validators and 4-second slots, 2048 signatures covers ~512 slots (~34 min). +/// Each XMSS signature is ~3KB, so worst-case memory is ~6 MB. +const GOSSIP_SIGNATURE_CAP: usize = 2048; + /// An entry in the payload buffer: attestation data + set of proofs. #[derive(Clone)] struct PayloadEntry { @@ -235,12 +240,150 @@ struct GossipDataEntry { signatures: BTreeMap, } -/// Gossip signatures grouped by attestation data (via data_root). -type GossipSignatureMap = HashMap; - /// Gossip signatures snapshot: (hashed_attestation_data, Vec<(validator_id, signature)>). pub type GossipSignatureSnapshot = Vec<(HashedAttestationData, Vec<(u64, ValidatorSignature)>)>; +/// Bounded buffer for gossip signatures with FIFO eviction. +/// +/// Groups signatures by attestation data (via data_root). Each distinct +/// attestation message stores the full `AttestationData` plus individual +/// validator signatures in ascending order (required for XMSS aggregation). +/// +/// Entries are evicted FIFO (by insertion order of the data_root) when +/// total_signatures exceeds capacity, matching the `PayloadBuffer` pattern. +struct GossipSignatureBuffer { + data: HashMap, + order: VecDeque, + capacity: usize, + total_signatures: usize, +} + +impl GossipSignatureBuffer { + fn new(capacity: usize) -> Self { + Self { + data: HashMap::new(), + order: VecDeque::new(), + capacity, + total_signatures: 0, + } + } + + /// Insert a gossip signature, FIFO-evicting oldest data_roots when over capacity. + /// + /// First-write-wins: if (validator_id, data_root) already exists, the signature is ignored. + fn insert( + &mut self, + hashed: HashedAttestationData, + validator_id: u64, + signature: ValidatorSignature, + ) { + let (data_root, att_data) = hashed.into_parts(); + + if let Some(entry) = self.data.get_mut(&data_root) { + // First-write-wins: skip if this validator already has a sig + if entry.signatures.contains_key(&validator_id) { + return; + } + entry.signatures.insert(validator_id, signature); + self.total_signatures += 1; + } else { + let mut signatures = BTreeMap::new(); + signatures.insert(validator_id, signature); + self.data.insert( + data_root, + GossipDataEntry { + data: att_data, + signatures, + }, + ); + self.order.push_back(data_root); + self.total_signatures += 1; + } + + // Evict oldest data_roots until under capacity + while self.total_signatures > self.capacity { + if let Some(evicted) = self.order.pop_front() { + if let Some(removed) = self.data.remove(&evicted) { + self.total_signatures -= removed.signatures.len(); + } + } else { + break; + } + } + } + + /// Delete gossip entries for the given (validator_id, data_root) pairs. + /// + /// When all signatures for a data_root are removed, the entry is cleaned up. + /// Collects emptied roots and batch-cleans the VecDeque in one pass. + fn delete(&mut self, keys: &[(u64, H256)]) { + if keys.is_empty() { + return; + } + let mut emptied_roots: HashSet = HashSet::new(); + for &(vid, data_root) in keys { + if let Some(entry) = self.data.get_mut(&data_root) { + if entry.signatures.remove(&vid).is_some() { + self.total_signatures -= 1; + } + if entry.signatures.is_empty() { + self.data.remove(&data_root); + emptied_roots.insert(data_root); + } + } + } + if !emptied_roots.is_empty() { + self.order.retain(|r| !emptied_roots.contains(r)); + } + } + + /// Prune gossip signatures for slots <= finalized_slot. + /// + /// Returns the number of data_root entries pruned. + fn prune(&mut self, finalized_slot: u64) -> usize { + let mut pruned_roots: HashSet = HashSet::new(); + self.data.retain(|root, entry| { + if entry.data.slot > finalized_slot { + true + } else { + self.total_signatures -= entry.signatures.len(); + pruned_roots.insert(*root); + false + } + }); + if !pruned_roots.is_empty() { + self.order.retain(|r| !pruned_roots.contains(r)); + } + pruned_roots.len() + } + + /// Returns a snapshot of all gossip signatures grouped by attestation data. + fn snapshot(&self) -> GossipSignatureSnapshot { + self.data + .values() + .map(|entry| { + let sigs: Vec<_> = entry + .signatures + .iter() + .map(|(&vid, sig)| (vid, sig.clone())) + .collect(); + (HashedAttestationData::new(entry.data.clone()), sigs) + }) + .collect() + } + + /// Returns the total number of individual signatures stored. + fn total_signatures(&self) -> usize { + self.total_signatures + } + + /// Returns the number of distinct data_roots. + #[cfg(test)] + fn len(&self) -> usize { + self.data.len() + } +} + /// Encode a LiveChain key (slot, root) to bytes. /// Layout: slot (8 bytes big-endian) || root (32 bytes) /// Big-endian ensures lexicographic ordering matches numeric ordering. @@ -278,7 +421,7 @@ pub struct Store { new_payloads: Arc>, known_payloads: Arc>, /// In-memory gossip signatures, consumed at interval 2 aggregation. - gossip_signatures: Arc>, + gossip_signatures: Arc>, } impl Store { @@ -409,7 +552,9 @@ impl Store { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), - gossip_signatures: Arc::new(Mutex::new(HashMap::new())), + gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( + GOSSIP_SIGNATURE_CAP, + ))), } } @@ -622,9 +767,7 @@ impl Store { /// Returns the number of entries pruned. pub fn prune_gossip_signatures(&mut self, finalized_slot: u64) -> usize { let mut gossip = self.gossip_signatures.lock().unwrap(); - let before = gossip.len(); - gossip.retain(|_, entry| entry.data.slot > finalized_slot); - before - gossip.len() + gossip.prune(finalized_slot) } /// Prune old states beyond the retention window. @@ -997,7 +1140,7 @@ impl Store { /// Returns the number of gossip signature entries stored. pub fn gossip_signatures_count(&self) -> usize { let gossip = self.gossip_signatures.lock().unwrap(); - gossip.values().map(|entry| entry.signatures.len()).sum() + gossip.total_signatures() } /// Estimated live data size in bytes for a table, as reported by the backend. @@ -1014,34 +1157,14 @@ impl Store { /// Delete gossip entries for the given (validator_id, data_root) pairs. pub fn delete_gossip_signatures(&mut self, keys: &[(u64, H256)]) { - if keys.is_empty() { - return; - } let mut gossip = self.gossip_signatures.lock().unwrap(); - for &(vid, data_root) in keys { - if let Some(entry) = gossip.get_mut(&data_root) { - entry.signatures.remove(&vid); - if entry.signatures.is_empty() { - gossip.remove(&data_root); - } - } - } + gossip.delete(keys); } /// Returns a snapshot of gossip signatures grouped by attestation data. pub fn iter_gossip_signatures(&self) -> GossipSignatureSnapshot { let gossip = self.gossip_signatures.lock().unwrap(); - gossip - .values() - .map(|entry| { - let sigs: Vec<_> = entry - .signatures - .iter() - .map(|(&vid, sig)| (vid, sig.clone())) - .collect(); - (HashedAttestationData::new(entry.data.clone()), sigs) - }) - .collect() + gossip.snapshot() } /// Stores a gossip signature for later aggregation. @@ -1051,13 +1174,8 @@ impl Store { validator_id: u64, signature: ValidatorSignature, ) { - let (data_root, att_data) = hashed.into_parts(); let mut gossip = self.gossip_signatures.lock().unwrap(); - let entry = gossip.entry(data_root).or_insert_with(|| GossipDataEntry { - data: att_data, - signatures: BTreeMap::new(), - }); - entry.signatures.entry(validator_id).or_insert(signature); + gossip.insert(hashed, validator_id, signature); } // ============ Derived Accessors ============ @@ -1189,7 +1307,9 @@ mod tests { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), - gossip_signatures: Arc::new(Mutex::new(HashMap::new())), + gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( + GOSSIP_SIGNATURE_CAP, + ))), } } @@ -1200,7 +1320,9 @@ mod tests { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), - gossip_signatures: Arc::new(Mutex::new(HashMap::new())), + gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( + GOSSIP_SIGNATURE_CAP, + ))), } } } @@ -1630,4 +1752,184 @@ mod tests { assert_eq!(cloned.new_payloads.lock().unwrap().len(), 0); assert_eq!(cloned.known_payloads.lock().unwrap().len(), 1); } + + // ============ GossipSignatureBuffer Tests ============ + + fn make_dummy_sig() -> ValidatorSignature { + use ethlambda_types::signature::{LeanSigSignature, LeanSignatureScheme}; + use leansig::{serialization::Serializable, signature::SignatureScheme}; + use rand::{SeedableRng, rngs::StdRng}; + + static CACHED_SIG: std::sync::LazyLock> = std::sync::LazyLock::new(|| { + let mut rng = StdRng::seed_from_u64(42); + let lifetime = 1 << 5; // small for speed + let (_pk, sk) = LeanSignatureScheme::key_gen(&mut rng, 0, lifetime); + let sig = LeanSignatureScheme::sign(&sk, 0, &[0u8; 32]).unwrap(); + sig.to_bytes() + }); + + ValidatorSignature::from_bytes(&CACHED_SIG).expect("cached test signature") + } + + #[test] + fn gossip_buffer_fifo_eviction() { + // Capacity of 3 signatures total + let mut buf = GossipSignatureBuffer::new(3); + + // Insert 3 sigs across 3 data_roots (1 sig each) + for slot in 1..=3u64 { + let data = make_att_data(slot); + buf.insert(HashedAttestationData::new(data), 0, make_dummy_sig()); + } + assert_eq!(buf.total_signatures(), 3); + assert_eq!(buf.len(), 3); + + // Insert a 4th — should evict the oldest (slot 1) + let data4 = make_att_data(4); + buf.insert(HashedAttestationData::new(data4), 0, make_dummy_sig()); + assert_eq!(buf.total_signatures(), 3); + assert_eq!(buf.len(), 3); + + // Slot 1 should be gone + let slot1_root = HashedAttestationData::new(make_att_data(1)).root(); + assert!(!buf.data.contains_key(&slot1_root)); + + // Slots 2, 3, 4 should remain + let slot2_root = HashedAttestationData::new(make_att_data(2)).root(); + let slot4_root = HashedAttestationData::new(make_att_data(4)).root(); + assert!(buf.data.contains_key(&slot2_root)); + assert!(buf.data.contains_key(&slot4_root)); + } + + #[test] + fn gossip_buffer_dedup_first_write_wins() { + let mut buf = GossipSignatureBuffer::new(100); + let data = make_att_data(1); + let hashed = HashedAttestationData::new(data); + + buf.insert(hashed.clone(), 0, make_dummy_sig()); + buf.insert(hashed.clone(), 0, make_dummy_sig()); + + assert_eq!(buf.total_signatures(), 1); + assert_eq!(buf.len(), 1); + } + + #[test] + fn gossip_buffer_multiple_validators_per_root() { + let mut buf = GossipSignatureBuffer::new(100); + let data = make_att_data(1); + + buf.insert( + HashedAttestationData::new(data.clone()), + 0, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data.clone()), + 1, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data.clone()), + 2, + make_dummy_sig(), + ); + + assert_eq!(buf.total_signatures(), 3); + assert_eq!(buf.len(), 1); // One data_root + } + + #[test] + fn gossip_buffer_delete_cleans_up() { + let mut buf = GossipSignatureBuffer::new(100); + let data = make_att_data(1); + let root = HashedAttestationData::new(data.clone()).root(); + + buf.insert( + HashedAttestationData::new(data.clone()), + 0, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data.clone()), + 1, + make_dummy_sig(), + ); + assert_eq!(buf.total_signatures(), 2); + + // Delete one sig — root should remain + buf.delete(&[(0, root)]); + assert_eq!(buf.total_signatures(), 1); + assert_eq!(buf.len(), 1); + + // Delete last sig — root should be fully removed + buf.delete(&[(1, root)]); + assert_eq!(buf.total_signatures(), 0); + assert_eq!(buf.len(), 0); + assert!(buf.order.is_empty()); + } + + #[test] + fn gossip_buffer_prune_by_slot() { + let mut buf = GossipSignatureBuffer::new(100); + + // Insert sigs at slots 1, 2, 3, 4, 5 + for slot in 1..=5u64 { + buf.insert( + HashedAttestationData::new(make_att_data(slot)), + 0, + make_dummy_sig(), + ); + } + assert_eq!(buf.total_signatures(), 5); + + // Prune slots <= 3 + let pruned = buf.prune(3); + assert_eq!(pruned, 3); + assert_eq!(buf.total_signatures(), 2); + assert_eq!(buf.len(), 2); + assert_eq!(buf.order.len(), 2); + } + + #[test] + fn gossip_buffer_eviction_removes_whole_root() { + // Capacity of 4 signatures + let mut buf = GossipSignatureBuffer::new(4); + + // Slot 1: 3 validators + let data1 = make_att_data(1); + buf.insert( + HashedAttestationData::new(data1.clone()), + 0, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data1.clone()), + 1, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data1.clone()), + 2, + make_dummy_sig(), + ); + + // Slot 2: 1 validator + let data2 = make_att_data(2); + buf.insert( + HashedAttestationData::new(data2.clone()), + 0, + make_dummy_sig(), + ); + assert_eq!(buf.total_signatures(), 4); + + // Insert slot 3 — should evict slot 1 (3 sigs), now total = 2 + let data3 = make_att_data(3); + buf.insert(HashedAttestationData::new(data3), 0, make_dummy_sig()); + + let slot1_root = HashedAttestationData::new(data1).root(); + assert!(!buf.data.contains_key(&slot1_root)); + assert_eq!(buf.total_signatures(), 2); // slot 2 (1) + slot 3 (1) + assert_eq!(buf.len(), 2); + } } From 8bdbff00bef9b555d92a5cfc8439f2dc4158509c Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Thu, 16 Apr 2026 12:31:32 -0300 Subject: [PATCH 2/3] Switch gossip signature dedup from first-write-wins to last-write-wins Overwrites the existing signature when the same (validator_id, data_root) is inserted again, keeping the latest signature. The total_signatures counter is only incremented for genuinely new entries. --- crates/storage/src/store.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 49c9e117..38b5164a 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -270,7 +270,7 @@ impl GossipSignatureBuffer { /// Insert a gossip signature, FIFO-evicting oldest data_roots when over capacity. /// - /// First-write-wins: if (validator_id, data_root) already exists, the signature is ignored. + /// Last-write-wins: if (validator_id, data_root) already exists, the signature is overwritten. fn insert( &mut self, hashed: HashedAttestationData, @@ -280,12 +280,11 @@ impl GossipSignatureBuffer { let (data_root, att_data) = hashed.into_parts(); if let Some(entry) = self.data.get_mut(&data_root) { - // First-write-wins: skip if this validator already has a sig - if entry.signatures.contains_key(&validator_id) { - return; - } + let is_new = !entry.signatures.contains_key(&validator_id); entry.signatures.insert(validator_id, signature); - self.total_signatures += 1; + if is_new { + self.total_signatures += 1; + } } else { let mut signatures = BTreeMap::new(); signatures.insert(validator_id, signature); @@ -1802,7 +1801,7 @@ mod tests { } #[test] - fn gossip_buffer_dedup_first_write_wins() { + fn gossip_buffer_dedup_last_write_wins() { let mut buf = GossipSignatureBuffer::new(100); let data = make_att_data(1); let hashed = HashedAttestationData::new(data); @@ -1810,6 +1809,7 @@ mod tests { buf.insert(hashed.clone(), 0, make_dummy_sig()); buf.insert(hashed.clone(), 0, make_dummy_sig()); + // Last-write-wins: overwrites the signature but count stays at 1 assert_eq!(buf.total_signatures(), 1); assert_eq!(buf.len(), 1); } From 3c626320161729aae35ab915de326be23cce2dc8 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Thu, 16 Apr 2026 12:45:29 -0300 Subject: [PATCH 3/3] Combine contains_key + insert into a single BTreeMap::insert call --- crates/storage/src/store.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 38b5164a..c15efe04 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -280,8 +280,7 @@ impl GossipSignatureBuffer { let (data_root, att_data) = hashed.into_parts(); if let Some(entry) = self.data.get_mut(&data_root) { - let is_new = !entry.signatures.contains_key(&validator_id); - entry.signatures.insert(validator_id, signature); + let is_new = entry.signatures.insert(validator_id, signature).is_none(); if is_new { self.total_signatures += 1; } @@ -1755,7 +1754,7 @@ mod tests { // ============ GossipSignatureBuffer Tests ============ fn make_dummy_sig() -> ValidatorSignature { - use ethlambda_types::signature::{LeanSigSignature, LeanSignatureScheme}; + use ethlambda_types::signature::LeanSignatureScheme; use leansig::{serialization::Serializable, signature::SignatureScheme}; use rand::{SeedableRng, rngs::StdRng};