From 0269d52e440fc0b13dfd9c88ddfb2c4e551f3399 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 11 May 2026 14:02:44 +0200 Subject: [PATCH 1/4] fuzz: model chanmon persistence in harness Replace the chanmon consistency harness' Watch wrapper with a Persist implementation backed by HarnessPersister. Monitor writes now flow through the real ChainMonitor persistence hooks. Track restart candidates separately from monitor completion callbacks. A monitor can stop being a valid reload candidate once a newer baseline is durable, while its callback may still be needed to unblock the live ChainMonitor. On reload, choose the durable baseline, first pending snapshot, or last pending snapshot. Startup monitor registration completes immediately before the configured persistence style is restored. --- fuzz/src/chanmon_consistency.rs | 567 ++++++++++++++++++++------------ 1 file changed, 348 insertions(+), 219 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 8a90dc93e97..59d29ac5133 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -41,8 +41,7 @@ use lightning::chain; use lightning::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType, }; -use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent}; -use lightning::chain::transaction::OutPoint; +use lightning::chain::channelmonitor::ChannelMonitor; use lightning::chain::{ chainmonitor, channelmonitor, BlockLocator, ChannelMonitorUpdateStatus, Confirm, Watch, }; @@ -87,7 +86,6 @@ use lightning::util::wallet_utils::{WalletSourceSync, WalletSync}; use lightning_invoice::RawBolt11Invoice; use crate::utils::test_logger::{self, Output}; -use crate::utils::test_persister::TestPersister; use bitcoin::secp256k1::ecdh::SharedSecret; use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature}; @@ -293,144 +291,302 @@ impl Writer for VecWriter { } } -/// The LDK API requires that any time we tell it we're done persisting a `ChannelMonitor[Update]` -/// we never pass it in as the "latest" `ChannelMonitor` on startup. However, we can pass -/// out-of-date monitors as long as we never told LDK we finished persisting them, which we do by -/// storing both old `ChannelMonitor`s and ones that are "being persisted" here. +fn serialize_monitor(monitor: &ChannelMonitor) -> Vec { + let mut ser = VecWriter(Vec::new()); + monitor.write(&mut ser).unwrap(); + ser.0 +} + +/// LDK requires the `ChannelMonitor` loaded on startup to be at least as current as the +/// `ChannelManager` state, except for monitor updates that `ChannelManager` still records as +/// in-flight and can replay. This harness tracks the monitor blobs that remain valid restart +/// candidates under that rule. /// -/// Note that such "being persisted" `ChannelMonitor`s are stored in `ChannelManager` and will -/// simply be replayed on startup. +/// Separately, we track every `InProgress` persistence operation that still needs a +/// `channel_monitor_updated` call. A newer persisted monitor can make an older monitor invalid for +/// restart while the older update still needs to be completed to unblock the live `ChainMonitor`. +/// +/// Off-chain monitor updates that are still "being persisted" are stored in `ChannelManager` and +/// will be replayed on startup. Full-monitor snapshots from chain sync or archive paths that return +/// `InProgress` are only restart candidates; losing one on restart does not require a +/// `channel_monitor_updated` callback. struct LatestMonitorState { /// The latest monitor id which we told LDK we've persisted. /// - /// Note that there may still be earlier pending monitor updates in [`Self::pending_monitors`] - /// which we haven't yet completed. We're allowed to reload with those as well, at least until - /// they're completed. + /// Note that earlier updates may still need a `channel_monitor_updated` callback via + /// [`Self::pending_monitor_completions`]. persisted_monitor_id: u64, /// The latest serialized `ChannelMonitor` that we told LDK we persisted. persisted_monitor: Vec, - /// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting", - /// from LDK's perspective. + /// An ordered list of (monitor id, serialized `ChannelMonitor`)s which remain safe to use as + /// stale monitors on reload. pending_monitors: Vec<(u64, Vec)>, + /// An ordered list of (monitor id, serialized `ChannelMonitor`)s which still need a + /// `channel_monitor_updated` callback. + pending_monitor_completions: Vec<(u64, Vec)>, } +impl LatestMonitorState { + fn insert_pending_entry( + pending: &mut Vec<(u64, Vec)>, monitor_id: u64, serialized_monitor: Vec, + ) { + // Monitor update ids must arrive in order. Assert at insertion time so duplicates or + // out-of-order updates fail close to the write that caused them instead of being sorted + // into place. + assert!( + pending.last().map_or(true, |(last_id, _)| *last_id < monitor_id), + "pending monitor updates should arrive in order" + ); + pending.push((monitor_id, serialized_monitor)); + } -struct TestChainMonitor { - pub logger: Arc, - pub keys: Arc, - pub persister: Arc, - pub chain_monitor: Arc< - chainmonitor::ChainMonitor< - TestChannelSigner, - Arc, - Arc, - Arc, - Arc, - Arc, - Arc, - >, - >, - pub latest_monitors: Mutex>, -} -impl TestChainMonitor { - pub fn new( - broadcaster: Arc, logger: Arc, feeest: Arc, - persister: Arc, keys: Arc, - ) -> Self { - Self { - chain_monitor: Arc::new(chainmonitor::ChainMonitor::new( - None, - broadcaster, - logger.clone(), - feeest, - Arc::clone(&persister), - Arc::clone(&keys), - keys.get_peer_storage_key(), - false, - )), - logger, - keys, - persister, - latest_monitors: Mutex::new(new_hash_map()), + fn insert_pending_monitor_candidate(&mut self, monitor_id: u64, serialized_monitor: Vec) { + // Full-monitor persists from chain sync or archive paths use the monitor's current + // latest_update_id rather than a fresh ChannelMonitorUpdate id. Keep duplicate ids so + // reload can choose between multiple same-id full snapshots that were in flight together. + if let Some((last_id, _)) = self.pending_monitors.last() { + assert!(*last_id <= monitor_id, "pending monitor updates should arrive in order"); } + self.pending_monitors.push((monitor_id, serialized_monitor)); } -} -impl chain::Watch for TestChainMonitor { - fn watch_channel( - &self, channel_id: ChannelId, monitor: channelmonitor::ChannelMonitor, - ) -> Result { - let mut ser = VecWriter(Vec::new()); - monitor.write(&mut ser).unwrap(); - let monitor_id = monitor.get_latest_update_id(); - let res = self.chain_monitor.watch_channel(channel_id, monitor); - let state = match res { - Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState { - persisted_monitor_id: monitor_id, - persisted_monitor: ser.0, - pending_monitors: Vec::new(), - }, - Ok(chain::ChannelMonitorUpdateStatus::InProgress) => LatestMonitorState { - persisted_monitor_id: monitor_id, - persisted_monitor: Vec::new(), - pending_monitors: vec![(monitor_id, ser.0)], - }, - Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(), - Err(()) => panic!(), - }; - if self.latest_monitors.lock().unwrap().insert(channel_id, state).is_some() { - panic!("Already had monitor pre-watch_channel"); + + fn mark_persisted(&mut self, monitor_id: u64, serialized_monitor: Vec) { + // Once a monitor is durable, use it as the restart baseline and stop tracking candidates + // at or behind that update id. Completion obligations are tracked separately and are + // deliberately not pruned here. + self.pending_monitors.retain(|(id, _)| *id > monitor_id); + if monitor_id >= self.persisted_monitor_id { + self.persisted_monitor_id = monitor_id; + self.persisted_monitor = serialized_monitor; } - res } - fn update_channel( - &self, channel_id: ChannelId, update: &channelmonitor::ChannelMonitorUpdate, - ) -> chain::ChannelMonitorUpdateStatus { - let mut map_lock = self.latest_monitors.lock().unwrap(); - let map_entry = map_lock.get_mut(&channel_id).expect("Didn't have monitor on update call"); - let latest_monitor_data = map_entry - .pending_monitors - .last() - .as_ref() - .map(|(_, data)| data) - .unwrap_or(&map_entry.persisted_monitor); - let deserialized_monitor = - <(BlockLocator, channelmonitor::ChannelMonitor)>::read( - &mut &latest_monitor_data[..], - (&*self.keys, &*self.keys), - ) - .unwrap() - .1; - deserialized_monitor - .update_monitor( - update, - &&TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) }, - &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, - &self.logger, - ) - .unwrap(); - let mut ser = VecWriter(Vec::new()); - deserialized_monitor.write(&mut ser).unwrap(); - let res = self.chain_monitor.update_channel(channel_id, update); - match res { - chain::ChannelMonitorUpdateStatus::Completed => { - map_entry.persisted_monitor_id = update.update_id; - map_entry.persisted_monitor = ser.0; + fn insert_pending( + &mut self, monitor_id: u64, serialized_monitor: Vec, needs_completion: bool, + ) { + if needs_completion { + // persist_new_channel and update_persisted_channel(Some(_)) require a later + // channel_monitor_updated callback if persistence returns InProgress. + Self::insert_pending_entry( + &mut self.pending_monitors, + monitor_id, + serialized_monitor.clone(), + ); + Self::insert_pending_entry( + &mut self.pending_monitor_completions, + monitor_id, + serialized_monitor, + ); + } else { + // This harness treats update_persisted_channel(None, ...) as the chain-sync/archive + // case: the full monitor may be used on restart, but ChainMonitor does not wait for a + // channel_monitor_updated callback. + self.insert_pending_monitor_candidate(monitor_id, serialized_monitor); + } + } + + fn mark_completed_update_persisted(&mut self, monitor_id: u64, serialized_monitor: Vec) { + // The selector/drain path should already have removed this entry before + // finish_monitor_update calls channel_monitor_updated. This check catches accidental + // double-completion or pruning of the wrong list. + assert!( + self.pending_monitor_completions.iter().all(|(id, _)| *id != monitor_id), + "completed monitor update should already be removed from the completion queue" + ); + self.mark_persisted(monitor_id, serialized_monitor); + } + + fn drain_pending_completions(&mut self) -> Vec<(u64, Vec)> { + std::mem::take(&mut self.pending_monitor_completions) + } + + fn take_pending_completion( + &mut self, selector: MonitorUpdateSelector, + ) -> Option<(u64, Vec)> { + // The fuzzer chooses which outstanding callback to deliver. These choices apply to + // completion obligations, not to the set of monitors that may be used on restart. + match selector { + MonitorUpdateSelector::First => { + if self.pending_monitor_completions.is_empty() { + None + } else { + Some(self.pending_monitor_completions.remove(0)) + } }, - chain::ChannelMonitorUpdateStatus::InProgress => { - map_entry.pending_monitors.push((update.update_id, ser.0)); + MonitorUpdateSelector::Second => { + if self.pending_monitor_completions.len() > 1 { + Some(self.pending_monitor_completions.remove(1)) + } else { + None + } }, - chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(), + MonitorUpdateSelector::Last => self.pending_monitor_completions.pop(), } - res } - fn release_pending_monitor_events( - &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { - return self.chain_monitor.release_pending_monitor_events(); + fn select_monitor_for_reload(&mut self, selector: MonitorReloadSelector) { + // A restart can load the last monitor we told LDK was persisted, or a monitor snapshot + // whose write was started before the simulated crash. + let old_mon = (self.persisted_monitor_id, std::mem::take(&mut self.persisted_monitor)); + let (monitor_id, serialized_monitor) = match selector { + MonitorReloadSelector::Persisted => old_mon, + MonitorReloadSelector::FirstPending => { + if self.pending_monitors.is_empty() { + old_mon + } else { + self.pending_monitors.remove(0) + } + }, + MonitorReloadSelector::LastPending => self.pending_monitors.pop().unwrap_or(old_mon), + }; + self.persisted_monitor_id = monitor_id; + self.persisted_monitor = serialized_monitor; + // After restart, stop tracking pre-restart in-flight writes. ChannelManager will replay + // off-chain monitor updates that still matter; full-monitor snapshots may simply be absent. + self.pending_monitors.clear(); + self.pending_monitor_completions.clear(); + } +} + +struct HarnessPersister { + pub update_ret: Mutex, + pub latest_monitors: Mutex>, +} +impl HarnessPersister { + fn track_monitor_update( + &self, channel_id: ChannelId, monitor_id: u64, serialized_monitor: Vec, + status: chain::ChannelMonitorUpdateStatus, needs_completion: bool, + ) { + let mut latest_monitors = self.latest_monitors.lock().unwrap(); + if let Some(state) = latest_monitors.get_mut(&channel_id) { + match status { + chain::ChannelMonitorUpdateStatus::Completed => { + // A completed write advances the restart baseline. Once LDK can rely on that + // monitor state being durable, the harness stops offering candidates at or + // behind that update id. + state.mark_persisted(monitor_id, serialized_monitor); + }, + chain::ChannelMonitorUpdateStatus::InProgress => { + // InProgress always creates a restart candidate, but only some calls also need + // an explicit channel_monitor_updated completion. + state.insert_pending(monitor_id, serialized_monitor, needs_completion); + }, + chain::ChannelMonitorUpdateStatus::UnrecoverableError => {}, + } + } else { + let state = match status { + chain::ChannelMonitorUpdateStatus::Completed => LatestMonitorState { + persisted_monitor_id: monitor_id, + persisted_monitor: serialized_monitor, + pending_monitors: Vec::new(), + pending_monitor_completions: Vec::new(), + }, + chain::ChannelMonitorUpdateStatus::InProgress => { + // The first persist for a channel is persist_new_channel, which always needs a + // completion callback when it returns InProgress. A full-monitor update without + // existing state would mean the harness missed the channel's initial monitor. + assert!(needs_completion, "missing monitor state for full monitor update"); + LatestMonitorState { + persisted_monitor_id: monitor_id, + persisted_monitor: Vec::new(), + pending_monitors: vec![(monitor_id, serialized_monitor.clone())], + pending_monitor_completions: vec![(monitor_id, serialized_monitor)], + } + }, + chain::ChannelMonitorUpdateStatus::UnrecoverableError => return, + }; + assert!( + latest_monitors.insert(channel_id, state).is_none(), + "Already had monitor state pre-persist" + ); + } + } + + fn mark_update_completed( + &self, channel_id: ChannelId, monitor_id: u64, serialized_monitor: Vec, + ) { + let mut latest_monitors = self.latest_monitors.lock().unwrap(); + let state = latest_monitors + .get_mut(&channel_id) + .expect("missing monitor state for completed update"); + // Once we tell LDK update N is completed, use the completed monitor as the restart + // baseline and drop restart candidates at or behind N. + state.mark_completed_update_persisted(monitor_id, serialized_monitor); + } + + fn drain_pending_updates(&self, channel_id: &ChannelId) -> Vec<(u64, Vec)> { + self.latest_monitors + .lock() + .unwrap() + .get_mut(channel_id) + .map_or_else(Vec::new, |state| state.drain_pending_completions()) + } + + fn drain_all_pending_updates(&self) -> Vec<(ChannelId, u64, Vec)> { + let mut completed_updates = Vec::new(); + for (channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() { + for (monitor_id, data) in state.drain_pending_completions() { + completed_updates.push((*channel_id, monitor_id, data)); + } + } + completed_updates + } + + fn take_pending_update( + &self, channel_id: &ChannelId, selector: MonitorUpdateSelector, + ) -> Option<(u64, Vec)> { + self.latest_monitors + .lock() + .unwrap() + .get_mut(channel_id) + .and_then(|state| state.take_pending_completion(selector)) + } +} +impl chainmonitor::Persist for HarnessPersister { + fn persist_new_channel( + &self, _monitor_name: lightning::util::persist::MonitorName, + data: &channelmonitor::ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { + let status = self.update_ret.lock().unwrap().clone(); + let monitor_id = data.get_latest_update_id(); + let serialized_monitor = serialize_monitor(data); + self.track_monitor_update(data.channel_id(), monitor_id, serialized_monitor, status, true); + status + } + + fn update_persisted_channel( + &self, _monitor_name: lightning::util::persist::MonitorName, + update: Option<&channelmonitor::ChannelMonitorUpdate>, + data: &channelmonitor::ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { + let status = self.update_ret.lock().unwrap().clone(); + let monitor_id = update.map_or_else(|| data.get_latest_update_id(), |upd| upd.update_id); + let serialized_monitor = serialize_monitor(data); + self.track_monitor_update( + data.channel_id(), + monitor_id, + serialized_monitor, + status, + // `None` normally comes from chain-sync or archive writes, which need no completion + // callback. `update_channel_internal` can also use `None` after `update_monitor` + // fails, but this harness does not model that error-recovery path. + update.is_some(), + ); + status } + + fn archive_persisted_channel(&self, _monitor_name: lightning::util::persist::MonitorName) {} } +type TestChainMonitor = chainmonitor::ChainMonitor< + TestChannelSigner, + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + struct KeyProvider { node_secret: SecretKey, rand_bytes_id: atomic::AtomicU32, @@ -654,6 +810,7 @@ struct HarnessNode<'a> { node_id: u8, node: ChanMan<'a>, monitor: Arc, + persister: Arc, keys_manager: Arc, logger: Arc, broadcaster: Arc, @@ -674,26 +831,33 @@ impl<'a> std::ops::Deref for HarnessNode<'a> { } impl<'a> HarnessNode<'a> { - fn build_loggers( + fn build_logger( node_id: u8, out: &Out, - ) -> (Arc, Arc) { - let raw_logger = Arc::new(test_logger::TestLogger::new(node_id.to_string(), out.clone())); - let logger_for_monitor: Arc = raw_logger.clone(); - let logger: Arc = raw_logger; - (logger_for_monitor, logger) + ) -> Arc { + Arc::new(test_logger::TestLogger::new(node_id.to_string(), out.clone())) + } + + fn build_persister(persistence_style: ChannelMonitorUpdateStatus) -> Arc { + Arc::new(HarnessPersister { + update_ret: Mutex::new(persistence_style), + latest_monitors: Mutex::new(new_hash_map()), + }) } fn build_chain_monitor( broadcaster: &Arc, fee_estimator: &Arc, - keys_manager: &Arc, logger_for_monitor: Arc, - persistence_style: ChannelMonitorUpdateStatus, + keys_manager: &Arc, logger: Arc, + persister: &Arc, ) -> Arc { - Arc::new(TestChainMonitor::new( + Arc::new(chainmonitor::ChainMonitor::new( + None, Arc::clone(broadcaster), - logger_for_monitor, + logger, Arc::clone(fee_estimator), - Arc::new(TestPersister { update_ret: Mutex::new(persistence_style) }), + Arc::clone(persister), Arc::clone(keys_manager), + keys_manager.get_peer_storage_key(), + false, )) } @@ -702,7 +866,7 @@ impl<'a> HarnessNode<'a> { broadcaster: Arc, persistence_style: ChannelMonitorUpdateStatus, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) -> Self { - let (logger_for_monitor, logger) = Self::build_loggers(node_id, out); + let logger = Self::build_logger(node_id, out); let node_secret = SecretKey::from_slice(&[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, node_id, @@ -713,12 +877,13 @@ impl<'a> HarnessNode<'a> { rand_bytes_id: atomic::AtomicU32::new(0), enforcement_states: Mutex::new(new_hash_map()), }); + let persister = Self::build_persister(persistence_style); let monitor = Self::build_chain_monitor( &broadcaster, &fee_estimator, &keys_manager, - logger_for_monitor, - persistence_style, + Arc::clone(&logger), + &persister, ); let network = Network::Bitcoin; let best_block_timestamp = genesis_block(network).header.time; @@ -741,6 +906,7 @@ impl<'a> HarnessNode<'a> { node_id, node, monitor, + persister, keys_manager, logger, broadcaster, @@ -754,67 +920,31 @@ impl<'a> HarnessNode<'a> { } fn set_persistence_style(&mut self, style: ChannelMonitorUpdateStatus) { + // Store the style for the next reload. The active persister is intentionally not changed + // in place. self.persistence_style = style; } + fn finish_monitor_update(&self, chan_id: ChannelId, monitor_id: u64, data: Vec) { + self.monitor.channel_monitor_updated(chan_id, monitor_id).unwrap(); + self.persister.mark_update_completed(chan_id, monitor_id, data); + } + fn complete_all_monitor_updates(&self, chan_id: &ChannelId) { - if let Some(state) = self.monitor.latest_monitors.lock().unwrap().get_mut(chan_id) { - assert!( - state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0), - "updates should be sorted by id" - ); - for (id, data) in state.pending_monitors.drain(..) { - self.monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap(); - if id > state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + for (monitor_id, data) in self.persister.drain_pending_updates(chan_id) { + self.finish_monitor_update(*chan_id, monitor_id, data); } } fn complete_all_pending_monitor_updates(&self) { - for (channel_id, state) in self.monitor.latest_monitors.lock().unwrap().iter_mut() { - for (id, data) in state.pending_monitors.drain(..) { - self.monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap(); - if id >= state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + for (channel_id, monitor_id, data) in self.persister.drain_all_pending_updates() { + self.finish_monitor_update(channel_id, monitor_id, data); } } fn complete_monitor_update(&self, chan_id: &ChannelId, selector: MonitorUpdateSelector) { - if let Some(state) = self.monitor.latest_monitors.lock().unwrap().get_mut(chan_id) { - assert!( - state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0), - "updates should be sorted by id" - ); - let update = match selector { - MonitorUpdateSelector::First => { - if state.pending_monitors.is_empty() { - None - } else { - Some(state.pending_monitors.remove(0)) - } - }, - MonitorUpdateSelector::Second => { - if state.pending_monitors.len() > 1 { - Some(state.pending_monitors.remove(1)) - } else { - None - } - }, - MonitorUpdateSelector::Last => state.pending_monitors.pop(), - }; - if let Some((id, data)) = update { - self.monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap(); - if id > state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + if let Some((monitor_id, data)) = self.persister.take_pending_update(chan_id, selector) { + self.finish_monitor_update(*chan_id, monitor_id, data); } } @@ -942,50 +1072,39 @@ impl<'a> HarnessNode<'a> { fn reload( &mut self, use_old_mons: u8, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) { - let (logger_for_monitor, logger) = Self::build_loggers(self.node_id, out); + let logger = Self::build_logger(self.node_id, out); + // Re-registering monitors during reload reflects data that was already selected from + // simulated storage, so these startup watch_channel calls should complete immediately. + let persister = Self::build_persister(ChannelMonitorUpdateStatus::Completed); let chain_monitor = Self::build_chain_monitor( &self.broadcaster, &self.fee_estimator, &self.keys_manager, - logger_for_monitor, - ChannelMonitorUpdateStatus::Completed, + Arc::clone(&logger), + &persister, ); let mut monitors = new_hash_map(); let mut use_old_mons = use_old_mons; { - let mut old_monitors = self.monitor.latest_monitors.lock().unwrap(); + let mut old_monitors = self.persister.latest_monitors.lock().unwrap(); for (channel_id, mut prev_state) in old_monitors.drain() { - let (mon_id, serialized_mon) = if use_old_mons % 3 == 0 { - // Reload with the oldest `ChannelMonitor` (the one that we already told - // `ChannelManager` we finished persisting). - (prev_state.persisted_monitor_id, prev_state.persisted_monitor) - } else if use_old_mons % 3 == 1 { - // Reload with the second-oldest `ChannelMonitor`. - let old_mon = (prev_state.persisted_monitor_id, prev_state.persisted_monitor); - prev_state.pending_monitors.drain(..).next().unwrap_or(old_mon) - } else { - // Reload with the newest `ChannelMonitor`. - let old_mon = (prev_state.persisted_monitor_id, prev_state.persisted_monitor); - prev_state.pending_monitors.pop().unwrap_or(old_mon) + let selector = match use_old_mons % 3 { + 0 => MonitorReloadSelector::Persisted, + 1 => MonitorReloadSelector::FirstPending, + _ => MonitorReloadSelector::LastPending, }; - // Use a different value of `use_old_mons` if we have another monitor - // (only for node B) by shifting `use_old_mons` one in base-3. + prev_state.select_monitor_for_reload(selector); + // Use a different trit for each monitor so one restart byte can vary the stale + // monitor depth across multiple monitors for the node. use_old_mons /= 3; let mon = <(BlockLocator, ChannelMonitor)>::read( - &mut &serialized_mon[..], + &mut &prev_state.persisted_monitor[..], (&*self.keys_manager, &*self.keys_manager), ) .expect("Failed to read monitor"); monitors.insert(channel_id, mon.1); - // Update the latest `ChannelMonitor` state to match what we just told LDK. - prev_state.persisted_monitor = serialized_mon; - prev_state.persisted_monitor_id = mon_id; - // Wipe any `ChannelMonitor`s which we never told LDK we finished persisting, - // considering them discarded. LDK should replay these for us as they're stored in - // the `ChannelManager`. - prev_state.pending_monitors.clear(); - chain_monitor.latest_monitors.lock().unwrap().insert(channel_id, prev_state); + persister.latest_monitors.lock().unwrap().insert(channel_id, prev_state); } } let mut monitor_refs = new_hash_map(); @@ -1011,17 +1130,27 @@ impl<'a> HarnessNode<'a> { .expect("Failed to read manager"); for (channel_id, mon) in monitors.drain() { assert_eq!( - chain_monitor.chain_monitor.watch_channel(channel_id, mon), + chain_monitor.watch_channel(channel_id, mon), Ok(ChannelMonitorUpdateStatus::Completed) ); } - *chain_monitor.persister.update_ret.lock().unwrap() = self.persistence_style; + // Future monitor writes should follow the node's configured persistence style; only the + // startup watch_channel registration above is forced to Completed. + *persister.update_ret.lock().unwrap() = self.persistence_style; self.node = manager.1; self.monitor = chain_monitor; + self.persister = persister; self.logger = logger; } } +#[derive(Copy, Clone)] +enum MonitorReloadSelector { + Persisted, + FirstPending, + LastPending, +} + #[derive(Copy, Clone)] enum MonitorUpdateSelector { First, @@ -1921,7 +2050,7 @@ fn make_channel( } }; dest.handle_funding_created(source.get_our_node_id(), &funding_created); - // Complete any pending monitor updates for dest after watch_channel. + // Complete any pending monitor persistence callbacks for dest after watch_channel. dest.complete_all_pending_monitor_updates(); let (funding_signed, channel_id) = { @@ -1942,7 +2071,7 @@ fn make_channel( } source.handle_funding_signed(dest.get_our_node_id(), &funding_signed); - // Complete any pending monitor updates for source after watch_channel. + // Complete any pending monitor persistence callbacks for source after watch_channel. source.complete_all_pending_monitor_updates(); let events = source.get_and_clear_pending_events(); @@ -2620,7 +2749,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { "It may take may iterations to settle the state, but it should not take forever" ); } - // Next, make sure no monitor updates are pending. + // Next, make sure no monitor completion callbacks are pending. self.ab_link.complete_all_monitor_updates(&self.nodes); self.bc_link.complete_all_monitor_updates(&self.nodes); // Then, make sure any current forwards make their way to their destination. @@ -3002,18 +3131,18 @@ pub fn do_test(data: &[u8], out: Out) { }, 0xb0 | 0xb1 | 0xb2 => { - // Restart node A, picking among the in-flight `ChannelMonitor`s to use based on - // the value of `v` we're matching. + // Restart node A, picking among persisted and in-flight `ChannelMonitor` + // candidates based on the value of `v` we're matching. harness.restart_node(0, v, &router); }, 0xb3..=0xbb => { - // Restart node B, picking among the in-flight `ChannelMonitor`s to use based on - // the value of `v` we're matching. + // Restart node B, picking among persisted and in-flight `ChannelMonitor` + // candidates based on the value of `v` we're matching. harness.restart_node(1, v, &router); }, 0xbc | 0xbd | 0xbe => { - // Restart node C, picking among the in-flight `ChannelMonitor`s to use based on - // the value of `v` we're matching. + // Restart node C, picking among persisted and in-flight `ChannelMonitor` + // candidates based on the value of `v` we're matching. harness.restart_node(2, v, &router); }, From 61456748e6d59bf8eb82dc549d4faf5d955ade28 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 13 May 2026 13:52:16 +0200 Subject: [PATCH 2/4] fuzz: keep settling after progress-only passes Treat HTLC-forward processing and monitor completion as real progress in the chanmon harness. This keeps the settle loop running after passes that only unblock follow-up work instead of stopping before the next event or message batch. --- fuzz/src/chanmon_consistency.rs | 39 +++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 59d29ac5133..784a745e5db 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -930,10 +930,13 @@ impl<'a> HarnessNode<'a> { self.persister.mark_update_completed(chan_id, monitor_id, data); } - fn complete_all_monitor_updates(&self, chan_id: &ChannelId) { - for (monitor_id, data) in self.persister.drain_pending_updates(chan_id) { + fn complete_all_monitor_updates(&self, chan_id: &ChannelId) -> bool { + let completed_updates = self.persister.drain_pending_updates(chan_id); + let completed_any = !completed_updates.is_empty(); + for (monitor_id, data) in completed_updates { self.finish_monitor_update(*chan_id, monitor_id, data); } + completed_any } fn complete_all_pending_monitor_updates(&self) { @@ -966,9 +969,12 @@ impl<'a> HarnessNode<'a> { } } - fn refresh_serialized_manager(&mut self) { + fn refresh_serialized_manager(&mut self) -> bool { if self.node.get_and_clear_needs_persistence() { self.serialized_manager = self.node.encode(); + true + } else { + false } } @@ -1362,11 +1368,13 @@ impl PeerLink { || (self.node_a == node_b && self.node_b == node_a) } - fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) { + fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) -> bool { + let mut completed_updates = false; for id in &self.channel_ids { - nodes[self.node_a].complete_all_monitor_updates(id); - nodes[self.node_b].complete_all_monitor_updates(id); + completed_updates |= nodes[self.node_a].complete_all_monitor_updates(id); + completed_updates |= nodes[self.node_b].complete_all_monitor_updates(id); } + completed_updates } fn complete_monitor_updates_for_node( @@ -2143,7 +2151,6 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { ChannelMonitorUpdateStatus::Completed }, ]; - let wallet_a = TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap()); let wallet_b = TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap()); let wallet_c = TestWalletSource::new(SecretKey::from_slice(&[3; 32]).unwrap()); @@ -2671,7 +2678,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { // claim/fail handling per event batch. let mut claim_set = new_hash_map(); let mut events = nodes[node_idx].get_and_clear_pending_events(); - let had_events = !events.is_empty(); + let mut had_events = !events.is_empty(); for event in events.drain(..) { match event { events::Event::PaymentClaimable { payment_hash, .. } => { @@ -2727,6 +2734,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } while nodes[node_idx].needs_pending_htlc_processing() { nodes[node_idx].process_pending_htlc_forwards(); + had_events = true; } had_events } @@ -2749,9 +2757,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { "It may take may iterations to settle the state, but it should not take forever" ); } + let mut made_progress = self.refresh_serialized_managers(); // Next, make sure no monitor completion callbacks are pending. - self.ab_link.complete_all_monitor_updates(&self.nodes); - self.bc_link.complete_all_monitor_updates(&self.nodes); + made_progress |= self.ab_link.complete_all_monitor_updates(&self.nodes); + made_progress |= self.bc_link.complete_all_monitor_updates(&self.nodes); // Then, make sure any current forwards make their way to their destination. if self.process_msg_events(0, false, ProcessMessages::AllMessages) { last_pass_no_updates = false; @@ -2778,6 +2787,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { last_pass_no_updates = false; continue; } + if made_progress { + last_pass_no_updates = false; + continue; + } if last_pass_no_updates { // In some cases, we may generate a message to send in // `process_msg_events`, but block sending until @@ -2876,10 +2889,12 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { self.nodes[2].record_last_htlc_clear_fee(); } - fn refresh_serialized_managers(&mut self) { + fn refresh_serialized_managers(&mut self) -> bool { + let mut made_progress = false; for node in &mut self.nodes { - node.refresh_serialized_manager(); + made_progress |= node.refresh_serialized_manager(); } + made_progress } } From 5a47124a5c8f716e2477ba2a3e6c0e449c032a00 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 13 May 2026 13:53:01 +0200 Subject: [PATCH 3/4] fuzz: reload monitors with the configured status Build the replacement persister with the configured monitor update status during reload. This keeps non-deferred restart behavior aligned with the active persistence-style matrix. --- fuzz/src/chanmon_consistency.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 784a745e5db..473beecec33 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -1079,9 +1079,7 @@ impl<'a> HarnessNode<'a> { &mut self, use_old_mons: u8, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) { let logger = Self::build_logger(self.node_id, out); - // Re-registering monitors during reload reflects data that was already selected from - // simulated storage, so these startup watch_channel calls should complete immediately. - let persister = Self::build_persister(ChannelMonitorUpdateStatus::Completed); + let persister = Self::build_persister(self.persistence_style); let chain_monitor = Self::build_chain_monitor( &self.broadcaster, &self.fee_estimator, @@ -1135,14 +1133,8 @@ impl<'a> HarnessNode<'a> { let manager = <(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args) .expect("Failed to read manager"); for (channel_id, mon) in monitors.drain() { - assert_eq!( - chain_monitor.watch_channel(channel_id, mon), - Ok(ChannelMonitorUpdateStatus::Completed) - ); + assert_eq!(chain_monitor.watch_channel(channel_id, mon), Ok(self.persistence_style)); } - // Future monitor writes should follow the node's configured persistence style; only the - // startup watch_channel registration above is forced to Completed. - *persister.update_ret.lock().unwrap() = self.persistence_style; self.node = manager.1; self.monitor = chain_monitor; self.persister = persister; From e0c5818859e62e4b46faf31a75bddd838b8a8681 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 13 May 2026 13:56:10 +0200 Subject: [PATCH 4/4] fuzz: add deferred chanmon checkpoints Track deferred monitor writes in the harness and checkpoint the ChannelManager state before flushing them to the persister. This extends setup, reload, and settle paths to model deferred ChainMonitor persistence ordering. --- fuzz/src/chanmon_consistency.rs | 88 +++++++++++++++++++++++++-------- 1 file changed, 67 insertions(+), 21 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 473beecec33..a93978c409b 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -817,6 +817,7 @@ struct HarnessNode<'a> { fee_estimator: Arc, wallet: TestWalletSource, persistence_style: ChannelMonitorUpdateStatus, + deferred: bool, serialized_manager: Vec, height: u32, last_htlc_clear_fee: u32, @@ -847,7 +848,7 @@ impl<'a> HarnessNode<'a> { fn build_chain_monitor( broadcaster: &Arc, fee_estimator: &Arc, keys_manager: &Arc, logger: Arc, - persister: &Arc, + persister: &Arc, deferred: bool, ) -> Arc { Arc::new(chainmonitor::ChainMonitor::new( None, @@ -857,14 +858,14 @@ impl<'a> HarnessNode<'a> { Arc::clone(persister), Arc::clone(keys_manager), keys_manager.get_peer_storage_key(), - false, + deferred, )) } fn new( node_id: u8, wallet: TestWalletSource, fee_estimator: Arc, broadcaster: Arc, persistence_style: ChannelMonitorUpdateStatus, - out: &Out, router: &'a FuzzRouter, chan_type: ChanType, + deferred: bool, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) -> Self { let logger = Self::build_logger(node_id, out); let node_secret = SecretKey::from_slice(&[ @@ -884,6 +885,7 @@ impl<'a> HarnessNode<'a> { &keys_manager, Arc::clone(&logger), &persister, + deferred, ); let network = Network::Bitcoin; let best_block_timestamp = genesis_block(network).header.time; @@ -913,6 +915,7 @@ impl<'a> HarnessNode<'a> { fee_estimator, wallet, persistence_style, + deferred, serialized_manager: Vec::new(), height: 0, last_htlc_clear_fee: 253, @@ -969,15 +972,33 @@ impl<'a> HarnessNode<'a> { } } - fn refresh_serialized_manager(&mut self) -> bool { + fn checkpoint_manager_persistence(&mut self) -> bool { if self.node.get_and_clear_needs_persistence() { + let pending_monitor_writes = self.monitor.pending_operation_count(); self.serialized_manager = self.node.encode(); + if self.deferred { + self.monitor.flush(pending_monitor_writes, &self.logger); + } else { + assert_eq!(pending_monitor_writes, 0); + } true } else { + assert_eq!(self.monitor.pending_operation_count(), 0); false } } + fn force_checkpoint_manager_persistence(&mut self) { + let pending_monitor_writes = self.monitor.pending_operation_count(); + self.serialized_manager = self.node.encode(); + self.node.get_and_clear_needs_persistence(); + if self.deferred { + self.monitor.flush(pending_monitor_writes, &self.logger); + } else { + assert_eq!(pending_monitor_writes, 0); + } + } + fn bump_fee_estimate(&mut self, chan_type: ChanType) { let mut max_feerate = self.last_htlc_clear_fee; if matches!(chan_type, ChanType::Legacy) { @@ -1086,6 +1107,7 @@ impl<'a> HarnessNode<'a> { &self.keys_manager, Arc::clone(&logger), &persister, + self.deferred, ); let mut monitors = new_hash_map(); @@ -1132,13 +1154,22 @@ impl<'a> HarnessNode<'a> { let manager = <(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args) .expect("Failed to read manager"); + let expected_status = if self.deferred { + ChannelMonitorUpdateStatus::InProgress + } else { + self.persistence_style + }; for (channel_id, mon) in monitors.drain() { - assert_eq!(chain_monitor.watch_channel(channel_id, mon), Ok(self.persistence_style)); + assert_eq!(chain_monitor.watch_channel(channel_id, mon), Ok(expected_status)); } self.node = manager.1; self.monitor = chain_monitor; self.persister = persister; self.logger = logger; + // In deferred mode, the startup watch_channel registrations above queue monitor operations + // even if the reloaded ChannelManager does not need persistence. Always checkpoint here so + // those registrations can be flushed against the manager snapshot they belong to. + self.force_checkpoint_manager_persistence(); } } @@ -1937,9 +1968,12 @@ fn connect_peers(source: &ChanMan<'_>, dest: &ChanMan<'_>) { } fn make_channel( - source: &HarnessNode<'_>, dest: &HarnessNode<'_>, chan_id: i32, trusted_open: bool, - trusted_accept: bool, chain_state: &mut ChainState, + nodes: &mut [HarnessNode<'_>; 3], source_idx: usize, dest_idx: usize, chan_id: i32, + trusted_open: bool, trusted_accept: bool, chain_state: &mut ChainState, ) { + assert!(source_idx < dest_idx); + let (left, right) = nodes.split_at_mut(dest_idx); + let (source, dest) = (&mut left[source_idx], &mut right[0]); if trusted_open { source .create_channel_to_trusted_peer_0reserve( @@ -2050,7 +2084,8 @@ fn make_channel( } }; dest.handle_funding_created(source.get_our_node_id(), &funding_created); - // Complete any pending monitor persistence callbacks for dest after watch_channel. + dest.checkpoint_manager_persistence(); + // Complete any monitor persistence callbacks made available for dest after watch_channel. dest.complete_all_pending_monitor_updates(); let (funding_signed, channel_id) = { @@ -2071,7 +2106,8 @@ fn make_channel( } source.handle_funding_signed(dest.get_our_node_id(), &funding_signed); - // Complete any pending monitor persistence callbacks for source after watch_channel. + source.checkpoint_manager_persistence(); + // Complete any monitor persistence callbacks made available for source after watch_channel. source.complete_all_pending_monitor_updates(); let events = source.get_and_clear_pending_events(); @@ -2143,6 +2179,12 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { ChannelMonitorUpdateStatus::Completed }, ]; + let deferred = [ + config_byte & 0b0010_0000 != 0, + config_byte & 0b0100_0000 != 0, + config_byte & 0b1000_0000 != 0, + ]; + let wallet_a = TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap()); let wallet_b = TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap()); let wallet_c = TestWalletSource::new(SecretKey::from_slice(&[3; 32]).unwrap()); @@ -2179,6 +2221,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_a), Arc::clone(&broadcast_a), persistence_styles[0], + deferred[0], &out, router, chan_type, @@ -2189,6 +2232,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_b), Arc::clone(&broadcast_b), persistence_styles[1], + deferred[1], &out, router, chan_type, @@ -2199,6 +2243,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_c), Arc::clone(&broadcast_c), persistence_styles[2], + deferred[2], &out, router, chan_type, @@ -2216,14 +2261,14 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { // channel gets its own txid and funding outpoint. // A-B: channel 2 A and B have 0-reserve (trusted open + trusted accept), // channel 3 A has 0-reserve (trusted accept). - make_channel(&nodes[0], &nodes[1], 1, false, false, &mut chain_state); - make_channel(&nodes[0], &nodes[1], 2, true, true, &mut chain_state); - make_channel(&nodes[0], &nodes[1], 3, false, true, &mut chain_state); + make_channel(&mut nodes, 0, 1, 1, false, false, &mut chain_state); + make_channel(&mut nodes, 0, 1, 2, true, true, &mut chain_state); + make_channel(&mut nodes, 0, 1, 3, false, true, &mut chain_state); // B-C: channel 4 B has 0-reserve (via trusted accept), // channel 5 C has 0-reserve (via trusted open). - make_channel(&nodes[1], &nodes[2], 4, false, true, &mut chain_state); - make_channel(&nodes[1], &nodes[2], 5, true, false, &mut chain_state); - make_channel(&nodes[1], &nodes[2], 6, false, false, &mut chain_state); + make_channel(&mut nodes, 1, 2, 4, false, true, &mut chain_state); + make_channel(&mut nodes, 1, 2, 5, true, false, &mut chain_state); + make_channel(&mut nodes, 1, 2, 6, false, false, &mut chain_state); // Wipe the transactions-broadcasted set to make sure we don't broadcast // any transactions during normal operation after setup. @@ -2250,7 +2295,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { }; for node in &mut nodes { - node.serialized_manager = node.encode(); + node.force_checkpoint_manager_persistence(); } Self { @@ -2749,7 +2794,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { "It may take may iterations to settle the state, but it should not take forever" ); } - let mut made_progress = self.refresh_serialized_managers(); + let mut made_progress = self.checkpoint_manager_persistences(); // Next, make sure no monitor completion callbacks are pending. made_progress |= self.ab_link.complete_all_monitor_updates(&self.nodes); made_progress |= self.bc_link.complete_all_monitor_updates(&self.nodes); @@ -2881,10 +2926,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { self.nodes[2].record_last_htlc_clear_fee(); } - fn refresh_serialized_managers(&mut self) -> bool { + fn checkpoint_manager_persistences(&mut self) -> bool { let mut made_progress = false; for node in &mut self.nodes { - made_progress |= node.refresh_serialized_manager(); + made_progress |= node.checkpoint_manager_persistence(); } made_progress } @@ -2893,9 +2938,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { #[inline] pub fn do_test(data: &[u8], out: Out) { let router = FuzzRouter {}; - // Read initial monitor styles and channel type from fuzz input byte 0: + // Read initial monitor styles, channel type, and deferred write mode from fuzz input byte 0: // bits 0-2: monitor styles (1 bit per node) // bits 3-4: channel type (0=Legacy, 1=KeyedAnchors, 2=ZeroFeeCommitments) + // bits 5-7: deferred monitor write mode (1 bit per node) let config_byte = if !data.is_empty() { data[0] } else { 0 }; let mut harness = Harness::new(config_byte, out, &router); let mut read_pos = 1; // First byte was consumed for initial config. @@ -3307,7 +3353,7 @@ pub fn do_test(data: &[u8], out: Out) { _ => break 'fuzz_loop, } - harness.refresh_serialized_managers(); + harness.checkpoint_manager_persistences(); } harness.finish(); }