Skip to content

Commit 88e8a9a

Browse files
authored
fix: process broadcast transactions via dispatch_local (#626)
When the SPV client broadcasts a transaction, connected peers never relay it back because Dash Core marks the sender as already having the transaction. This meant broadcast transactions were invisible in the client's mempool state until a restart triggered a full mempool re-fetch. Add `dispatch_local()` to `NetworkManager` to inject messages into the local message pipeline. `broadcast_transaction()` now checks sync state, broadcasts to peers, then injects the tx locally so `MempoolManager::handle_tx()` processes it through the existing path. The sentinel peer address (`0.0.0.0:0`) signals a self-originated transaction, causing `handle_tx` to call `record_send` automatically. A dedup guard skips transactions already tracked in `mempool_state`. Also adds `SyncError::NotSynced` for rejecting broadcasts before sync completes, and a `create_signed_transaction` test helper for building transactions without broadcasting via `dashd`.
1 parent 1e75084 commit 88e8a9a

9 files changed

Lines changed: 270 additions & 22 deletions

File tree

dash-spv/src/client/transactions.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Transaction-related client APIs (e.g., broadcasting)
22
3-
use crate::error::{Result, SpvError};
3+
use crate::error::{NetworkError, Result, SpvError, SyncError};
44
use crate::network::NetworkManager;
55
use crate::storage::StorageManager;
66
use dashcore::network::message::NetworkMessage;
@@ -12,14 +12,26 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager, H: EventHandler>
1212
DashSpvClient<W, N, S, H>
1313
{
1414
/// Broadcast a transaction to all connected peers.
15+
///
16+
/// The transaction is also injected into the local message pipeline so that
17+
/// the mempool manager processes it immediately.
1518
pub async fn broadcast_transaction(&self, tx: &dashcore::Transaction) -> Result<()> {
19+
if !self.sync_progress().await.is_synced() {
20+
return Err(SpvError::Sync(SyncError::NotSynced));
21+
}
22+
1623
let network_guard = self.network.lock().await;
1724

1825
if network_guard.peer_count() == 0 {
19-
return Err(SpvError::Network(crate::error::NetworkError::NotConnected));
26+
return Err(SpvError::Network(NetworkError::NotConnected));
2027
}
2128

2229
let message = NetworkMessage::Tx(tx.clone());
23-
Ok(network_guard.broadcast(message).await?)
30+
network_guard.broadcast(message).await?;
31+
32+
// Inject locally so the mempool manager picks it up through handle_tx.
33+
network_guard.dispatch_local(NetworkMessage::Tx(tx.clone())).await;
34+
35+
Ok(())
2436
}
2537
}

dash-spv/src/error.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,13 +232,19 @@ pub enum SyncError {
232232
/// Masternode sync failed (QRInfo or MnListDiff processing error)
233233
#[error("Masternode sync failed: {0}")]
234234
MasternodeSyncFailed(String),
235+
236+
/// Operation requires the client to be fully synced
237+
#[error("Client is not synced")]
238+
NotSynced,
235239
}
236240

237241
impl SyncError {
238242
/// Returns a static string representing the error category based on the variant
239243
pub fn category(&self) -> &'static str {
240244
match self {
241-
SyncError::SyncInProgress(_) | SyncError::InvalidState(_) => "state",
245+
SyncError::SyncInProgress(_) | SyncError::InvalidState(_) | SyncError::NotSynced => {
246+
"state"
247+
}
242248
SyncError::Timeout(_) => "timeout",
243249
SyncError::Validation(_) => "validation",
244250
SyncError::MissingDependency(_) => "dependency",
@@ -339,6 +345,7 @@ mod tests {
339345
assert_eq!(SyncError::SyncInProgress(ManagerIdentifier::BlockHeader).category(), "state");
340346
assert_eq!(SyncError::InvalidState("test".to_string()).category(), "state");
341347
assert_eq!(SyncError::MissingDependency("test".to_string()).category(), "dependency");
348+
assert_eq!(SyncError::NotSynced.category(), "state");
342349

343350
// Test deprecated SyncFailed always returns "unknown"
344351
#[allow(deprecated)]

dash-spv/src/network/manager.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Peer network manager for SPV client
22
33
use std::collections::{HashMap, HashSet};
4-
use std::net::SocketAddr;
4+
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
55
use std::path::PathBuf;
66
use std::sync::atomic::{AtomicUsize, Ordering};
77
use std::sync::Arc;
@@ -1329,6 +1329,12 @@ impl NetworkManager for PeerNetworkManager {
13291329
Ok(())
13301330
}
13311331

1332+
async fn dispatch_local(&self, message: NetworkMessage) {
1333+
let local_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
1334+
let msg = Message::new(local_addr, message);
1335+
self.message_dispatcher.lock().await.dispatch(&msg);
1336+
}
1337+
13321338
async fn disconnect_peer(&self, addr: &SocketAddr, reason: &str) -> NetworkResult<()> {
13331339
PeerNetworkManager::disconnect_peer(self, addr, reason)
13341340
.await

dash-spv/src/network/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,12 @@ pub trait NetworkManager: Send + Sync + 'static {
223223
/// Broadcast a message to all connected peers.
224224
async fn broadcast(&self, _message: NetworkMessage) -> NetworkResult<()>;
225225

226+
/// Inject a message into the local message dispatcher as if received from a peer.
227+
///
228+
/// Used for locally-originated messages (e.g., self-broadcast transactions) that
229+
/// should be processed through the same pipeline as peer-received messages.
230+
async fn dispatch_local(&self, message: NetworkMessage);
231+
226232
/// Disconnect a specific peer by address.
227233
async fn disconnect_peer(&self, _addr: &SocketAddr, _reason: &str) -> NetworkResult<()>;
228234

dash-spv/src/sync/mempool/manager.rs

Lines changed: 108 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -296,9 +296,27 @@ impl<W: WalletInterface> MempoolManager<W> {
296296
}
297297

298298
/// Handle a received transaction.
299-
pub(super) async fn handle_tx(&mut self, tx: Transaction) -> SyncResult<Vec<SyncEvent>> {
299+
///
300+
/// When `peer` is the local sentinel address (`0.0.0.0:0`), the transaction
301+
/// is treated as self-originated and recorded in `recent_sends`.
302+
pub(super) async fn handle_tx(
303+
&mut self,
304+
tx: Transaction,
305+
peer: SocketAddr,
306+
) -> SyncResult<Vec<SyncEvent>> {
300307
let txid = tx.txid();
301308
self.pending_requests.remove(&txid);
309+
let is_local = peer.ip().is_unspecified();
310+
311+
// Skip if already tracked (e.g., locally broadcast then received from a peer)
312+
if self.mempool_state.read().await.transactions.contains_key(&txid) {
313+
self.seen_txids.insert(txid, Instant::now());
314+
if is_local {
315+
self.mempool_state.write().await.record_send(txid);
316+
}
317+
return Ok(vec![]);
318+
}
319+
302320
self.seen_txids.insert(txid, Instant::now());
303321
self.progress.add_received(1);
304322

@@ -331,6 +349,9 @@ impl<W: WalletInterface> MempoolManager<W> {
331349
{
332350
let mut state = self.mempool_state.write().await;
333351
state.add_transaction(unconfirmed_tx);
352+
if is_local {
353+
state.record_send(txid);
354+
}
334355
self.progress.set_tracked(state.transactions.len() as u32);
335356
}
336357

@@ -345,6 +366,7 @@ impl<W: WalletInterface> MempoolManager<W> {
345366
let mut state = self.mempool_state.write().await;
346367
for txid in txids {
347368
if state.remove_transaction(txid).is_some() {
369+
state.recent_sends.remove(txid);
348370
removed.push(*txid);
349371
}
350372
}
@@ -365,6 +387,7 @@ impl<W: WalletInterface> MempoolManager<W> {
365387
let mut state = self.mempool_state.write().await;
366388
let instant_lock_opt = if let Some(tx) = state.transactions.get_mut(&txid) {
367389
tx.is_instant_send = true;
390+
state.recent_sends.remove(&txid);
368391
tracing::debug!("Marked mempool tx {} as InstantSend-locked", txid);
369392
Some(instant_lock)
370393
} else if self.pending_is_locks.len() < MAX_PENDING_IS_LOCKS {
@@ -712,7 +735,7 @@ mod tests {
712735
};
713736
let txid = tx.txid();
714737

715-
let events = manager.handle_tx(tx).await.unwrap();
738+
let events = manager.handle_tx(tx, test_socket_address(1)).await.unwrap();
716739
// MockWallet returns is_relevant=false by default
717740
assert!(events.is_empty());
718741
assert_eq!(manager.progress.received(), 1);
@@ -831,7 +854,7 @@ mod tests {
831854
};
832855
let txid = tx.txid();
833856

834-
let events = manager.handle_tx(tx).await.unwrap();
857+
let events = manager.handle_tx(tx, test_socket_address(1)).await.unwrap();
835858
assert!(events.is_empty());
836859

837860
// Verify transaction was stored in mempool state
@@ -840,6 +863,72 @@ mod tests {
840863
assert_eq!(manager.progress.received(), 1);
841864
assert_eq!(manager.progress.relevant(), 1);
842865
assert_eq!(manager.progress.tracked(), 1);
866+
drop(state);
867+
868+
// Processing the same transaction again should be a no-op (dedup guard)
869+
let tx2 = Transaction {
870+
version: 1,
871+
lock_time: 0,
872+
input: vec![],
873+
output: vec![],
874+
special_transaction_payload: None,
875+
};
876+
let events = manager.handle_tx(tx2, test_socket_address(1)).await.unwrap();
877+
assert!(events.is_empty());
878+
879+
let state = manager.mempool_state.read().await;
880+
assert_eq!(state.transactions.len(), 1);
881+
// Progress counters should not have incremented
882+
assert_eq!(manager.progress.received(), 1);
883+
assert_eq!(manager.progress.relevant(), 1);
884+
}
885+
886+
#[tokio::test]
887+
async fn test_handle_tx_local_records_send() {
888+
let (mut manager, _requests, _wallet) = create_relevant_manager();
889+
890+
let tx = Transaction {
891+
version: 2,
892+
lock_time: 0,
893+
input: vec![],
894+
output: vec![],
895+
special_transaction_payload: None,
896+
};
897+
let txid = tx.txid();
898+
899+
// Use the unspecified address to simulate a locally broadcast transaction
900+
let local_addr = SocketAddr::from(([0, 0, 0, 0], 0));
901+
manager.handle_tx(tx, local_addr).await.unwrap();
902+
903+
let state = manager.mempool_state.read().await;
904+
assert!(state.transactions.contains_key(&txid));
905+
assert!(
906+
state.is_recent_send(&txid, Duration::from_secs(10)),
907+
"locally dispatched transaction should be recorded as a recent send"
908+
);
909+
}
910+
911+
#[tokio::test]
912+
async fn test_handle_tx_remote_does_not_record_send() {
913+
let (mut manager, _requests, _wallet) = create_relevant_manager();
914+
915+
let tx = Transaction {
916+
version: 3,
917+
lock_time: 0,
918+
input: vec![],
919+
output: vec![],
920+
special_transaction_payload: None,
921+
};
922+
let txid = tx.txid();
923+
924+
manager.handle_tx(tx, test_socket_address(1)).await.unwrap();
925+
926+
let state = manager.mempool_state.read().await;
927+
assert!(state.transactions.contains_key(&txid));
928+
assert!(
929+
!state.is_recent_send(&txid, Duration::from_secs(10)),
930+
"peer-received transaction should not be recorded as a recent send"
931+
);
843932
}
844933

845934
#[tokio::test]
@@ -859,7 +948,7 @@ mod tests {
859948
manager.pending_requests.insert(txid, Instant::now());
860949
assert!(manager.pending_requests.contains_key(&txid));
861950

862-
manager.handle_tx(tx).await.unwrap();
951+
manager.handle_tx(tx, test_socket_address(1)).await.unwrap();
863952
// Pending request should be cleared regardless of relevance
864953
assert!(!manager.pending_requests.contains_key(&txid));
865954

@@ -933,13 +1022,18 @@ mod tests {
9331022
Vec::new(),
9341023
0,
9351024
));
1025+
state.record_send(txid);
9361026
}
9371027

9381028
manager.process_instant_send(dummy_instant_lock(txid)).await;
9391029

940-
// Verify mempool state also reflects IS flag
1030+
// Verify mempool state reflects IS flag and recent_sends is cleaned up
9411031
let state = manager.mempool_state.read().await;
9421032
assert!(state.transactions.get(&txid).unwrap().is_instant_send);
1033+
assert!(
1034+
!state.recent_sends.contains_key(&txid),
1035+
"IS-locked transaction should be removed from recent_sends"
1036+
);
9431037
drop(state);
9441038

9451039
let wallet = manager.wallet.read().await;
@@ -1127,7 +1221,7 @@ mod tests {
11271221
assert!(manager.pending_is_locks.contains_key(&txid));
11281222

11291223
// Transaction arrives
1130-
manager.handle_tx(tx).await.unwrap();
1224+
manager.handle_tx(tx, test_socket_address(1)).await.unwrap();
11311225

11321226
// Pending IS lock consumed
11331227
assert!(manager.pending_is_locks.is_empty());
@@ -1167,7 +1261,7 @@ mod tests {
11671261
assert!(manager.pending_is_locks.contains_key(&txid));
11681262

11691263
// Transaction arrives but wallet says it's not relevant
1170-
manager.handle_tx(tx).await.unwrap();
1264+
manager.handle_tx(tx, test_socket_address(1)).await.unwrap();
11711265

11721266
// Pending IS lock cleaned up (no leak)
11731267
assert!(manager.pending_is_locks.is_empty());
@@ -1349,7 +1443,7 @@ mod tests {
13491443
w.set_mempool_addresses(vec![addr.clone()]);
13501444
}
13511445

1352-
manager.handle_tx(tx).await.unwrap();
1446+
manager.handle_tx(tx, test_socket_address(1)).await.unwrap();
13531447

13541448
let state = manager.mempool_state.read().await;
13551449
let stored = state.transactions.get(&txid).unwrap();
@@ -1378,7 +1472,7 @@ mod tests {
13781472
w.set_mempool_net_amount(-30000);
13791473
}
13801474

1381-
manager.handle_tx(tx).await.unwrap();
1475+
manager.handle_tx(tx, test_socket_address(1)).await.unwrap();
13821476

13831477
let state = manager.mempool_state.read().await;
13841478
let stored = state.transactions.get(&txid).unwrap();
@@ -1491,6 +1585,9 @@ mod tests {
14911585
));
14921586
}
14931587
assert_eq!(state.transactions.len(), 3);
1588+
// Mark two as recent sends
1589+
state.record_send(txids[0]);
1590+
state.record_send(txids[1]);
14941591
}
14951592

14961593
// Remove 2 of the 3 transactions
@@ -1499,6 +1596,8 @@ mod tests {
14991596
let state = manager.mempool_state.read().await;
15001597
assert_eq!(state.transactions.len(), 1);
15011598
assert!(state.transactions.contains_key(&txids[2]));
1599+
assert!(!state.recent_sends.contains_key(&txids[0]));
1600+
assert!(!state.recent_sends.contains_key(&txids[1]));
15021601
drop(state);
15031602

15041603
assert_eq!(manager.progress.removed(), 2);

dash-spv/src/sync/mempool/sync_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl<W: WalletInterface + 'static> SyncManager for MempoolManager<W> {
5050
) -> SyncResult<Vec<SyncEvent>> {
5151
match msg.inner() {
5252
NetworkMessage::Inv(inv) => self.handle_inv(inv, msg.peer_address(), requests).await,
53-
NetworkMessage::Tx(tx) => self.handle_tx(tx.clone()).await,
53+
NetworkMessage::Tx(tx) => self.handle_tx(tx.clone(), msg.peer_address()).await,
5454
_ => Ok(vec![]),
5555
}
5656
}
@@ -816,7 +816,7 @@ mod tests {
816816
output: vec![],
817817
special_transaction_payload: None,
818818
};
819-
manager.handle_tx(tx).await.unwrap();
819+
manager.handle_tx(tx, test_socket_address(1)).await.unwrap();
820820

821821
let has_filter_load = std::iter::from_fn(|| rx.try_recv().ok()).any(|msg| {
822822
matches!(msg, NetworkRequest::SendMessageToPeer(NetworkMessage::FilterLoad(_), _))

0 commit comments

Comments
 (0)