From 0b67fefe020387638f0c490cbe10f9c0225e8d06 Mon Sep 17 00:00:00 2001 From: officeyutong Date: Wed, 6 May 2026 21:25:09 +0800 Subject: [PATCH 1/3] replace `StreamHandle` with `SubstreamInner` --- tentacle/src/quic/mod.rs | 4 +++ tentacle/src/quic/stream.rs | 43 ++++++++++++++++++++++++ tentacle/src/session.rs | 12 +++---- tentacle/src/substream.rs | 66 ++++++++++++++++++++++++++++++++----- 4 files changed, 111 insertions(+), 14 deletions(-) create mode 100644 tentacle/src/quic/stream.rs diff --git a/tentacle/src/quic/mod.rs b/tentacle/src/quic/mod.rs index a4212070..04e4c213 100644 --- a/tentacle/src/quic/mod.rs +++ b/tentacle/src/quic/mod.rs @@ -1,3 +1,5 @@ +#![allow(missing_docs)] + #[allow(missing_docs)] pub mod identity_mol; @@ -13,3 +15,5 @@ pub mod error; #[allow(missing_docs)] /// Verifier for rustls pub mod verifier; + +pub mod stream; diff --git a/tentacle/src/quic/stream.rs b/tentacle/src/quic/stream.rs new file mode 100644 index 00000000..366e3a04 --- /dev/null +++ b/tentacle/src/quic/stream.rs @@ -0,0 +1,43 @@ +use std::pin::Pin; + +use tokio::io::{AsyncRead, AsyncWrite}; + +#[derive(Debug)] +pub struct QuicBiStream { + pub(crate) send: quinn::SendStream, + pub(crate) recv: quinn::RecvStream, +} + +impl AsyncRead for QuicBiStream { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.recv).poll_read(cx, buf) + } +} + +impl AsyncWrite for QuicBiStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + AsyncWrite::poll_write(Pin::new(&mut self.send), cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.send).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.send).poll_shutdown(cx) + } +} diff --git a/tentacle/src/session.rs b/tentacle/src/session.rs index 5d275f17..63697f72 100644 --- a/tentacle/src/session.rs +++ b/tentacle/src/session.rs @@ -16,7 +16,7 @@ use yamux::{Control, Session as YamuxSession, StreamHandle}; use crate::{ ProtocolId, SessionId, StreamId, SubstreamReadPart, buffer::{Buffer, PriorityBuffer, SendResult}, - channel::{QuickSinkExt, mpsc as priority_mpsc, mpsc::Priority}, + channel::{QuickSinkExt, mpsc::{self as priority_mpsc, Priority}}, context::SessionContext, error::{HandshakeErrorKind, ProtocolHandleErrorKind, TransportErrorKind}, multiaddr::Multiaddr, @@ -28,7 +28,7 @@ use crate::{ config::{Meta, SessionConfig}, future_task::BoxedFutureTask, }, - substream::{ProtocolEvent, SubstreamBuilder, SubstreamWritePartBuilder}, + substream::{ProtocolEvent, SubstreamBuilder, SubstreamInner, SubstreamWritePartBuilder}, transports::MultiIncoming, }; @@ -260,7 +260,7 @@ impl Session { procedure: impl Future< Output = Result< ( - Framed, + Framed, String, Option, ), @@ -325,7 +325,7 @@ impl Session { let task = async move { let handle = match control.open_stream().await { - Ok(handle) => handle, + Ok(handle) => SubstreamInner::Yamux(handle), Err(e) => { debug!("session {} open stream error: {}", id, e); return Err(io::ErrorKind::BrokenPipe.into()); @@ -398,7 +398,7 @@ impl Session { }) .collect(); - let task = server_select(substream, proto_metas); + let task = server_select(SubstreamInner::Yamux(substream), proto_metas); self.select_procedure(task); } @@ -407,7 +407,7 @@ impl Session { cx: &mut Context, name: String, version: String, - substream: Box>, + substream: Box>, ) { let proto = match self.protocol_configs_by_name.get(&name) { Some(proto) => proto, diff --git a/tentacle/src/substream.rs b/tentacle/src/substream.rs index 391e366e..c85c6e8e 100644 --- a/tentacle/src/substream.rs +++ b/tentacle/src/substream.rs @@ -10,7 +10,7 @@ use std::{ sync::{Arc, atomic::Ordering}, task::{Context, Poll}, }; -use tokio::io::AsyncWrite; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio_util::codec::{Framed, length_delimited::LengthDelimitedCodec}; use crate::{ @@ -22,9 +22,59 @@ use crate::{ protocol_handle_stream::{ServiceProtocolEvent, SessionProtocolEvent}, service::config::SessionConfig, traits::Codec, - yamux::StreamHandle, }; +#[derive(Debug)] +pub(crate) enum SubstreamInner { + Yamux(yamux::StreamHandle), + #[cfg(feature = "quic")] + Quic(crate::quic::stream::QuicBiStream), +} + +impl AsyncRead for SubstreamInner { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match self.get_mut() { + Self::Yamux(s) => Pin::new(s).poll_read(cx, buf), + #[cfg(feature = "quic")] + Self::Quic(s) => Pin::new(s).poll_read(cx, buf), + } + } +} + +impl AsyncWrite for SubstreamInner { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match self.get_mut() { + Self::Yamux(s) => Pin::new(s).poll_write(cx, buf), + #[cfg(feature = "quic")] + Self::Quic(s) => Pin::new(s).poll_write(cx, buf), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + Self::Yamux(s) => Pin::new(s).poll_flush(cx), + #[cfg(feature = "quic")] + Self::Quic(s) => Pin::new(s).poll_flush(cx), + } + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + Self::Yamux(s) => Pin::new(s).poll_shutdown(cx), + #[cfg(feature = "quic")] + Self::Quic(s) => Pin::new(s).poll_shutdown(cx), + } + } +} + /// Event generated/received by the protocol stream #[derive(Debug)] pub(crate) enum ProtocolEvent { @@ -33,7 +83,7 @@ pub(crate) enum ProtocolEvent { /// Protocol name proto_name: String, /// Yamux sub stream handle handshake framed - substream: Box>, + substream: Box>, /// Protocol version version: String, }, @@ -65,7 +115,7 @@ pub(crate) enum ProtocolEvent { /// Each custom protocol in a session corresponds to a sub stream /// Can be seen as the route of each protocol pub(crate) struct Substream { - substream: Framed, + substream: Framed, id: StreamId, proto_id: ProtocolId, @@ -562,7 +612,7 @@ impl SubstreamBuilder { self } - pub fn build(self, substream: Framed) -> Substream + pub fn build(self, substream: Framed) -> Substream where U: Codec, { @@ -592,7 +642,7 @@ impl SubstreamBuilder { /* Code organization under read-write separation */ pub(crate) struct SubstreamWritePart { - substream: SplitSink, bytes::Bytes>, + substream: SplitSink, bytes::Bytes>, id: StreamId, proto_id: ProtocolId, @@ -863,7 +913,7 @@ where /// Protocol Stream read part pub struct SubstreamReadPart { - pub(crate) substream: SplitStream>>, + pub(crate) substream: SplitStream>>, pub(crate) before_receive: Option, pub(crate) proto_id: ProtocolId, pub(crate) stream_id: StreamId, @@ -966,7 +1016,7 @@ impl SubstreamWritePartBuilder { pub fn build( self, - substream: SplitSink, bytes::Bytes>, + substream: SplitSink, bytes::Bytes>, ) -> SubstreamWritePart where U: Codec, From 83eb6c48de54641e664860b6b17b922946a6be29 Mon Sep 17 00:00:00 2001 From: officeyutong Date: Fri, 8 May 2026 13:18:51 +0800 Subject: [PATCH 2/3] Implement QuicEndpoint and QuicListener --- tentacle/src/quic/endpoint.rs | 600 ++++++++++++++++++++++++++++++++++ tentacle/src/quic/error.rs | 26 +- tentacle/src/quic/mod.rs | 5 + tentacle/src/quic/session.rs | 60 ++++ 4 files changed, 690 insertions(+), 1 deletion(-) create mode 100644 tentacle/src/quic/endpoint.rs create mode 100644 tentacle/src/quic/session.rs diff --git a/tentacle/src/quic/endpoint.rs b/tentacle/src/quic/endpoint.rs new file mode 100644 index 00000000..f4b7cb40 --- /dev/null +++ b/tentacle/src/quic/endpoint.rs @@ -0,0 +1,600 @@ +//! QUIC endpoint, listener, and dial entry points. +//! +//! - [`QuicEndpoint`] is a factory holding the local TLS certificate +//! (carrying the tentacle identity extension), the local secio key, and a +//! pre-built `quinn::ServerConfig`. It is the user-facing entry point that +//! the higher-level service builder hands to the transport layer. +//! - [`QuicEndpoint::listen`] binds a UDP socket and returns a +//! [`QuicListener`] that yields accepted [`QuicSession`]s (each one with +//! its TLS handshake already completed and verified). +//! - [`QuicEndpoint::dial`] opens a one-shot client endpoint and dials the +//! given multiaddr, returning a fully-handshaken [`QuicSession`]. +//! - [`parse_quic_multiaddr`] enforces the legal address shape. +//! +//! Endpoint reuse / pooling is left to a future manager layer; this module +//! deliberately keeps `dial` to a fresh client endpoint per call so the +//! basic flow can be unit-tested in isolation. + +use std::{net::SocketAddr, sync::Arc, time::Duration}; + +use quinn::crypto::rustls::{QuicClientConfig, QuicServerConfig}; +use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; + +use crate::{ + multiaddr::{Multiaddr, Protocol}, + quic::{ + config::QuicConfig, + error::QuicErrorKind, + identity::{TentacleQuicCert, build_self_signed, extract_identity}, + session::QuicSession, + verifier::{TentacleQuicClientCertVerifier, TentacleQuicServerCertVerifier}, + }, + secio::{KeyProvider, PeerId, PublicKey}, +}; + +/// `ServerName` passed to quinn when dialing — hostname checks are skipped by +/// our verifier (see [`crate::quic::verifier`]) so any RFC 2606-reserved name +/// works. `.invalid` will not collide with real DNS. +const TENTACLE_QUIC_SNI: &str = "tentacle.invalid"; + +// ─────────────────────────────────── QuicEndpoint ────────────────────────────────── + +/// Factory for QUIC listeners and outgoing dials. +/// +/// One [`QuicEndpoint`] corresponds to a single local secio identity and a +/// single self-signed TLS certificate. It is intended to be kept around for +/// the lifetime of the tentacle service. +pub struct QuicEndpoint { + local_key: K, + cert_der: Vec, + key_der: Vec, + server_config: quinn::ServerConfig, + config: Arc, +} + +impl QuicEndpoint { + /// Build a new QUIC endpoint factory from a secio `KeyProvider` and a + /// transport-level [`QuicConfig`]. + /// + /// This generates a fresh self-signed Ed25519 TLS certificate carrying the + /// tentacle identity extension (see [`crate::quic::identity`]) and + /// pre-builds a `quinn::ServerConfig` that authenticates clients via + /// [`TentacleQuicClientCertVerifier`]. Outgoing client configs are built + /// per-dial so that a per-dial `expected_peer_id` can be wired into the + /// server-cert verifier. + pub fn new(local_key: K, config: QuicConfig) -> Result { + let cert = build_self_signed(&local_key)?; + let server_config = build_quinn_server_config(local_key.clone(), &cert, &config)?; + Ok(Self { + local_key, + cert_der: cert.cert_der, + key_der: cert.key_der, + server_config, + config: Arc::new(config), + }) + } + + /// Bind a server-capable QUIC endpoint to the UDP address described by + /// `addr` and return a [`QuicListener`] yielding accepted sessions. + /// + /// `addr` must match the shape accepted by [`parse_quic_multiaddr`]. + pub fn listen(&self, addr: Multiaddr) -> Result { + let (socket_addr, _peer_id) = parse_quic_multiaddr(&addr)?; + let endpoint = quinn::Endpoint::server(self.server_config.clone(), socket_addr)?; + let local_addr = endpoint.local_addr()?; + Ok(QuicListener { + endpoint, + listen_addr: socketaddr_to_quic_multiaddr(local_addr), + }) + } + + /// Dial a remote QUIC peer and complete the TLS handshake. + /// + /// On success the returned [`QuicSession`] holds a `quinn::Connection` + /// whose peer certificate has already passed the tentacle verifier + /// checks. The remote secio public key is recovered from the peer cert's + /// identity extension and made available via + /// [`QuicSession::remote_pubkey`]. + pub async fn dial(&self, addr: Multiaddr) -> Result { + let (socket_addr, expected_peer_id) = parse_quic_multiaddr(&addr)?; + + let client_config = build_quinn_client_config( + self.local_key.clone(), + &self.cert_der, + &self.key_der, + expected_peer_id, + )?; + + // One-shot client-only endpoint per dial. Pooling/reuse is deferred + // to a future manager layer. + let bind_addr: SocketAddr = match socket_addr { + SocketAddr::V4(_) => "0.0.0.0:0".parse().unwrap(), + SocketAddr::V6(_) => "[::]:0".parse().unwrap(), + }; + let endpoint = quinn::Endpoint::client(bind_addr)?; + + let connecting = endpoint.connect_with(client_config, socket_addr, TENTACLE_QUIC_SNI)?; + let conn = connecting.await?; + + let remote_pubkey = peer_pubkey_from_connection(&conn)?; + + // The connection holds a clone of the endpoint's UDP driver, but the + // endpoint itself must stay alive to keep routing inbound datagrams. + // Attach a guard task that holds the endpoint until the connection + // closes. The full session machinery will own this directly once it + // is implemented. + spawn_endpoint_keepalive(endpoint, conn.clone()); + + Ok(QuicSession::new(conn, remote_pubkey)) + } + + /// Read-only access to the configured transport parameters. + pub fn config(&self) -> &QuicConfig { + &self.config + } +} + +// ─────────────────────────────────── QuicListener ────────────────────────────────── + +/// Server-side QUIC listener, wrapping a bound `quinn::Endpoint`. +/// +/// Each call to [`QuicListener::accept`] yields a fully-handshaken +/// [`QuicSession`] together with the multiaddr of the remote peer. +pub struct QuicListener { + endpoint: quinn::Endpoint, + listen_addr: Multiaddr, +} + +impl QuicListener { + /// The actual local listen address (resolved after bind, so e.g. + /// `/udp/0/quic-v1` becomes `/udp//quic-v1`). + pub fn listen_addr(&self) -> &Multiaddr { + &self.listen_addr + } + + /// Accept the next incoming connection, drive its TLS handshake to + /// completion, and return the resulting [`QuicSession`] paired with the + /// remote peer's multiaddr. + /// + /// Returns `Ok(None)` when the endpoint has been closed. + pub async fn accept(&self) -> Result, QuicErrorKind> { + let incoming = match self.endpoint.accept().await { + Some(i) => i, + None => return Ok(None), + }; + let remote_addr = incoming.remote_address(); + let conn = incoming.await?; + let remote_pubkey = peer_pubkey_from_connection(&conn)?; + Ok(Some(( + socketaddr_to_quic_multiaddr(remote_addr), + QuicSession::new(conn, remote_pubkey), + ))) + } + + /// Stop accepting new connections and close the underlying UDP socket. + pub fn close(&self, error_code: u32, reason: &[u8]) { + self.endpoint.close(error_code.into(), reason); + } + + /// Borrow the underlying `quinn::Endpoint` (for tests / integration). + pub fn endpoint(&self) -> &quinn::Endpoint { + &self.endpoint + } +} + +// ──────────────────────────────── address parsing ──────────────────────────────── + +/// Parse a tentacle QUIC multiaddr. +/// +/// Accepts: +/// - `/ip4//udp//quic-v1` +/// - `/ip6//udp//quic-v1` +/// - either form followed by an optional `/p2p/` suffix +/// +/// Rejects (with `QuicErrorKind::InvalidAddress`): +/// - DNS-form addresses (`/dns4/...`, `/dns6/...`) +/// - missing `/quic-v1` suffix +/// - non-UDP intermediate (e.g. `/tcp/...`) +/// - any other unexpected protocol stack +pub fn parse_quic_multiaddr( + addr: &Multiaddr, +) -> Result<(SocketAddr, Option), QuicErrorKind> { + let mut iter = addr.iter(); + + let ip = match iter.next() { + Some(Protocol::Ip4(ip)) => std::net::IpAddr::V4(ip), + Some(Protocol::Ip6(ip)) => std::net::IpAddr::V6(ip), + _ => { + return Err(QuicErrorKind::InvalidAddress(format!( + "expected /ip4/.../udp//quic-v1 or /ip6/.../udp//quic-v1, got {}", + addr + ))); + } + }; + + let port = match iter.next() { + Some(Protocol::Udp(p)) => p, + _ => { + return Err(QuicErrorKind::InvalidAddress(format!( + "QUIC multiaddr must use /udp/ after the IP, got {}", + addr + ))); + } + }; + + match iter.next() { + Some(Protocol::QuicV1) => {} + _ => { + return Err(QuicErrorKind::InvalidAddress(format!( + "QUIC multiaddr must end with /quic-v1 after /udp/, got {}", + addr + ))); + } + } + + // Optional /p2p/ tail. Anything else is a malformed address. + let mut peer_id = None; + for proto in iter { + match proto { + Protocol::P2P(raw) => { + if peer_id.is_some() { + return Err(QuicErrorKind::InvalidAddress(format!( + "QUIC multiaddr contains multiple /p2p/ components: {}", + addr + ))); + } + peer_id = Some(PeerId::from_bytes(raw.to_vec()).map_err(|e| { + QuicErrorKind::InvalidAddress(format!( + "invalid /p2p/ component in {}: {:?}", + addr, e + )) + })?); + } + other => { + return Err(QuicErrorKind::InvalidAddress(format!( + "unexpected protocol {:?} after /quic-v1 in {}", + other, addr + ))); + } + } + } + + Ok((SocketAddr::new(ip, port), peer_id)) +} + +/// Inverse of [`parse_quic_multiaddr`] (peer_id-less): build +/// `/ip{4,6}//udp//quic-v1` from a `SocketAddr`. +fn socketaddr_to_quic_multiaddr(addr: SocketAddr) -> Multiaddr { + let ip_proto = match addr.ip() { + std::net::IpAddr::V4(ip) => Protocol::Ip4(ip), + std::net::IpAddr::V6(ip) => Protocol::Ip6(ip), + }; + [ip_proto, Protocol::Udp(addr.port()), Protocol::QuicV1] + .into_iter() + .collect() +} + +// ────────────────────────────────── helpers ────────────────────────────────── + +/// Build the `quinn::ServerConfig` used by [`QuicEndpoint`]. +fn build_quinn_server_config( + local_key: K, + cert: &TentacleQuicCert, + config: &QuicConfig, +) -> Result { + let cert_der = CertificateDer::from(cert.cert_der.clone()); + let key_der: PrivatePkcs8KeyDer<'static> = PrivatePkcs8KeyDer::from(cert.key_der.clone()); + + let rustls_cfg = rustls::ServerConfig::builder() + .with_client_cert_verifier(Arc::new(TentacleQuicClientCertVerifier::new(local_key))) + .with_single_cert(vec![cert_der], key_der.into()) + .map_err(|e| QuicErrorKind::TlsConfig(e.to_string()))?; + + let quic_crypto = QuicServerConfig::try_from(rustls_cfg) + .map_err(|e| QuicErrorKind::TlsConfig(e.to_string()))?; + let mut server_cfg = quinn::ServerConfig::with_crypto(Arc::new(quic_crypto)); + server_cfg.transport_config(Arc::new(build_transport_config(config)?)); + Ok(server_cfg) +} + +/// Build a fresh `quinn::ClientConfig` for a single dial. +/// +/// `expected_peer_id` is the `/p2p/` from the dial target multiaddr, +/// if any. The resulting client config is single-use because the verifier +/// captures `expected_peer_id`. +fn build_quinn_client_config( + local_key: K, + cert_der: &[u8], + key_der: &[u8], + expected_peer_id: Option, +) -> Result { + let cert = CertificateDer::from(cert_der.to_vec()); + let key: PrivatePkcs8KeyDer<'static> = PrivatePkcs8KeyDer::from(key_der.to_vec()); + + let rustls_cfg = rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(TentacleQuicServerCertVerifier::new( + local_key, + expected_peer_id, + ))) + .with_client_auth_cert(vec![cert], key.into()) + .map_err(|e| QuicErrorKind::TlsConfig(e.to_string()))?; + + let quic_crypto = QuicClientConfig::try_from(rustls_cfg) + .map_err(|e| QuicErrorKind::TlsConfig(e.to_string()))?; + Ok(quinn::ClientConfig::new(Arc::new(quic_crypto))) +} + +/// Convert a [`QuicConfig`] into a `quinn::TransportConfig`. +fn build_transport_config(config: &QuicConfig) -> Result { + let mut tc = quinn::TransportConfig::default(); + + let idle: quinn::IdleTimeout = + quinn::VarInt::from_u64(config.max_idle_timeout.as_millis() as u64) + .map_err(|e| QuicErrorKind::TlsConfig(format!("max_idle_timeout out of range: {}", e)))? + .into(); + tc.max_idle_timeout(Some(idle)); + tc.keep_alive_interval(config.keep_alive_interval); + tc.max_concurrent_bidi_streams( + quinn::VarInt::from_u64(config.max_concurrent_bidi_streams).map_err(|e| { + QuicErrorKind::TlsConfig(format!("max_concurrent_bidi_streams out of range: {}", e)) + })?, + ); + + // Disable QUIC features not supported by tentacle v1: uni-streams and + // datagrams are deliberately turned off — only bidi streams are used. + tc.max_concurrent_uni_streams(0u32.into()); + tc.datagram_receive_buffer_size(None); + tc.datagram_send_buffer_size(0); + + Ok(tc) +} + +/// Recover the remote secio `PublicKey` from the leaf certificate that the +/// peer presented during the QUIC handshake. +/// +/// The verifier (`crate::quic::verifier`) has already validated the cert and +/// the binding signature by the time this is called, so the only remaining +/// failure modes here are "no cert at all" (impossible under mutual auth, but +/// defensive) and re-decoding the molecule payload. +fn peer_pubkey_from_connection(conn: &quinn::Connection) -> Result { + let any = conn.peer_identity().ok_or(QuicErrorKind::NoPeerCert)?; + let chain: Box>> = any + .downcast::>>() + .map_err(|_| QuicErrorKind::NoPeerCert)?; + let leaf = chain.first().ok_or(QuicErrorKind::NoPeerCert)?; + let identity = extract_identity(leaf.as_ref())?; + let pubkey_bytes = identity.secio_pubkey().raw_data().to_vec(); + Ok(PublicKey::from_raw_key(pubkey_bytes)) +} + +/// Drop guard that keeps a `quinn::Endpoint` alive for the lifetime of a +/// `quinn::Connection`. quinn's `Connection` does not retain a clone of the +/// `Endpoint`, so without this the inbound UDP driver would be torn down as +/// soon as `dial()` returns. +fn spawn_endpoint_keepalive(endpoint: quinn::Endpoint, conn: quinn::Connection) { + crate::runtime::spawn(async move { + let _ = conn.closed().await; + endpoint.close(0u32.into(), b"closed"); + let _ = tokio::time::timeout(Duration::from_millis(100), endpoint.wait_idle()).await; + }); +} + +// ──────────────────────────────────── tests ──────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::secio::SecioKeyPair; + use std::str::FromStr; + + // ────────────── address parsing ────────────── + + #[test] + fn parse_ip4_quic_ok() { + let addr = Multiaddr::from_str("/ip4/127.0.0.1/udp/4433/quic-v1").unwrap(); + let (sock, peer) = parse_quic_multiaddr(&addr).unwrap(); + assert_eq!(sock, "127.0.0.1:4433".parse::().unwrap()); + assert!(peer.is_none()); + } + + #[test] + fn parse_ip6_quic_ok() { + let addr = Multiaddr::from_str("/ip6/::1/udp/4433/quic-v1").unwrap(); + let (sock, peer) = parse_quic_multiaddr(&addr).unwrap(); + assert_eq!(sock, "[::1]:4433".parse::().unwrap()); + assert!(peer.is_none()); + } + + #[test] + fn parse_ip4_quic_with_peer() { + let key = SecioKeyPair::secp256k1_generated(); + let pid = key.peer_id().to_base58(); + let addr = Multiaddr::from_str(&format!("/ip4/127.0.0.1/udp/4433/quic-v1/p2p/{}", pid)) + .unwrap(); + let (sock, peer) = parse_quic_multiaddr(&addr).unwrap(); + assert_eq!(sock, "127.0.0.1:4433".parse::().unwrap()); + assert_eq!(peer.unwrap(), key.peer_id()); + } + + #[test] + fn reject_dns4_quic() { + let addr = Multiaddr::from_str("/dns4/example.com/udp/4433/quic-v1").unwrap(); + assert!(matches!( + parse_quic_multiaddr(&addr), + Err(QuicErrorKind::InvalidAddress(_)) + )); + } + + #[test] + fn reject_dns6_quic() { + let addr = Multiaddr::from_str("/dns6/example.com/udp/4433/quic-v1").unwrap(); + assert!(matches!( + parse_quic_multiaddr(&addr), + Err(QuicErrorKind::InvalidAddress(_)) + )); + } + + #[test] + fn reject_tcp_quic() { + let addr = Multiaddr::from_str("/ip4/127.0.0.1/tcp/4433/quic-v1").unwrap(); + assert!(matches!( + parse_quic_multiaddr(&addr), + Err(QuicErrorKind::InvalidAddress(_)) + )); + } + + #[test] + fn reject_missing_quic_suffix() { + let addr = Multiaddr::from_str("/ip4/127.0.0.1/udp/4433").unwrap(); + assert!(matches!( + parse_quic_multiaddr(&addr), + Err(QuicErrorKind::InvalidAddress(_)) + )); + } + + #[test] + fn reject_plain_tcp() { + let addr = Multiaddr::from_str("/ip4/127.0.0.1/tcp/4433").unwrap(); + assert!(matches!( + parse_quic_multiaddr(&addr), + Err(QuicErrorKind::InvalidAddress(_)) + )); + } + + #[test] + fn reject_trailing_garbage() { + let key = SecioKeyPair::secp256k1_generated(); + let pid = key.peer_id().to_base58(); + let addr = Multiaddr::from_str(&format!( + "/ip4/127.0.0.1/udp/4433/quic-v1/p2p/{}/p2p/{}", + pid, pid + )) + .unwrap(); + assert!(matches!( + parse_quic_multiaddr(&addr), + Err(QuicErrorKind::InvalidAddress(_)) + )); + } + + // ────────────── round-trip ────────────── + + #[test] + fn socketaddr_round_trip() { + let original: SocketAddr = "127.0.0.1:4433".parse().unwrap(); + let ma = socketaddr_to_quic_multiaddr(original); + let (back, _) = parse_quic_multiaddr(&ma).unwrap(); + assert_eq!(back, original); + } + + // ────────────── endpoint construction ────────────── + + #[test] + fn endpoint_new_succeeds() { + let key = SecioKeyPair::secp256k1_generated(); + QuicEndpoint::new(key, QuicConfig::default()).expect("endpoint construction"); + } + + // ────────────── end-to-end smoke ────────────── + + /// One server + one client, both built from `QuicEndpoint`. The client + /// dials with the server's PeerId pinned, opens a bidi stream, and + /// expects the server to echo a short message back. Validates that the + /// listener / dial / handshake / pubkey extraction wiring works in concert. + #[tokio::test] + async fn end_to_end_dial_and_echo() { + let server_key = SecioKeyPair::secp256k1_generated(); + let server_pid = server_key.peer_id(); + let server_endpoint = + QuicEndpoint::new(server_key.clone(), QuicConfig::default()).unwrap(); + + let listener = server_endpoint + .listen(Multiaddr::from_str("/ip4/127.0.0.1/udp/0/quic-v1").unwrap()) + .expect("listen"); + let server_addr = listener.listen_addr().clone(); + + // Server task: accept, read up to 64 bytes from a bidi stream, echo back. + let server_task = tokio::spawn(async move { + let (_remote_addr, session) = listener + .accept() + .await + .expect("accept ok") + .expect("not closed"); + let conn = session.connection().clone(); + let (mut send, mut recv) = conn.accept_bi().await.expect("accept_bi"); + let mut buf = vec![0u8; 64]; + let n = recv.read(&mut buf).await.expect("read").expect("data"); + buf.truncate(n); + send.write_all(&buf).await.expect("write echo"); + send.finish().expect("finish"); + // Hold the conn until client closes. + conn.closed().await; + buf + }); + + // Client dials with the server's PeerId pinned, exchanges one message. + let client_key = SecioKeyPair::secp256k1_generated(); + let client_endpoint = QuicEndpoint::new(client_key, QuicConfig::default()).unwrap(); + + let dial_addr_with_peer: Multiaddr = + format!("{}/p2p/{}", server_addr, server_pid.to_base58()) + .parse() + .unwrap(); + let session = client_endpoint + .dial(dial_addr_with_peer) + .await + .expect("client dial ok"); + + assert_eq!(*session.remote_pubkey(), server_key.public_key()); + + let conn = session.connection().clone(); + let (mut send, mut recv) = conn.open_bi().await.expect("open_bi"); + send.write_all(b"hello").await.expect("write"); + send.finish().expect("finish"); + let mut echo = vec![0u8; 64]; + let n = recv.read(&mut echo).await.expect("read").expect("data"); + echo.truncate(n); + assert_eq!(echo, b"hello"); + conn.close(0u32.into(), b"done"); + + let server_observed = server_task.await.expect("server join"); + assert_eq!(server_observed, b"hello"); + } + + /// Dial fails when the dial target multiaddr pins a `/p2p/` that + /// does not match the server's actual identity. The server-cert verifier + /// rejects the handshake at TLS level, surfacing as a connection error. + #[tokio::test] + async fn dial_rejects_wrong_peer_id() { + let server_key = SecioKeyPair::secp256k1_generated(); + let server_endpoint = + QuicEndpoint::new(server_key.clone(), QuicConfig::default()).unwrap(); + + let listener = server_endpoint + .listen(Multiaddr::from_str("/ip4/127.0.0.1/udp/0/quic-v1").unwrap()) + .expect("listen"); + let server_addr = listener.listen_addr().clone(); + + // Drive the listener so the handshake can progress (the server-side + // failure is fine; we only need the listener task to keep polling). + let _server_task = tokio::spawn(async move { + let _ = listener.accept().await; + }); + + let client_key = SecioKeyPair::secp256k1_generated(); + let client_endpoint = QuicEndpoint::new(client_key, QuicConfig::default()).unwrap(); + + // Pin a peer_id that the server does NOT have. + let wrong_pid = SecioKeyPair::secp256k1_generated().peer_id(); + let dial_addr: Multiaddr = format!("{}/p2p/{}", server_addr, wrong_pid.to_base58()) + .parse() + .unwrap(); + + let result = client_endpoint.dial(dial_addr).await; + assert!( + result.is_err(), + "dial with wrong /p2p/ must fail, got Ok({:?})", + result.as_ref().ok().map(|s| s.remote_pubkey().clone()), + ); + } +} diff --git a/tentacle/src/quic/error.rs b/tentacle/src/quic/error.rs index eaa7ac57..b41ea28f 100644 --- a/tentacle/src/quic/error.rs +++ b/tentacle/src/quic/error.rs @@ -1,4 +1,4 @@ -/// Errors that can occur while building or validating QUIC identities. +/// Errors that can occur in the QUIC transport. #[derive(Debug, thiserror::Error)] pub enum QuicErrorKind { /// Failed to build or parse the X.509 certificate. @@ -20,4 +20,28 @@ pub enum QuicErrorKind { /// Multiple identity found #[error("Multiple identity found")] MultipleIdentityFound, + + /// Multiaddr is not a valid QUIC address (must be `/ip{4,6}/.../udp/.../quic-v1`). + #[error("Invalid QUIC multiaddr: {0}")] + InvalidAddress(String), + + /// Configured TLS settings are not acceptable for QUIC (e.g. wrong cipher suite). + #[error("TLS configuration error: {0}")] + TlsConfig(String), + + /// Underlying UDP socket bind / endpoint construction failed. + #[error("Endpoint bind failed: {0}")] + EndpointBind(#[from] std::io::Error), + + /// Failed to start dialing — invalid configuration / server name etc. + #[error("Connect failed: {0}")] + Connect(#[from] quinn::ConnectError), + + /// QUIC connection ended in failure (handshake timeout, transport error, …). + #[error("Connection error: {0}")] + Connection(#[from] quinn::ConnectionError), + + /// Peer did not present a TLS certificate during the QUIC handshake. + #[error("Peer did not present a certificate")] + NoPeerCert, } diff --git a/tentacle/src/quic/mod.rs b/tentacle/src/quic/mod.rs index 04e4c213..fdca1148 100644 --- a/tentacle/src/quic/mod.rs +++ b/tentacle/src/quic/mod.rs @@ -17,3 +17,8 @@ pub mod error; pub mod verifier; pub mod stream; + +pub mod endpoint; + +/// QUIC session wrapper (placeholder until PR 4 §步骤 E lands the full main loop). +pub mod session; diff --git a/tentacle/src/quic/session.rs b/tentacle/src/quic/session.rs new file mode 100644 index 00000000..fc17fc17 --- /dev/null +++ b/tentacle/src/quic/session.rs @@ -0,0 +1,60 @@ +//! Placeholder for the QUIC session main loop. +//! +//! At the current stage of the QUIC integration, only the +//! handshake-completed shell is implemented. The full session main loop +//! (event routing, protocol lifecycle, substream multiplexing) lands +//! together with the integration into `InnerService` in a follow-up change. +//! +//! For now this module exposes a thin wrapper around a `quinn::Connection` +//! plus the remote secio public key recovered from the peer certificate. It +//! is what `QuicEndpoint::dial()` returns and what `QuicListener::accept()` +//! yields. + +use crate::secio::PublicKey; + +/// A successfully-handshaken QUIC session. +/// +/// Holds a `quinn::Connection` whose TLS handshake has already completed and +/// passed the [`crate::quic::verifier`] checks, together with the remote secio +/// public key recovered from the peer certificate's tentacle identity +/// extension. +/// +/// The full session machinery (substream multiplexing, protocol open/close, +/// `SessionEvent` plumbing) is not implemented here yet. Callers can currently +/// inspect `remote_pubkey()` and access the underlying `quinn::Connection` via +/// [`QuicSession::into_inner`]. +#[derive(Debug)] +pub struct QuicSession { + conn: quinn::Connection, + remote_pubkey: PublicKey, +} + +impl QuicSession { + /// Wrap a freshly-handshaken `quinn::Connection` together with the remote + /// secio public key extracted from the peer certificate. + pub(crate) fn new(conn: quinn::Connection, remote_pubkey: PublicKey) -> Self { + Self { + conn, + remote_pubkey, + } + } + + /// Remote peer's secio public key, recovered from the tentacle identity + /// extension on the TLS leaf cert. + pub fn remote_pubkey(&self) -> &PublicKey { + &self.remote_pubkey + } + + /// Borrow the underlying `quinn::Connection`. Useful for inspection in + /// tests; the eventual session main loop will own it directly. + pub fn connection(&self) -> &quinn::Connection { + &self.conn + } + + /// Decompose the wrapper, returning the underlying `quinn::Connection` + /// and remote `PublicKey`. The session main loop will use this when the + /// full session machinery is implemented. + pub fn into_inner(self) -> (quinn::Connection, PublicKey) { + (self.conn, self.remote_pubkey) + } +} From 2cabb01be130342724001caa492cb3c2e30ad9ad Mon Sep 17 00:00:00 2001 From: officeyutong Date: Fri, 8 May 2026 13:35:25 +0800 Subject: [PATCH 3/3] Implement QuicEndpointManager --- tentacle/src/quic/endpoint.rs | 10 ++++----- tentacle/src/quic/manager.rs | 40 +++++++++++++++++++++++++++++++++++ tentacle/src/quic/mod.rs | 4 +++- tentacle/src/session.rs | 5 ++++- 4 files changed, 51 insertions(+), 8 deletions(-) create mode 100644 tentacle/src/quic/manager.rs diff --git a/tentacle/src/quic/endpoint.rs b/tentacle/src/quic/endpoint.rs index f4b7cb40..59e7ea01 100644 --- a/tentacle/src/quic/endpoint.rs +++ b/tentacle/src/quic/endpoint.rs @@ -410,8 +410,8 @@ mod tests { fn parse_ip4_quic_with_peer() { let key = SecioKeyPair::secp256k1_generated(); let pid = key.peer_id().to_base58(); - let addr = Multiaddr::from_str(&format!("/ip4/127.0.0.1/udp/4433/quic-v1/p2p/{}", pid)) - .unwrap(); + let addr = + Multiaddr::from_str(&format!("/ip4/127.0.0.1/udp/4433/quic-v1/p2p/{}", pid)).unwrap(); let (sock, peer) = parse_quic_multiaddr(&addr).unwrap(); assert_eq!(sock, "127.0.0.1:4433".parse::().unwrap()); assert_eq!(peer.unwrap(), key.peer_id()); @@ -505,8 +505,7 @@ mod tests { async fn end_to_end_dial_and_echo() { let server_key = SecioKeyPair::secp256k1_generated(); let server_pid = server_key.peer_id(); - let server_endpoint = - QuicEndpoint::new(server_key.clone(), QuicConfig::default()).unwrap(); + let server_endpoint = QuicEndpoint::new(server_key.clone(), QuicConfig::default()).unwrap(); let listener = server_endpoint .listen(Multiaddr::from_str("/ip4/127.0.0.1/udp/0/quic-v1").unwrap()) @@ -567,8 +566,7 @@ mod tests { #[tokio::test] async fn dial_rejects_wrong_peer_id() { let server_key = SecioKeyPair::secp256k1_generated(); - let server_endpoint = - QuicEndpoint::new(server_key.clone(), QuicConfig::default()).unwrap(); + let server_endpoint = QuicEndpoint::new(server_key.clone(), QuicConfig::default()).unwrap(); let listener = server_endpoint .listen(Multiaddr::from_str("/ip4/127.0.0.1/udp/0/quic-v1").unwrap()) diff --git a/tentacle/src/quic/manager.rs b/tentacle/src/quic/manager.rs new file mode 100644 index 00000000..bb1cb746 --- /dev/null +++ b/tentacle/src/quic/manager.rs @@ -0,0 +1,40 @@ +use std::{collections::HashMap, net::SocketAddr}; + +use futures::io; + +pub struct QuicEndpointManager { + data: HashMap, + server_config: quinn::ServerConfig, +} + +impl QuicEndpointManager { + pub fn new(server_config: quinn::ServerConfig) -> Self { + Self { + data: Default::default(), + server_config, + } + } + pub fn get_or_create_endpoint( + &mut self, + listen_addr: SocketAddr, + ) -> io::Result { + match self.data.entry(listen_addr) { + std::collections::hash_map::Entry::Occupied(occupied_entry) => { + Ok(occupied_entry.get().clone()) + } + std::collections::hash_map::Entry::Vacant(vacant_entry) => { + let endpoint = quinn::Endpoint::server(self.server_config.clone(), listen_addr)?; + vacant_entry.insert(endpoint.clone()); + Ok(endpoint) + } + } + } + pub async fn close_all(&mut self) { + for item in self.data.values() { + item.close(0u32.into(), b"shutdown"); + } + for item in self.data.values() { + item.wait_idle().await; + } + } +} diff --git a/tentacle/src/quic/mod.rs b/tentacle/src/quic/mod.rs index fdca1148..ec089aaf 100644 --- a/tentacle/src/quic/mod.rs +++ b/tentacle/src/quic/mod.rs @@ -20,5 +20,7 @@ pub mod stream; pub mod endpoint; -/// QUIC session wrapper (placeholder until PR 4 §步骤 E lands the full main loop). +/// QUIC session wrapper pub mod session; + +pub mod manager; diff --git a/tentacle/src/session.rs b/tentacle/src/session.rs index 63697f72..3fd0900a 100644 --- a/tentacle/src/session.rs +++ b/tentacle/src/session.rs @@ -16,7 +16,10 @@ use yamux::{Control, Session as YamuxSession, StreamHandle}; use crate::{ ProtocolId, SessionId, StreamId, SubstreamReadPart, buffer::{Buffer, PriorityBuffer, SendResult}, - channel::{QuickSinkExt, mpsc::{self as priority_mpsc, Priority}}, + channel::{ + QuickSinkExt, + mpsc::{self as priority_mpsc, Priority}, + }, context::SessionContext, error::{HandshakeErrorKind, ProtocolHandleErrorKind, TransportErrorKind}, multiaddr::Multiaddr,