Skip to content

Commit 74bc3eb

Browse files
committed
recovery
1 parent 1976070 commit 74bc3eb

7 files changed

Lines changed: 606 additions & 114 deletions

File tree

crates/client-api/src/routes/database.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,31 @@ pub async fn abort_2pc<S: ControlStateDelegate + NodeDelegate>(
349349
Ok(StatusCode::OK)
350350
}
351351

352+
/// 2PC coordinator status endpoint.
353+
///
354+
/// Returns `"commit"` if the coordinator has durably decided COMMIT for `prepare_id`,
355+
/// or `"abort"` otherwise. Participant B polls this to recover from a timeout or crash.
356+
///
357+
/// `GET /v1/database/:name_or_identity/2pc/status/:prepare_id`
358+
pub async fn status_2pc<S: ControlStateDelegate + NodeDelegate>(
359+
State(worker_ctx): State<S>,
360+
Extension(_auth): Extension<SpacetimeAuth>,
361+
Path(TwoPcParams {
362+
name_or_identity,
363+
prepare_id,
364+
}): Path<TwoPcParams>,
365+
) -> axum::response::Result<impl IntoResponse> {
366+
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;
367+
368+
let decision = if module.has_2pc_coordinator_commit(&prepare_id) {
369+
"commit"
370+
} else {
371+
"abort"
372+
};
373+
374+
Ok((StatusCode::OK, decision))
375+
}
376+
352377
fn reducer_outcome_response(
353378
module: &ModuleHost,
354379
owner_identity: &Identity,
@@ -1361,6 +1386,8 @@ pub struct DatabaseRoutes<S> {
13611386
pub commit_2pc_post: MethodRouter<S>,
13621387
/// POST: /database/:name_or_identity/2pc/abort/:prepare_id
13631388
pub abort_2pc_post: MethodRouter<S>,
1389+
/// GET: /database/:name_or_identity/2pc/status/:prepare_id
1390+
pub status_2pc_get: MethodRouter<S>,
13641391
}
13651392

13661393
impl<S> Default for DatabaseRoutes<S>
@@ -1389,6 +1416,7 @@ where
13891416
prepare_post: post(prepare::<S>),
13901417
commit_2pc_post: post(commit_2pc::<S>),
13911418
abort_2pc_post: post(abort_2pc::<S>),
1419+
status_2pc_get: get(status_2pc::<S>),
13921420
}
13931421
}
13941422
}
@@ -1416,7 +1444,8 @@ where
14161444
.route("/reset", self.db_reset)
14171445
.route("/prepare/:reducer", self.prepare_post)
14181446
.route("/2pc/commit/:prepare_id", self.commit_2pc_post)
1419-
.route("/2pc/abort/:prepare_id", self.abort_2pc_post);
1447+
.route("/2pc/abort/:prepare_id", self.abort_2pc_post)
1448+
.route("/2pc/status/:prepare_id", self.status_2pc_get);
14201449

14211450
axum::Router::new()
14221451
.route("/", self.root_post)

crates/core/src/db/relational_db.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -454,14 +454,25 @@ impl RelationalDB {
454454
}
455455

456456
/// Read any 2PC participant transactions that were in PREPARE state when the database
457-
/// last shut down (or crashed). Each returned string is a `prepare_id`.
457+
/// last shut down (or crashed).
458458
///
459-
/// If non-empty, the caller must resume these transactions: retransmit PREPARED to
460-
/// the coordinator and await a COMMIT or ABORT decision before allowing normal operation.
461-
pub fn pending_2pc_prepares(&self) -> Result<Vec<String>, DBError> {
459+
/// Each returned row contains all the information needed to resume the transaction:
460+
/// the prepare_id, coordinator identity, reducer name/args, and caller context.
461+
/// B never aborts on its own — it polls the coordinator for a decision.
462+
pub fn pending_2pc_prepares(&self) -> Result<Vec<spacetimedb_datastore::system_tables::St2pcStateRow>, DBError> {
462463
self.with_auto_commit(Workload::Internal, |tx| tx.scan_st_2pc_state().map_err(DBError::from))
463464
}
464465

466+
/// Read any 2PC coordinator log entries that have not yet been acknowledged by their
467+
/// participants. Used on coordinator crash-recovery to retransmit COMMIT decisions.
468+
pub fn pending_2pc_coordinator_commits(
469+
&self,
470+
) -> Result<Vec<spacetimedb_datastore::system_tables::St2pcCoordinatorLogRow>, DBError> {
471+
self.with_auto_commit(Workload::Internal, |tx| {
472+
tx.scan_st_2pc_coordinator_log().map_err(DBError::from)
473+
})
474+
}
475+
465476
/// Read the set of clients currently connected to the database.
466477
pub fn connected_clients(&self) -> Result<ConnectedClients, DBError> {
467478
self.with_read_only(Workload::Internal, |tx| {

crates/core/src/host/host_controller.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,6 +1141,11 @@ impl Host {
11411141
module_host.clear_all_clients().await?;
11421142

11431143
scheduler_starter.start(&module_host)?;
1144+
1145+
// Crash recovery: retransmit any pending 2PC decisions from before the restart.
1146+
module_host.recover_2pc_coordinator();
1147+
module_host.recover_2pc_participant();
1148+
11441149
let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
11451150
let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone());
11461151

crates/core/src/host/module_host.rs

Lines changed: 229 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,17 +1808,18 @@ impl ModuleHost {
18081808
let this = self.clone();
18091809
let reducer_name_owned = reducer_def.name.clone();
18101810
let prepare_id_clone = prepare_id.clone();
1811+
let coordinator_identity = caller_identity;
18111812
tokio::spawn(async move {
18121813
let _ = this
18131814
.call(
18141815
&reducer_name_owned,
1815-
(params, prepare_id_clone, prepared_tx, decision_rx),
1816-
async |(p, pid, ptx, drx), inst| {
1817-
inst.call_reducer_prepare_and_hold(p, pid, ptx, drx);
1816+
(params, prepare_id_clone, coordinator_identity, prepared_tx, decision_rx),
1817+
async |(p, pid, cid, ptx, drx), inst| {
1818+
inst.call_reducer_prepare_and_hold(p, pid, cid, ptx, drx);
18181819
Ok::<(), ReducerCallError>(())
18191820
},
18201821
// JS modules: no 2PC support yet.
1821-
async |(p, _pid, ptx, _drx), inst| {
1822+
async |(p, _pid, _cid, ptx, _drx), inst| {
18221823
let (res, rv) = inst.call_reducer(p).await.map(|r| (r, None)).unwrap_or_else(|e| {
18231824
log::error!("prepare_reducer JS fallback: {e}");
18241825
(
@@ -1874,6 +1875,230 @@ impl ModuleHost {
18741875
Ok(())
18751876
}
18761877

1878+
/// Check whether `prepare_id` is present in the coordinator log of this database.
1879+
/// Used by participant B to ask coordinator A: "did you commit?"
1880+
pub fn has_2pc_coordinator_commit(&self, prepare_id: &str) -> bool {
1881+
let db = self.relational_db();
1882+
db.pending_2pc_coordinator_commits()
1883+
.map(|rows| rows.iter().any(|r| r.participant_prepare_id == prepare_id))
1884+
.unwrap_or(false)
1885+
}
1886+
1887+
/// Crash recovery for the **coordinator** role.
1888+
///
1889+
/// Scans `st_2pc_coordinator_log` for participants that have not yet acked
1890+
/// COMMIT and retransmits the HTTP commit call. Deletes the log entry on success.
1891+
pub fn recover_2pc_coordinator(&self) {
1892+
let db = self.relational_db().clone();
1893+
let rows = match db.pending_2pc_coordinator_commits() {
1894+
Ok(r) => r,
1895+
Err(e) => {
1896+
log::error!("recover_2pc_coordinator: scan failed: {e}");
1897+
return;
1898+
}
1899+
};
1900+
if rows.is_empty() {
1901+
return;
1902+
}
1903+
let replica_ctx = self.replica_ctx().clone();
1904+
let db2 = db.clone();
1905+
tokio::spawn(async move {
1906+
let client = replica_ctx.call_reducer_client.clone();
1907+
let router = replica_ctx.call_reducer_router.clone();
1908+
let auth_token = replica_ctx.call_reducer_auth_token.clone();
1909+
for row in rows {
1910+
let prepare_id = row.participant_prepare_id.clone();
1911+
let participant_identity = match Identity::from_hex(&row.participant_identity_hex) {
1912+
Ok(id) => id,
1913+
Err(e) => {
1914+
log::error!("recover_2pc_coordinator: invalid participant identity hex {}: {e}", row.participant_identity_hex);
1915+
continue;
1916+
}
1917+
};
1918+
let base_url = match router.resolve_base_url(participant_identity).await {
1919+
Ok(url) => url,
1920+
Err(e) => {
1921+
log::warn!("recover_2pc_coordinator: cannot resolve URL for {participant_identity}: {e}");
1922+
continue;
1923+
}
1924+
};
1925+
let url = format!(
1926+
"{}/v1/database/{}/2pc/commit/{}",
1927+
base_url,
1928+
participant_identity.to_hex(),
1929+
prepare_id,
1930+
);
1931+
let mut req = client.post(&url);
1932+
if let Some(ref token) = auth_token {
1933+
req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}"));
1934+
}
1935+
match req.send().await {
1936+
Ok(resp) if resp.status().is_success() => {
1937+
log::info!("recover_2pc_coordinator: re-committed {prepare_id} on {participant_identity}");
1938+
if let Err(e) = db2.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |tx| {
1939+
Ok(tx.delete_st_2pc_coordinator_log(&prepare_id)?)
1940+
}) {
1941+
log::warn!("recover_2pc_coordinator: delete coordinator log failed for {prepare_id}: {e}");
1942+
}
1943+
}
1944+
Ok(resp) => {
1945+
log::warn!("recover_2pc_coordinator: commit for {prepare_id} returned {}", resp.status());
1946+
}
1947+
Err(e) => {
1948+
log::warn!("recover_2pc_coordinator: transport error for {prepare_id}: {e}");
1949+
}
1950+
}
1951+
}
1952+
});
1953+
}
1954+
1955+
/// Crash recovery for the **participant** role.
1956+
///
1957+
/// Scans `st_2pc_state` for any prepared-but-not-decided transactions, re-runs
1958+
/// each reducer to reacquire the write lock, then polls the coordinator for a decision.
1959+
///
1960+
/// **B never aborts on its own** — only the coordinator's response yields ABORT.
1961+
pub fn recover_2pc_participant(&self) {
1962+
let db = self.relational_db().clone();
1963+
let rows = match db.pending_2pc_prepares() {
1964+
Ok(r) => r,
1965+
Err(e) => {
1966+
log::error!("recover_2pc_participant: scan failed: {e}");
1967+
return;
1968+
}
1969+
};
1970+
if rows.is_empty() {
1971+
return;
1972+
}
1973+
let this = self.clone();
1974+
tokio::spawn(async move {
1975+
for row in rows {
1976+
let original_prepare_id = row.prepare_id.clone();
1977+
let coordinator_identity = match Identity::from_hex(&row.coordinator_identity_hex) {
1978+
Ok(id) => id,
1979+
Err(e) => {
1980+
log::error!("recover_2pc_participant: invalid coordinator identity hex for {original_prepare_id}: {e}");
1981+
continue;
1982+
}
1983+
};
1984+
let caller_identity = match Identity::from_hex(&row.caller_identity_hex) {
1985+
Ok(id) => id,
1986+
Err(e) => {
1987+
log::error!("recover_2pc_participant: invalid caller identity hex for {original_prepare_id}: {e}");
1988+
continue;
1989+
}
1990+
};
1991+
let caller_connection_id = u128::from_str_radix(&row.caller_connection_id_hex, 16)
1992+
.map(ConnectionId::from_u128)
1993+
.unwrap_or(ConnectionId::ZERO);
1994+
let args = FunctionArgs::Bsatn(row.args_bsatn.clone().into());
1995+
1996+
// Step 1: Re-run the reducer to reacquire the write lock.
1997+
let new_prepare_id = match this
1998+
.prepare_reducer(caller_identity, Some(caller_connection_id), &row.reducer_name, args)
1999+
.await
2000+
{
2001+
Ok((pid, result, _rv)) if !pid.is_empty() => {
2002+
log::info!(
2003+
"recover_2pc_participant: re-prepared {original_prepare_id} as {pid}: {:?}",
2004+
result.outcome
2005+
);
2006+
pid
2007+
}
2008+
Ok(_) => {
2009+
// Reducer failed — treat as abort, clean up old marker.
2010+
log::warn!("recover_2pc_participant: reducer failed on re-run for {original_prepare_id}");
2011+
let _ = db.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |tx| {
2012+
Ok(tx.delete_st_2pc_state(&original_prepare_id)?)
2013+
});
2014+
continue;
2015+
}
2016+
Err(e) => {
2017+
log::error!("recover_2pc_participant: prepare_reducer error for {original_prepare_id}: {e:?}");
2018+
continue;
2019+
}
2020+
};
2021+
2022+
// Step 2: Poll coordinator with the ORIGINAL prepare_id until we get a decision.
2023+
// We do this in a separate task so the loop can proceed to the next row.
2024+
let this2 = this.clone();
2025+
let db2 = db.clone();
2026+
let client = this.replica_ctx().call_reducer_client.clone();
2027+
let router = this.replica_ctx().call_reducer_router.clone();
2028+
let auth_token = this.replica_ctx().call_reducer_auth_token.clone();
2029+
tokio::spawn(async move {
2030+
loop {
2031+
let decision = Self::query_coordinator_status_with_client(
2032+
&client,
2033+
&router,
2034+
auth_token.clone(),
2035+
coordinator_identity,
2036+
&original_prepare_id,
2037+
).await;
2038+
match decision {
2039+
Some(commit) => {
2040+
if commit {
2041+
let _ = this2.commit_prepared(&new_prepare_id);
2042+
} else {
2043+
let _ = this2.abort_prepared(&new_prepare_id);
2044+
}
2045+
// Clean up the old st_2pc_state entry.
2046+
let _ = db2.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |tx| {
2047+
Ok(tx.delete_st_2pc_state(&original_prepare_id)?)
2048+
});
2049+
break;
2050+
}
2051+
None => tokio::time::sleep(std::time::Duration::from_secs(5)).await,
2052+
}
2053+
}
2054+
});
2055+
}
2056+
});
2057+
}
2058+
2059+
/// Query `GET /v1/database/{coordinator}/2pc/status/{prepare_id}`.
2060+
///
2061+
/// Returns `Some(true)` = COMMIT, `Some(false)` = ABORT, `None` = transient error (retry).
2062+
async fn query_coordinator_status_with_client(
2063+
client: &reqwest::Client,
2064+
router: &std::sync::Arc<dyn crate::host::reducer_router::ReducerCallRouter>,
2065+
auth_token: Option<String>,
2066+
coordinator_identity: Identity,
2067+
prepare_id: &str,
2068+
) -> Option<bool> {
2069+
let base_url = match router.resolve_base_url(coordinator_identity).await {
2070+
Ok(url) => url,
2071+
Err(e) => {
2072+
log::warn!("2PC recovery status poll: cannot resolve coordinator URL: {e}");
2073+
return None;
2074+
}
2075+
};
2076+
let url = format!(
2077+
"{}/v1/database/{}/2pc/status/{}",
2078+
base_url,
2079+
coordinator_identity.to_hex(),
2080+
prepare_id,
2081+
);
2082+
let mut req = client.get(&url);
2083+
if let Some(token) = &auth_token {
2084+
req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}"));
2085+
}
2086+
match req.send().await {
2087+
Ok(resp) if resp.status().is_success() => {
2088+
let body = resp.text().await.unwrap_or_default();
2089+
Some(body.trim() == "commit")
2090+
}
2091+
Ok(resp) => {
2092+
log::warn!("2PC recovery status poll: coordinator returned {}", resp.status());
2093+
None
2094+
}
2095+
Err(e) => {
2096+
log::warn!("2PC recovery status poll: transport error: {e}");
2097+
None
2098+
}
2099+
}
2100+
}
2101+
18772102
pub async fn call_view_add_single_subscription(
18782103
&self,
18792104
sender: Arc<ClientConnectionSender>,

0 commit comments

Comments
 (0)