From 0a473290440dddbf36e8d1ac7dc183e38c994d81 Mon Sep 17 00:00:00 2001 From: Ovi Trif Date: Sat, 16 May 2026 00:19:07 +0200 Subject: [PATCH] feat(scoring): expose merged channel liquidities --- lightning/src/routing/scoring.rs | 210 ++++++++++++++++++++++++++++++- 1 file changed, 209 insertions(+), 1 deletion(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index cb377632c29..91e095af6c7 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -492,6 +492,17 @@ pub struct ProbabilisticScorer>, L: Logger> { #[derive(Clone)] pub struct ChannelLiquidities(HashMap); +/// The action to take when two [`ChannelLiquidities`] contain the same short channel id. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ChannelLiquidityMergeAction { + /// Keep the existing entry from the left-hand [`ChannelLiquidities`]. + KeepExisting, + /// Replace the existing entry with the entry from the right-hand [`ChannelLiquidities`]. + ReplaceWithOther, + /// Combine both entries using LDK's per-channel merge logic. + Combine, +} + impl ChannelLiquidities { fn new() -> Self { Self(new_hash_map()) @@ -550,6 +561,75 @@ impl ChannelLiquidities { self.0.get_mut(short_channel_id) } + /// Merge another set of channel liquidities into this one. + /// + /// Both sets are first decayed to `duration_since_epoch` using the given `decay_params`, + /// matching the normalization performed by [`CombinedScorer::merge`]. Entries present in + /// both sets are combined using LDK's per-channel merge logic; entries present in only one + /// set are preserved. Use [`Self::merge_with`] if duplicate entries should be resolved with a + /// custom policy instead. + /// + /// This is primarily useful for offline tooling that reads multiple serialized scorer files, + /// merges them, and writes a new serialized [`ChannelLiquidities`] file. + pub fn merge( + &mut self, other: Self, duration_since_epoch: Duration, + decay_params: ProbabilisticScoringDecayParameters, + ) { + self.merge_with(other, duration_since_epoch, decay_params, |_existing, _other| { + ChannelLiquidityMergeAction::Combine + }); + } + + /// Merge another set of channel liquidities into this one, resolving duplicate entries with + /// `merge_action`. + /// + /// Both sets are first decayed to `duration_since_epoch` using the given `decay_params`, + /// matching the normalization performed by [`CombinedScorer::merge`]. Entries present in only + /// one set are preserved. For duplicate short channel ids, `merge_action` is called with + /// diagnostic views of the existing and incoming entries and decides whether to keep the + /// existing value, replace it with the incoming value, or combine both entries using LDK's + /// per-channel merge logic. + /// + /// This lets offline tooling apply deterministic source-aware policies such as preferring the + /// entry with richer historical data, preferring the newer datapoint, or preserving a trusted + /// source for known-good channels while still writing a regular serialized [`ChannelLiquidities`] + /// file. + pub fn merge_with( + &mut self, mut other: Self, duration_since_epoch: Duration, + decay_params: ProbabilisticScoringDecayParameters, mut merge_action: F, + ) where + F: FnMut( + &ChannelLiquidityDiagnostic, + &ChannelLiquidityDiagnostic, + ) -> ChannelLiquidityMergeAction, + { + self.time_passed(duration_since_epoch, decay_params); + other.time_passed(duration_since_epoch, decay_params); + + for (scid, other_liquidity) in other.0 { + match self.0.entry(scid) { + Entry::Occupied(mut entry) => { + let existing_diagnostic = + ChannelLiquidityDiagnostic::from_internal(scid, entry.get()); + let other_diagnostic = + ChannelLiquidityDiagnostic::from_internal(scid, &other_liquidity); + match merge_action(&existing_diagnostic, &other_diagnostic) { + ChannelLiquidityMergeAction::KeepExisting => {}, + ChannelLiquidityMergeAction::ReplaceWithOther => { + _ = entry.insert(other_liquidity); + }, + ChannelLiquidityMergeAction::Combine => { + entry.get_mut().merge(&other_liquidity); + }, + } + }, + Entry::Vacant(entry) => { + entry.insert(other_liquidity); + }, + } + } + } + /// Produces a read-only [`ChannelLiquidityDiagnostic`] view of every entry, sorted by /// `short_channel_id` for deterministic output. /// @@ -1961,6 +2041,16 @@ impl> + Clone, L: Logger + Clone> CombinedScor pub fn set_scores(&mut self, external_scores: ChannelLiquidities) { self.scorer.set_scores(external_scores); } + + /// Returns the current combined scoring state. + /// + /// Unlike this type's [`Writeable`] implementation, which intentionally persists only the + /// local scorer, this exposes the in-memory scorer that includes any external scores merged + /// through [`Self::merge`] or installed through [`Self::set_scores`]. This lets offline tools + /// serialize the combined view explicitly via [`ChannelLiquidities::write`]. + pub fn scores(&self) -> &ChannelLiquidities { + self.scorer.scores() + } } impl>, L: Logger> ScoreLookUp for CombinedScorer { @@ -2711,7 +2801,8 @@ mod tests { BlindedTail, CandidateRouteHop, Path, PublicHopCandidate, RouteHop, }; use crate::routing::scoring::{ - ChannelLiquidities, ChannelUsage, CombinedScorer, ScoreLookUp, ScoreUpdate, + ChannelLiquidities, ChannelLiquidityMergeAction, ChannelUsage, CombinedScorer, ScoreLookUp, + ScoreUpdate, }; use crate::util::ser::{Readable, ReadableArgs, Writeable}; use crate::util::test_utils::{self, TestLogger}; @@ -4277,6 +4368,108 @@ mod tests { assert_eq!(scores.iter().count(), 2); } + #[test] + #[rustfmt::skip] + fn channel_liquidities_merge_preserves_unique_entries_and_averages_overlaps() { + let last_updated = Duration::ZERO; + let mut first = ChannelLiquidities::new(); + first.insert(42, ChannelLiquidity { + min_liquidity_offset_msat: 100, + max_liquidity_offset_msat: 300, + liquidity_history: HistoricalLiquidityTracker::new(), + last_updated, + offset_history_last_updated: last_updated, + last_datapoint_time: last_updated, + }); + first.insert(43, ChannelLiquidity { + min_liquidity_offset_msat: 10, + max_liquidity_offset_msat: 30, + liquidity_history: HistoricalLiquidityTracker::new(), + last_updated, + offset_history_last_updated: last_updated, + last_datapoint_time: last_updated, + }); + + let mut second = ChannelLiquidities::new(); + second.insert(42, ChannelLiquidity { + min_liquidity_offset_msat: 300, + max_liquidity_offset_msat: 700, + liquidity_history: HistoricalLiquidityTracker::new(), + last_updated, + offset_history_last_updated: last_updated, + last_datapoint_time: last_updated, + }); + second.insert(44, ChannelLiquidity { + min_liquidity_offset_msat: 20, + max_liquidity_offset_msat: 40, + liquidity_history: HistoricalLiquidityTracker::new(), + last_updated, + offset_history_last_updated: last_updated, + last_datapoint_time: last_updated, + }); + + first.merge(second, Duration::ZERO, ProbabilisticScoringDecayParameters::zero_penalty()); + + let diagnostics = first.diagnostics(); + assert_eq!(diagnostics.len(), 3); + let merged = diagnostics.iter().find(|diag| diag.scid == 42).unwrap(); + assert_eq!(merged.min_liquidity_offset_msat, 200); + assert_eq!(merged.max_liquidity_offset_msat, 500); + assert!(diagnostics.iter().any(|diag| diag.scid == 43)); + assert!(diagnostics.iter().any(|diag| diag.scid == 44)); + } + + #[test] + #[rustfmt::skip] + fn channel_liquidities_merge_with_can_choose_overlap_winner() { + let last_updated = Duration::ZERO; + let mut first = ChannelLiquidities::new(); + first.insert(42, ChannelLiquidity { + min_liquidity_offset_msat: 100, + max_liquidity_offset_msat: 300, + liquidity_history: HistoricalLiquidityTracker::new(), + last_updated, + offset_history_last_updated: last_updated, + last_datapoint_time: last_updated, + }); + + let mut second = ChannelLiquidities::new(); + second.insert(42, ChannelLiquidity { + min_liquidity_offset_msat: 300, + max_liquidity_offset_msat: 700, + liquidity_history: HistoricalLiquidityTracker::new(), + last_updated, + offset_history_last_updated: last_updated, + last_datapoint_time: last_updated, + }); + second.insert(44, ChannelLiquidity { + min_liquidity_offset_msat: 20, + max_liquidity_offset_msat: 40, + liquidity_history: HistoricalLiquidityTracker::new(), + last_updated, + offset_history_last_updated: last_updated, + last_datapoint_time: last_updated, + }); + + first.merge_with( + second, + Duration::ZERO, + ProbabilisticScoringDecayParameters::zero_penalty(), + |existing, other| { + assert_eq!(existing.scid, 42); + assert_eq!(other.scid, 42); + ChannelLiquidityMergeAction::ReplaceWithOther + }, + ); + + let diagnostics = first.diagnostics(); + assert_eq!(diagnostics.len(), 2); + let merged = diagnostics.iter().find(|diag| diag.scid == 42).unwrap(); + assert_eq!(merged.min_liquidity_offset_msat, 300); + assert_eq!(merged.max_liquidity_offset_msat, 700); + assert!(diagnostics.iter().any(|diag| diag.scid == 44)); + } + #[test] fn combined_scorer() { let logger = TestLogger::new(); @@ -4342,6 +4535,21 @@ mod tests { combined_scorer.scorer.estimated_channel_liquidity_range(42, &target_node_id()); assert_eq!(liquidity_range.unwrap(), (0, 300)); + let mut serialized_scores = Vec::new(); + combined_scorer.scores().write(&mut serialized_scores).unwrap(); + let exported_scores: ChannelLiquidities = + Readable::read(&mut io::Cursor::new(&serialized_scores)).unwrap(); + let mut scorer_from_export = ProbabilisticScorer::new( + ProbabilisticScoringDecayParameters::default(), + &network_graph, + &logger, + ); + scorer_from_export.set_scores(exported_scores); + assert_eq!( + scorer_from_export.estimated_channel_liquidity_range(42, &target_node_id()).unwrap(), + (0, 300) + ); + // Now set (overwrite) the scorer state with the external data which should lead to an even greater liquidity // range. Just the success from the external source is now considered. combined_scorer.set_scores(external_scores);