From 9d26e0331ef1b0c001f2cdde0215d7b1865cf597 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 21 Jan 2026 15:44:42 -0300 Subject: [PATCH 1/4] feat: send Status request when peer connects --- crates/net/p2p/src/lib.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 25b9fc24..a0076144 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -4,6 +4,7 @@ use std::{ }; use ethlambda_blockchain::{BlockChain, OutboundGossip}; +use ethlambda_types::{primitives::H256, state::Checkpoint}; use ethrex_common::H264; use ethrex_p2p::types::NodeRecord; use ethrex_rlp::decode::RLPDecode; @@ -238,6 +239,14 @@ async fn event_loop( metrics::notify_peer_connected(&peer_id, "inbound", "error"); warn!(%error, "Incoming connection error"); } + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + info!(%peer_id, "Connection established, sending status request"); + let placeholder_status = Status { + finalized: Checkpoint { root: H256::ZERO, slot: 0 }, + head: Checkpoint { root: H256::ZERO, slot: 0 }, + }; + swarm.behaviour_mut().req_resp.send_request(&peer_id, placeholder_status); + } _ => { trace!(?event, "Ignored swarm event"); } @@ -318,7 +327,6 @@ async fn handle_req_resp_message( .req_resp .send_response(channel, request.clone()) .unwrap(); - swarm.behaviour_mut().req_resp.send_request(&peer, request); } request_response::Message::Response { request_id: _, From c2810152ce523bdf3a77031f177bfca0b0f12210 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 26 Jan 2026 16:16:35 -0300 Subject: [PATCH 2/4] fix: send real status instead of dummy --- Cargo.lock | 1 + bin/ethlambda/src/main.rs | 5 +++- crates/blockchain/src/lib.rs | 7 +++-- crates/net/p2p/Cargo.toml | 1 + crates/net/p2p/src/lib.rs | 51 ++++++++++++++++++++++++++---------- 5 files changed, 46 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e8f9e754..6c73c8fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1960,6 +1960,7 @@ dependencies = [ "ethereum_ssz", "ethereum_ssz_derive", "ethlambda-blockchain", + "ethlambda-storage", "ethlambda-types", "ethrex-common", "ethrex-p2p", diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 0b682394..2143c310 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -20,6 +20,7 @@ use tracing::{error, info}; use tracing_subscriber::{EnvFilter, Layer, Registry, layer::SubscriberExt}; use ethlambda_blockchain::BlockChain; +use ethlambda_storage::Store; const ASCII_ART: &str = r#" _ _ _ _ _ @@ -91,9 +92,10 @@ async fn main() { read_validator_keys(&validators_path, &validator_keys_dir, &options.node_id); let genesis_state = State::from_genesis(&genesis, validators); + let store = Store::from_genesis(genesis_state); let (p2p_tx, p2p_rx) = tokio::sync::mpsc::unbounded_channel(); - let blockchain = BlockChain::spawn(genesis_state, p2p_tx, validator_keys); + let blockchain = BlockChain::spawn(store.clone(), p2p_tx, validator_keys); let p2p_handle = tokio::spawn(start_p2p( node_p2p_key, @@ -101,6 +103,7 @@ async fn main() { p2p_socket, blockchain, p2p_rx, + store, )); start_prometheus_metrics_api(metrics_socket).await.unwrap(); diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index efd58ef9..9f35821c 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -8,7 +8,7 @@ use ethlambda_types::{ block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, primitives::TreeHash, signature::ValidatorSecretKey, - state::{Checkpoint, State}, + state::Checkpoint, }; use spawned_concurrency::tasks::{ CallResponse, CastResponse, GenServer, GenServerHandle, send_after, @@ -40,12 +40,11 @@ pub const SECONDS_PER_SLOT: u64 = 4; impl BlockChain { pub fn spawn( - genesis_state: State, + store: Store, p2p_tx: mpsc::UnboundedSender, validator_keys: HashMap, ) -> BlockChain { - let genesis_time = genesis_state.config.genesis_time; - let store = Store::from_genesis(genesis_state); + let genesis_time = store.config().genesis_time; let key_manager = key_manager::KeyManager::new(validator_keys); let handle = BlockChainServer { store, diff --git a/crates/net/p2p/Cargo.toml b/crates/net/p2p/Cargo.toml index 488275f1..29af2f7a 100644 --- a/crates/net/p2p/Cargo.toml +++ b/crates/net/p2p/Cargo.toml @@ -11,6 +11,7 @@ version.workspace = true [dependencies] ethlambda-blockchain.workspace = true +ethlambda-storage.workspace = true ethlambda-types.workspace = true async-trait = "0.1" diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index a0076144..0d5d521d 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -4,7 +4,8 @@ use std::{ }; use ethlambda_blockchain::{BlockChain, OutboundGossip}; -use ethlambda_types::{primitives::H256, state::Checkpoint}; +use ethlambda_storage::Store; +use ethlambda_types::state::Checkpoint; use ethrex_common::H264; use ethrex_p2p::types::NodeRecord; use ethrex_rlp::decode::RLPDecode; @@ -42,6 +43,7 @@ pub async fn start_p2p( listening_socket: SocketAddr, blockchain: BlockChain, p2p_rx: mpsc::UnboundedReceiver, + store: Store, ) { let config = libp2p::gossipsub::ConfigBuilder::default() // d @@ -143,7 +145,15 @@ pub async fn start_p2p( info!("P2P node started on {listening_socket}"); - event_loop(swarm, blockchain, p2p_rx, attestation_topic, block_topic).await; + event_loop( + swarm, + blockchain, + p2p_rx, + attestation_topic, + block_topic, + store, + ) + .await; } /// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining Gossipsub and Request-Response Behaviours @@ -161,6 +171,7 @@ async fn event_loop( mut p2p_rx: mpsc::UnboundedReceiver, attestation_topic: libp2p::gossipsub::IdentTopic, block_topic: libp2p::gossipsub::IdentTopic, + store: Store, ) { loop { tokio::select! { @@ -180,7 +191,7 @@ async fn event_loop( SwarmEvent::Behaviour(BehaviourEvent::ReqResp( message @ request_response::Event::Message { .. }, )) => { - handle_req_resp_message(&mut swarm, message).await; + handle_req_resp_message(&mut swarm, message, &store).await; } SwarmEvent::Behaviour(BehaviourEvent::Gossipsub( message @ libp2p::gossipsub::Event::Message { .. }, @@ -196,8 +207,13 @@ async fn event_loop( let direction = connection_direction(&endpoint); if num_established.get() == 1 { metrics::notify_peer_connected(&Some(peer_id), direction, "success"); + // Send status request on first connection to this peer + let our_status = build_status(&store); + info!(%peer_id, %direction, finalized_slot=%our_status.finalized.slot, head_slot=%our_status.head.slot, "Peer connected, sending status request"); + swarm.behaviour_mut().req_resp.send_request(&peer_id, our_status); + } else { + info!(%peer_id, %direction, "Peer connected"); } - info!(%peer_id, %direction, "Peer connected"); } SwarmEvent::ConnectionClosed { peer_id, @@ -239,14 +255,6 @@ async fn event_loop( metrics::notify_peer_connected(&peer_id, "inbound", "error"); warn!(%error, "Incoming connection error"); } - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - info!(%peer_id, "Connection established, sending status request"); - let placeholder_status = Status { - finalized: Checkpoint { root: H256::ZERO, slot: 0 }, - head: Checkpoint { root: H256::ZERO, slot: 0 }, - }; - swarm.behaviour_mut().req_resp.send_request(&peer_id, placeholder_status); - } _ => { trace!(?event, "Ignored swarm event"); } @@ -305,6 +313,7 @@ async fn handle_outgoing_gossip( async fn handle_req_resp_message( swarm: &mut libp2p::Swarm, event: request_response::Event, + store: &Store, ) { let request_response::Event::Message { peer, @@ -321,11 +330,11 @@ async fn handle_req_resp_message( channel, } => { info!(finalized_slot=%request.finalized.slot, head_slot=%request.head.slot, "Received status request from peer {peer}"); - // TODO: send real status + let our_status = build_status(store); swarm .behaviour_mut() .req_resp - .send_response(channel, request.clone()) + .send_response(channel, our_status) .unwrap(); } request_response::Message::Response { @@ -385,6 +394,20 @@ fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str } } +/// Build a Status message from the current Store state. +fn build_status(store: &Store) -> Status { + let finalized = store.latest_finalized(); + let head_root = store.head(); + let head_slot = store.get_block(&head_root).expect("head block exists").slot; + Status { + finalized, + head: Checkpoint { + root: head_root, + slot: head_slot, + }, + } +} + fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId { const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0x00, 0x00, 0x00, 0x00]; const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [0x01, 0x00, 0x00, 0x00]; From 91fe5e0f0ebe925d90a8f88305896a759b198216 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 26 Jan 2026 16:23:23 -0300 Subject: [PATCH 3/4] fix: clone store before passing --- bin/ethlambda/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index aca793fa..c9da0768 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -102,7 +102,7 @@ async fn main() { p2p_socket, blockchain, p2p_rx, - store, + store.clone(), )); ethlambda_rpc::start_rpc_server(metrics_socket, store) From ef80abbd66c928ceb3d02851ce1ddd3cfdc566d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 26 Jan 2026 16:29:16 -0300 Subject: [PATCH 4/4] chore: change log messages --- crates/net/p2p/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 0d5d521d..ecaab663 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -209,10 +209,10 @@ async fn event_loop( metrics::notify_peer_connected(&Some(peer_id), direction, "success"); // Send status request on first connection to this peer let our_status = build_status(&store); - info!(%peer_id, %direction, finalized_slot=%our_status.finalized.slot, head_slot=%our_status.head.slot, "Peer connected, sending status request"); + info!(%peer_id, %direction, finalized_slot=%our_status.finalized.slot, head_slot=%our_status.head.slot, "Added connection to new peer, sending status request"); swarm.behaviour_mut().req_resp.send_request(&peer_id, our_status); } else { - info!(%peer_id, %direction, "Peer connected"); + info!(%peer_id, %direction, "Added peer connection"); } } SwarmEvent::ConnectionClosed {