diff --git a/Cargo.lock b/Cargo.lock index 9e72455c..1378eca1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2188,8 +2188,10 @@ name = "ethlambda-storage" version = "0.1.0" dependencies = [ "ethlambda-types", + "leansig", "libssz", "libssz-derive", + "rand 0.10.0", "rocksdb", "tempfile", "tracing", diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 2a877bb4..d389a3a9 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -20,3 +20,5 @@ libssz-derive.workspace = true [dev-dependencies] tempfile = "3" +leansig.workspace = true +rand.workspace = true diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index ed185a26..c15efe04 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -88,6 +88,11 @@ const AGGREGATED_PAYLOAD_CAP: usize = 512; /// Smaller than known since new payloads are drained every interval (~4s). const NEW_PAYLOAD_CAP: usize = 64; +/// Hard cap for the gossip signature buffer (individual signatures, not distinct data_roots). +/// With 4 validators and 4-second slots, 2048 signatures covers ~512 slots (~34 min). +/// Each XMSS signature is ~3KB, so worst-case memory is ~6 MB. +const GOSSIP_SIGNATURE_CAP: usize = 2048; + /// An entry in the payload buffer: attestation data + set of proofs. #[derive(Clone)] struct PayloadEntry { @@ -235,12 +240,148 @@ struct GossipDataEntry { signatures: BTreeMap, } -/// Gossip signatures grouped by attestation data (via data_root). -type GossipSignatureMap = HashMap; - /// Gossip signatures snapshot: (hashed_attestation_data, Vec<(validator_id, signature)>). pub type GossipSignatureSnapshot = Vec<(HashedAttestationData, Vec<(u64, ValidatorSignature)>)>; +/// Bounded buffer for gossip signatures with FIFO eviction. +/// +/// Groups signatures by attestation data (via data_root). Each distinct +/// attestation message stores the full `AttestationData` plus individual +/// validator signatures in ascending order (required for XMSS aggregation). +/// +/// Entries are evicted FIFO (by insertion order of the data_root) when +/// total_signatures exceeds capacity, matching the `PayloadBuffer` pattern. +struct GossipSignatureBuffer { + data: HashMap, + order: VecDeque, + capacity: usize, + total_signatures: usize, +} + +impl GossipSignatureBuffer { + fn new(capacity: usize) -> Self { + Self { + data: HashMap::new(), + order: VecDeque::new(), + capacity, + total_signatures: 0, + } + } + + /// Insert a gossip signature, FIFO-evicting oldest data_roots when over capacity. + /// + /// Last-write-wins: if (validator_id, data_root) already exists, the signature is overwritten. + fn insert( + &mut self, + hashed: HashedAttestationData, + validator_id: u64, + signature: ValidatorSignature, + ) { + let (data_root, att_data) = hashed.into_parts(); + + if let Some(entry) = self.data.get_mut(&data_root) { + let is_new = entry.signatures.insert(validator_id, signature).is_none(); + if is_new { + self.total_signatures += 1; + } + } else { + let mut signatures = BTreeMap::new(); + signatures.insert(validator_id, signature); + self.data.insert( + data_root, + GossipDataEntry { + data: att_data, + signatures, + }, + ); + self.order.push_back(data_root); + self.total_signatures += 1; + } + + // Evict oldest data_roots until under capacity + while self.total_signatures > self.capacity { + if let Some(evicted) = self.order.pop_front() { + if let Some(removed) = self.data.remove(&evicted) { + self.total_signatures -= removed.signatures.len(); + } + } else { + break; + } + } + } + + /// Delete gossip entries for the given (validator_id, data_root) pairs. + /// + /// When all signatures for a data_root are removed, the entry is cleaned up. + /// Collects emptied roots and batch-cleans the VecDeque in one pass. + fn delete(&mut self, keys: &[(u64, H256)]) { + if keys.is_empty() { + return; + } + let mut emptied_roots: HashSet = HashSet::new(); + for &(vid, data_root) in keys { + if let Some(entry) = self.data.get_mut(&data_root) { + if entry.signatures.remove(&vid).is_some() { + self.total_signatures -= 1; + } + if entry.signatures.is_empty() { + self.data.remove(&data_root); + emptied_roots.insert(data_root); + } + } + } + if !emptied_roots.is_empty() { + self.order.retain(|r| !emptied_roots.contains(r)); + } + } + + /// Prune gossip signatures for slots <= finalized_slot. + /// + /// Returns the number of data_root entries pruned. + fn prune(&mut self, finalized_slot: u64) -> usize { + let mut pruned_roots: HashSet = HashSet::new(); + self.data.retain(|root, entry| { + if entry.data.slot > finalized_slot { + true + } else { + self.total_signatures -= entry.signatures.len(); + pruned_roots.insert(*root); + false + } + }); + if !pruned_roots.is_empty() { + self.order.retain(|r| !pruned_roots.contains(r)); + } + pruned_roots.len() + } + + /// Returns a snapshot of all gossip signatures grouped by attestation data. + fn snapshot(&self) -> GossipSignatureSnapshot { + self.data + .values() + .map(|entry| { + let sigs: Vec<_> = entry + .signatures + .iter() + .map(|(&vid, sig)| (vid, sig.clone())) + .collect(); + (HashedAttestationData::new(entry.data.clone()), sigs) + }) + .collect() + } + + /// Returns the total number of individual signatures stored. + fn total_signatures(&self) -> usize { + self.total_signatures + } + + /// Returns the number of distinct data_roots. + #[cfg(test)] + fn len(&self) -> usize { + self.data.len() + } +} + /// Encode a LiveChain key (slot, root) to bytes. /// Layout: slot (8 bytes big-endian) || root (32 bytes) /// Big-endian ensures lexicographic ordering matches numeric ordering. @@ -278,7 +419,7 @@ pub struct Store { new_payloads: Arc>, known_payloads: Arc>, /// In-memory gossip signatures, consumed at interval 2 aggregation. - gossip_signatures: Arc>, + gossip_signatures: Arc>, } impl Store { @@ -409,7 +550,9 @@ impl Store { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), - gossip_signatures: Arc::new(Mutex::new(HashMap::new())), + gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( + GOSSIP_SIGNATURE_CAP, + ))), } } @@ -622,9 +765,7 @@ impl Store { /// Returns the number of entries pruned. pub fn prune_gossip_signatures(&mut self, finalized_slot: u64) -> usize { let mut gossip = self.gossip_signatures.lock().unwrap(); - let before = gossip.len(); - gossip.retain(|_, entry| entry.data.slot > finalized_slot); - before - gossip.len() + gossip.prune(finalized_slot) } /// Prune old states beyond the retention window. @@ -997,7 +1138,7 @@ impl Store { /// Returns the number of gossip signature entries stored. pub fn gossip_signatures_count(&self) -> usize { let gossip = self.gossip_signatures.lock().unwrap(); - gossip.values().map(|entry| entry.signatures.len()).sum() + gossip.total_signatures() } /// Estimated live data size in bytes for a table, as reported by the backend. @@ -1014,34 +1155,14 @@ impl Store { /// Delete gossip entries for the given (validator_id, data_root) pairs. pub fn delete_gossip_signatures(&mut self, keys: &[(u64, H256)]) { - if keys.is_empty() { - return; - } let mut gossip = self.gossip_signatures.lock().unwrap(); - for &(vid, data_root) in keys { - if let Some(entry) = gossip.get_mut(&data_root) { - entry.signatures.remove(&vid); - if entry.signatures.is_empty() { - gossip.remove(&data_root); - } - } - } + gossip.delete(keys); } /// Returns a snapshot of gossip signatures grouped by attestation data. pub fn iter_gossip_signatures(&self) -> GossipSignatureSnapshot { let gossip = self.gossip_signatures.lock().unwrap(); - gossip - .values() - .map(|entry| { - let sigs: Vec<_> = entry - .signatures - .iter() - .map(|(&vid, sig)| (vid, sig.clone())) - .collect(); - (HashedAttestationData::new(entry.data.clone()), sigs) - }) - .collect() + gossip.snapshot() } /// Stores a gossip signature for later aggregation. @@ -1051,13 +1172,8 @@ impl Store { validator_id: u64, signature: ValidatorSignature, ) { - let (data_root, att_data) = hashed.into_parts(); let mut gossip = self.gossip_signatures.lock().unwrap(); - let entry = gossip.entry(data_root).or_insert_with(|| GossipDataEntry { - data: att_data, - signatures: BTreeMap::new(), - }); - entry.signatures.entry(validator_id).or_insert(signature); + gossip.insert(hashed, validator_id, signature); } // ============ Derived Accessors ============ @@ -1189,7 +1305,9 @@ mod tests { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), - gossip_signatures: Arc::new(Mutex::new(HashMap::new())), + gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( + GOSSIP_SIGNATURE_CAP, + ))), } } @@ -1200,7 +1318,9 @@ mod tests { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), - gossip_signatures: Arc::new(Mutex::new(HashMap::new())), + gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( + GOSSIP_SIGNATURE_CAP, + ))), } } } @@ -1630,4 +1750,185 @@ mod tests { assert_eq!(cloned.new_payloads.lock().unwrap().len(), 0); assert_eq!(cloned.known_payloads.lock().unwrap().len(), 1); } + + // ============ GossipSignatureBuffer Tests ============ + + fn make_dummy_sig() -> ValidatorSignature { + use ethlambda_types::signature::LeanSignatureScheme; + use leansig::{serialization::Serializable, signature::SignatureScheme}; + use rand::{SeedableRng, rngs::StdRng}; + + static CACHED_SIG: std::sync::LazyLock> = std::sync::LazyLock::new(|| { + let mut rng = StdRng::seed_from_u64(42); + let lifetime = 1 << 5; // small for speed + let (_pk, sk) = LeanSignatureScheme::key_gen(&mut rng, 0, lifetime); + let sig = LeanSignatureScheme::sign(&sk, 0, &[0u8; 32]).unwrap(); + sig.to_bytes() + }); + + ValidatorSignature::from_bytes(&CACHED_SIG).expect("cached test signature") + } + + #[test] + fn gossip_buffer_fifo_eviction() { + // Capacity of 3 signatures total + let mut buf = GossipSignatureBuffer::new(3); + + // Insert 3 sigs across 3 data_roots (1 sig each) + for slot in 1..=3u64 { + let data = make_att_data(slot); + buf.insert(HashedAttestationData::new(data), 0, make_dummy_sig()); + } + assert_eq!(buf.total_signatures(), 3); + assert_eq!(buf.len(), 3); + + // Insert a 4th — should evict the oldest (slot 1) + let data4 = make_att_data(4); + buf.insert(HashedAttestationData::new(data4), 0, make_dummy_sig()); + assert_eq!(buf.total_signatures(), 3); + assert_eq!(buf.len(), 3); + + // Slot 1 should be gone + let slot1_root = HashedAttestationData::new(make_att_data(1)).root(); + assert!(!buf.data.contains_key(&slot1_root)); + + // Slots 2, 3, 4 should remain + let slot2_root = HashedAttestationData::new(make_att_data(2)).root(); + let slot4_root = HashedAttestationData::new(make_att_data(4)).root(); + assert!(buf.data.contains_key(&slot2_root)); + assert!(buf.data.contains_key(&slot4_root)); + } + + #[test] + fn gossip_buffer_dedup_last_write_wins() { + let mut buf = GossipSignatureBuffer::new(100); + let data = make_att_data(1); + let hashed = HashedAttestationData::new(data); + + buf.insert(hashed.clone(), 0, make_dummy_sig()); + buf.insert(hashed.clone(), 0, make_dummy_sig()); + + // Last-write-wins: overwrites the signature but count stays at 1 + assert_eq!(buf.total_signatures(), 1); + assert_eq!(buf.len(), 1); + } + + #[test] + fn gossip_buffer_multiple_validators_per_root() { + let mut buf = GossipSignatureBuffer::new(100); + let data = make_att_data(1); + + buf.insert( + HashedAttestationData::new(data.clone()), + 0, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data.clone()), + 1, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data.clone()), + 2, + make_dummy_sig(), + ); + + assert_eq!(buf.total_signatures(), 3); + assert_eq!(buf.len(), 1); // One data_root + } + + #[test] + fn gossip_buffer_delete_cleans_up() { + let mut buf = GossipSignatureBuffer::new(100); + let data = make_att_data(1); + let root = HashedAttestationData::new(data.clone()).root(); + + buf.insert( + HashedAttestationData::new(data.clone()), + 0, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data.clone()), + 1, + make_dummy_sig(), + ); + assert_eq!(buf.total_signatures(), 2); + + // Delete one sig — root should remain + buf.delete(&[(0, root)]); + assert_eq!(buf.total_signatures(), 1); + assert_eq!(buf.len(), 1); + + // Delete last sig — root should be fully removed + buf.delete(&[(1, root)]); + assert_eq!(buf.total_signatures(), 0); + assert_eq!(buf.len(), 0); + assert!(buf.order.is_empty()); + } + + #[test] + fn gossip_buffer_prune_by_slot() { + let mut buf = GossipSignatureBuffer::new(100); + + // Insert sigs at slots 1, 2, 3, 4, 5 + for slot in 1..=5u64 { + buf.insert( + HashedAttestationData::new(make_att_data(slot)), + 0, + make_dummy_sig(), + ); + } + assert_eq!(buf.total_signatures(), 5); + + // Prune slots <= 3 + let pruned = buf.prune(3); + assert_eq!(pruned, 3); + assert_eq!(buf.total_signatures(), 2); + assert_eq!(buf.len(), 2); + assert_eq!(buf.order.len(), 2); + } + + #[test] + fn gossip_buffer_eviction_removes_whole_root() { + // Capacity of 4 signatures + let mut buf = GossipSignatureBuffer::new(4); + + // Slot 1: 3 validators + let data1 = make_att_data(1); + buf.insert( + HashedAttestationData::new(data1.clone()), + 0, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data1.clone()), + 1, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data1.clone()), + 2, + make_dummy_sig(), + ); + + // Slot 2: 1 validator + let data2 = make_att_data(2); + buf.insert( + HashedAttestationData::new(data2.clone()), + 0, + make_dummy_sig(), + ); + assert_eq!(buf.total_signatures(), 4); + + // Insert slot 3 — should evict slot 1 (3 sigs), now total = 2 + let data3 = make_att_data(3); + buf.insert(HashedAttestationData::new(data3), 0, make_dummy_sig()); + + let slot1_root = HashedAttestationData::new(data1).root(); + assert!(!buf.data.contains_key(&slot1_root)); + assert_eq!(buf.total_signatures(), 2); // slot 2 (1) + slot 3 (1) + assert_eq!(buf.len(), 2); + } }