Skip to content

Commit d0cbf19

Browse files
committed
Add CallEdgeTracker trait for distributed deadlock detection
When database A calls a reducer on database B, register the edge A -> B with a CallEdgeTracker. If the edge would create a cycle (B is already waiting for A), return CycleDetected error to abort the call before it deadlocks. - New trait CallEdgeTracker with register_edge, unregister_edge, is_cancelled - NoopCallEdgeTracker for standalone (always succeeds, no cycle detection) - Edge registration in call_reducer_on_db and call_reducer_on_db_2pc - New NodesError::CycleDetected variant - New errno::CYCLE_DETECTED (22) for wasm ABI - Cloud implementation will provide a real tracker that talks to the control DB for cycle detection
1 parent ee07f87 commit d0cbf19

9 files changed

Lines changed: 109 additions & 2 deletions

File tree

crates/bindings-sys/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,7 +1520,7 @@ pub fn call_reducer_on_db(
15201520
// on transport failure. Unlike other ABI functions, a non-zero return value here
15211521
// does NOT indicate a generic errno — it's the HTTP status code. Only HTTP_ERROR
15221522
// specifically signals a transport-level failure.
1523-
if status == Errno::HTTP_ERROR.code() {
1523+
if status == Errno::HTTP_ERROR.code() || status == Errno::CYCLE_DETECTED.code() {
15241524
Err(out)
15251525
} else {
15261526
Ok((status, out))
@@ -1551,7 +1551,7 @@ pub fn call_reducer_on_db_2pc(
15511551
&mut out,
15521552
)
15531553
};
1554-
if status == Errno::HTTP_ERROR.code() {
1554+
if status == Errno::HTTP_ERROR.code() || status == Errno::CYCLE_DETECTED.code() {
15551555
Err(out)
15561556
} else {
15571557
Ok((status, out))

crates/core/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,8 @@ pub enum NodesError {
277277
ScheduleError(#[source] ScheduleError),
278278
#[error("HTTP request failed: {0}")]
279279
HttpError(String),
280+
#[error("distributed deadlock detected: call would create a cycle")]
281+
CycleDetected,
280282
}
281283

282284
impl From<DBError> for NodesError {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
//! Distributed deadlock detection via call-edge tracking.
2+
//!
3+
//! When database A calls a reducer on database B, the edge A -> B is registered
4+
//! with the control DB. If inserting the edge creates a cycle in the call graph,
5+
//! the call that created the cycle is cancelled.
6+
//!
7+
//! The [`CallEdgeTracker`] trait provides the interface. Standalone uses a no-op
8+
//! implementation; cloud implements cycle detection via the control DB.
9+
10+
use spacetimedb_lib::Identity;
11+
12+
/// Error returned when registering a call edge would create a cycle.
13+
#[derive(Debug)]
14+
pub struct CycleDetected;
15+
16+
impl std::fmt::Display for CycleDetected {
17+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18+
write!(f, "distributed deadlock detected: call would create a cycle")
19+
}
20+
}
21+
22+
impl std::error::Error for CycleDetected {}
23+
24+
/// Tracks cross-database call edges for distributed deadlock detection.
25+
///
26+
/// Methods are blocking (not async) because they are called from the database
27+
/// thread, which must not enter an async runtime.
28+
pub trait CallEdgeTracker: Send + Sync {
29+
/// Register that `caller` is about to call a reducer on `target`.
30+
///
31+
/// Returns `Err(CycleDetected)` if the edge would create a cycle in the
32+
/// call graph (i.e. `target` is already waiting, directly or transitively,
33+
/// for `caller`).
34+
fn register_edge(&self, caller: Identity, target: Identity) -> Result<(), CycleDetected>;
35+
36+
/// Remove the edge after the call completes (success or failure).
37+
fn unregister_edge(&self, caller: Identity, target: Identity);
38+
39+
/// Check whether a pending call from `caller` to `target` has been
40+
/// asynchronously cancelled (e.g. because a cycle was detected after
41+
/// the edge was registered).
42+
fn is_cancelled(&self, caller: Identity, target: Identity) -> bool;
43+
}
44+
45+
/// No-op implementation for standalone (single-node) deployments.
46+
///
47+
/// Always succeeds, never detects cycles. Distributed deadlocks are not
48+
/// possible when the control DB is not involved.
49+
pub struct NoopCallEdgeTracker;
50+
51+
impl CallEdgeTracker for NoopCallEdgeTracker {
52+
fn register_edge(&self, _caller: Identity, _target: Identity) -> Result<(), CycleDetected> {
53+
Ok(())
54+
}
55+
56+
fn unregister_edge(&self, _caller: Identity, _target: Identity) {}
57+
58+
fn is_cancelled(&self, _caller: Identity, _target: Identity) -> bool {
59+
false
60+
}
61+
}

crates/core/src/host/host_controller.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,7 @@ async fn make_replica_ctx(
729729
call_reducer_auth_token,
730730
prepared_txs: crate::host::prepared_tx::PreparedTransactions::new(),
731731
on_panic: std::sync::Arc::new(std::sync::OnceLock::new()),
732+
call_edge_tracker: std::sync::Arc::new(crate::host::call_edge_tracker::NoopCallEdgeTracker),
732733
})
733734
}
734735

crates/core/src/host/instance_env.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,10 +1016,16 @@ impl InstanceEnv {
10161016
// accepts the request without generating a fresh ephemeral identity per call.
10171017
let auth_token = self.replica_ctx.call_reducer_auth_token.clone();
10181018
let caller_identity = self.replica_ctx.database.database_identity;
1019+
let edge_tracker = self.replica_ctx.call_edge_tracker.clone();
10191020

10201021
async move {
10211022
let start = Instant::now();
10221023

1024+
// Register call edge for distributed deadlock detection.
1025+
edge_tracker
1026+
.register_edge(caller_identity, database_identity)
1027+
.map_err(|_| NodesError::CycleDetected)?;
1028+
10231029
let base_url = router
10241030
.resolve_base_url(database_identity)
10251031
.await
@@ -1048,6 +1054,9 @@ impl InstanceEnv {
10481054
}
10491055
.await;
10501056

1057+
// Unregister the edge now that the call has completed.
1058+
edge_tracker.unregister_edge(caller_identity, database_identity);
1059+
10511060
WORKER_METRICS
10521061
.cross_db_reducer_calls_total
10531062
.with_label_values(&caller_identity)
@@ -1082,10 +1091,16 @@ impl InstanceEnv {
10821091
let reducer_name = reducer_name.to_owned();
10831092
let auth_token = self.replica_ctx.call_reducer_auth_token.clone();
10841093
let caller_identity = self.replica_ctx.database.database_identity;
1094+
let edge_tracker = self.replica_ctx.call_edge_tracker.clone();
10851095

10861096
async move {
10871097
let start = Instant::now();
10881098

1099+
// Register call edge for distributed deadlock detection.
1100+
edge_tracker
1101+
.register_edge(caller_identity, database_identity)
1102+
.map_err(|_| NodesError::CycleDetected)?;
1103+
10891104
let base_url = router
10901105
.resolve_base_url(database_identity)
10911106
.await
@@ -1120,6 +1135,9 @@ impl InstanceEnv {
11201135
}
11211136
.await;
11221137

1138+
// Unregister the edge now that the call has completed.
1139+
edge_tracker.unregister_edge(caller_identity, database_identity);
1140+
11231141
WORKER_METRICS
11241142
.cross_db_reducer_calls_total
11251143
.with_label_values(&caller_identity)
@@ -1511,6 +1529,7 @@ mod test {
15111529
call_reducer_auth_token: None,
15121530
prepared_txs: crate::host::prepared_tx::PreparedTransactions::new(),
15131531
on_panic: std::sync::Arc::new(std::sync::OnceLock::new()),
1532+
call_edge_tracker: std::sync::Arc::new(crate::host::call_edge_tracker::NoopCallEdgeTracker),
15141533
},
15151534
runtime,
15161535
))

crates/core/src/host/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ where
3636
})
3737
}
3838

39+
pub mod call_edge_tracker;
3940
mod disk_storage;
4041
mod host_controller;
4142
mod module_common;

crates/core/src/host/wasmtime/wasm_instance_env.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2016,6 +2016,15 @@ impl WasmInstanceEnv {
20162016
bytes_source.0.write_to(mem, out)?;
20172017
Ok(errno::HTTP_ERROR.get() as u32)
20182018
}
2019+
Err(NodesError::CycleDetected) => {
2020+
let err_msg = "distributed deadlock detected: call would create a cycle";
2021+
let err_bytes = bsatn::to_vec(&err_msg).with_context(|| {
2022+
"Failed to BSATN-serialize cycle detection error"
2023+
})?;
2024+
let bytes_source = WasmInstanceEnv::create_bytes_source(env, err_bytes.into())?;
2025+
bytes_source.0.write_to(mem, out)?;
2026+
Ok(errno::CYCLE_DETECTED.get() as u32)
2027+
}
20192028
Err(e) => Err(WasmError::Db(e)),
20202029
}
20212030
})
@@ -2081,6 +2090,15 @@ impl WasmInstanceEnv {
20812090
bytes_source.0.write_to(mem, out)?;
20822091
Ok(errno::HTTP_ERROR.get() as u32)
20832092
}
2093+
Err(NodesError::CycleDetected) => {
2094+
let err_msg = "distributed deadlock detected: call would create a cycle";
2095+
let err_bytes = bsatn::to_vec(&err_msg).with_context(|| {
2096+
"Failed to BSATN-serialize cycle detection error"
2097+
})?;
2098+
let bytes_source = WasmInstanceEnv::create_bytes_source(env, err_bytes.into())?;
2099+
bytes_source.0.write_to(mem, out)?;
2100+
Ok(errno::CYCLE_DETECTED.get() as u32)
2101+
}
20842102
Err(e) => Err(WasmError::Db(e)),
20852103
}
20862104
})

crates/core/src/replica_context.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use spacetimedb_commitlog::SizeOnDisk;
33
use super::database_logger::DatabaseLogger;
44
use crate::db::relational_db::RelationalDB;
55
use crate::error::DBError;
6+
use crate::host::call_edge_tracker::CallEdgeTracker;
67
use crate::host::prepared_tx::PreparedTransactions;
78
use crate::host::reducer_router::ReducerCallRouter;
89
use crate::messages::control_db::Database;
@@ -72,6 +73,9 @@ pub struct ReplicaContext {
7273
/// async task that can't panic on the WASM executor thread (e.g., 2PC persistence
7374
/// abort in Round 2). Set once by `launch_module`; empty in tests.
7475
pub on_panic: Arc<OnceLock<Box<dyn Fn() + Send + Sync + 'static>>>,
76+
/// Distributed deadlock detection via call-edge tracking.
77+
/// Standalone: no-op. Cloud: registers edges with control DB for cycle detection.
78+
pub call_edge_tracker: Arc<dyn CallEdgeTracker>,
7579
}
7680

7781
impl ReplicaContext {

crates/primitives/src/errno.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ macro_rules! errnos {
3535
"ABI call can only be made while within a read-only transaction"
3636
),
3737
HTTP_ERROR(21, "The HTTP request failed"),
38+
CYCLE_DETECTED(22, "Distributed deadlock detected: call would create a cycle"),
3839
);
3940
};
4041
}

0 commit comments

Comments
 (0)