diff --git a/src/lean_spec/subspecs/sync/peer_manager.py b/src/lean_spec/subspecs/sync/peer_manager.py index da679e4b..97203141 100644 --- a/src/lean_spec/subspecs/sync/peer_manager.py +++ b/src/lean_spec/subspecs/sync/peer_manager.py @@ -167,6 +167,23 @@ def get_network_finalized_slot(self) -> Slot | None: return None return counter.most_common(1)[0][0] + def get_network_head_slot(self) -> Slot | None: + """ + Highest head slot reported by any connected peer with status. + + Used to distinguish a node that is itself behind from a network where + every peer is also behind (e.g. a streak of skipped proposals). + Returns None when no connected peer has reported a status yet. + """ + slots = [ + peer.status.head.slot + for peer in self._peers.values() + if peer.status is not None and peer.is_connected() + ] + if not slots: + return None + return max(slots) + def on_request_success(self, peer_id: PeerId) -> None: """Record a successful request to a peer.""" peer = self._peers.get(peer_id) diff --git a/src/lean_spec/subspecs/validator/constants.py b/src/lean_spec/subspecs/validator/constants.py new file mode 100644 index 00000000..8ef0b867 --- /dev/null +++ b/src/lean_spec/subspecs/validator/constants.py @@ -0,0 +1,22 @@ +""" +Validator service constants. + +Operational thresholds governing validator duty execution. +""" + +from __future__ import annotations + +from typing import Final + +SYNC_LAG_THRESHOLD: Final[int] = 4 +""" +Maximum tolerated lag, in slots, between wall-clock and the local head before +validator duties are skipped. + +A node whose local head trails wall clock by more than this attests against a +stale subtree, depositing fork-choice weight on the wrong branch. + +The gate also checks peer-reported head slots: if no peer claims a recent head, +the network as a whole is lagging (e.g. a streak of skipped proposals) and the +gate stays open so the chain can keep progressing. +""" diff --git a/src/lean_spec/subspecs/validator/service.py b/src/lean_spec/subspecs/validator/service.py index ab60e12d..5c07bab8 100644 --- a/src/lean_spec/subspecs/validator/service.py +++ b/src/lean_spec/subspecs/validator/service.py @@ -54,6 +54,7 @@ from lean_spec.subspecs.xmss.containers import Signature from lean_spec.types import Bytes32, Slot, Uint64, ValidatorIndex +from .constants import SYNC_LAG_THRESHOLD from .registry import ValidatorEntry, ValidatorRegistry logger = logging.getLogger(__name__) @@ -111,6 +112,9 @@ class ValidatorService: _attested_slots: set[Slot] = field(default_factory=set, repr=False) """Slots for which we've already produced attestations (prevents duplicates).""" + _duties_skipped_lag: int = field(default=0, repr=False) + """Counter for duties skipped because the local head trails wall clock.""" + async def run(self) -> None: """ Main loop - check duties every interval. @@ -168,7 +172,10 @@ async def run(self) -> None: # # Check if any of our validators is the proposer. logger.debug("ValidatorService: checking block production for slot %d", slot) - await self._maybe_produce_block(slot) + if self._is_synced_for_duties(slot): + await self._maybe_produce_block(slot) + else: + self._record_lag_skip(slot, "block") logger.debug("ValidatorService: done block production check for slot %d", slot) # Re-fetch interval after block production. @@ -197,16 +204,21 @@ async def run(self) -> None: slot, interval, ) - await self._produce_attestations(slot) - logger.debug("ValidatorService: done producing attestations for slot %d", slot) - self._attested_slots.add(slot) - - # Prune old entries to prevent unbounded growth. - # - # Keep only recent slots (current slot - 4) to bound memory usage. - # We never need to attest for slots that far in the past. - prune_threshold = Slot(max(0, int(slot) - 4)) - self._attested_slots = {s for s in self._attested_slots if s >= prune_threshold} + if self._is_synced_for_duties(slot): + await self._produce_attestations(slot) + logger.debug("ValidatorService: done producing attestations for slot %d", slot) + self._attested_slots.add(slot) + + # Prune old entries to prevent unbounded growth. + # + # Keep only recent slots (current slot - 4) to bound memory usage. + # We never need to attest for slots that far in the past. + prune_threshold = Slot(max(0, int(slot) - 4)) + self._attested_slots = {s for s in self._attested_slots if s >= prune_threshold} + else: + # Do NOT mark the slot attested: if the node catches up before + # the slot ends, the next iteration should retry the duty. + self._record_lag_skip(slot, "attestation") # Intervals 2-4 have no additional validator duties. @@ -498,6 +510,73 @@ def _sign_with_key( self.registry.add(updated_entry) return updated_entry, signature + def _is_synced_for_duties(self, slot: Slot) -> bool: + """ + Decide whether validator duties should run for the given slot. + + Combines two signals to avoid two failure modes: + + 1. Local lag: if our head is close to wall clock, attest and propose. + 2. Peer max head: if even the most up-to-date peer is also far behind, + the network is stalling (a streak of skipped proposals) and gating + our duties would only deepen the stall. + + Args: + slot: Wall-clock slot for which a duty would be performed. + + Returns: + True when duties should run; False when the local node is + materially behind a network that has otherwise made progress. + """ + store = self.sync_service.store + head_block = store.blocks.get(store.head) + # No head yet: nothing to compare against; duty methods will no-op. + if head_block is None: + return True + head_slot = head_block.slot + # Clock skew or chain ahead of wall clock: trust the chain. + if int(slot) <= int(head_slot): + return True + lag = int(slot) - int(head_slot) + if lag <= SYNC_LAG_THRESHOLD: + return True + # Local node is behind. Check whether the network is also behind. + peer_max = self.sync_service.peer_manager.get_network_head_slot() + # No peer evidence at all: isolated node, fall through and try. + if peer_max is None: + return True + # Network-wide skip: the highest peer head is also lagged. Keep duties + # alive so the chain can keep progressing through the skipped slots. + if int(slot) - int(peer_max) > SYNC_LAG_THRESHOLD: + return True + return False + + def _record_lag_skip(self, slot: Slot, duty: str) -> None: + """ + Emit a structured log and increment the lag-skip counter. + + Args: + slot: Slot whose duty was skipped. + duty: One of "block" or "attestation". Identifies the duty type + in the log so operators can attribute missed signatures. + """ + store = self.sync_service.store + head_block = store.blocks.get(store.head) + head_slot = head_block.slot if head_block is not None else Slot(0) + lag = max(0, int(slot) - int(head_slot)) + peer_max = self.sync_service.peer_manager.get_network_head_slot() + peer_max_repr = int(peer_max) if peer_max is not None else "none" + logger.info( + "Validator duty skipped due to sync lag: " + "duty=%s slot=%d head_slot=%d lag=%d peer_max_head_slot=%s", + duty, + int(slot), + int(head_slot), + lag, + peer_max_repr, + ) + self._duties_skipped_lag += 1 + def stop(self) -> None: """ Stop the service. @@ -521,3 +600,8 @@ def blocks_produced(self) -> int: def attestations_produced(self) -> int: """Total attestations produced since creation.""" return self._attestations_produced + + @property + def duties_skipped_lag(self) -> int: + """Total duties (block + attestation) skipped because of sync lag.""" + return self._duties_skipped_lag diff --git a/tests/lean_spec/subspecs/sync/test_peer_manager.py b/tests/lean_spec/subspecs/sync/test_peer_manager.py index d8f14576..aee53f75 100644 --- a/tests/lean_spec/subspecs/sync/test_peer_manager.py +++ b/tests/lean_spec/subspecs/sync/test_peer_manager.py @@ -279,6 +279,57 @@ def test_get_network_finalized_slot_ignores_disconnected( finalized = manager.get_network_finalized_slot() assert finalized == Slot(100) + def test_get_network_head_slot_returns_max( + self, peer_id: PeerId, peer_id_2: PeerId, peer_id_3: PeerId + ) -> None: + """get_network_head_slot returns the maximum head slot across peers.""" + manager = PeerManager() + + for pid, head_slot in [(peer_id, 100), (peer_id_2, 250), (peer_id_3, 175)]: + info = PeerInfo(peer_id=pid, state=ConnectionState.CONNECTED) + sync_peer = manager.add_peer(info) + sync_peer.status = Status( + finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(50)), + head=Checkpoint(root=Bytes32.zero(), slot=Slot(head_slot)), + ) + + assert manager.get_network_head_slot() == Slot(250) + + def test_get_network_head_slot_none_without_status(self, connected_peer_info: PeerInfo) -> None: + """get_network_head_slot returns None when no peer has reported status.""" + manager = PeerManager() + manager.add_peer(connected_peer_info) + assert manager.get_network_head_slot() is None + + def test_get_network_head_slot_none_with_no_peers(self) -> None: + """get_network_head_slot returns None when there are no peers at all.""" + manager = PeerManager() + assert manager.get_network_head_slot() is None + + def test_get_network_head_slot_ignores_disconnected( + self, peer_id: PeerId, peer_id_2: PeerId + ) -> None: + """Disconnected peers are excluded even if they have a recent reported head.""" + manager = PeerManager() + + info1 = PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED) + info2 = PeerInfo(peer_id=peer_id_2, state=ConnectionState.DISCONNECTED) + + sync_peer1 = manager.add_peer(info1) + sync_peer2 = manager.add_peer(info2) + + sync_peer1.status = Status( + finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(50)), + head=Checkpoint(root=Bytes32.zero(), slot=Slot(100)), + ) + # Disconnected peer reports a more recent head; must be ignored. + sync_peer2.status = Status( + finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(200)), + head=Checkpoint(root=Bytes32.zero(), slot=Slot(500)), + ) + + assert manager.get_network_head_slot() == Slot(100) + class TestPeerManagerRequestCallbacks: """Tests for PeerManager request callbacks.""" diff --git a/tests/lean_spec/subspecs/validator/test_service.py b/tests/lean_spec/subspecs/validator/test_service.py index 0d0a4ebf..1607edc0 100644 --- a/tests/lean_spec/subspecs/validator/test_service.py +++ b/tests/lean_spec/subspecs/validator/test_service.py @@ -22,6 +22,7 @@ from lean_spec.subspecs.sync.peer_manager import PeerManager from lean_spec.subspecs.sync.service import SyncService from lean_spec.subspecs.validator import ValidatorRegistry, ValidatorService +from lean_spec.subspecs.validator.constants import SYNC_LAG_THRESHOLD from lean_spec.subspecs.validator.registry import ValidatorEntry from lean_spec.subspecs.xmss import TARGET_SIGNATURE_SCHEME from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof @@ -1315,3 +1316,246 @@ async def capture_attestation(attestation: SignedAttestation) -> None: sig=signed_att.signature, ) assert not is_invalid, "Signature should not verify with the wrong slot" + + +def _set_head_slot(sync_service: SyncService, slot: Slot) -> None: + """Replace the head block in-place so its slot reads as `slot`. + + The genesis store has a single block at the head root. Mutating its slot + lets the duty gate observe whatever lag the test wants to exercise without + constructing a full chain. + """ + head_root = sync_service.store.head + new_head = sync_service.store.blocks[head_root].model_copy(update={"slot": slot}) + sync_service.store = sync_service.store.model_copy(update={"blocks": {head_root: new_head}}) + + +class TestSyncLagGate: + """ + Tests for the sync-lag duty gate. + + The gate combines local lag with peer-reported head slots so that: + + - A node that is itself behind a network making progress is silenced + - A node behind because the network is also behind keeps signing duties + """ + + def test_within_threshold_allows_duties(self, sync_service: SyncService) -> None: + """Lag of 0..SYNC_LAG_THRESHOLD slots is allowed.""" + _set_head_slot(sync_service, Slot(10)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + + for lag in range(SYNC_LAG_THRESHOLD + 1): + assert service._is_synced_for_duties(Slot(10 + lag)) + + def test_just_over_threshold_with_recent_peer_gates(self, sync_service: SyncService) -> None: + """Lag > THRESHOLD is gated when at least one peer has a fresh head.""" + _set_head_slot(sync_service, Slot(10)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + + with patch.object( + PeerManager, + "get_network_head_slot", + return_value=Slot(15), + ): + assert not service._is_synced_for_duties(Slot(15)) + + def test_clock_skew_does_not_gate(self, sync_service: SyncService) -> None: + """If wall clock is behind head slot, duties are allowed (trust the chain).""" + _set_head_slot(sync_service, Slot(20)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + assert service._is_synced_for_duties(Slot(15)) + + def test_no_peer_status_does_not_gate(self, sync_service: SyncService) -> None: + """An isolated node with no peer status keeps duties live.""" + _set_head_slot(sync_service, Slot(0)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + # peer_max defaults to None on a fresh PeerManager; lag is 100. + assert service._is_synced_for_duties(Slot(100)) + + def test_network_wide_stall_does_not_gate(self, sync_service: SyncService) -> None: + """When even the most up-to-date peer is far behind, duties stay live. + + Regression for the consecutive skipped-slots edge case: the simple + local-lag check would silence every validator at the same moment and + make recovery impossible. + """ + _set_head_slot(sync_service, Slot(0)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + with patch.object( + PeerManager, + "get_network_head_slot", + return_value=Slot(0), + ): + assert service._is_synced_for_duties(Slot(50)) + + def test_boundary_lag_equal_threshold_allowed(self, sync_service: SyncService) -> None: + """Boundary: lag == SYNC_LAG_THRESHOLD is still allowed.""" + _set_head_slot(sync_service, Slot(10)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + wall_clock = Slot(10 + SYNC_LAG_THRESHOLD) + with patch.object( + PeerManager, + "get_network_head_slot", + return_value=wall_clock, + ): + assert service._is_synced_for_duties(wall_clock) + + def test_boundary_lag_one_over_threshold_gated(self, sync_service: SyncService) -> None: + """Boundary: lag == SYNC_LAG_THRESHOLD + 1 with recent peer is gated.""" + _set_head_slot(sync_service, Slot(10)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + wall_clock = Slot(10 + SYNC_LAG_THRESHOLD + 1) + with patch.object( + PeerManager, + "get_network_head_slot", + return_value=wall_clock, + ): + assert not service._is_synced_for_duties(wall_clock) + + async def test_run_loop_skips_block_production_when_gated( + self, sync_service: SyncService, key_manager: XmssKeyManager + ) -> None: + """Interval 0 in a gated slot does not invoke _maybe_produce_block.""" + _set_head_slot(sync_service, Slot(0)) + clock = SlotClock(genesis_time=Uint64(0), time_fn=lambda: _interval_time(10, 0)) + service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=_make_registry(key_manager, 0), + ) + + block_calls: list[Slot] = [] + + async def mock_block(_self, slot: Slot) -> None: + block_calls.append(slot) + + async def stop_on_sleep(_d: float) -> None: + service.stop() + + with ( + patch.object( + PeerManager, + "get_network_head_slot", + return_value=Slot(10), + ), + patch.object(ValidatorService, "_maybe_produce_block", mock_block), + patch("asyncio.sleep", new=stop_on_sleep), + ): + await service.run() + + assert block_calls == [] + assert service.duties_skipped_lag >= 1 + + async def test_run_loop_skips_attestation_when_gated( + self, sync_service: SyncService, key_manager: XmssKeyManager + ) -> None: + """Gated attestation: duty is skipped AND the slot stays unmarked. + + The slot must stay out of `_attested_slots` so a node that catches up + before the slot ends can still attest in this same slot. + """ + _set_head_slot(sync_service, Slot(0)) + clock = SlotClock(genesis_time=Uint64(0), time_fn=lambda: _interval_time(10, 1)) + service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=_make_registry(key_manager, 0), + ) + + attest_calls: list[Slot] = [] + + async def mock_attest(_self, slot: Slot) -> None: + attest_calls.append(slot) + + async def stop_on_sleep(_d: float) -> None: + service.stop() + + with ( + patch.object( + PeerManager, + "get_network_head_slot", + return_value=Slot(10), + ), + patch.object(ValidatorService, "_produce_attestations", mock_attest), + patch("asyncio.sleep", new=stop_on_sleep), + ): + await service.run() + + assert attest_calls == [] + assert Slot(10) not in service._attested_slots + assert service.duties_skipped_lag >= 1 + + def test_record_lag_skip_logs_structured_fields_and_increments_counter( + self, sync_service: SyncService, caplog: pytest.LogCaptureFixture + ) -> None: + """The skip log carries duty type, slot, head_slot, lag, and peer_max.""" + _set_head_slot(sync_service, Slot(3)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + + with ( + caplog.at_level("INFO"), + patch.object( + PeerManager, + "get_network_head_slot", + return_value=Slot(20), + ), + ): + service._record_lag_skip(Slot(20), "block") + + assert "duty=block" in caplog.text + assert "slot=20" in caplog.text + assert "head_slot=3" in caplog.text + assert "lag=17" in caplog.text + assert "peer_max_head_slot=20" in caplog.text + assert service.duties_skipped_lag == 1 + + def test_record_lag_skip_with_no_peer_status_renders_none( + self, sync_service: SyncService, caplog: pytest.LogCaptureFixture + ) -> None: + """When no peer has reported status, the log renders peer_max as 'none'.""" + _set_head_slot(sync_service, Slot(3)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + + with caplog.at_level("INFO"): + service._record_lag_skip(Slot(20), "attestation") + + assert "duty=attestation" in caplog.text + assert "peer_max_head_slot=none" in caplog.text + assert service.duties_skipped_lag == 1