Skip to content

Commit ffcb5ed

Browse files
committed
fix prepare
1 parent b88d4ae commit ffcb5ed

4 files changed

Lines changed: 118 additions & 46 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk};
1212
use spacetimedb_data_structures::map::HashSet;
1313
use spacetimedb_datastore::db_metrics::DB_METRICS;
1414
use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError};
15-
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
15+
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
1616
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1717
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1818
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
@@ -855,6 +855,16 @@ impl RelationalDB {
855855
(tx_data, tx_metrics, tx)
856856
}
857857

858+
/// Forward a pre-built `TxData` directly to the durability worker.
859+
///
860+
/// Used by the 2PC participant path to make the `st_2pc_state` PREPARE marker durable
861+
/// while the main write lock is still held (i.e. without going through a full commit).
862+
pub fn request_durability_for_tx_data(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
863+
if let Some(durability) = &self.durability {
864+
durability.request_durability(reducer_context, tx_data);
865+
}
866+
}
867+
858868
/// Get the [`DurableOffset`] of this database, or `None` if this is an
859869
/// in-memory instance.
860870
pub fn durable_tx_offset(&self) -> Option<DurableOffset> {

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 51 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -568,88 +568,94 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
568568

569569
/// Run the reducer as a 2PC participant PREPARE.
570570
///
571-
/// Holds the write lock (MutTxId) open until a COMMIT or ABORT decision arrives.
572-
/// The flow:
573-
/// 1. Run reducer (no commit).
574-
/// 2. If reducer failed: send failure via `prepared_tx`; rollback; return.
575-
/// 3. If reducer succeeded: insert `st_2pc_state` row; send PREPARED result via `prepared_tx`.
576-
/// 4. Block on `decision_rx`:
577-
/// - `true` (COMMIT): commit via `commit_and_broadcast_event`, then delete `st_2pc_state`.
578-
/// - `false` (ABORT) or channel closed: roll back.
579571
/// Run the reducer as a 2PC participant PREPARE.
580572
///
581573
/// Holds the write lock (MutTxId) open until a COMMIT or ABORT decision arrives.
582574
/// The flow:
583-
/// 1. Run reducer (no commit).
575+
/// 1. Run reducer (no commit); hold open MutTxId (write lock).
584576
/// 2. If reducer failed: send failure via `prepared_tx`; rollback; return.
585-
/// 3. If reducer succeeded: insert `st_2pc_state` row; send PREPARED result via `prepared_tx`.
586-
/// 4. Block on `decision_rx`:
587-
/// - `true` (COMMIT): commit via `commit_and_broadcast_event`, then delete `st_2pc_state`.
588-
/// - `false` (ABORT) or channel closed: roll back.
577+
/// 3. If reducer succeeded: call `flush_2pc_prepare_marker` — inserts `st_2pc_state`
578+
/// directly into committed state (bumps tx_offset), returns `TxData` for the marker.
579+
/// Forward the `TxData` to the durability worker so the PREPARE is in the commitlog.
580+
/// The write lock remains held throughout.
581+
/// 4. Signal PREPARED via `prepared_tx`.
582+
/// 5. Block on `decision_rx`:
583+
/// - `true` (COMMIT): commit main tx (reducer changes get the next tx_offset), then
584+
/// delete `st_2pc_state` in a new tx.
585+
/// - `false` (ABORT) or channel closed: roll back main tx; delete `st_2pc_state` in
586+
/// a new tx (the marker row is already in committed state from step 3).
589587
pub fn call_reducer_prepare_and_hold(
590588
&mut self,
591589
params: CallReducerParams,
592590
prepare_id: String,
593591
prepared_tx: tokio::sync::oneshot::Sender<(ReducerCallResult, Option<Bytes>)>,
594592
decision_rx: std::sync::mpsc::Receiver<bool>,
595593
) {
596-
let (mut tx, event, client, trapped) =
597-
crate::callgrind_flag::invoke_allowing_callgrind(|| {
598-
self.common.run_reducer_no_commit(None, params, &mut self.instance)
599-
});
594+
let stdb = self.instance.replica_ctx().relational_db().clone();
595+
596+
// Step 1: run the reducer and hold the write lock open.
597+
let (mut tx, event, client, trapped) = crate::callgrind_flag::invoke_allowing_callgrind(|| {
598+
self.common.run_reducer_no_commit(None, params, &mut self.instance)
599+
});
600600
self.trapped = trapped;
601601

602602
let energy_quanta_used = event.energy_quanta_used;
603603
let total_duration = event.host_execution_duration;
604604

605605
if !matches!(event.status, EventStatus::Committed(_)) {
606-
// Reducer failed — roll back and signal failure to the waiter.
606+
// Reducer failed — roll back and signal failure; no marker was written.
607607
let res = ReducerCallResult {
608608
outcome: ReducerOutcome::from(&event.status),
609609
energy_used: energy_quanta_used,
610610
execution_duration: total_duration,
611611
};
612612
let return_value = event.reducer_return_value.clone();
613613
let _ = prepared_tx.send((res, return_value));
614-
// commit_and_broadcast_event handles rollback for non-Committed status.
615-
commit_and_broadcast_event(&self.common.info.subscriptions, client, event, tx);
614+
let _ = stdb.rollback_mut_tx(tx);
616615
return;
617616
}
618617

619-
// Insert the st_2pc_state marker into the held tx atomically with the reducer's changes.
620-
if let Err(e) = tx.insert_st_2pc_state(&prepare_id) {
621-
log::error!("call_reducer_prepare_and_hold: failed to insert st_2pc_state for {prepare_id}: {e}");
622-
}
618+
// Step 3: flush the st_2pc_state marker directly into committed state, assign
619+
// a tx_offset, and forward to durability — all while holding the write lock.
620+
let marker_tx_data = match tx.flush_2pc_prepare_marker(&prepare_id) {
621+
Ok(td) => std::sync::Arc::new(td),
622+
Err(e) => {
623+
log::error!("call_reducer_prepare_and_hold: flush_2pc_prepare_marker failed for {prepare_id}: {e}");
624+
let _ = stdb.rollback_mut_tx(tx);
625+
return;
626+
}
627+
};
628+
stdb.request_durability_for_tx_data(None, &marker_tx_data);
623629

630+
// Step 4: signal PREPARED.
624631
let res = ReducerCallResult {
625632
outcome: ReducerOutcome::from(&event.status),
626633
energy_used: energy_quanta_used,
627634
execution_duration: total_duration,
628635
};
629636
let return_value = event.reducer_return_value.clone();
630-
// Signal PREPARED — the coordinator can now send COMMIT or ABORT.
631637
let _ = prepared_tx.send((res, return_value));
632638

633-
// Block the executor thread until we receive a decision.
639+
// Step 5: block the executor thread until we receive a decision.
634640
let commit = decision_rx.recv().unwrap_or(false);
635641

636642
if commit {
643+
// Delete the marker in the same tx as the reducer changes so they are
644+
// committed atomically. The row is in committed state (inserted by
645+
// flush_2pc_prepare_marker), so delete_st_2pc_state finds it via iter.
646+
if let Err(e) = tx.delete_st_2pc_state(&prepare_id) {
647+
log::error!("call_reducer_prepare_and_hold: failed to delete st_2pc_state for {prepare_id}: {e}");
648+
}
637649
commit_and_broadcast_event(&self.common.info.subscriptions, client, event, tx);
638-
639-
// Delete the st_2pc_state row in a new tx so recovery knows COMMIT is done.
640-
let stdb = self.instance.replica_ctx().relational_db();
650+
} else {
651+
// ABORT: roll back reducer changes (tx_state discarded).
652+
// The marker row is already in committed state; clean it up in a new tx.
653+
let _ = stdb.rollback_mut_tx(tx);
641654
if let Err(e) = stdb.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| {
642655
Ok(del_tx.delete_st_2pc_state(&prepare_id)?)
643656
}) {
644-
log::error!("call_reducer_prepare_and_hold: failed to delete st_2pc_state for {prepare_id}: {e}");
657+
log::error!("call_reducer_prepare_and_hold: abort: failed to delete st_2pc_state for {prepare_id}: {e}");
645658
}
646-
} else {
647-
// ABORT: roll back by passing a failure event.
648-
let abort_event = ModuleEvent {
649-
status: EventStatus::FailedInternal("2PC abort".into()),
650-
..event
651-
};
652-
commit_and_broadcast_event(&self.common.info.subscriptions, None, abort_event, tx);
653659
}
654660
}
655661

@@ -975,10 +981,7 @@ impl InstanceCommon {
975981
);
976982
let mut req = client.post(&url);
977983
if let Some(ref token) = auth_token {
978-
req = req.header(
979-
http::header::AUTHORIZATION,
980-
format!("Bearer {token}"),
981-
);
984+
req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}"));
982985
}
983986
match req.send().await {
984987
Ok(resp) if resp.status().is_success() => {
@@ -991,9 +994,7 @@ impl InstanceCommon {
991994
);
992995
}
993996
Err(e) => {
994-
log::error!(
995-
"2PC {action}: transport error for {prepare_id} on {db_identity}: {e}"
996-
);
997+
log::error!("2PC {action}: transport error for {prepare_id} on {db_identity}: {e}");
997998
}
998999
}
9991000
}
@@ -1026,7 +1027,12 @@ impl InstanceCommon {
10261027
tx: Option<MutTxId>,
10271028
params: CallReducerParams,
10281029
inst: &mut I,
1029-
) -> (MutTxId, ModuleEvent, Option<Arc<crate::client::ClientConnectionSender>>, bool) {
1030+
) -> (
1031+
MutTxId,
1032+
ModuleEvent,
1033+
Option<Arc<crate::client::ClientConnectionSender>>,
1034+
bool,
1035+
) {
10301036
let CallReducerParams {
10311037
timestamp,
10321038
caller_identity,

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,6 +1415,37 @@ impl CommittedState {
14151415
self.tables.insert(table_id, Self::make_table(schema));
14161416
}
14171417

1418+
/// Insert a single row directly into the committed state, bypassing `TxState`.
1419+
///
1420+
/// Assigns the next `tx_offset` to the resulting `TxData` and increments the counter.
1421+
/// The write lock (and therefore the transaction) is **not** released.
1422+
///
1423+
/// Used by the 2PC participant path to flush the `st_2pc_state` PREPARE marker to the
1424+
/// commitlog (via the durability worker) while keeping the reducer's write lock open,
1425+
/// so that no other transaction can interleave between PREPARE and COMMIT/ABORT.
1426+
pub(super) fn insert_row_and_consume_offset(
1427+
&mut self,
1428+
table_id: TableId,
1429+
schema: &Arc<TableSchema>,
1430+
row: &ProductValue,
1431+
) -> Result<TxData> {
1432+
let (table, blob_store, pool) = self.get_table_and_blob_store_or_create(table_id, schema);
1433+
table
1434+
.insert(pool, blob_store, row)
1435+
.map_err(|e| match e {
1436+
InsertError::Duplicate(e) => DatastoreError::from(TableError::Duplicate(e)),
1437+
InsertError::Bflatn(e) => DatastoreError::from(TableError::Bflatn(e)),
1438+
InsertError::IndexError(e) => DatastoreError::from(IndexError::UniqueConstraintViolation(e)),
1439+
})?;
1440+
1441+
let row_arc: Arc<[ProductValue]> = Arc::from([row.clone()]);
1442+
let mut tx_data = TxData::default();
1443+
tx_data.set_inserts_for_table(table_id, &schema.table_name, row_arc);
1444+
tx_data.set_tx_offset(self.next_tx_offset);
1445+
self.next_tx_offset += 1;
1446+
Ok(tx_data)
1447+
}
1448+
14181449
pub(super) fn get_table_and_blob_store_or_create<'this>(
14191450
&'this mut self,
14201451
table_id: TableId,

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2705,6 +2705,31 @@ impl MutTxId {
27052705
})
27062706
}
27072707

2708+
/// Write the `st_2pc_state` PREPARE marker directly to the committed state and allocate a
2709+
/// `tx_offset` for it, **without** releasing the write lock or committing the pending
2710+
/// reducer changes in `tx_state`.
2711+
///
2712+
/// Returns the `TxData` containing just the `st_2pc_state` insert together with its
2713+
/// assigned `tx_offset`. The caller is responsible for forwarding this to the durability
2714+
/// worker so that the PREPARE record becomes durable in the commitlog.
2715+
///
2716+
/// Because the write lock remains held, no other transaction can begin between this call
2717+
/// and the eventual `commit` / `rollback` of the enclosing `MutTxId`. On ABORT the
2718+
/// caller must delete the `st_2pc_state` row in a subsequent transaction (the row was
2719+
/// inserted directly into the committed state and is not part of `tx_state`).
2720+
pub fn flush_2pc_prepare_marker(&mut self, prepare_id: &str) -> Result<TxData> {
2721+
let schema = self
2722+
.committed_state_write_lock
2723+
.get_schema(ST_2PC_STATE_ID)
2724+
.cloned()
2725+
.expect("st_2pc_state system table must exist in committed state");
2726+
let row = ProductValue::from(St2pcStateRow {
2727+
prepare_id: prepare_id.to_owned(),
2728+
});
2729+
self.committed_state_write_lock
2730+
.insert_row_and_consume_offset(ST_2PC_STATE_ID, &schema, &row)
2731+
}
2732+
27082733
/// Delete the `st_2pc_state` row for the given `prepare_id`, called on COMMIT or ABORT.
27092734
pub fn delete_st_2pc_state(&mut self, prepare_id: &str) -> Result<()> {
27102735
if let Err(e) = self.delete_col_eq(

0 commit comments

Comments
 (0)