Skip to content

Commit 95d112a

Browse files
committed
avoid blocking co-ordinator
1 parent c2b4f97 commit 95d112a

2 files changed

Lines changed: 79 additions & 79 deletions

File tree

crates/core/src/host/module_host.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1826,21 +1826,7 @@ impl ModuleHost {
18261826
Ok::<(), ReducerCallError>(())
18271827
},
18281828
// JS modules: no 2PC support yet.
1829-
async |(p, _pid, _cid, ptx, _drx), inst| {
1830-
let (res, rv) = inst.call_reducer(p).await.map(|r| (r, None)).unwrap_or_else(|e| {
1831-
log::error!("prepare_reducer JS fallback: {e}");
1832-
(
1833-
ReducerCallResult {
1834-
outcome: ReducerOutcome::Failed(Box::new(Box::from("reducer error"))),
1835-
energy_used: EnergyQuanta::ZERO,
1836-
execution_duration: Default::default(),
1837-
},
1838-
None,
1839-
)
1840-
});
1841-
let _ = ptx.send((res, rv));
1842-
Ok(())
1843-
},
1829+
async |(p, _pid, _cid, ptx, _drx), inst| Err(ReducerCallError::NoSuchReducer),
18441830
)
18451831
.await;
18461832
});

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 78 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::client::ClientActorId;
44
use crate::database_logger;
55
use crate::energy::{EnergyMonitor, FunctionBudget, FunctionFingerprint};
66
use crate::error::DBError;
7+
use crate::host::block_on_scoped;
78
use crate::host::host_controller::CallProcedureReturn;
89
use crate::host::instance_env::{InstanceEnv, TxSlot};
910
use crate::host::module_common::{build_common_module_from_raw, ModuleCommon};
@@ -13,7 +14,6 @@ use crate::host::module_host::{
1314
ViewCallResult, ViewCommand, ViewCommandResult, ViewOutcome,
1415
};
1516
use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams};
16-
use crate::host::block_on_scoped;
1717
use crate::host::{
1818
ArgsTuple, ModuleHost, ProcedureCallError, ProcedureCallResult, ReducerCallError, ReducerCallResult, ReducerId,
1919
ReducerOutcome, Scheduler, UpdateDatabaseResult,
@@ -790,13 +790,16 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
790790
let auth_token = replica_ctx.call_reducer_auth_token.clone();
791791
let prepare_id_owned = prepare_id.to_owned();
792792
loop {
793-
let decision = block_on_scoped(&handle, Self::query_coordinator_status(
794-
&client,
795-
&router,
796-
auth_token.clone(),
797-
coordinator_identity,
798-
&prepare_id_owned,
799-
));
793+
let decision = block_on_scoped(
794+
&handle,
795+
Self::query_coordinator_status(
796+
&client,
797+
&router,
798+
auth_token.clone(),
799+
coordinator_identity,
800+
&prepare_id_owned,
801+
),
802+
);
800803
match decision {
801804
Some(commit) => return commit,
802805
None => std::thread::sleep(Duration::from_secs(5)),
@@ -1151,68 +1154,79 @@ impl InstanceCommon {
11511154
if !prepared_participants.is_empty() {
11521155
let committed = matches!(event.status, EventStatus::Committed(_));
11531156
let stdb = self.info.subscriptions.relational_db().clone();
1154-
1155-
let replica_ctx = inst.replica_ctx().clone();
11561157
let handle = tokio::runtime::Handle::current();
1157-
block_on_scoped(&handle, async {
1158-
// Wait for A's coordinator log (committed atomically with the tx) to be
1159-
// durable before sending COMMIT to B. This guarantees that if A crashes
1160-
// after sending COMMIT, recovery can retransmit from the durable log.
1161-
if committed && let Some(mut durable_offset) = stdb.durable_tx_offset() {
1162-
if let Ok(offset) = commit_tx_offset.await {
1163-
let _ = durable_offset.wait_for(offset).await;
1164-
}
1158+
1159+
// Wait for A's coordinator log (committed atomically with the tx) to be
1160+
// durable before sending COMMIT to B. This guarantees that if A crashes
1161+
// after sending COMMIT, recovery can retransmit from the durable log.
1162+
// Only needed for COMMIT — ABORT carries no durability requirement.
1163+
if committed {
1164+
if let Some(mut durable_offset) = stdb.durable_tx_offset() {
1165+
block_on_scoped(&handle, async move {
1166+
if let Ok(offset) = commit_tx_offset.await {
1167+
let _ = durable_offset.wait_for(offset).await;
11651168
}
1169+
});
1170+
}
1171+
}
11661172

1167-
let client = replica_ctx.call_reducer_client.clone();
1168-
let router = replica_ctx.call_reducer_router.clone();
1169-
let auth_token = replica_ctx.call_reducer_auth_token.clone();
1170-
for (db_identity, prepare_id) in &prepared_participants {
1171-
let action = if committed { "commit" } else { "abort" };
1172-
let base_url = match router.resolve_base_url(*db_identity).await {
1173-
Ok(url) => url,
1174-
Err(e) => {
1175-
log::error!("2PC {action}: failed to resolve base URL for {db_identity}: {e}");
1176-
continue;
1177-
}
1178-
};
1179-
let url = format!(
1180-
"{}/v1/database/{}/2pc/{}/{}",
1181-
base_url,
1182-
db_identity.to_hex(),
1183-
action,
1184-
prepare_id,
1185-
);
1186-
let mut req = client.post(&url);
1187-
if let Some(ref token) = auth_token {
1188-
req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}"));
1189-
}
1190-
match req.send().await {
1191-
Ok(resp) if resp.status().is_success() => {
1192-
log::info!("2PC {action}: {prepare_id} on {db_identity}");
1193-
// B acknowledged COMMIT — remove coordinator log entry
1194-
// (best-effort; recovery will clean up on restart if missed).
1195-
if committed {
1196-
if let Err(e) = stdb
1197-
.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| {
1198-
Ok(del_tx.delete_st_2pc_coordinator_log(prepare_id)?)
1199-
})
1200-
{
1201-
log::warn!("delete_st_2pc_coordinator_log failed for {prepare_id}: {e}");
1202-
}
1203-
}
1204-
}
1205-
Ok(resp) => {
1206-
log::error!(
1207-
"2PC {action}: failed for {prepare_id} on {db_identity}: status {}",
1208-
resp.status()
1209-
);
1210-
}
1211-
Err(e) => {
1212-
log::error!("2PC {action}: transport error for {prepare_id} on {db_identity}: {e}");
1173+
// Fire-and-forget: send COMMIT/ABORT to each participant.
1174+
// The coordinator log (written atomically with A's tx above) is the
1175+
// durability guarantee — if a send fails, recover_2pc_coordinator retransmits
1176+
// on restart and recover_2pc_participant polls the status endpoint.
1177+
// Blocking the executor here would stall A's next reducer for up to
1178+
// 30 s × number of participants with no correctness benefit.
1179+
let replica_ctx = inst.replica_ctx().clone();
1180+
handle.spawn(async move {
1181+
let client = replica_ctx.call_reducer_client.clone();
1182+
let router = replica_ctx.call_reducer_router.clone();
1183+
let auth_token = replica_ctx.call_reducer_auth_token.clone();
1184+
for (db_identity, prepare_id) in &prepared_participants {
1185+
let action = if committed { "commit" } else { "abort" };
1186+
let base_url = match router.resolve_base_url(*db_identity).await {
1187+
Ok(url) => url,
1188+
Err(e) => {
1189+
log::error!("2PC {action}: failed to resolve base URL for {db_identity}: {e}");
1190+
continue;
1191+
}
1192+
};
1193+
let url = format!(
1194+
"{}/v1/database/{}/2pc/{}/{}",
1195+
base_url,
1196+
db_identity.to_hex(),
1197+
action,
1198+
prepare_id,
1199+
);
1200+
let mut req = client.post(&url);
1201+
if let Some(ref token) = auth_token {
1202+
req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}"));
1203+
}
1204+
match req.send().await {
1205+
Ok(resp) if resp.status().is_success() => {
1206+
log::info!("2PC {action}: {prepare_id} on {db_identity}");
1207+
// B acknowledged COMMIT — remove coordinator log entry
1208+
// (best-effort; recovery will clean up on restart if missed).
1209+
if committed {
1210+
if let Err(e) = stdb
1211+
.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| {
1212+
Ok(del_tx.delete_st_2pc_coordinator_log(prepare_id)?)
1213+
})
1214+
{
1215+
log::warn!("delete_st_2pc_coordinator_log failed for {prepare_id}: {e}");
12131216
}
12141217
}
12151218
}
1219+
Ok(resp) => {
1220+
log::error!(
1221+
"2PC {action}: failed for {prepare_id} on {db_identity}: status {}",
1222+
resp.status()
1223+
);
1224+
}
1225+
Err(e) => {
1226+
log::error!("2PC {action}: transport error for {prepare_id} on {db_identity}: {e}");
1227+
}
1228+
}
1229+
}
12161230
});
12171231
}
12181232

0 commit comments

Comments
 (0)