From 6006c2a55aceea3ea1be4472644418e8cdfa4811 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Tue, 5 May 2026 13:46:48 +0800 Subject: [PATCH 1/2] mctp-estack: Add WakeOnDrop Helper to implement async locks Signed-off-by: Matt Johnston --- mctp-estack/src/router.rs | 1 + mctp-estack/src/util.rs | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index 6744632..4158a6d 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -7,6 +7,7 @@ #[allow(unused)] use crate::fmt::{debug, error, info, trace, warn}; +use crate::util::WakeOnDrop; use core::cell::RefCell; use core::debug_assert; diff --git a/mctp-estack/src/util.rs b/mctp-estack/src/util.rs index 415f5d8..fe0325d 100644 --- a/mctp-estack/src/util.rs +++ b/mctp-estack/src/util.rs @@ -1,3 +1,6 @@ +use core::ops::{Deref, DerefMut}; +use core::task::Waker; + /// Takes a `usize` from a build-time environment variable. /// /// If unset, the default is used. Can be used in a const context. @@ -114,6 +117,42 @@ impl VectorReader { #[derive(Debug)] pub struct VectorReaderError; +// TODO: Use DropGuard instead once it's stable. +// That can wake _after_ T::drop() +pub struct WakeOnDrop { + waker: Waker, + value: T, +} + +impl WakeOnDrop { + pub fn new(value: T, waker: &Waker) -> Self { + Self { + value, + waker: waker.clone(), + } + } +} + +impl Drop for WakeOnDrop { + fn drop(&mut self) { + self.waker.wake_by_ref(); + } +} + +impl Deref for WakeOnDrop { + type Target = T; + + fn deref(&self) -> &T { + &self.value + } +} + +impl DerefMut for WakeOnDrop { + fn deref_mut(&mut self) -> &mut T { + &mut self.value + } +} + #[cfg(test)] mod tests { #[test] From 0b1081f24dcafcfb8ddb23d8ca9f453aaf024dd9 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Tue, 5 May 2026 13:47:57 +0800 Subject: [PATCH 2/2] mctp-estack: Don't hold a lock to send messages send_message() was holding a blocking lock across the whole send sequence (until the queue was full). This is currently a CriticalSectionMutex so would block interrupts. Instead we can poll until a sender is available. forward_packet() is now made async. Any waits will be short, until a concurrent send_message() completes or runs out of queue to copy into. Signed-off-by: Matt Johnston --- mctp-estack/src/router.rs | 116 +++++++++++++++++++------------------- mctp-estack/src/util.rs | 2 + 2 files changed, 61 insertions(+), 57 deletions(-) diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index 4158a6d..3ded9d5 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -117,9 +117,8 @@ impl core::ops::Deref for PktBuf { pub struct PortTop { /// Forwarded packet queue. channel: FixedChannel, - // Callers should hold send_mutex when using channel.sender(). - // send_message() will wait on send_mutex being available using sender_waker. - send_mutex: BlockingMutex<()>, + /// Only a single Sender can be created from a FixedChannel at a time. + /// sender_waker wakes futures waiting for a Sender. sender_waker: AtomicWaker, } @@ -127,7 +126,6 @@ impl PortTop { pub fn new() -> Self { Self { channel: FixedChannel::new(), - send_mutex: BlockingMutex::new(()), sender_waker: AtomicWaker::new(), } } @@ -145,32 +143,34 @@ impl PortTop { /// Do not call with locks held. /// May block waiting for a port queue to flush. /// Packet must be a valid MCTP packet, may panic otherwise. - fn forward_packet(&self, pkt: &[u8]) -> Result<()> { + async fn forward_packet(&self, pkt: &[u8]) -> Result<()> { debug_assert!(MctpHeader::decode(pkt).is_ok()); + let mut sender = poll_fn(|cx| match self.channel.sender() { + Some(s) => Poll::Ready(WakeOnDrop::new(s, cx.waker())), + None => { + self.sender_waker.register(cx.waker()); + Poll::Pending + } + }) + .await; + + // Get a slot to send // With forwarded packets we don't want to block if // the queue is full (we drop packets instead). - let r = self.send_mutex.lock(|_| { - // OK unwrap, we have the send_mutex - let mut sender = self.channel.sender().unwrap(); - - // Get a slot to send - let slot = sender.try_send().ok_or_else(|| { - debug!("Dropped forward packet"); - Error::TxFailure - })?; - - // Fill the buffer - if slot.set(pkt).is_ok() { - sender.send_done(); - Ok(()) - } else { - debug!("Oversized forward packet"); - Err(Error::TxFailure) - } - }); - self.sender_waker.wake(); - r + let slot = sender.try_send().ok_or_else(|| { + debug!("Dropped forward packet"); + Error::TxFailure + })?; + + // Fill the buffer + if slot.set(pkt).is_ok() { + sender.send_done(); + Ok(()) + } else { + debug!("Oversized forward packet"); + Err(Error::TxFailure) + } } /// Fragments and enqueues a message. @@ -187,39 +187,41 @@ impl PortTop { // It shouldn't hold the send_mutex() across an await, since that would block // forward_packet(). poll_fn(|cx| { - self.send_mutex.lock(|_| { - // OK to unwrap, protected by send_mutex.lock() - let mut sender = self.channel.sender().unwrap(); - - // Send as much as we can in a loop without blocking. - // If it blocks the next poll_fn iteration will continue - // where it left off. - loop { - let Poll::Ready(qpkt) = sender.poll_send(cx) else { - self.sender_waker.register(cx.waker()); - break Poll::Pending; - }; - - qpkt.len = 0; - match fragmenter.fragment_vectored(pkt, &mut qpkt.data) { - SendOutput::Packet(p) => { - qpkt.len = p.len(); - sender.send_done(); - if fragmenter.is_done() { - // Break here rather than using SendOutput::Complete, - // since we don't want to call channel.sender() an extra time. - break Poll::Ready(Ok(fragmenter.tag())); - } - } - SendOutput::Error { err, .. } => { - debug!("Error packetising"); - debug_assert!(false, "fragment () shouldn't fail"); - break Poll::Ready(Err(err)); + let mut sender = match self.channel.sender() { + Some(s) => WakeOnDrop::new(s, cx.waker()), + None => { + self.sender_waker.register(cx.waker()); + return Poll::Pending; + } + }; + + // Send as much as we can in a loop without blocking. + // If it blocks the next poll_fn iteration will continue + // where it left off. + loop { + let Poll::Ready(qpkt) = sender.poll_send(cx) else { + break Poll::Pending; + }; + + qpkt.len = 0; + match fragmenter.fragment_vectored(pkt, &mut qpkt.data) { + SendOutput::Packet(p) => { + qpkt.len = p.len(); + sender.send_done(); + if fragmenter.is_done() { + // Break here rather than using SendOutput::Complete, + // since we don't want to call channel.sender() an extra time. + break Poll::Ready(Ok(fragmenter.tag())); } - SendOutput::Complete { .. } => unreachable!(), } + SendOutput::Error { err, .. } => { + debug!("Error packetising"); + debug_assert!(false, "fragment () shouldn't fail"); + break Poll::Ready(Err(err)); + } + SendOutput::Complete { .. } => unreachable!(), } - }) + } }) .await } @@ -578,7 +580,7 @@ impl<'r> Router<'r> { return ret_src; }; - let _ = top.forward_packet(pkt); + let _ = top.forward_packet(pkt).await; ret_src } diff --git a/mctp-estack/src/util.rs b/mctp-estack/src/util.rs index fe0325d..e3c7292 100644 --- a/mctp-estack/src/util.rs +++ b/mctp-estack/src/util.rs @@ -125,6 +125,8 @@ pub struct WakeOnDrop { } impl WakeOnDrop { + // Currently only used by async feature + #[cfg_attr(not(feature = "async"), expect(dead_code))] pub fn new(value: T, waker: &Waker) -> Self { Self { value,