From 9fe71add4e01c4820dc0de04aec117e7efe0bee1 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 7 May 2026 12:45:27 -0300 Subject: [PATCH 1/4] feat: add monitor/demonitor for unidirectional actor death observation --- concurrency/src/lib.rs | 2 + concurrency/src/monitor.rs | 70 ++++++++ concurrency/src/tasks/actor.rs | 291 ++++++++++++++++++++++++++++++- concurrency/src/threads/actor.rs | 232 +++++++++++++++++++++++- examples/exit_reason/src/main.rs | 67 ++++++- 5 files changed, 659 insertions(+), 3 deletions(-) create mode 100644 concurrency/src/monitor.rs diff --git a/concurrency/src/lib.rs b/concurrency/src/lib.rs index e34000a..5ed6a7d 100644 --- a/concurrency/src/lib.rs +++ b/concurrency/src/lib.rs @@ -74,6 +74,7 @@ pub mod child_handle; pub mod error; pub mod message; +pub mod monitor; pub mod registry; pub mod response; pub mod tasks; @@ -81,5 +82,6 @@ pub mod threads; pub use child_handle::{ActorId, ChildHandle}; pub use error::{ActorError, ExitReason}; +pub use monitor::{Down, MonitorRef}; pub use response::Response; pub use spawned_macros::{actor, protocol}; diff --git a/concurrency/src/monitor.rs b/concurrency/src/monitor.rs new file mode 100644 index 0000000..5edf9c4 --- /dev/null +++ b/concurrency/src/monitor.rs @@ -0,0 +1,70 @@ +use crate::error::ExitReason; +use crate::message::Message; +use std::sync::atomic::{AtomicU64, Ordering}; + +// --------------------------------------------------------------------------- +// MonitorRef +// --------------------------------------------------------------------------- + +static NEXT_MONITOR_ID: AtomicU64 = AtomicU64::new(1); + +/// Opaque identifier for an active monitor relationship. Returned by +/// [`Context::monitor`] and used to cancel via [`Context::demonitor`]. +/// +/// Multiple independent monitors are allowed on the same target — each call +/// to `monitor` returns a distinct `MonitorRef`. +/// +/// [`Context::monitor`]: crate::tasks::actor::Context::monitor +/// [`Context::demonitor`]: crate::tasks::actor::Context::demonitor +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct MonitorRef(u64); + +impl MonitorRef { + pub(crate) fn next() -> Self { + Self(NEXT_MONITOR_ID.fetch_add(1, Ordering::Relaxed)) + } +} + +impl std::fmt::Display for MonitorRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "MonitorRef({})", self.0) + } +} + +// --------------------------------------------------------------------------- +// Down +// --------------------------------------------------------------------------- + +/// Message delivered to a monitoring actor when its target stops. +/// +/// To monitor another actor, call `ctx.monitor(&child_handle)` and implement +/// `Handler` on the monitoring actor. +#[derive(Debug, Clone)] +pub struct Down { + /// The monitor that triggered this notification. + pub monitor_ref: MonitorRef, + /// Why the monitored actor stopped. + pub reason: ExitReason, +} + +impl Message for Down { + type Result = (); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn monitor_ref_is_unique() { + let a = MonitorRef::next(); + let b = MonitorRef::next(); + assert_ne!(a, b); + } + + #[test] + fn monitor_ref_display() { + let r = MonitorRef::next(); + assert!(format!("{r}").starts_with("MonitorRef(")); + } +} diff --git a/concurrency/src/tasks/actor.rs b/concurrency/src/tasks/actor.rs index 1d241a8..6b34847 100644 --- a/concurrency/src/tasks/actor.rs +++ b/concurrency/src/tasks/actor.rs @@ -1,6 +1,7 @@ use crate::child_handle::{ActorId, ChildHandle}; use crate::error::{panic_message, ActorError, ExitReason}; use crate::message::Message; +use crate::monitor::{Down, MonitorRef}; use core::pin::pin; use futures::future::{self, FutureExt as _}; use spawned_rt::{ @@ -8,9 +9,23 @@ use spawned_rt::{ threads, }; use std::{ - fmt::Debug, future::Future, panic::AssertUnwindSafe, pin::Pin, sync::Arc, time::Duration, + collections::HashMap, + fmt::Debug, + future::Future, + panic::AssertUnwindSafe, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, + time::Duration, }; +/// Per-actor table of active monitors. Each entry maps a `MonitorRef` to a +/// flag the watcher checks before delivering `Down`. Shared across `Context` +/// clones via `Arc`. +type MonitorTable = Arc>>>; + pub use crate::response::DEFAULT_REQUEST_TIMEOUT; // --------------------------------------------------------------------------- @@ -111,6 +126,7 @@ pub struct Context { sender: mpsc::Sender + Send>>, cancellation_token: CancellationToken, completion_rx: watch::Receiver>, + monitors: MonitorTable, } impl Clone for Context { @@ -120,6 +136,7 @@ impl Clone for Context { sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion_rx: self.completion_rx.clone(), + monitors: self.monitors.clone(), } } } @@ -139,6 +156,7 @@ impl Context { sender: actor_ref.sender.clone(), cancellation_token: actor_ref.cancellation_token.clone(), completion_rx: actor_ref.completion_rx.clone(), + monitors: actor_ref.monitors.clone(), } } @@ -224,6 +242,64 @@ impl Context { sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion_rx: self.completion_rx.clone(), + monitors: self.monitors.clone(), + } + } + + /// Set up a unidirectional monitor on another actor. + /// + /// Returns a [`MonitorRef`] that can be used to cancel the monitor via + /// [`Context::demonitor`]. When the monitored actor stops, a [`Down`] + /// message is delivered to this actor's mailbox via `Handler`. + /// + /// If the target is already dead, a `Down` message is delivered immediately. + /// + /// Multiple independent monitors are allowed on the same target — each + /// call returns a distinct `MonitorRef`. + /// + /// Monitors are unidirectional: the monitored actor is unaware of the + /// monitor and unaffected by it. + pub fn monitor(&self, target: &ChildHandle) -> MonitorRef + where + A: Handler, + { + let monitor_ref = MonitorRef::next(); + let active = Arc::new(AtomicBool::new(true)); + + self.monitors + .lock() + .unwrap_or_else(|p| p.into_inner()) + .insert(monitor_ref, active.clone()); + + let target = target.clone(); + let actor_ref = self.actor_ref(); + + rt::spawn(async move { + let reason = target.wait_exit_async().await; + if active.load(Ordering::Acquire) { + let _ = actor_ref.send(Down { + monitor_ref, + reason, + }); + } + }); + + monitor_ref + } + + /// Cancel a previously-set monitor. + /// + /// If the target hasn't yet died, no `Down` message will be delivered. + /// If a `Down` message has already been delivered (or queued), this is + /// a best-effort cancellation — the message may still arrive. + pub fn demonitor(&self, monitor_ref: MonitorRef) { + if let Some(active) = self + .monitors + .lock() + .unwrap_or_else(|p| p.into_inner()) + .remove(&monitor_ref) + { + active.store(false, Ordering::Release); } } @@ -293,6 +369,7 @@ pub struct ActorRef { sender: mpsc::Sender + Send>>, cancellation_token: CancellationToken, completion_rx: watch::Receiver>, + monitors: MonitorTable, } impl Debug for ActorRef { @@ -308,6 +385,7 @@ impl Clone for ActorRef { sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion_rx: self.completion_rx.clone(), + monitors: self.monitors.clone(), } } } @@ -449,12 +527,14 @@ impl ActorRef { let (tx, rx) = mpsc::channel:: + Send>>(); let cancellation_token = CancellationToken::new(); let (completion_tx, completion_rx) = watch::channel(None); + let monitors: MonitorTable = Arc::new(Mutex::new(HashMap::new())); let actor_ref = ActorRef { id: ActorId::next(), sender: tx.clone(), cancellation_token: cancellation_token.clone(), completion_rx, + monitors: monitors.clone(), }; let ctx = Context { @@ -462,6 +542,7 @@ impl ActorRef { sender: tx, cancellation_token: cancellation_token.clone(), completion_rx: actor_ref.completion_rx.clone(), + monitors, }; let inner_future = async move { @@ -1238,4 +1319,212 @@ mod tests { assert!(actor.exit_reason().is_some()); }); } + + // --- Monitor tests --- + + struct GetDowns; + impl Message for GetDowns { + type Result = Vec; + } + + /// Actor that exposes `monitor`/`demonitor` via messages, so tests can + /// drive it from outside. Records all received Down messages. + struct Watcher { + downs: Arc>>, + last_ref: Arc>>, + } + + struct StartMonitor(crate::ChildHandle); + impl Message for StartMonitor { + type Result = crate::monitor::MonitorRef; + } + struct CallDemonitor(crate::monitor::MonitorRef); + impl Message for CallDemonitor { + type Result = (); + } + + impl Actor for Watcher {} + + impl Handler for Watcher { + async fn handle( + &mut self, + msg: StartMonitor, + ctx: &Context, + ) -> crate::monitor::MonitorRef { + let r = ctx.monitor(&msg.0); + *self.last_ref.lock().unwrap() = Some(r); + r + } + } + + impl Handler for Watcher { + async fn handle(&mut self, msg: CallDemonitor, ctx: &Context) { + ctx.demonitor(msg.0); + } + } + + impl Handler for Watcher { + async fn handle(&mut self, msg: crate::monitor::Down, _ctx: &Context) { + self.downs.lock().unwrap().push(msg); + } + } + + impl Handler for Watcher { + async fn handle( + &mut self, + _msg: GetDowns, + _ctx: &Context, + ) -> Vec { + self.downs.lock().unwrap().clone() + } + } + + #[test] + pub fn monitor_running_actor_delivers_down_on_exit() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let target = Counter { count: 0 }.start(); + let target_handle = target.child_handle(); + + let watcher = Watcher { + downs: Arc::new(Mutex::new(Vec::new())), + last_ref: Arc::new(Mutex::new(None)), + } + .start(); + + let monitor_ref = watcher.request(StartMonitor(target_handle)).await.unwrap(); + + // Stop the target — Down should be delivered + target.request(StopCounter).await.unwrap(); + target.join().await; + + // Give the watcher task time to deliver the message + rt::sleep(Duration::from_millis(50)).await; + + let downs = watcher.request(GetDowns).await.unwrap(); + assert_eq!(downs.len(), 1); + assert_eq!(downs[0].monitor_ref, monitor_ref); + assert!(matches!(downs[0].reason, ExitReason::Normal)); + }); + } + + #[test] + pub fn monitor_already_dead_actor_delivers_down_immediately() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let target = Counter { count: 0 }.start(); + target.request(StopCounter).await.unwrap(); + target.join().await; + let target_handle = target.child_handle(); + + let watcher = Watcher { + downs: Arc::new(Mutex::new(Vec::new())), + last_ref: Arc::new(Mutex::new(None)), + } + .start(); + + let _ = watcher.request(StartMonitor(target_handle)).await.unwrap(); + + // Wait for the watcher task to deliver Down + rt::sleep(Duration::from_millis(50)).await; + + let downs = watcher.request(GetDowns).await.unwrap(); + assert_eq!(downs.len(), 1); + }); + } + + #[test] + pub fn demonitor_before_target_dies_suppresses_down() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let target = Counter { count: 0 }.start(); + let target_handle = target.child_handle(); + + let watcher = Watcher { + downs: Arc::new(Mutex::new(Vec::new())), + last_ref: Arc::new(Mutex::new(None)), + } + .start(); + + let monitor_ref = watcher.request(StartMonitor(target_handle)).await.unwrap(); + watcher.request(CallDemonitor(monitor_ref)).await.unwrap(); + + // Now stop the target + target.request(StopCounter).await.unwrap(); + target.join().await; + rt::sleep(Duration::from_millis(50)).await; + + let downs = watcher.request(GetDowns).await.unwrap(); + assert!(downs.is_empty(), "expected no Down, got {:?}", downs.len()); + }); + } + + #[test] + pub fn multiple_monitors_each_get_own_ref_and_down() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let target = Counter { count: 0 }.start(); + let target_handle = target.child_handle(); + + let watcher = Watcher { + downs: Arc::new(Mutex::new(Vec::new())), + last_ref: Arc::new(Mutex::new(None)), + } + .start(); + + let r1 = watcher + .request(StartMonitor(target_handle.clone())) + .await + .unwrap(); + let r2 = watcher.request(StartMonitor(target_handle)).await.unwrap(); + assert_ne!(r1, r2); + + target.request(StopCounter).await.unwrap(); + target.join().await; + rt::sleep(Duration::from_millis(50)).await; + + let downs = watcher.request(GetDowns).await.unwrap(); + assert_eq!(downs.len(), 2); + let refs: Vec<_> = downs.iter().map(|d| d.monitor_ref).collect(); + assert!(refs.contains(&r1)); + assert!(refs.contains(&r2)); + }); + } + + #[test] + pub fn monitor_observes_panic_reason() { + struct PanicMsg; + impl Message for PanicMsg { + type Result = (); + } + struct PanicMe; + impl Actor for PanicMe {} + impl Handler for PanicMe { + async fn handle(&mut self, _msg: PanicMsg, _ctx: &Context) { + panic!("intentional panic"); + } + } + + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let target = PanicMe.start(); + let target_handle = target.child_handle(); + + let watcher = Watcher { + downs: Arc::new(Mutex::new(Vec::new())), + last_ref: Arc::new(Mutex::new(None)), + } + .start(); + + let _ = watcher.request(StartMonitor(target_handle)).await.unwrap(); + let _ = target.send(PanicMsg); + + // Wait for target to panic and watcher to deliver + rt::sleep(Duration::from_millis(100)).await; + + let downs = watcher.request(GetDowns).await.unwrap(); + assert_eq!(downs.len(), 1); + assert!(matches!(downs[0].reason, ExitReason::Panic(_))); + }); + } } diff --git a/concurrency/src/threads/actor.rs b/concurrency/src/threads/actor.rs index 61fb57d..6e4ba3e 100644 --- a/concurrency/src/threads/actor.rs +++ b/concurrency/src/threads/actor.rs @@ -2,15 +2,25 @@ use spawned_rt::threads::{ self as rt, mpsc, oneshot, oneshot::RecvTimeoutError, CancellationToken, }; use std::{ + collections::HashMap, fmt::Debug, panic::{catch_unwind, AssertUnwindSafe}, - sync::{Arc, Condvar, Mutex}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Condvar, Mutex, + }, time::Duration, }; use crate::child_handle::{ActorId, ChildHandle}; use crate::error::{panic_message, ActorError, ExitReason}; use crate::message::Message; +use crate::monitor::{Down, MonitorRef}; + +/// Per-actor table of active monitors. Each entry maps a `MonitorRef` to a +/// flag the watcher checks before delivering `Down`. Shared across `Context` +/// clones via `Arc`. +type MonitorTable = Arc>>>; pub use crate::response::DEFAULT_REQUEST_TIMEOUT; @@ -79,6 +89,7 @@ pub struct Context { sender: mpsc::Sender + Send>>, cancellation_token: CancellationToken, completion: Arc<(Mutex>, Condvar)>, + monitors: MonitorTable, } impl Clone for Context { @@ -88,6 +99,7 @@ impl Clone for Context { sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion: self.completion.clone(), + monitors: self.monitors.clone(), } } } @@ -107,6 +119,7 @@ impl Context { sender: actor_ref.sender.clone(), cancellation_token: actor_ref.cancellation_token.clone(), completion: actor_ref.completion.clone(), + monitors: actor_ref.monitors.clone(), } } @@ -191,6 +204,64 @@ impl Context { sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion: self.completion.clone(), + monitors: self.monitors.clone(), + } + } + + /// Set up a unidirectional monitor on another actor. + /// + /// Returns a [`MonitorRef`] that can be used to cancel the monitor via + /// [`Context::demonitor`]. When the monitored actor stops, a [`Down`] + /// message is delivered to this actor's mailbox via `Handler`. + /// + /// If the target is already dead, a `Down` message is delivered immediately. + /// + /// Multiple independent monitors are allowed on the same target — each + /// call returns a distinct `MonitorRef`. + /// + /// Monitors are unidirectional: the monitored actor is unaware of the + /// monitor and unaffected by it. + pub fn monitor(&self, target: &ChildHandle) -> MonitorRef + where + A: Handler, + { + let monitor_ref = MonitorRef::next(); + let active = Arc::new(AtomicBool::new(true)); + + self.monitors + .lock() + .unwrap_or_else(|p| p.into_inner()) + .insert(monitor_ref, active.clone()); + + let target = target.clone(); + let actor_ref = self.actor_ref(); + + rt::spawn(move || { + let reason = target.wait_exit_blocking(); + if active.load(Ordering::Acquire) { + let _ = actor_ref.send(Down { + monitor_ref, + reason, + }); + } + }); + + monitor_ref + } + + /// Cancel a previously-set monitor. + /// + /// If the target hasn't yet died, no `Down` message will be delivered. + /// If a `Down` message has already been delivered (or queued), this is + /// a best-effort cancellation — the message may still arrive. + pub fn demonitor(&self, monitor_ref: MonitorRef) { + if let Some(active) = self + .monitors + .lock() + .unwrap_or_else(|p| p.into_inner()) + .remove(&monitor_ref) + { + active.store(false, Ordering::Release); } } @@ -275,6 +346,7 @@ pub struct ActorRef { sender: mpsc::Sender + Send>>, cancellation_token: CancellationToken, completion: Arc<(Mutex>, Condvar)>, + monitors: MonitorTable, } impl Debug for ActorRef { @@ -290,6 +362,7 @@ impl Clone for ActorRef { sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion: self.completion.clone(), + monitors: self.monitors.clone(), } } } @@ -432,12 +505,14 @@ impl ActorRef { let cancellation_token = CancellationToken::new(); let completion = Arc::new((Mutex::new(None), Condvar::new())); let id = ActorId::next(); + let monitors: MonitorTable = Arc::new(Mutex::new(HashMap::new())); let actor_ref = ActorRef { id, sender: tx.clone(), cancellation_token: cancellation_token.clone(), completion: completion.clone(), + monitors: monitors.clone(), }; let ctx = Context { @@ -445,6 +520,7 @@ impl ActorRef { sender: tx, cancellation_token: cancellation_token.clone(), completion: actor_ref.completion.clone(), + monitors, }; let _thread_handle = rt::spawn(move || { @@ -834,4 +910,158 @@ mod tests { actor.join(); assert!(actor.exit_reason().is_some()); } + + // --- Monitor tests --- + + struct GetDowns; + impl Message for GetDowns { + type Result = Vec; + } + + struct Watcher { + downs: Arc>>, + } + + struct StartMonitor(crate::ChildHandle); + impl Message for StartMonitor { + type Result = crate::monitor::MonitorRef; + } + struct CallDemonitor(crate::monitor::MonitorRef); + impl Message for CallDemonitor { + type Result = (); + } + + impl Actor for Watcher {} + + impl Handler for Watcher { + fn handle(&mut self, msg: StartMonitor, ctx: &Context) -> crate::monitor::MonitorRef { + ctx.monitor(&msg.0) + } + } + + impl Handler for Watcher { + fn handle(&mut self, msg: CallDemonitor, ctx: &Context) { + ctx.demonitor(msg.0); + } + } + + impl Handler for Watcher { + fn handle(&mut self, msg: crate::monitor::Down, _ctx: &Context) { + self.downs.lock().unwrap().push(msg); + } + } + + impl Handler for Watcher { + fn handle(&mut self, _msg: GetDowns, _ctx: &Context) -> Vec { + self.downs.lock().unwrap().clone() + } + } + + fn make_watcher() -> ActorRef { + Watcher { + downs: Arc::new(Mutex::new(Vec::new())), + } + .start() + } + + #[test] + fn monitor_running_actor_delivers_down_on_exit() { + let target = Counter { count: 0 }.start(); + let target_handle = target.child_handle(); + let watcher = make_watcher(); + + let monitor_ref = watcher.request(StartMonitor(target_handle)).unwrap(); + + target.request(StopCounter).unwrap(); + target.join(); + rt::sleep(Duration::from_millis(150)); + + let downs = watcher.request(GetDowns).unwrap(); + assert_eq!(downs.len(), 1); + assert_eq!(downs[0].monitor_ref, monitor_ref); + assert!(matches!(downs[0].reason, ExitReason::Normal)); + } + + #[test] + fn monitor_already_dead_actor_delivers_down_immediately() { + let target = Counter { count: 0 }.start(); + target.request(StopCounter).unwrap(); + target.join(); + let target_handle = target.child_handle(); + + let watcher = make_watcher(); + let _ = watcher.request(StartMonitor(target_handle)).unwrap(); + rt::sleep(Duration::from_millis(150)); + + let downs = watcher.request(GetDowns).unwrap(); + assert_eq!(downs.len(), 1); + } + + #[test] + fn demonitor_before_target_dies_suppresses_down() { + let target = Counter { count: 0 }.start(); + let target_handle = target.child_handle(); + let watcher = make_watcher(); + + let monitor_ref = watcher.request(StartMonitor(target_handle)).unwrap(); + watcher.request(CallDemonitor(monitor_ref)).unwrap(); + + target.request(StopCounter).unwrap(); + target.join(); + rt::sleep(Duration::from_millis(150)); + + let downs = watcher.request(GetDowns).unwrap(); + assert!(downs.is_empty()); + } + + #[test] + fn multiple_monitors_each_get_own_ref_and_down() { + let target = Counter { count: 0 }.start(); + let target_handle = target.child_handle(); + let watcher = make_watcher(); + + let r1 = watcher + .request(StartMonitor(target_handle.clone())) + .unwrap(); + let r2 = watcher.request(StartMonitor(target_handle)).unwrap(); + assert_ne!(r1, r2); + + target.request(StopCounter).unwrap(); + target.join(); + rt::sleep(Duration::from_millis(150)); + + let downs = watcher.request(GetDowns).unwrap(); + assert_eq!(downs.len(), 2); + let refs: Vec<_> = downs.iter().map(|d| d.monitor_ref).collect(); + assert!(refs.contains(&r1)); + assert!(refs.contains(&r2)); + } + + #[test] + fn monitor_observes_panic_reason() { + struct PanicMsg; + impl Message for PanicMsg { + type Result = (); + } + struct PanicMe; + impl Actor for PanicMe {} + impl Handler for PanicMe { + fn handle(&mut self, _msg: PanicMsg, _ctx: &Context) { + panic!("intentional panic"); + } + } + + let target = PanicMe.start(); + let target_handle = target.child_handle(); + let watcher = make_watcher(); + + let _ = watcher.request(StartMonitor(target_handle)).unwrap(); + let _ = target.send(PanicMsg); + + rt::sleep(Duration::from_millis(200)); + + let downs = watcher.request(GetDowns).unwrap(); + assert_eq!(downs.len(), 1); + assert!(matches!(downs[0].reason, ExitReason::Panic(_))); + } } diff --git a/examples/exit_reason/src/main.rs b/examples/exit_reason/src/main.rs index 30d05ef..06eb4dc 100644 --- a/examples/exit_reason/src/main.rs +++ b/examples/exit_reason/src/main.rs @@ -1,7 +1,8 @@ use spawned_concurrency::protocol; use spawned_concurrency::tasks::{Actor, ActorStart, Context, Handler}; -use spawned_concurrency::{ChildHandle, Response}; +use spawned_concurrency::{ChildHandle, Down, MonitorRef, Response}; use spawned_rt::tasks as rt; +use std::sync::{Arc, Mutex}; use std::time::Duration; // -- A simple worker that can be told to stop, panic, or just keep running -- @@ -152,6 +153,70 @@ fn main() { println!(" {} — exit: {reason}", h.id()); } + // 8. Monitor — get notified when another actor dies + println!("\n--- Scenario 8: Monitor ---"); + + // Observer actor that records Down notifications + struct Observer { + log: Arc>>, + } + struct StartMonitor(ChildHandle); + impl spawned_concurrency::message::Message for StartMonitor { + type Result = MonitorRef; + } + impl Actor for Observer {} + impl Handler for Observer { + async fn handle(&mut self, msg: StartMonitor, ctx: &Context) -> MonitorRef { + ctx.monitor(&msg.0) + } + } + impl Handler for Observer { + async fn handle(&mut self, msg: Down, _ctx: &Context) { + tracing::info!( + "[observer] received Down for {} ({})", + msg.monitor_ref, + msg.reason + ); + self.log.lock().unwrap().push(msg); + } + } + + let log = Arc::new(Mutex::new(Vec::new())); + let observer = Observer { log: log.clone() }.start(); + + let worker_a = Worker::new("worker-a").start(); + let worker_b = Worker::new("worker-b").start(); + + let ref_a = observer + .request(StartMonitor(worker_a.child_handle())) + .await + .unwrap(); + let ref_b = observer + .request(StartMonitor(worker_b.child_handle())) + .await + .unwrap(); + println!(" Observer monitoring {} and {}", ref_a, ref_b); + + // Trigger one clean stop and one panic + worker_a.stop().await.unwrap(); + let _ = worker_b.panic_now().await; + + // Give the watchers a moment to deliver Down messages + rt::sleep(Duration::from_millis(100)).await; + + let snapshot = log.lock().unwrap().clone(); + println!(" Observer received {} Down messages:", snapshot.len()); + for down in &snapshot { + println!( + " {} — {} (abnormal: {})", + down.monitor_ref, + down.reason, + down.reason.is_abnormal() + ); + } + observer.child_handle().stop(); + observer.join().await; + // Give tracing a moment to flush rt::sleep(Duration::from_millis(50)).await; println!("\n=== Done ==="); From aa7d5156e03cf1f0c5c612a5bfc4470cf346d823 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 7 May 2026 13:29:16 -0300 Subject: [PATCH 2/4] feat: add demonitor demo to exit_reason example --- examples/exit_reason/src/main.rs | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/examples/exit_reason/src/main.rs b/examples/exit_reason/src/main.rs index 06eb4dc..6d06215 100644 --- a/examples/exit_reason/src/main.rs +++ b/examples/exit_reason/src/main.rs @@ -164,12 +164,21 @@ fn main() { impl spawned_concurrency::message::Message for StartMonitor { type Result = MonitorRef; } + struct StopMonitor(MonitorRef); + impl spawned_concurrency::message::Message for StopMonitor { + type Result = (); + } impl Actor for Observer {} impl Handler for Observer { async fn handle(&mut self, msg: StartMonitor, ctx: &Context) -> MonitorRef { ctx.monitor(&msg.0) } } + impl Handler for Observer { + async fn handle(&mut self, msg: StopMonitor, ctx: &Context) { + ctx.demonitor(msg.0); + } + } impl Handler for Observer { async fn handle(&mut self, msg: Down, _ctx: &Context) { tracing::info!( @@ -186,6 +195,7 @@ fn main() { let worker_a = Worker::new("worker-a").start(); let worker_b = Worker::new("worker-b").start(); + let worker_c = Worker::new("worker-c").start(); let ref_a = observer .request(StartMonitor(worker_a.child_handle())) @@ -195,11 +205,20 @@ fn main() { .request(StartMonitor(worker_b.child_handle())) .await .unwrap(); - println!(" Observer monitoring {} and {}", ref_a, ref_b); + let ref_c = observer + .request(StartMonitor(worker_c.child_handle())) + .await + .unwrap(); + println!(" Observer monitoring {}, {}, and {}", ref_a, ref_b, ref_c); + + // Demonitor worker_c BEFORE it dies — Observer won't be notified + observer.request(StopMonitor(ref_c)).await.unwrap(); + println!(" Demonitored {} — its death won't be observed", ref_c); - // Trigger one clean stop and one panic + // Trigger: clean stop, panic, and the demonitored one worker_a.stop().await.unwrap(); let _ = worker_b.panic_now().await; + worker_c.stop().await.unwrap(); // Give the watchers a moment to deliver Down messages rt::sleep(Duration::from_millis(100)).await; From ee7ebc5cc4ef1b480a99cc961eb2a635db5cd3f4 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 7 May 2026 16:12:32 -0300 Subject: [PATCH 3/4] fix: clean up monitor table entries after target dies; derive PartialEq on Down --- concurrency/src/monitor.rs | 2 +- concurrency/src/tasks/actor.rs | 50 ++++++++++++++++++++++++++++++++ concurrency/src/threads/actor.rs | 8 +++++ 3 files changed, 59 insertions(+), 1 deletion(-) diff --git a/concurrency/src/monitor.rs b/concurrency/src/monitor.rs index 5edf9c4..2987769 100644 --- a/concurrency/src/monitor.rs +++ b/concurrency/src/monitor.rs @@ -39,7 +39,7 @@ impl std::fmt::Display for MonitorRef { /// /// To monitor another actor, call `ctx.monitor(&child_handle)` and implement /// `Handler` on the monitoring actor. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Down { /// The monitor that triggered this notification. pub monitor_ref: MonitorRef, diff --git a/concurrency/src/tasks/actor.rs b/concurrency/src/tasks/actor.rs index 6b34847..92b06b8 100644 --- a/concurrency/src/tasks/actor.rs +++ b/concurrency/src/tasks/actor.rs @@ -273,9 +273,17 @@ impl Context { let target = target.clone(); let actor_ref = self.actor_ref(); + let monitors = self.monitors.clone(); rt::spawn(async move { let reason = target.wait_exit_async().await; + // Remove the entry from the monitor table so it doesn't accumulate + // stale entries over the actor's lifetime. Done before delivery + // since `demonitor` is now a no-op for this monitor anyway. + monitors + .lock() + .unwrap_or_else(|p| p.into_inner()) + .remove(&monitor_ref); if active.load(Ordering::Acquire) { let _ = actor_ref.send(Down { monitor_ref, @@ -1491,6 +1499,48 @@ mod tests { }); } + #[test] + pub fn monitor_table_is_cleaned_up_after_target_dies() { + // Watcher should remove its entry from the monitor table after the + // target dies, so the table doesn't accumulate stale entries. + struct Inspect; + impl Message for Inspect { + type Result = usize; + } + impl Handler for Watcher { + async fn handle(&mut self, _msg: Inspect, ctx: &Context) -> usize { + ctx.monitors.lock().unwrap().len() + } + } + + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let target = Counter { count: 0 }.start(); + let watcher = Watcher { + downs: Arc::new(Mutex::new(Vec::new())), + last_ref: Arc::new(Mutex::new(None)), + } + .start(); + + let _ = watcher + .request(StartMonitor(target.child_handle())) + .await + .unwrap(); + assert_eq!(watcher.request(Inspect).await.unwrap(), 1); + + target.request(StopCounter).await.unwrap(); + target.join().await; + // Give the watcher time to process and clean up + rt::sleep(Duration::from_millis(50)).await; + + assert_eq!( + watcher.request(Inspect).await.unwrap(), + 0, + "monitor table should be empty after target died" + ); + }); + } + #[test] pub fn monitor_observes_panic_reason() { struct PanicMsg; diff --git a/concurrency/src/threads/actor.rs b/concurrency/src/threads/actor.rs index 6e4ba3e..727cc4a 100644 --- a/concurrency/src/threads/actor.rs +++ b/concurrency/src/threads/actor.rs @@ -235,9 +235,17 @@ impl Context { let target = target.clone(); let actor_ref = self.actor_ref(); + let monitors = self.monitors.clone(); rt::spawn(move || { let reason = target.wait_exit_blocking(); + // Remove the entry from the monitor table so it doesn't accumulate + // stale entries over the actor's lifetime. Done before delivery + // since `demonitor` is now a no-op for this monitor anyway. + monitors + .lock() + .unwrap_or_else(|p| p.into_inner()) + .remove(&monitor_ref); if active.load(Ordering::Acquire) { let _ = actor_ref.send(Down { monitor_ref, From 4067f09dd1796932d0ecc123e33598405521a840 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 7 May 2026 16:20:50 -0300 Subject: [PATCH 4/4] refactor: address bot review feedback on monitor PR --- concurrency/src/tasks/actor.rs | 43 ++++++++++++++++++++++++-------- concurrency/src/threads/actor.rs | 5 ++++ 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/concurrency/src/tasks/actor.rs b/concurrency/src/tasks/actor.rs index 92b06b8..acdab02 100644 --- a/concurrency/src/tasks/actor.rs +++ b/concurrency/src/tasks/actor.rs @@ -1339,7 +1339,6 @@ mod tests { /// drive it from outside. Records all received Down messages. struct Watcher { downs: Arc>>, - last_ref: Arc>>, } struct StartMonitor(crate::ChildHandle); @@ -1359,9 +1358,7 @@ mod tests { msg: StartMonitor, ctx: &Context, ) -> crate::monitor::MonitorRef { - let r = ctx.monitor(&msg.0); - *self.last_ref.lock().unwrap() = Some(r); - r + ctx.monitor(&msg.0) } } @@ -1396,7 +1393,6 @@ mod tests { let watcher = Watcher { downs: Arc::new(Mutex::new(Vec::new())), - last_ref: Arc::new(Mutex::new(None)), } .start(); @@ -1427,7 +1423,6 @@ mod tests { let watcher = Watcher { downs: Arc::new(Mutex::new(Vec::new())), - last_ref: Arc::new(Mutex::new(None)), } .start(); @@ -1450,7 +1445,6 @@ mod tests { let watcher = Watcher { downs: Arc::new(Mutex::new(Vec::new())), - last_ref: Arc::new(Mutex::new(None)), } .start(); @@ -1476,7 +1470,6 @@ mod tests { let watcher = Watcher { downs: Arc::new(Mutex::new(Vec::new())), - last_ref: Arc::new(Mutex::new(None)), } .start(); @@ -1518,7 +1511,6 @@ mod tests { let target = Counter { count: 0 }.start(); let watcher = Watcher { downs: Arc::new(Mutex::new(Vec::new())), - last_ref: Arc::new(Mutex::new(None)), } .start(); @@ -1562,7 +1554,6 @@ mod tests { let watcher = Watcher { downs: Arc::new(Mutex::new(Vec::new())), - last_ref: Arc::new(Mutex::new(None)), } .start(); @@ -1577,4 +1568,36 @@ mod tests { assert!(matches!(downs[0].reason, ExitReason::Panic(_))); }); } + + #[test] + pub fn monitoring_actor_stops_before_target_does_not_panic() { + // Regression: if the monitoring actor stops while the target is still + // alive, the watcher must not crash when the target eventually dies. + // The watcher's send() will fail silently (mailbox closed), and the + // watcher exits cleanly. + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let target = Counter { count: 0 }.start(); + let target_handle = target.child_handle(); + + let watcher = Watcher { + downs: Arc::new(Mutex::new(Vec::new())), + } + .start(); + + let _ = watcher.request(StartMonitor(target_handle)).await.unwrap(); + + // Stop the monitoring actor first + let watcher_handle = watcher.child_handle(); + watcher_handle.stop(); + watcher_handle.wait_exit_async().await; + + // Now stop the target — the watcher should clean up without panicking + target.request(StopCounter).await.unwrap(); + target.join().await; + // Give the orphaned watcher task time to attempt delivery + rt::sleep(Duration::from_millis(50)).await; + // If we got here without panicking, the test passes. + }); + } } diff --git a/concurrency/src/threads/actor.rs b/concurrency/src/threads/actor.rs index 727cc4a..329d3e7 100644 --- a/concurrency/src/threads/actor.rs +++ b/concurrency/src/threads/actor.rs @@ -221,6 +221,11 @@ impl Context { /// /// Monitors are unidirectional: the monitored actor is unaware of the /// monitor and unaffected by it. + /// + /// **Resource cost (threads mode):** each active monitor occupies one OS + /// thread for the duration of the target's lifetime, blocked on the + /// target's completion signal. For supervisors with many long-lived + /// children, consider using tasks mode instead. pub fn monitor(&self, target: &ChildHandle) -> MonitorRef where A: Handler,