Skip to content

Commit e8e861a

Browse files
committed
WIP on wound-wait for 2pc
1 parent 2c04a39 commit e8e861a

22 files changed

Lines changed: 1145 additions & 34 deletions

crates/client-api/src/routes/database.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use axum::response::{ErrorResponse, IntoResponse};
2020
use axum::routing::MethodRouter;
2121
use axum::Extension;
2222
use axum_extra::TypedHeader;
23+
use http::HeaderMap;
2324
use futures::TryStreamExt;
2425
use http::StatusCode;
2526
use log::{info, warn};
@@ -41,7 +42,7 @@ use spacetimedb_lib::bsatn;
4142
use spacetimedb_lib::db::raw_def::v10::RawModuleDefV10;
4243
use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9;
4344
use spacetimedb_lib::de::DeserializeSeed;
44-
use spacetimedb_lib::{sats, AlgebraicValue, Hash, ProductValue, Timestamp};
45+
use spacetimedb_lib::{sats, AlgebraicValue, GlobalTxId, Hash, ProductValue, Timestamp, TX_ID_HEADER};
4546
use spacetimedb_schema::auto_migrate::{
4647
MigrationPolicy as SchemaMigrationPolicy, MigrationToken, PrettyPrintStyle as AutoMigratePrettyPrintStyle,
4748
};
@@ -133,6 +134,7 @@ fn map_procedure_error(e: ProcedureCallError, procedure: &str) -> (StatusCode, S
133134
pub async fn call<S: ControlStateDelegate + NodeDelegate>(
134135
State(worker_ctx): State<S>,
135136
Extension(auth): Extension<SpacetimeAuth>,
137+
headers: HeaderMap,
136138
Path(CallParams {
137139
name_or_identity,
138140
reducer,
@@ -141,6 +143,10 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
141143
body: Bytes,
142144
) -> axum::response::Result<impl IntoResponse> {
143145
let caller_identity = auth.claims.identity;
146+
let tx_id = headers
147+
.get(TX_ID_HEADER)
148+
.and_then(|v| v.to_str().ok())
149+
.and_then(|s| s.parse::<GlobalTxId>().ok());
144150

145151
let args = parse_call_args(content_type, body)?;
146152

@@ -162,6 +168,7 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
162168
.call_reducer_with_return(
163169
caller_identity,
164170
Some(connection_id),
171+
tx_id,
165172
None,
166173
None,
167174
None,
@@ -251,6 +258,7 @@ fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::res
251258
pub async fn prepare<S: ControlStateDelegate + NodeDelegate>(
252259
State(worker_ctx): State<S>,
253260
Extension(auth): Extension<SpacetimeAuth>,
261+
headers: HeaderMap,
254262
Path(CallParams {
255263
name_or_identity,
256264
reducer,
@@ -260,14 +268,18 @@ pub async fn prepare<S: ControlStateDelegate + NodeDelegate>(
260268
) -> axum::response::Result<impl IntoResponse> {
261269
let args = parse_call_args(content_type, body)?;
262270
let caller_identity = auth.claims.identity;
271+
let tx_id = headers
272+
.get(TX_ID_HEADER)
273+
.and_then(|v| v.to_str().ok())
274+
.and_then(|s| s.parse::<GlobalTxId>().ok());
263275

264276
let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?;
265277

266278
// 2PC prepare is a server-to-server call; no client lifecycle management needed.
267279
// call_identity_connected/disconnected submit jobs to the module's executor, which
268280
// will be blocked holding the 2PC write lock after prepare_reducer returns — deadlock.
269281
let result = module
270-
.prepare_reducer(caller_identity, None, &reducer, args)
282+
.prepare_reducer(caller_identity, None, tx_id, &reducer, args)
271283
.await;
272284

273285
match result {
@@ -298,6 +310,12 @@ pub struct TwoPcParams {
298310
prepare_id: String,
299311
}
300312

313+
#[derive(Deserialize)]
314+
pub struct GlobalTxParams {
315+
name_or_identity: NameOrIdentity,
316+
global_tx_id: String,
317+
}
318+
301319
/// 2PC commit endpoint: finalize a prepared transaction.
302320
///
303321
/// `POST /v1/database/:name_or_identity/2pc/commit/:prepare_id`
@@ -389,6 +407,30 @@ pub async fn ack_commit_2pc<S: ControlStateDelegate + NodeDelegate>(
389407
Ok(StatusCode::OK)
390408
}
391409

410+
/// 2PC wound endpoint.
411+
///
412+
/// `POST /v1/database/:name_or_identity/2pc/wound/:global_tx_id`
413+
pub async fn wound_2pc<S: ControlStateDelegate + NodeDelegate>(
414+
State(worker_ctx): State<S>,
415+
Extension(_auth): Extension<SpacetimeAuth>,
416+
Path(GlobalTxParams {
417+
name_or_identity,
418+
global_tx_id,
419+
}): Path<GlobalTxParams>,
420+
) -> axum::response::Result<impl IntoResponse> {
421+
let tx_id = global_tx_id
422+
.parse::<GlobalTxId>()
423+
.map_err(|e| (StatusCode::BAD_REQUEST, e).into_response())?;
424+
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;
425+
426+
module.wound_global_tx(tx_id).await.map_err(|e| {
427+
log::warn!("2PC wound failed for {tx_id}: {e}");
428+
(StatusCode::NOT_FOUND, e).into_response()
429+
})?;
430+
431+
Ok(StatusCode::OK)
432+
}
433+
392434
fn reducer_outcome_response(
393435
module: &ModuleHost,
394436
owner_identity: &Identity,
@@ -426,6 +468,7 @@ fn reducer_outcome_response(
426468
// TODO: different status code? this is what cloudflare uses, sorta
427469
Ok((StatusCode::from_u16(530).unwrap(), (*errmsg).into_response()))
428470
}
471+
ReducerOutcome::Wounded(errmsg) => Ok((StatusCode::CONFLICT, (*errmsg).into_response())),
429472
ReducerOutcome::BudgetExceeded => {
430473
log::warn!("Node's energy budget exceeded for identity: {owner_identity} while executing {reducer}");
431474
Ok((
@@ -1401,6 +1444,8 @@ pub struct DatabaseRoutes<S> {
14011444
pub commit_2pc_post: MethodRouter<S>,
14021445
/// POST: /database/:name_or_identity/2pc/abort/:prepare_id
14031446
pub abort_2pc_post: MethodRouter<S>,
1447+
/// POST: /database/:name_or_identity/2pc/wound/:global_tx_id
1448+
pub wound_2pc_post: MethodRouter<S>,
14041449
/// GET: /database/:name_or_identity/2pc/status/:prepare_id
14051450
pub status_2pc_get: MethodRouter<S>,
14061451
/// POST: /database/:name_or_identity/2pc/ack-commit/:prepare_id
@@ -1433,6 +1478,7 @@ where
14331478
prepare_post: post(prepare::<S>),
14341479
commit_2pc_post: post(commit_2pc::<S>),
14351480
abort_2pc_post: post(abort_2pc::<S>),
1481+
wound_2pc_post: post(wound_2pc::<S>),
14361482
status_2pc_get: get(status_2pc::<S>),
14371483
ack_commit_2pc_post: post(ack_commit_2pc::<S>),
14381484
}
@@ -1463,6 +1509,7 @@ where
14631509
.route("/prepare/:reducer", self.prepare_post)
14641510
.route("/2pc/commit/:prepare_id", self.commit_2pc_post)
14651511
.route("/2pc/abort/:prepare_id", self.abort_2pc_post)
1512+
.route("/2pc/wound/:global_tx_id", self.wound_2pc_post)
14661513
.route("/2pc/status/:prepare_id", self.status_2pc_get)
14671514
.route("/2pc/ack-commit/:prepare_id", self.ack_commit_2pc_post);
14681515

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# Distributed Wound-Wait for 2PC Deadlock Breaking
2+
3+
## Problem
4+
5+
Distributed reducers can deadlock when one distributed transaction holds a participant lock on one database and waits on another database locked by a younger distributed transaction. Existing 2PC ensures atomic commit/abort, but it does not resolve distributed lock cycles.
6+
7+
## Chosen Model
8+
9+
- Use wound-wait.
10+
- Transaction identity is `GlobalTxId`.
11+
- `GlobalTxId.creator_db` is the authoritative coordinator database.
12+
- Participant and coordinator runtime state are keyed by `GlobalTxId`.
13+
- `prepare_id` remains a participant-local 2PC phase handle only.
14+
- Older transactions wound younger transactions.
15+
- Younger transactions wait behind older transactions.
16+
- Lock acquisition is managed by an owner-aware scheduler that tracks running and pending `GlobalTxId`s.
17+
- A wound RPC is required because a younger lock holder may belong to a distributed transaction coordinated on a different database.
18+
19+
## Runtime Model
20+
21+
- Add a distributed session registry keyed by `GlobalTxId`.
22+
- Session tracks role, state, local `prepare_id`, coordinator identity, participants, and a local wounded/abort signal.
23+
- Add a per-database async lock scheduler for distributed reducer write acquisition.
24+
- Scheduler state includes current running owner, pending queue/set, and wounded marker for the current owner.
25+
- Requesters await scheduler admission before reducer execution starts a local mutable transaction.
26+
27+
## Wound Protocol
28+
29+
Participant detecting conflict compares requester and owner by `GlobalTxId` ordering.
30+
31+
- If requester is older:
32+
- mark local owner as wounded
33+
- send wound RPC to coordinator `GlobalTxId.creator_db` if needed
34+
- keep requester pending until local owner releases
35+
- If requester is younger:
36+
- requester stays pending
37+
38+
Wound RPC is idempotent and targets the distributed session at the coordinator.
39+
40+
Coordinator receiving wound:
41+
42+
- transitions session to `Aborting`
43+
- sets wounded flag
44+
- aborts local execution cooperatively
45+
- fans out abort to known prepared participants
46+
47+
## Safe Points
48+
49+
- Before remote reducer calls
50+
- Before PREPARE / COMMIT path work
51+
- After reducer body returns, before expensive post-processing
52+
53+
On safe-point wound detection:
54+
55+
- rollback local tx
56+
- unregister scheduler ownership
57+
- wake waiters
58+
- surface retryable `wounded` error
59+
60+
## Compatibility
61+
62+
- Keep existing `/2pc/commit`, `/2pc/abort`, `/2pc/status`, and ack-commit flows.
63+
- Add a new wound endpoint.
64+
- `/prepare` must propagate `GlobalTxId` the same way `/call` already does.
65+
- No durable format change unless recovery work later proves it necessary.
66+
67+
## Implementation Sequence
68+
69+
### 1. Propagate `GlobalTxId` through 2PC prepare path
70+
71+
- Update outgoing 2PC prepare requests to send `X-Spacetime-Tx-Id`.
72+
- Update incoming `/prepare/:reducer` to parse `X-Spacetime-Tx-Id`.
73+
- Thread `GlobalTxId` through `prepare_reducer` and any participant execution params.
74+
- Ensure recovered/replayed participant work can recover or reconstruct the same session identity.
75+
76+
### 2. Replace minimal prepared registry with `GlobalTxId` session registry
77+
78+
- Extend the current prepared transaction registry into a session manager keyed by `GlobalTxId`.
79+
- Track:
80+
- role
81+
- state
82+
- local `prepare_id`
83+
- participants
84+
- coordinator identity
85+
- wounded/abort signal
86+
- Provide lookup by both `GlobalTxId` and `prepare_id`.
87+
88+
### 3. Add distributed lock scheduler
89+
90+
- Add an async scheduler in `core`, adjacent to reducer tx startup, not inside raw datastore locking.
91+
- Track running owner and pending `GlobalTxId`s.
92+
- Require distributed reducer write acquisition to await scheduler admission before blocking datastore acquisition.
93+
- Implement wound-wait ordering and wakeup behavior there.
94+
95+
### 4. Add wound RPC endpoint and coordinator handler
96+
97+
- Add `POST /v1/database/:name_or_identity/2pc/wound/:global_tx_id`.
98+
- Parse and route by `GlobalTxId.creator_db`.
99+
- Coordinator session handler must:
100+
- mark session `Aborting`
101+
- set wounded flag
102+
- begin participant abort fanout
103+
- behave idempotently
104+
105+
### 5. Add cooperative abort checks in reducer execution
106+
107+
- Add wound checks at required safe points.
108+
- On wound:
109+
- rollback
110+
- unregister running owner
111+
- notify scheduler waiters
112+
- surface retryable wounded error
113+
114+
### 6. Integrate scheduler + wound with 2PC transitions
115+
116+
- Ensure PREPARE, COMMIT, ABORT, and recovery all keep scheduler and session registry consistent.
117+
- Make local owner release happen on all terminal paths.
118+
- Keep participant recovery compatible with session state.
119+
120+
### 7. Add tests
121+
122+
- Scheduler ordering and wakeup tests
123+
- Local same-database wound tests
124+
- Distributed deadlock cycle tests
125+
- Wound RPC idempotency tests
126+
- Recovery tests for wounded prepared transactions
127+
- Regression tests for existing 2PC success/failure flows
128+
129+
## Acceptance Criteria
130+
131+
- Distributed deadlock cycles are broken deterministically by wound-wait.
132+
- Older distributed transactions eventually proceed without manual intervention.
133+
- Younger distributed transactions abort globally, not just locally.
134+
- `/prepare` and `/call` both carry `GlobalTxId`.
135+
- Existing 2PC happy paths continue to pass.
136+
- Repeated wound or abort requests are safe and idempotent.
137+
138+
## Assumptions
139+
140+
- `GlobalTxId.creator_db` is always the coordinator database.
141+
- `GlobalTxId` ordering is the authoritative age/tie-break rule.
142+
- Cooperative abort at safe points is sufficient for v1; no preemptive interruption is required.
143+
- Lock scheduler state is in-memory runtime state, not durable state.

crates/core/src/client/client_connection.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,7 @@ impl ClientConnection {
852852
.call_reducer(
853853
self.id.identity,
854854
Some(self.id.connection_id),
855+
None,
855856
caller,
856857
Some(request_id),
857858
Some(timer),
@@ -873,6 +874,7 @@ impl ClientConnection {
873874
.call_reducer(
874875
self.id.identity,
875876
Some(self.id.connection_id),
877+
None,
876878
Some(self.sender()),
877879
Some(request_id),
878880
Some(timer),

crates/core/src/client/messages.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,9 @@ impl ToProtocol for TransactionUpdateMessage {
379379

380380
let status = match &event.status {
381381
EventStatus::Committed(_) => ws_v1::UpdateStatus::Committed(update),
382-
EventStatus::FailedUser(errmsg) | EventStatus::FailedInternal(errmsg) => {
382+
EventStatus::FailedUser(errmsg)
383+
| EventStatus::FailedInternal(errmsg)
384+
| EventStatus::Wounded(errmsg) => {
383385
ws_v1::UpdateStatus::Failed(errmsg.clone().into())
384386
}
385387
EventStatus::OutOfEnergy => ws_v1::UpdateStatus::OutOfEnergy,

crates/core/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,8 @@ pub enum NodesError {
275275
BadIndexType(u8),
276276
#[error("Failed to scheduled timer: {0}")]
277277
ScheduleError(#[source] ScheduleError),
278+
#[error("Distributed transaction wounded: {0}")]
279+
Wounded(String),
278280
#[error("HTTP request failed: {0}")]
279281
HttpError(String),
280282
}

0 commit comments

Comments
 (0)