Skip to content

Commit 2c04a39

Browse files
committed
fix deadlock
1 parent 056f511 commit 2c04a39

6 files changed

Lines changed: 55 additions & 53 deletions

File tree

crates/client-api/src/routes/database.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -263,22 +263,13 @@ pub async fn prepare<S: ControlStateDelegate + NodeDelegate>(
263263

264264
let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?;
265265

266-
let connection_id = generate_random_connection_id();
267-
268-
module
269-
.call_identity_connected(auth.into(), connection_id)
270-
.await
271-
.map_err(client_connected_error_to_response)?;
272-
266+
// 2PC prepare is a server-to-server call; no client lifecycle management needed.
267+
// call_identity_connected/disconnected submit jobs to the module's executor, which
268+
// will be blocked holding the 2PC write lock after prepare_reducer returns — deadlock.
273269
let result = module
274-
.prepare_reducer(caller_identity, Some(connection_id), &reducer, args)
270+
.prepare_reducer(caller_identity, None, &reducer, args)
275271
.await;
276272

277-
module
278-
.call_identity_disconnected(caller_identity, connection_id)
279-
.await
280-
.map_err(client_disconnected_error_to_response)?;
281-
282273
match result {
283274
Ok((prepare_id, rcr, return_value)) => {
284275
let (status, body) =

crates/core/src/host/mod.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use anyhow::Context;
22
use bytes::Bytes;
33
use bytestring::ByteString;
4+
use core::future::Future;
45
use derive_more::Display;
56
use enum_map::Enum;
67
use once_cell::sync::OnceCell;
@@ -10,6 +11,31 @@ use spacetimedb_lib::ProductValue;
1011
use spacetimedb_schema::def::deserialize::{ArgsSeed, FunctionDef};
1112
use spacetimedb_schema::def::ModuleDef;
1213

14+
/// Block on `fut` from a synchronous context that may be inside a Tokio runtime.
15+
///
16+
/// `Handle::block_on` and `block_in_place` both panic when the calling thread is
17+
/// a custom (`std::thread::spawn`) thread that has entered the runtime via
18+
/// `Handle::enter()` — which is exactly the pattern used by `SingleCoreExecutor`.
19+
///
20+
/// The fix (same as the non-2PC `call_reducer_on_db` path): spawn a **scoped**
21+
/// OS thread. The scoped thread starts with no Tokio context, so `Handle::block_on`
22+
/// works normally and drives the future using the **original** runtime's I/O reactor
23+
/// and connection pools.
24+
///
25+
/// Use this for every place in the 2PC / cross-DB call paths that needs to
26+
/// synchronously drive a future from blocking (WASM executor) context.
27+
pub(crate) fn block_on_scoped<F>(handle: &tokio::runtime::Handle, fut: F) -> F::Output
28+
where
29+
F: Future + Send,
30+
F::Output: Send,
31+
{
32+
std::thread::scope(|s| {
33+
s.spawn(|| handle.block_on(fut))
34+
.join()
35+
.expect("block_on_scoped: thread panicked")
36+
})
37+
}
38+
1339
mod disk_storage;
1440
mod host_controller;
1541
mod module_common;

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::host::module_host::{
1313
ViewCallResult, ViewCommand, ViewCommandResult, ViewOutcome,
1414
};
1515
use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams};
16+
use crate::host::block_on_scoped;
1617
use crate::host::{
1718
ArgsTuple, ModuleHost, ProcedureCallError, ProcedureCallResult, ReducerCallError, ReducerCallResult, ReducerId,
1819
ReducerOutcome, Scheduler, UpdateDatabaseResult,
@@ -698,7 +699,7 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
698699
if let Some(prepare_offset) = marker_tx_data.tx_offset() {
699700
if let Some(mut durable) = stdb.durable_tx_offset() {
700701
let handle = tokio::runtime::Handle::current();
701-
let _ = handle.block_on(durable.wait_for(prepare_offset));
702+
let _ = block_on_scoped(&handle, durable.wait_for(prepare_offset));
702703
}
703704
}
704705

@@ -725,11 +726,13 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
725726
// Without this, A could delete its coordinator log entry while B's commit
726727
// is still in-memory — a B crash at that point would leave the tx uncommitted
727728
// with no way to recover (A has already forgotten it committed).
728-
let handle = tokio::runtime::Handle::current();
729729
if let Some(mut durable) = stdb.durable_tx_offset() {
730-
if let Ok(offset) = handle.block_on(commit_result.tx_offset) {
731-
let _ = handle.block_on(durable.wait_for(offset));
732-
}
730+
let handle = tokio::runtime::Handle::current();
731+
block_on_scoped(&handle, async move {
732+
if let Ok(offset) = commit_result.tx_offset.await {
733+
let _ = durable.wait_for(offset).await;
734+
}
735+
});
733736
}
734737

735738
// Notify coordinator that B has committed so it can delete its coordinator log entry.
@@ -738,7 +741,7 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
738741
let router = replica_ctx.call_reducer_router.clone();
739742
let client_http = replica_ctx.call_reducer_client.clone();
740743
let auth_token = replica_ctx.call_reducer_auth_token.clone();
741-
handle.spawn(send_ack_commit_to_coordinator(
744+
tokio::runtime::Handle::current().spawn(send_ack_commit_to_coordinator(
742745
client_http,
743746
router,
744747
auth_token,
@@ -787,19 +790,13 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
787790
let auth_token = replica_ctx.call_reducer_auth_token.clone();
788791
let prepare_id_owned = prepare_id.to_owned();
789792
loop {
790-
let decision = std::thread::scope(|s| {
791-
s.spawn(|| {
792-
handle.block_on(Self::query_coordinator_status(
793-
&client,
794-
&router,
795-
auth_token.clone(),
796-
coordinator_identity,
797-
&prepare_id_owned,
798-
))
799-
})
800-
.join()
801-
.expect("coordinator poll thread panicked")
802-
});
793+
let decision = block_on_scoped(&handle, Self::query_coordinator_status(
794+
&client,
795+
&router,
796+
auth_token.clone(),
797+
coordinator_identity,
798+
&prepare_id_owned,
799+
));
803800
match decision {
804801
Some(commit) => return commit,
805802
None => std::thread::sleep(Duration::from_secs(5)),
@@ -1157,9 +1154,7 @@ impl InstanceCommon {
11571154

11581155
let replica_ctx = inst.replica_ctx().clone();
11591156
let handle = tokio::runtime::Handle::current();
1160-
std::thread::scope(|s| {
1161-
s.spawn(|| {
1162-
handle.block_on(async {
1157+
block_on_scoped(&handle, async {
11631158
// Wait for A's coordinator log (committed atomically with the tx) to be
11641159
// durable before sending COMMIT to B. This guarantees that if A crashes
11651160
// after sending COMMIT, recovery can retransmit from the durable log.
@@ -1218,10 +1213,6 @@ impl InstanceCommon {
12181213
}
12191214
}
12201215
}
1221-
});
1222-
})
1223-
.join()
1224-
.expect("2PC coordination thread panicked");
12251216
});
12261217
}
12271218

crates/core/src/host/wasmtime/wasm_instance_env.rs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1996,19 +1996,11 @@ impl WasmInstanceEnv {
19961996
let args_buf = mem.deref_slice(args_ptr, args_len)?;
19971997
let args = bytes::Bytes::copy_from_slice(args_buf);
19981998

1999-
// Reducers run inside a tokio LocalSet (single-threaded), so block_in_place
2000-
// is unavailable and futures::executor::block_on can't drive tokio I/O.
2001-
// Spawn a new OS thread and call Handle::block_on from there, which is
2002-
// designed to be called from synchronous (non-async) contexts.
20031999
let handle = tokio::runtime::Handle::current();
20042000
let fut = env
20052001
.instance_env
20062002
.call_reducer_on_db(database_identity, &reducer_name, args);
2007-
let result = std::thread::scope(|s| {
2008-
s.spawn(|| handle.block_on(fut))
2009-
.join()
2010-
.expect("call_reducer_on_db: worker thread panicked")
2011-
});
2003+
let result = super::super::block_on_scoped(&handle, fut);
20122004

20132005
match result {
20142006
Ok((status, body)) => {
@@ -2067,11 +2059,7 @@ impl WasmInstanceEnv {
20672059
let fut = env
20682060
.instance_env
20692061
.call_reducer_on_db_2pc(database_identity, &reducer_name, args);
2070-
let result = std::thread::scope(|s| {
2071-
s.spawn(|| handle.block_on(fut))
2072-
.join()
2073-
.expect("call_reducer_on_db_2pc: worker thread panicked")
2074-
});
2062+
let result = super::super::block_on_scoped(&handle, fut);
20752063

20762064
match result {
20772065
Ok((status, body, prepare_id)) => {

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::{
2929
use crate::{
3030
locking_tx_datastore::ViewCallInfo,
3131
system_tables::{
32+
ST_2PC_COORDINATOR_LOG_ID, ST_2PC_COORDINATOR_LOG_IDX, ST_2PC_STATE_ID, ST_2PC_STATE_IDX,
3233
ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ACCESSOR_IDX, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX,
3334
ST_EVENT_TABLE_ID, ST_EVENT_TABLE_IDX, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_IDX, ST_TABLE_ACCESSOR_ID,
3435
ST_TABLE_ACCESSOR_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID,
@@ -478,6 +479,9 @@ impl CommittedState {
478479
self.create_table(ST_INDEX_ACCESSOR_ID, schemas[ST_INDEX_ACCESSOR_IDX].clone());
479480
self.create_table(ST_COLUMN_ACCESSOR_ID, schemas[ST_COLUMN_ACCESSOR_IDX].clone());
480481

482+
self.create_table(ST_2PC_STATE_ID, schemas[ST_2PC_STATE_IDX].clone());
483+
self.create_table(ST_2PC_COORDINATOR_LOG_ID, schemas[ST_2PC_COORDINATOR_LOG_IDX].clone());
484+
481485
// Insert the sequences into `st_sequences`
482486
let (st_sequences, blob_store, pool) =
483487
self.get_table_and_blob_store_or_create(ST_SEQUENCE_ID, &schemas[ST_SEQUENCE_IDX]);

crates/datastore/src/system_tables.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,8 @@ pub(crate) const ST_EVENT_TABLE_IDX: usize = 16;
286286
pub(crate) const ST_TABLE_ACCESSOR_IDX: usize = 17;
287287
pub(crate) const ST_INDEX_ACCESSOR_IDX: usize = 18;
288288
pub(crate) const ST_COLUMN_ACCESSOR_IDX: usize = 19;
289+
pub(crate) const ST_2PC_STATE_IDX: usize = 20;
290+
pub(crate) const ST_2PC_COORDINATOR_LOG_IDX: usize = 21;
289291

290292
macro_rules! st_fields_enum {
291293
($(#[$attr:meta])* enum $ty_name:ident { $($name:expr, $var:ident = $discr:expr,)* }) => {

0 commit comments

Comments
 (0)