Skip to content

Commit 91a2a7e

Browse files
committed
persistence
1 parent b1477a6 commit 91a2a7e

2 files changed

Lines changed: 90 additions & 52 deletions

File tree

crates/core/src/host/module_host.rs

Lines changed: 3 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2049,14 +2049,9 @@ impl ModuleHost {
20492049
Some(commit) => {
20502050
if commit {
20512051
let _ = this2.commit_prepared(&new_prepare_id);
2052-
// Tell A we committed so it can delete its coordinator log entry.
2053-
Self::send_ack_commit_to_coordinator(
2054-
&client,
2055-
&router,
2056-
auth_token.clone(),
2057-
coordinator_identity,
2058-
&original_prepare_id,
2059-
).await;
2052+
// The actor thread (call_reducer_prepare_and_hold) will wait
2053+
// for B's commit to be durable and then send the ack-commit
2054+
// to the coordinator. Nothing to do here.
20602055
} else {
20612056
let _ = this2.abort_prepared(&new_prepare_id);
20622057
}
@@ -2117,45 +2112,6 @@ impl ModuleHost {
21172112
}
21182113
}
21192114

2120-
/// POST `POST /v1/database/{coordinator}/2pc/ack-commit/{prepare_id}` to tell A that
2121-
/// B has committed, so A can delete its coordinator log entry.
2122-
async fn send_ack_commit_to_coordinator(
2123-
client: &reqwest::Client,
2124-
router: &std::sync::Arc<dyn crate::host::reducer_router::ReducerCallRouter>,
2125-
auth_token: Option<String>,
2126-
coordinator_identity: Identity,
2127-
prepare_id: &str,
2128-
) {
2129-
let base_url = match router.resolve_base_url(coordinator_identity).await {
2130-
Ok(url) => url,
2131-
Err(e) => {
2132-
log::warn!("2PC ack-commit: cannot resolve coordinator URL: {e}");
2133-
return;
2134-
}
2135-
};
2136-
let url = format!(
2137-
"{}/v1/database/{}/2pc/ack-commit/{}",
2138-
base_url,
2139-
coordinator_identity.to_hex(),
2140-
prepare_id,
2141-
);
2142-
let mut req = client.post(&url);
2143-
if let Some(token) = &auth_token {
2144-
req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}"));
2145-
}
2146-
match req.send().await {
2147-
Ok(resp) if resp.status().is_success() => {
2148-
log::info!("2PC ack-commit: notified coordinator for {prepare_id}");
2149-
}
2150-
Ok(resp) => {
2151-
log::warn!("2PC ack-commit: coordinator returned {} for {prepare_id}", resp.status());
2152-
}
2153-
Err(e) => {
2154-
log::warn!("2PC ack-commit: transport error for {prepare_id}: {e}");
2155-
}
2156-
}
2157-
}
2158-
21592115
pub async fn call_view_add_single_subscription(
21602116
&self,
21612117
sender: Arc<ClientConnectionSender>,

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 87 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,47 @@ impl<T: WasmModule> WasmModuleHostActor<T> {
399399
}
400400
}
401401

402+
/// Notify coordinator A that B has committed, so A can delete its coordinator log entry.
403+
///
404+
/// Called AFTER B's commit is durable. Fire-and-forget: failure is tolerated because
405+
/// `recover_2pc_coordinator` on A will retransmit COMMIT on restart.
406+
async fn send_ack_commit_to_coordinator(
407+
client: reqwest::Client,
408+
router: std::sync::Arc<dyn crate::host::reducer_router::ReducerCallRouter>,
409+
auth_token: Option<String>,
410+
coordinator_identity: crate::identity::Identity,
411+
prepare_id: String,
412+
) {
413+
let base_url = match router.resolve_base_url(coordinator_identity).await {
414+
Ok(url) => url,
415+
Err(e) => {
416+
log::warn!("2PC ack-commit: cannot resolve coordinator URL: {e}");
417+
return;
418+
}
419+
};
420+
let url = format!(
421+
"{}/v1/database/{}/2pc/ack-commit/{}",
422+
base_url,
423+
coordinator_identity.to_hex(),
424+
prepare_id,
425+
);
426+
let mut req = client.post(&url);
427+
if let Some(token) = &auth_token {
428+
req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}"));
429+
}
430+
match req.send().await {
431+
Ok(resp) if resp.status().is_success() => {
432+
log::info!("2PC ack-commit: notified coordinator for {prepare_id}");
433+
}
434+
Ok(resp) => {
435+
log::warn!("2PC ack-commit: coordinator returned {} for {prepare_id}", resp.status());
436+
}
437+
Err(e) => {
438+
log::warn!("2PC ack-commit: transport error for {prepare_id}: {e}");
439+
}
440+
}
441+
}
442+
402443
impl<T: WasmModule> WasmModuleHostActor<T> {
403444
fn make_from_instance(&self, mut instance: T::Instance) -> WasmModuleInstance<T::Instance> {
404445
let common = InstanceCommon::new(&self.common);
@@ -648,7 +689,17 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
648689
};
649690
stdb.request_durability_for_tx_data(None, &marker_tx_data);
650691

651-
// Step 3: signal PREPARED.
692+
// Step 3: wait for the PREPARE marker to be durable before signalling PREPARED.
693+
// B must not claim PREPARED until the marker is on disk — if B crashes after
694+
// claiming PREPARED but before the marker is durable, recovery has nothing to recover.
695+
if let Some(prepare_offset) = marker_tx_data.tx_offset() {
696+
if let Some(mut durable) = stdb.durable_tx_offset() {
697+
let handle = tokio::runtime::Handle::current();
698+
let _ = handle.block_on(durable.wait_for(prepare_offset));
699+
}
700+
}
701+
702+
// Step 4: signal PREPARED.
652703
let res = ReducerCallResult {
653704
outcome: ReducerOutcome::from(&event.status),
654705
energy_used: energy_quanta_used,
@@ -666,7 +717,32 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
666717
if let Err(e) = tx.delete_st_2pc_state(&prepare_id) {
667718
log::error!("call_reducer_prepare_and_hold: failed to delete st_2pc_state for {prepare_id}: {e}");
668719
}
669-
commit_and_broadcast_event(&self.common.info.subscriptions, client, event, tx);
720+
let commit_result = commit_and_broadcast_event(&self.common.info.subscriptions, client, event, tx);
721+
722+
// Wait for B's COMMIT to be durable before acking to coordinator.
723+
// Without this, A could delete its coordinator log entry while B's commit
724+
// is still in-memory — a B crash at that point would leave the tx uncommitted
725+
// with no way to recover (A has already forgotten it committed).
726+
let handle = tokio::runtime::Handle::current();
727+
if let Some(mut durable) = stdb.durable_tx_offset() {
728+
if let Ok(offset) = handle.block_on(commit_result.tx_offset) {
729+
let _ = handle.block_on(durable.wait_for(offset));
730+
}
731+
}
732+
733+
// Notify coordinator that B has committed so it can delete its coordinator log entry.
734+
// Fire-and-forget: if this fails, coordinator's recover_2pc_coordinator will retry on
735+
// restart, and B's commit_prepared will then return a harmless "not found" error.
736+
let router = replica_ctx.call_reducer_router.clone();
737+
let client_http = replica_ctx.call_reducer_client.clone();
738+
let auth_token = replica_ctx.call_reducer_auth_token.clone();
739+
handle.spawn(send_ack_commit_to_coordinator(
740+
client_http,
741+
router,
742+
auth_token,
743+
coordinator_identity,
744+
prepare_id,
745+
));
670746
} else {
671747
// ABORT: roll back reducer changes; clean up the already-committed marker.
672748
let _ = stdb.rollback_mut_tx(tx);
@@ -1068,7 +1144,9 @@ impl InstanceCommon {
10681144
}
10691145
}
10701146

1071-
let event = commit_and_broadcast_event(&self.info.subscriptions, client, event, tx).event;
1147+
let commit_result = commit_and_broadcast_event(&self.info.subscriptions, client, event, tx);
1148+
let commit_tx_offset = commit_result.tx_offset;
1149+
let event = commit_result.event;
10721150

10731151
// 2PC post-commit coordination: send COMMIT or ABORT to each participant.
10741152
if !prepared_participants.is_empty() {
@@ -1080,9 +1158,13 @@ impl InstanceCommon {
10801158
std::thread::scope(|s| {
10811159
s.spawn(|| {
10821160
handle.block_on(async {
1161+
// Wait for A's coordinator log (committed atomically with the tx) to be
1162+
// durable before sending COMMIT to B. This guarantees that if A crashes
1163+
// after sending COMMIT, recovery can retransmit from the durable log.
10831164
if committed && let Some(mut durable_offset) = stdb.durable_tx_offset() {
1084-
let current: u64 = durable_offset.last_seen().unwrap_or(0);
1085-
let _ = durable_offset.wait_for(current + 1).await;
1165+
if let Ok(offset) = commit_tx_offset.await {
1166+
let _ = durable_offset.wait_for(offset).await;
1167+
}
10861168
}
10871169

10881170
let client = replica_ctx.call_reducer_client.clone();

0 commit comments

Comments
 (0)