Skip to content

Commit 7e2aa23

Browse files
authored
feat: rebroadcast unconfirmed self-sent transactions (#627)
* feat: rebroadcast unconfirmed self-sent transactions Each self-sent transaction in `recent_sends` tracks when it was last broadcast. Transactions whose last broadcast was more than 10 minutes ago are rebroadcast to all peers on each tick. Add `BroadcastMessage` variant to `NetworkRequest` so the mempool manager can request a broadcast via the request queue. Clean up `recent_sends` when a transaction is confirmed or IS-locked, since there is no need to rebroadcast finalized transactions. * test(dash-spv): fix Windows `Instant` underflow in rebroadcast test Project `now` forward via a private `rebroadcast_if_due_at` helper instead of subtracting `REBROADCAST_INTERVAL` from `Instant::now()`. On Windows, `Instant` is backed by `QueryPerformanceCounter` measured from boot, so on a freshly provisioned runner with uptime below the interval the subtraction underflows and panics. * refactor: broadcast inline in `rebroadcast_if_due_at` instead of collecting into a `Vec` Addresses ZocoLini review comment on PR #627 #627 (comment)
1 parent 747d2fa commit 7e2aa23

4 files changed

Lines changed: 124 additions & 0 deletions

File tree

dash-spv/src/network/manager.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,21 @@ impl PeerNetworkManager {
822822
}
823823
});
824824
}
825+
Some(NetworkRequest::BroadcastMessage(msg)) => {
826+
tracing::debug!("Request processor: broadcasting {}", msg.cmd());
827+
let this = this.clone();
828+
tokio::spawn(async move {
829+
let results = this.broadcast(msg).await;
830+
let failures = results.iter().filter(|r| r.is_err()).count();
831+
if failures > 0 {
832+
tracing::warn!(
833+
"Request processor: broadcast had {} failures out of {} peers",
834+
failures,
835+
results.len()
836+
);
837+
}
838+
});
839+
}
825840
None => {
826841
tracing::info!("Request processor: channel closed");
827842
break;

dash-spv/src/network/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ pub enum NetworkRequest {
4747
SendMessage(NetworkMessage),
4848
/// Send a message to a specific peer.
4949
SendMessageToPeer(NetworkMessage, SocketAddr),
50+
/// Broadcast a message to all connected peers.
51+
BroadcastMessage(NetworkMessage),
5052
}
5153

5254
/// Handle for managers to queue outgoing network requests.
@@ -81,6 +83,13 @@ impl RequestSender {
8183
.map_err(|e| NetworkError::ProtocolError(e.to_string()))
8284
}
8385

86+
/// Queue a message to be broadcast to all connected peers.
87+
pub(crate) fn broadcast(&self, msg: NetworkMessage) -> NetworkResult<()> {
88+
self.tx
89+
.send(NetworkRequest::BroadcastMessage(msg))
90+
.map_err(|e| NetworkError::ProtocolError(e.to_string()))
91+
}
92+
8493
/// Request inventory from a specific peer.
8594
pub fn request_inventory(
8695
&self,

dash-spv/src/sync/mempool/manager.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::sync::Arc;
1111
use std::time::{Duration, Instant};
1212

1313
use dashcore::ephemerealdata::instant_lock::InstantLock;
14+
use dashcore::network::message::NetworkMessage;
1415
use dashcore::network::message_blockdata::Inventory;
1516
use dashcore::{Amount, Transaction, Txid};
1617
use rand::seq::IteratorRandom;
@@ -42,6 +43,9 @@ const MAX_PENDING_IS_LOCKS: usize = 1000;
4243
/// Covers the window where multiple peers respond to the initial `mempool` request.
4344
const SEEN_TXID_EXPIRY: Duration = Duration::from_secs(180);
4445

46+
/// Per-transaction interval between rebroadcast attempts (10 minutes).
47+
const REBROADCAST_INTERVAL: Duration = Duration::from_secs(600);
48+
4549
/// Mempool manager that monitors unconfirmed transactions from the P2P network.
4650
///
4751
/// Tracks connected peers via a unified map where:
@@ -436,6 +440,36 @@ impl<W: WalletInterface> MempoolManager<W> {
436440
}
437441
}
438442

443+
/// Rebroadcast unconfirmed self-sent transactions to all peers.
444+
///
445+
/// Each transaction in `recent_sends` tracks when it was last broadcast.
446+
/// Transactions whose last broadcast was more than `REBROADCAST_INTERVAL`
447+
/// ago are rebroadcast and their timestamp is reset.
448+
pub(super) async fn rebroadcast_if_due(&mut self, requests: &RequestSender) {
449+
self.rebroadcast_if_due_at(requests, Instant::now()).await
450+
}
451+
452+
/// `now`-injected variant of [`Self::rebroadcast_if_due`]. Tests project `now`
453+
/// forward instead of subtracting from `Instant::now()`, which underflows on
454+
/// Windows when the QPC-based monotonic clock has a small value at boot.
455+
async fn rebroadcast_if_due_at(&mut self, requests: &RequestSender, now: Instant) {
456+
let mut count: usize = 0;
457+
for (txid, last_broadcast) in &mut self.recent_sends {
458+
if now.saturating_duration_since(*last_broadcast) < REBROADCAST_INTERVAL {
459+
continue;
460+
}
461+
if let Some(unconfirmed) = self.transactions.get(txid) {
462+
let _ = requests.broadcast(NetworkMessage::Tx(unconfirmed.transaction.clone()));
463+
*last_broadcast = now;
464+
count += 1;
465+
}
466+
}
467+
468+
if count > 0 {
469+
tracing::info!("Rebroadcast {} unconfirmed transaction(s) to all peers", count);
470+
}
471+
}
472+
439473
fn is_queued(&self, txid: &Txid) -> bool {
440474
self.peers.values().filter_map(|v| v.as_ref()).any(|q| q.contains(txid))
441475
}
@@ -1603,6 +1637,69 @@ mod tests {
16031637
);
16041638
}
16051639

1640+
#[tokio::test]
1641+
async fn test_rebroadcast_sends_old_recent_sends() {
1642+
let (mut manager, requests, mut rx) = create_test_manager();
1643+
1644+
let tx = Transaction {
1645+
version: 10,
1646+
lock_time: 0,
1647+
input: vec![],
1648+
output: vec![],
1649+
special_transaction_payload: None,
1650+
};
1651+
let txid = tx.txid();
1652+
1653+
let t0 = Instant::now();
1654+
let later = t0 + REBROADCAST_INTERVAL + Duration::from_secs(1);
1655+
1656+
manager.transactions.insert(
1657+
txid,
1658+
UnconfirmedTransaction::new(tx, Amount::from_sat(0), false, true, Vec::new(), -100_000),
1659+
);
1660+
manager.recent_sends.insert(txid, t0);
1661+
1662+
manager.rebroadcast_if_due_at(&requests, later).await;
1663+
1664+
// Should have sent a BroadcastMessage for the transaction
1665+
let msg = rx.try_recv().expect("expected a rebroadcast message");
1666+
assert!(
1667+
matches!(msg, NetworkRequest::BroadcastMessage(NetworkMessage::Tx(_))),
1668+
"expected BroadcastMessage(Tx), got {:?}",
1669+
msg
1670+
);
1671+
1672+
// Timestamp should be reset to `later`, so a second call at the same instant
1673+
// must not rebroadcast.
1674+
manager.rebroadcast_if_due_at(&requests, later).await;
1675+
assert!(rx.try_recv().is_err(), "should not rebroadcast immediately after reset");
1676+
}
1677+
1678+
#[tokio::test]
1679+
async fn test_rebroadcast_skips_recent_transactions() {
1680+
let (mut manager, requests, mut rx) = create_test_manager();
1681+
1682+
let tx = Transaction {
1683+
version: 11,
1684+
lock_time: 0,
1685+
input: vec![],
1686+
output: vec![],
1687+
special_transaction_payload: None,
1688+
};
1689+
let txid = tx.txid();
1690+
1691+
// Add a transaction that was just sent (within the rebroadcast interval)
1692+
manager.transactions.insert(
1693+
txid,
1694+
UnconfirmedTransaction::new(tx, Amount::from_sat(0), false, true, Vec::new(), -50_000),
1695+
);
1696+
manager.recent_sends.insert(txid, Instant::now());
1697+
1698+
manager.rebroadcast_if_due(&requests).await;
1699+
1700+
assert!(rx.try_recv().is_err(), "recently sent transactions should not be rebroadcast");
1701+
}
1702+
16061703
#[test]
16071704
fn test_peer_disconnect_keeps_other_peers_intact() {
16081705
let (mut manager, _requests, _rx) = create_test_manager();

dash-spv/src/sync/mempool/sync_manager.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ impl<W: WalletInterface + 'static> SyncManager for MempoolManager<W> {
117117
// Send queued getdata requests now that slots may have freed up
118118
self.send_queued(requests).await?;
119119

120+
// Rebroadcast unconfirmed self-sent transactions on a randomized interval
121+
self.rebroadcast_if_due(requests).await;
122+
120123
// Rebuild bloom filter if the wallet's monitored set has changed.
121124
//
122125
// We poll the revision counter rather than using push-based wallet events

0 commit comments

Comments
 (0)