Skip to content

Commit 6147323

Browse files
committed
Add distributed deadlock detection via call edge tracking
Before making a cross-database reducer call, register an edge A -> B with a CallEdgeTracker. If adding the edge would create a cycle (distributed deadlock), the tracker returns an error. The caller retries with exponential backoff (5 attempts), then fails with a deadlock error. New trait CallEdgeTracker in core::host::call_edge_tracker with: - register_edge(call_id, caller, callee) -> Result<()> - unregister_edge(call_id) -> Result<()> - unregister_all_edges() -> Result<()> (crash cleanup on startup) NoopCallEdgeTracker for standalone (always allows calls). Cloud implementation will call control DB reducers for cycle detection. Also added register/unregister_reducer_call_edge methods to ControlStateWriteAccess trait (no-op in standalone).
1 parent ee07f87 commit 6147323

7 files changed

Lines changed: 178 additions & 3 deletions

File tree

crates/client-api/src/lib.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,23 @@ pub trait ControlStateWriteAccess: Send + Sync {
344344
owner_identity: &Identity,
345345
domain_names: &[DomainName],
346346
) -> anyhow::Result<SetDomainsResult>;
347+
348+
// Cross-database call edge tracking (distributed deadlock detection)
349+
350+
/// Register a cross-database call edge for deadlock detection.
351+
/// Returns `Err` if the edge would create a cycle (distributed deadlock).
352+
async fn register_reducer_call_edge(
353+
&self,
354+
call_id: &str,
355+
caller: &Identity,
356+
callee: &Identity,
357+
) -> anyhow::Result<()>;
358+
359+
/// Unregister a cross-database call edge after the call completes.
360+
async fn unregister_reducer_call_edge(&self, call_id: &str) -> anyhow::Result<()>;
361+
362+
/// Unregister all edges for this node (crash cleanup on startup).
363+
async fn unregister_all_reducer_call_edges_for_node(&self) -> anyhow::Result<()>;
347364
}
348365

349366
#[async_trait]
@@ -454,6 +471,23 @@ impl<T: ControlStateWriteAccess + ?Sized> ControlStateWriteAccess for Arc<T> {
454471
.replace_dns_records(database_identity, owner_identity, domain_names)
455472
.await
456473
}
474+
475+
async fn register_reducer_call_edge(
476+
&self,
477+
call_id: &str,
478+
caller: &Identity,
479+
callee: &Identity,
480+
) -> anyhow::Result<()> {
481+
(**self).register_reducer_call_edge(call_id, caller, callee).await
482+
}
483+
484+
async fn unregister_reducer_call_edge(&self, call_id: &str) -> anyhow::Result<()> {
485+
(**self).unregister_reducer_call_edge(call_id).await
486+
}
487+
488+
async fn unregister_all_reducer_call_edges_for_node(&self) -> anyhow::Result<()> {
489+
(**self).unregister_all_reducer_call_edges_for_node().await
490+
}
457491
}
458492

459493
#[async_trait]
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/// Trait for tracking cross-database call edges for distributed deadlock detection.
2+
///
3+
/// Before making a cross-database reducer call, the caller registers an edge
4+
/// A -> B (caller -> callee). If this would create a cycle in the call graph,
5+
/// the registration fails with an error, indicating a potential distributed deadlock.
6+
///
7+
/// Implementations differ between deployment modes:
8+
///
9+
/// - **Standalone** -- [`NoopCallEdgeTracker`] always returns `Ok(())`.
10+
/// Single-node deployments cannot have distributed deadlocks.
11+
///
12+
/// - **Cluster** -- Calls a reducer on the control database that inserts the edge
13+
/// and runs cycle detection. Returns `Err` if a cycle is found.
14+
use spacetimedb_lib::Identity;
15+
use std::future::Future;
16+
use std::pin::Pin;
17+
18+
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
19+
20+
pub trait CallEdgeTracker: Send + Sync + 'static {
21+
/// Register a call edge: `caller` is about to call `callee`.
22+
///
23+
/// Returns `Ok(())` if the edge was registered (no cycle).
24+
/// Returns `Err` if registering this edge would create a cycle.
25+
fn register_edge<'a>(
26+
&'a self,
27+
call_id: &'a str,
28+
caller: Identity,
29+
callee: Identity,
30+
) -> BoxFuture<'a, anyhow::Result<()>>;
31+
32+
/// Unregister a call edge after the call completes (success or failure).
33+
fn unregister_edge<'a>(&'a self, call_id: &'a str) -> BoxFuture<'a, anyhow::Result<()>>;
34+
35+
/// Unregister all edges for this node (crash cleanup on startup).
36+
fn unregister_all_edges<'a>(&'a self) -> BoxFuture<'a, anyhow::Result<()>>;
37+
}
38+
39+
/// No-op implementation for standalone (single-node) deployments.
40+
pub struct NoopCallEdgeTracker;
41+
42+
impl CallEdgeTracker for NoopCallEdgeTracker {
43+
fn register_edge<'a>(
44+
&'a self,
45+
_call_id: &'a str,
46+
_caller: Identity,
47+
_callee: Identity,
48+
) -> BoxFuture<'a, anyhow::Result<()>> {
49+
Box::pin(async { Ok(()) })
50+
}
51+
52+
fn unregister_edge<'a>(&'a self, _call_id: &'a str) -> BoxFuture<'a, anyhow::Result<()>> {
53+
Box::pin(async { Ok(()) })
54+
}
55+
56+
fn unregister_all_edges<'a>(&'a self) -> BoxFuture<'a, anyhow::Result<()>> {
57+
Box::pin(async { Ok(()) })
58+
}
59+
}

crates/core/src/host/host_controller.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,7 @@ async fn make_replica_ctx(
729729
call_reducer_auth_token,
730730
prepared_txs: crate::host::prepared_tx::PreparedTransactions::new(),
731731
on_panic: std::sync::Arc::new(std::sync::OnceLock::new()),
732+
call_edge_tracker: std::sync::Arc::new(crate::host::call_edge_tracker::NoopCallEdgeTracker),
732733
})
733734
}
734735

crates/core/src/host/instance_env.rs

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,15 +1011,39 @@ impl InstanceEnv {
10111011
let client = self.replica_ctx.call_reducer_client.clone();
10121012
let router = self.replica_ctx.call_reducer_router.clone();
10131013
let reducer_name = reducer_name.to_owned();
1014-
// Node-level auth token: a single token minted at startup and shared by all replicas
1015-
// on this node. Passed as a Bearer token so `anon_auth_middleware` on the target node
1016-
// accepts the request without generating a fresh ephemeral identity per call.
10171014
let auth_token = self.replica_ctx.call_reducer_auth_token.clone();
10181015
let caller_identity = self.replica_ctx.database.database_identity;
1016+
let edge_tracker = self.replica_ctx.call_edge_tracker.clone();
10191017

10201018
async move {
10211019
let start = Instant::now();
10221020

1021+
// Register call edge for distributed deadlock detection.
1022+
// Retry with backoff if a cycle is detected.
1023+
let call_id = uuid::Uuid::new_v4().to_string();
1024+
const MAX_RETRIES: u32 = 5;
1025+
const BACKOFF_MS: u64 = 100;
1026+
for attempt in 0..MAX_RETRIES {
1027+
match edge_tracker.register_edge(&call_id, caller_identity, database_identity).await {
1028+
Ok(()) => break,
1029+
Err(e) if attempt < MAX_RETRIES - 1 => {
1030+
log::warn!(
1031+
"Cycle detected on call {caller_identity} -> {database_identity} \
1032+
(attempt {attempt}): {e}; retrying"
1033+
);
1034+
tokio::time::sleep(std::time::Duration::from_millis(
1035+
BACKOFF_MS * 2u64.pow(attempt),
1036+
))
1037+
.await;
1038+
}
1039+
Err(e) => {
1040+
return Err(NodesError::HttpError(format!(
1041+
"distributed deadlock detected: {caller_identity} -> {database_identity}: {e}"
1042+
)));
1043+
}
1044+
}
1045+
}
1046+
10231047
let base_url = router
10241048
.resolve_base_url(database_identity)
10251049
.await
@@ -1048,6 +1072,9 @@ impl InstanceEnv {
10481072
}
10491073
.await;
10501074

1075+
// Unregister the call edge (regardless of success/failure).
1076+
let _ = edge_tracker.unregister_edge(&call_id).await;
1077+
10511078
WORKER_METRICS
10521079
.cross_db_reducer_calls_total
10531080
.with_label_values(&caller_identity)
@@ -1082,10 +1109,36 @@ impl InstanceEnv {
10821109
let reducer_name = reducer_name.to_owned();
10831110
let auth_token = self.replica_ctx.call_reducer_auth_token.clone();
10841111
let caller_identity = self.replica_ctx.database.database_identity;
1112+
let edge_tracker = self.replica_ctx.call_edge_tracker.clone();
10851113

10861114
async move {
10871115
let start = Instant::now();
10881116

1117+
// Register call edge for distributed deadlock detection.
1118+
let call_id = uuid::Uuid::new_v4().to_string();
1119+
const MAX_RETRIES: u32 = 5;
1120+
const BACKOFF_MS: u64 = 100;
1121+
for attempt in 0..MAX_RETRIES {
1122+
match edge_tracker.register_edge(&call_id, caller_identity, database_identity).await {
1123+
Ok(()) => break,
1124+
Err(e) if attempt < MAX_RETRIES - 1 => {
1125+
log::warn!(
1126+
"Cycle detected on 2PC call {caller_identity} -> {database_identity} \
1127+
(attempt {attempt}): {e}; retrying"
1128+
);
1129+
tokio::time::sleep(std::time::Duration::from_millis(
1130+
BACKOFF_MS * 2u64.pow(attempt),
1131+
))
1132+
.await;
1133+
}
1134+
Err(e) => {
1135+
return Err(NodesError::HttpError(format!(
1136+
"distributed deadlock detected: {caller_identity} -> {database_identity}: {e}"
1137+
)));
1138+
}
1139+
}
1140+
}
1141+
10891142
let base_url = router
10901143
.resolve_base_url(database_identity)
10911144
.await
@@ -1120,6 +1173,9 @@ impl InstanceEnv {
11201173
}
11211174
.await;
11221175

1176+
// Unregister the call edge (regardless of success/failure).
1177+
let _ = edge_tracker.unregister_edge(&call_id).await;
1178+
11231179
WORKER_METRICS
11241180
.cross_db_reducer_calls_total
11251181
.with_label_values(&caller_identity)
@@ -1511,6 +1567,7 @@ mod test {
15111567
call_reducer_auth_token: None,
15121568
prepared_txs: crate::host::prepared_tx::PreparedTransactions::new(),
15131569
on_panic: std::sync::Arc::new(std::sync::OnceLock::new()),
1570+
call_edge_tracker: Arc::new(crate::host::call_edge_tracker::NoopCallEdgeTracker),
15141571
},
15151572
runtime,
15161573
))

crates/core/src/host/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub mod prepared_tx;
4545
pub mod scheduler;
4646
pub mod wasmtime;
4747

48+
pub mod call_edge_tracker;
4849
// Visible for integration testing.
4950
pub mod instance_env;
5051
pub mod reducer_router;

crates/core/src/replica_context.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use spacetimedb_commitlog::SizeOnDisk;
33
use super::database_logger::DatabaseLogger;
44
use crate::db::relational_db::RelationalDB;
55
use crate::error::DBError;
6+
use crate::host::call_edge_tracker::CallEdgeTracker;
67
use crate::host::prepared_tx::PreparedTransactions;
78
use crate::host::reducer_router::ReducerCallRouter;
89
use crate::messages::control_db::Database;
@@ -72,6 +73,9 @@ pub struct ReplicaContext {
7273
/// async task that can't panic on the WASM executor thread (e.g., 2PC persistence
7374
/// abort in Round 2). Set once by `launch_module`; empty in tests.
7475
pub on_panic: Arc<OnceLock<Box<dyn Fn() + Send + Sync + 'static>>>,
76+
/// Distributed deadlock detection: tracks cross-database call edges.
77+
/// Standalone uses [`crate::host::call_edge_tracker::NoopCallEdgeTracker`].
78+
pub call_edge_tracker: Arc<dyn CallEdgeTracker>,
7579
}
7680

7781
impl ReplicaContext {

crates/standalone/src/lib.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,25 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv {
484484
.control_db
485485
.spacetime_replace_domains(database_identity, owner_identity, domain_names)?)
486486
}
487+
488+
// Distributed deadlock detection: no-op in standalone (single node).
489+
490+
async fn register_reducer_call_edge(
491+
&self,
492+
_call_id: &str,
493+
_caller: &Identity,
494+
_callee: &Identity,
495+
) -> anyhow::Result<()> {
496+
Ok(())
497+
}
498+
499+
async fn unregister_reducer_call_edge(&self, _call_id: &str) -> anyhow::Result<()> {
500+
Ok(())
501+
}
502+
503+
async fn unregister_all_reducer_call_edges_for_node(&self) -> anyhow::Result<()> {
504+
Ok(())
505+
}
487506
}
488507

489508
impl spacetimedb_client_api::Authorization for StandaloneEnv {

0 commit comments

Comments
 (0)