Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/lean_spec/subspecs/sync/peer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,23 @@ def get_network_finalized_slot(self) -> Slot | None:
return None
return counter.most_common(1)[0][0]

def get_network_head_slot(self) -> Slot | None:
"""
Highest head slot reported by any connected peer with status.

Used to distinguish a node that is itself behind from a network where
every peer is also behind (e.g. a streak of skipped proposals).
Returns None when no connected peer has reported a status yet.
"""
slots = [
peer.status.head.slot
for peer in self._peers.values()
if peer.status is not None and peer.is_connected()
]
if not slots:
return None
return max(slots)

def on_request_success(self, peer_id: PeerId) -> None:
"""Record a successful request to a peer."""
peer = self._peers.get(peer_id)
Expand Down
22 changes: 22 additions & 0 deletions src/lean_spec/subspecs/validator/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
Validator service constants.

Operational thresholds governing validator duty execution.
"""

from __future__ import annotations

from typing import Final

SYNC_LAG_THRESHOLD: Final[int] = 4
"""
Maximum tolerated lag, in slots, between wall-clock and the local head before
validator duties are skipped.

A node whose local head trails wall clock by more than this attests against a
stale subtree, depositing fork-choice weight on the wrong branch.

The gate also checks peer-reported head slots: if no peer claims a recent head,
the network as a whole is lagging (e.g. a streak of skipped proposals) and the
gate stays open so the chain can keep progressing.
"""
106 changes: 95 additions & 11 deletions src/lean_spec/subspecs/validator/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from lean_spec.subspecs.xmss.containers import Signature
from lean_spec.types import Bytes32, Slot, Uint64, ValidatorIndex

from .constants import SYNC_LAG_THRESHOLD
from .registry import ValidatorEntry, ValidatorRegistry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -111,6 +112,9 @@ class ValidatorService:
_attested_slots: set[Slot] = field(default_factory=set, repr=False)
"""Slots for which we've already produced attestations (prevents duplicates)."""

_duties_skipped_lag: int = field(default=0, repr=False)
"""Counter for duties skipped because the local head trails wall clock."""

async def run(self) -> None:
"""
Main loop - check duties every interval.
Expand Down Expand Up @@ -168,7 +172,10 @@ async def run(self) -> None:
#
# Check if any of our validators is the proposer.
logger.debug("ValidatorService: checking block production for slot %d", slot)
await self._maybe_produce_block(slot)
if self._is_synced_for_duties(slot):
await self._maybe_produce_block(slot)
else:
self._record_lag_skip(slot, "block")
logger.debug("ValidatorService: done block production check for slot %d", slot)

# Re-fetch interval after block production.
Expand Down Expand Up @@ -197,16 +204,21 @@ async def run(self) -> None:
slot,
interval,
)
await self._produce_attestations(slot)
logger.debug("ValidatorService: done producing attestations for slot %d", slot)
self._attested_slots.add(slot)

# Prune old entries to prevent unbounded growth.
#
# Keep only recent slots (current slot - 4) to bound memory usage.
# We never need to attest for slots that far in the past.
prune_threshold = Slot(max(0, int(slot) - 4))
self._attested_slots = {s for s in self._attested_slots if s >= prune_threshold}
if self._is_synced_for_duties(slot):
await self._produce_attestations(slot)
logger.debug("ValidatorService: done producing attestations for slot %d", slot)
self._attested_slots.add(slot)

# Prune old entries to prevent unbounded growth.
#
# Keep only recent slots (current slot - 4) to bound memory usage.
# We never need to attest for slots that far in the past.
prune_threshold = Slot(max(0, int(slot) - 4))
self._attested_slots = {s for s in self._attested_slots if s >= prune_threshold}
else:
# Do NOT mark the slot attested: if the node catches up before
# the slot ends, the next iteration should retry the duty.
self._record_lag_skip(slot, "attestation")

# Intervals 2-4 have no additional validator duties.

Expand Down Expand Up @@ -498,6 +510,73 @@ def _sign_with_key(
self.registry.add(updated_entry)
return updated_entry, signature

def _is_synced_for_duties(self, slot: Slot) -> bool:
"""
Decide whether validator duties should run for the given slot.

Combines two signals to avoid two failure modes:

1. Local lag: if our head is close to wall clock, attest and propose.
2. Peer max head: if even the most up-to-date peer is also far behind,
the network is stalling (a streak of skipped proposals) and gating
our duties would only deepen the stall.

Args:
slot: Wall-clock slot for which a duty would be performed.

Returns:
True when duties should run; False when the local node is
materially behind a network that has otherwise made progress.
"""
store = self.sync_service.store
head_block = store.blocks.get(store.head)
# No head yet: nothing to compare against; duty methods will no-op.
if head_block is None:
return True
head_slot = head_block.slot
# Clock skew or chain ahead of wall clock: trust the chain.
if int(slot) <= int(head_slot):
return True
lag = int(slot) - int(head_slot)
if lag <= SYNC_LAG_THRESHOLD:
return True
# Local node is behind. Check whether the network is also behind.
peer_max = self.sync_service.peer_manager.get_network_head_slot()
# No peer evidence at all: isolated node, fall through and try.
if peer_max is None:
return True
# Network-wide skip: the highest peer head is also lagged. Keep duties
# alive so the chain can keep progressing through the skipped slots.
if int(slot) - int(peer_max) > SYNC_LAG_THRESHOLD:
return True
return False

def _record_lag_skip(self, slot: Slot, duty: str) -> None:
"""
Emit a structured log and increment the lag-skip counter.

Args:
slot: Slot whose duty was skipped.
duty: One of "block" or "attestation". Identifies the duty type
in the log so operators can attribute missed signatures.
"""
store = self.sync_service.store
head_block = store.blocks.get(store.head)
head_slot = head_block.slot if head_block is not None else Slot(0)
lag = max(0, int(slot) - int(head_slot))
peer_max = self.sync_service.peer_manager.get_network_head_slot()
peer_max_repr = int(peer_max) if peer_max is not None else "none"
logger.info(
"Validator duty skipped due to sync lag: "
"duty=%s slot=%d head_slot=%d lag=%d peer_max_head_slot=%s",
duty,
int(slot),
int(head_slot),
lag,
peer_max_repr,
)
self._duties_skipped_lag += 1

def stop(self) -> None:
"""
Stop the service.
Expand All @@ -521,3 +600,8 @@ def blocks_produced(self) -> int:
def attestations_produced(self) -> int:
"""Total attestations produced since creation."""
return self._attestations_produced

@property
def duties_skipped_lag(self) -> int:
"""Total duties (block + attestation) skipped because of sync lag."""
return self._duties_skipped_lag
51 changes: 51 additions & 0 deletions tests/lean_spec/subspecs/sync/test_peer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,57 @@ def test_get_network_finalized_slot_ignores_disconnected(
finalized = manager.get_network_finalized_slot()
assert finalized == Slot(100)

def test_get_network_head_slot_returns_max(
self, peer_id: PeerId, peer_id_2: PeerId, peer_id_3: PeerId
) -> None:
"""get_network_head_slot returns the maximum head slot across peers."""
manager = PeerManager()

for pid, head_slot in [(peer_id, 100), (peer_id_2, 250), (peer_id_3, 175)]:
info = PeerInfo(peer_id=pid, state=ConnectionState.CONNECTED)
sync_peer = manager.add_peer(info)
sync_peer.status = Status(
finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(50)),
head=Checkpoint(root=Bytes32.zero(), slot=Slot(head_slot)),
)

assert manager.get_network_head_slot() == Slot(250)

def test_get_network_head_slot_none_without_status(self, connected_peer_info: PeerInfo) -> None:
"""get_network_head_slot returns None when no peer has reported status."""
manager = PeerManager()
manager.add_peer(connected_peer_info)
assert manager.get_network_head_slot() is None

def test_get_network_head_slot_none_with_no_peers(self) -> None:
"""get_network_head_slot returns None when there are no peers at all."""
manager = PeerManager()
assert manager.get_network_head_slot() is None

def test_get_network_head_slot_ignores_disconnected(
self, peer_id: PeerId, peer_id_2: PeerId
) -> None:
"""Disconnected peers are excluded even if they have a recent reported head."""
manager = PeerManager()

info1 = PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED)
info2 = PeerInfo(peer_id=peer_id_2, state=ConnectionState.DISCONNECTED)

sync_peer1 = manager.add_peer(info1)
sync_peer2 = manager.add_peer(info2)

sync_peer1.status = Status(
finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(50)),
head=Checkpoint(root=Bytes32.zero(), slot=Slot(100)),
)
# Disconnected peer reports a more recent head; must be ignored.
sync_peer2.status = Status(
finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(200)),
head=Checkpoint(root=Bytes32.zero(), slot=Slot(500)),
)

assert manager.get_network_head_slot() == Slot(100)


class TestPeerManagerRequestCallbacks:
"""Tests for PeerManager request callbacks."""
Expand Down
Loading
Loading