Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ use crate::{
publish_attestation, publish_block,
},
req_resp::{
BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, Request,
STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer,
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec,
MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status,
fetch_block_from_peer,
},
swarm_adapter::SwarmHandle,
};
Expand Down Expand Up @@ -154,6 +155,10 @@ pub fn build_swarm(
StreamProtocol::new(BLOCKS_BY_ROOT_PROTOCOL_V1),
request_response::ProtocolSupport::Full,
),
(
StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1),
request_response::ProtocolSupport::Full,
),
],
Default::default(),
);
Expand Down
36 changes: 31 additions & 5 deletions crates/net/p2p/src/req_resp/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use tracing::{debug, trace, warn};
use super::{
encoding::{MAX_PAYLOAD_SIZE, decode_payload, write_payload},
messages::{
BLOCKS_BY_ROOT_PROTOCOL_V1, ErrorMessage, Request, Response, ResponseCode, ResponsePayload,
STATUS_PROTOCOL_V1, Status,
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, ErrorMessage, Request, Response,
ResponseCode, ResponsePayload, STATUS_PROTOCOL_V1, Status,
},
};

Expand Down Expand Up @@ -45,6 +45,12 @@ impl libp2p::request_response::Codec for Codec {
})?;
Ok(Request::BlocksByRoot(request))
}
BLOCKS_BY_RANGE_PROTOCOL_V1 => {
let request = SszDecode::from_ssz_bytes(&payload).map_err(|err| {
io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}"))
})?;
Ok(Request::BlocksByRange(request))
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("unknown protocol: {}", protocol.as_ref()),
Expand All @@ -63,6 +69,7 @@ impl libp2p::request_response::Codec for Codec {
match protocol.as_ref() {
STATUS_PROTOCOL_V1 => decode_status_response(io).await,
BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io).await,
BLOCKS_BY_RANGE_PROTOCOL_V1 => decode_blocks_by_range_response(io).await,
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("unknown protocol: {}", protocol.as_ref()),
Expand All @@ -84,6 +91,7 @@ impl libp2p::request_response::Codec for Codec {
let encoded = match req {
Request::Status(status) => status.to_ssz(),
Request::BlocksByRoot(request) => request.to_ssz(),
Request::BlocksByRange(request) => request.to_ssz(),
};

write_payload(io, &encoded).await
Expand All @@ -107,7 +115,8 @@ impl libp2p::request_response::Codec for Codec {
let encoded = status.to_ssz();
write_payload(io, &encoded).await
}
ResponsePayload::BlocksByRoot(blocks) => {
ResponsePayload::BlocksByRoot(blocks)
| ResponsePayload::BlocksByRange(blocks) => {
// Write each block as a separate chunk.
// Encode first, then check size before writing the SUCCESS
// code byte. This avoids corrupting the stream if a block
Expand All @@ -118,7 +127,7 @@ impl libp2p::request_response::Codec for Codec {
if encoded.len() > MAX_PAYLOAD_SIZE - 1024 {
warn!(
size = encoded.len(),
"Skipping oversized block in BlocksByRoot response"
"Skipping oversized block in block response"
);
continue;
}
Expand Down Expand Up @@ -216,6 +225,23 @@ where
/// Note: Error chunks from the peer (non-SUCCESS response codes) do not cause this
/// function to return `Err` - they are logged and skipped.
async fn decode_blocks_by_root_response<T>(io: &mut T) -> io::Result<Response>
where
T: AsyncRead + Unpin + Send,
{
decode_blocks_response(io, ResponsePayload::BlocksByRoot).await
}

async fn decode_blocks_by_range_response<T>(io: &mut T) -> io::Result<Response>
where
T: AsyncRead + Unpin + Send,
{
decode_blocks_response(io, ResponsePayload::BlocksByRange).await
}

async fn decode_blocks_response<T>(
io: &mut T,
payload: fn(Vec<SignedBlock>) -> ResponsePayload,
) -> io::Result<Response>
where
T: AsyncRead + Unpin + Send,
{
Expand Down Expand Up @@ -247,5 +273,5 @@ where
blocks.push(block);
}

Ok(Response::success(ResponsePayload::BlocksByRoot(blocks)))
Ok(Response::success(payload(blocks)))
}
178 changes: 175 additions & 3 deletions crates/net/p2p/src/req_resp/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use ethlambda_storage::Store;
use libp2p::{PeerId, request_response};
Expand All @@ -12,11 +12,13 @@ use ethlambda_types::primitives::HashTreeRoot as _;
use ethlambda_types::{block::SignedBlock, primitives::H256};

use super::{
BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload, Status,
BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS,
Request, Response, ResponsePayload, Status, messages::error_message,
};
use crate::{
BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest,
p2p_protocol, req_resp::RequestedBlockRoots,
p2p_protocol,
req_resp::{RequestedBlockRoots, messages::ResponseCode},
};

pub async fn handle_req_resp_message(
Expand All @@ -42,6 +44,13 @@ pub async fn handle_req_resp_message(
);
handle_blocks_by_root_request(server, request, channel, peer).await;
}
Request::BlocksByRange(request) => {
info!(
kind = "blocks_by_range_request",
peer_count, "P2P message received"
);
handle_blocks_by_range_request(server, request, channel, peer).await;
}
}
}
request_response::Message::Response {
Expand All @@ -63,6 +72,14 @@ pub async fn handle_req_resp_message(
handle_blocks_by_root_response(server, blocks, peer, request_id, ctx)
.await;
}
ResponsePayload::BlocksByRange(blocks) => {
info!(
kind = "blocks_by_range_response",
peer_count,
count = blocks.len(),
"P2P message received"
);
}
Comment on lines +75 to +82
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 BlocksByRange response handler is a no-op stub

When this node receives a BlocksByRange response from a peer, the blocks are logged and then silently dropped — no processing, storage, or forwarding takes place. This is fine for now since the node doesn't yet issue outbound BlocksByRange requests, but if a future caller sends such a request, all returned blocks will be discarded without any diagnostic. Consider adding a warn! that makes the "not yet implemented" intent explicit.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/req_resp/handlers.rs
Line: 75-82

Comment:
**BlocksByRange response handler is a no-op stub**

When this node receives a `BlocksByRange` response from a peer, the blocks are logged and then silently dropped — no processing, storage, or forwarding takes place. This is fine for now since the node doesn't yet issue outbound `BlocksByRange` requests, but if a future caller sends such a request, all returned blocks will be discarded without any diagnostic. Consider adding a `warn!` that makes the "not yet implemented" intent explicit.

How can I resolve this? If you propose a fix, please make it concise.

},
Response::Error { code, message } => {
let error_str = String::from_utf8_lossy(&message);
Expand Down Expand Up @@ -140,6 +157,97 @@ async fn handle_blocks_by_root_request(
server.swarm_handle.send_response(channel, response);
}

async fn handle_blocks_by_range_request(
server: &mut P2PServer,
request: BlocksByRangeRequest,
channel: request_response::ResponseChannel<Response>,
peer: PeerId,
) {
info!(
%peer,
start_slot = request.start_slot,
count = request.count,
step = request.step,
"Received BlocksByRange request"
);

if request.step == 0 || request.count == 0 || request.count > MAX_REQUEST_BLOCKS {
let response = Response::error(
ResponseCode::INVALID_REQUEST,
error_message("invalid BlocksByRange request"),
);
server.swarm_handle.send_response(channel, response);
return;
}

let blocks = canonical_blocks_by_range(
&server.store,
request.start_slot,
request.count,
request.step,
);

info!(
%peer,
start_slot = request.start_slot,
count = request.count,
step = request.step,
found = blocks.len(),
"Responding to BlocksByRange request"
);

let response = Response::success(ResponsePayload::BlocksByRange(blocks));
server.swarm_handle.send_response(channel, response);
}

fn canonical_blocks_by_range(
store: &Store,
start_slot: u64,
count: u64,
step: u64,
) -> Vec<SignedBlock> {
if count == 0 {
return Vec::new();
}

let Some(last_offset) = count
.checked_sub(1)
.and_then(|value| value.checked_mul(step))
else {
return Vec::new();
};
let Some(end_slot) = start_slot.checked_add(last_offset) else {
return Vec::new();
};

let mut roots_by_slot = HashMap::new();
let mut current_root = store.head();

while !current_root.is_zero() {
let Some(header) = store.get_block_header(&current_root) else {
break;
};

if header.slot < start_slot {
break;
}

if header.slot <= end_slot && (header.slot - start_slot) % step == 0 {
roots_by_slot.insert(header.slot, current_root);
}

current_root = header.parent_root;
}

(0..count)
.filter_map(|index| {
let slot = start_slot.checked_add(index.checked_mul(step)?)?;
let root = roots_by_slot.get(&slot)?;
store.get_signed_block(root)
})
.collect()
}

async fn handle_blocks_by_root_response(
server: &mut P2PServer,
blocks: Vec<SignedBlock>,
Expand Down Expand Up @@ -313,3 +421,67 @@ async fn handle_fetch_failure(

send_after(backoff, ctx.clone(), p2p_protocol::RetryBlockFetch { root });
}

#[cfg(test)]
mod tests {
use super::*;
use ethlambda_storage::{ForkCheckpoints, backend::InMemoryBackend};
use ethlambda_types::{
attestation::XmssSignature,
block::{Block, BlockBody, BlockSignatures},
signature::SIGNATURE_SIZE,
state::State,
};
use libssz_types::SszList;
use std::sync::Arc;

fn signed_block(slot: u64, parent_root: H256) -> SignedBlock {
SignedBlock {
message: Block {
slot,
proposer_index: 0,
parent_root,
state_root: H256::ZERO,
body: BlockBody::default(),
},
signature: BlockSignatures {
attestation_signatures: SszList::new(),
proposer_signature: XmssSignature::try_from(vec![0u8; SIGNATURE_SIZE]).unwrap(),
},
}
}

#[test]
fn blocks_by_range_returns_canonical_blocks_in_requested_order() {
let backend = Arc::new(InMemoryBackend::new());
let mut store = Store::from_anchor_state(backend, State::from_genesis(0, vec![]));

let block_1 = signed_block(1, store.head());
let root_1 = block_1.message.hash_tree_root();
store.insert_signed_block(root_1, block_1);

let block_2 = signed_block(2, root_1);
let root_2 = block_2.message.hash_tree_root();
store.insert_signed_block(root_2, block_2);

let side_block_3 = signed_block(3, root_1);
let side_root_3 = side_block_3.message.hash_tree_root();
store.insert_signed_block(side_root_3, side_block_3);

let block_4 = signed_block(4, root_2);
let root_4 = block_4.message.hash_tree_root();
store.insert_signed_block(root_4, block_4);
store.update_checkpoints(ForkCheckpoints::head_only(root_4));

let blocks = canonical_blocks_by_range(&store, 1, 4, 1);
let slots: Vec<_> = blocks.iter().map(|block| block.message.slot).collect();
let roots: Vec<_> = blocks
.iter()
.map(|block| block.message.hash_tree_root())
.collect();

assert_eq!(slots, vec![1, 2, 4]);
assert_eq!(roots, vec![root_1, root_2, root_4]);
assert!(!roots.contains(&side_root_3));
}
}
13 changes: 11 additions & 2 deletions crates/net/p2p/src/req_resp/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use libssz_types::SszList;

pub const STATUS_PROTOCOL_V1: &str = "/leanconsensus/req/status/1/ssz_snappy";
pub const BLOCKS_BY_ROOT_PROTOCOL_V1: &str = "/leanconsensus/req/blocks_by_root/1/ssz_snappy";
pub const BLOCKS_BY_RANGE_PROTOCOL_V1: &str = "/leanconsensus/req/blocks_by_range/1/ssz_snappy";
pub const MAX_REQUEST_BLOCKS: u64 = 1024; // Maximum number of blocks in a single request (1024).

#[derive(Debug, Clone)]
pub enum Request {
Status(Status),
BlocksByRoot(BlocksByRootRequest),
BlocksByRange(BlocksByRangeRequest),
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -89,6 +92,7 @@ impl std::fmt::Debug for ResponseCode {
pub enum ResponsePayload {
Status(Status),
BlocksByRoot(Vec<SignedBlock>),
BlocksByRange(Vec<SignedBlock>),
}

#[derive(Debug, Clone, SszEncode, SszDecode)]
Expand All @@ -106,8 +110,6 @@ pub type ErrorMessage = SszList<u8, 256>;
/// Helper to create an ErrorMessage from a string.
/// Debug builds panic if message exceeds 256 bytes (programming error).
/// Release builds truncate to 256 bytes.
#[expect(dead_code)]
// TODO: map errors to req/resp error messages
pub fn error_message(msg: impl AsRef<str>) -> ErrorMessage {
let bytes = msg.as_ref().as_bytes();
debug_assert!(
Expand All @@ -130,3 +132,10 @@ pub fn error_message(msg: impl AsRef<str>) -> ErrorMessage {
pub struct BlocksByRootRequest {
pub roots: RequestedBlockRoots,
}

#[derive(Debug, Clone, SszEncode, SszDecode)]
pub struct BlocksByRangeRequest {
pub start_slot: u64,
pub count: u64,
pub step: u64,
}
3 changes: 2 additions & 1 deletion crates/net/p2p/src/req_resp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use codec::Codec;
pub use encoding::{MAX_COMPRESSED_PAYLOAD_SIZE, MAX_PAYLOAD_SIZE};
pub use handlers::{build_status, fetch_block_from_peer, handle_req_resp_message};
pub use messages::{
BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, RequestedBlockRoots, Response,
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest,
BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, RequestedBlockRoots, Response,
ResponsePayload, STATUS_PROTOCOL_V1, Status,
};