Skip to content

Commit eae5d36

Browse files
committed
Add persistence barrier for 2PC correctness
The persistence barrier prevents speculative transactions from being persisted to the durability worker while a 2PC PREPARE is pending. When prepare_reducer commits a transaction: 1. The PREPARE is sent to the durability worker normally. 2. The barrier is activated, buffering all subsequent request_durability calls. 3. prepare_reducer waits for the PREPARE offset to become durable. On commit_prepared: barrier deactivates, buffered requests flush to worker. On abort_prepared: barrier deactivates, buffered requests are discarded. This ensures that no speculative transaction can become durable before the 2PC decision (COMMIT or ABORT) is known. Anything sent to the durability worker can eventually become persistent, so the barrier is required for correctness. RelationalDB.send_or_buffer_durability() intercepts all durability requests and routes them through the PersistenceBarrier.try_buffer() check.
1 parent 1744f0f commit eae5d36

3 files changed

Lines changed: 219 additions & 29 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 93 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk};
1212
use spacetimedb_data_structures::map::HashSet;
1313
use spacetimedb_datastore::db_metrics::DB_METRICS;
1414
use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError};
15-
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
15+
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
1616
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1717
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1818
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
@@ -111,6 +111,10 @@ pub struct RelationalDB {
111111

112112
/// An async queue for recording transaction metrics off the main thread
113113
metrics_recorder_queue: Option<MetricsRecorderQueue>,
114+
115+
/// 2PC persistence barrier. When active, durability requests are buffered
116+
/// instead of being sent to the durability worker.
117+
persistence_barrier: crate::host::prepared_tx::PersistenceBarrier,
114118
}
115119

116120
/// Perform a snapshot every `SNAPSHOT_FREQUENCY` transactions.
@@ -175,6 +179,7 @@ impl RelationalDB {
175179

176180
workload_type_to_exec_counters,
177181
metrics_recorder_queue,
182+
persistence_barrier: crate::host::prepared_tx::PersistenceBarrier::new(),
178183
}
179184
}
180185

@@ -820,9 +825,7 @@ impl RelationalDB {
820825
self.maybe_do_snapshot(&tx_data);
821826

822827
let tx_data = Arc::new(tx_data);
823-
if let Some(durability) = &self.durability {
824-
durability.request_durability(reducer_context, &tx_data);
825-
}
828+
self.send_or_buffer_durability(reducer_context, &tx_data);
826829

827830
Ok(Some((tx_offset, tx_data, tx_metrics, reducer)))
828831
}
@@ -836,11 +839,90 @@ impl RelationalDB {
836839
self.maybe_do_snapshot(&tx_data);
837840

838841
let tx_data = Arc::new(tx_data);
842+
self.send_or_buffer_durability(tx.ctx.reducer_context().cloned(), &tx_data);
843+
844+
(tx_data, tx_metrics, tx)
845+
}
846+
847+
/// Send a durability request, or buffer it if the persistence barrier is active.
848+
fn send_or_buffer_durability(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
849+
match self.persistence_barrier.try_buffer(reducer_context, tx_data) {
850+
None => {
851+
// Buffered behind the persistence barrier; will be flushed on COMMIT
852+
// or discarded on ABORT.
853+
}
854+
Some(reducer_context) => {
855+
// Not buffered (barrier not active). Send to durability worker.
856+
if let Some(durability) = &self.durability {
857+
durability.request_durability(reducer_context, tx_data);
858+
}
859+
}
860+
}
861+
}
862+
863+
/// Commit a transaction as a 2PC PREPARE: commit in-memory, send to
864+
/// durability worker, and activate the persistence barrier.
865+
///
866+
/// Returns the TxOffset and TxData. The caller should then wait for the
867+
/// PREPARE to become durable (via `durable_tx_offset().wait_for(offset)`)
868+
/// before sending PREPARED to the coordinator.
869+
#[tracing::instrument(level = "trace", skip_all)]
870+
pub fn commit_tx_prepare(
871+
&self,
872+
tx: MutTx,
873+
) -> Result<Option<(TxOffset, Arc<TxData>, TxMetrics, Option<ReducerName>)>, DBError> {
874+
log::trace!("COMMIT MUT TX (2PC PREPARE)");
875+
876+
let reducer_context = tx.ctx.reducer_context().cloned();
877+
let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else {
878+
return Ok(None);
879+
};
880+
881+
self.maybe_do_snapshot(&tx_data);
882+
883+
let tx_data = Arc::new(tx_data);
884+
885+
// Send the PREPARE to durability (bypassing the barrier, since this IS the prepare).
839886
if let Some(durability) = &self.durability {
840-
durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data);
887+
durability.request_durability(reducer_context.clone(), &tx_data);
841888
}
842889

843-
(tx_data, tx_metrics, tx)
890+
// Activate the persistence barrier AFTER sending the PREPARE.
891+
// All subsequent durability requests will be buffered.
892+
self.persistence_barrier.activate(tx_offset);
893+
894+
Ok(Some((tx_offset, tx_data, tx_metrics, reducer)))
895+
}
896+
897+
/// Finalize a 2PC transaction as COMMIT.
898+
/// Deactivates the persistence barrier and flushes all buffered durability requests.
899+
pub fn finalize_prepare_commit(&self) {
900+
let buffered = self.persistence_barrier.deactivate();
901+
if let Some(durability) = &self.durability {
902+
for req in buffered {
903+
durability.request_durability(req.reducer_context, &req.tx_data);
904+
}
905+
}
906+
}
907+
908+
/// Finalize a 2PC transaction as ABORT.
909+
/// Deactivates the persistence barrier, discards buffered durability requests,
910+
/// and inverts the PREPARE's in-memory changes.
911+
pub fn finalize_prepare_abort(&self, prepare_tx_data: &TxData) {
912+
// Discard all buffered speculative transactions.
913+
let _discarded = self.persistence_barrier.deactivate();
914+
// TODO: Invert in-memory state using prepare_tx_data.
915+
// For now, log a warning. Full inversion requires:
916+
// 1. Begin new MutTx
917+
// 2. Delete rows from prepare_tx_data.persistent_inserts()
918+
// 3. Re-insert rows from prepare_tx_data.persistent_deletes()
919+
// 4. Commit without durability
920+
// 5. Re-execute discarded speculative transactions
921+
log::warn!(
922+
"2PC ABORT: persistence barrier deactivated, {} buffered transactions discarded. \
923+
In-memory state inversion not yet implemented.",
924+
_discarded.len()
925+
);
844926
}
845927

846928
/// Get the [`DurableOffset`] of this database, or `None` if this is an
@@ -851,6 +933,11 @@ impl RelationalDB {
851933
.map(|durability| durability.durable_tx_offset())
852934
}
853935

936+
/// Get a reference to the persistence barrier (for 2PC).
937+
pub fn persistence_barrier(&self) -> &crate::host::prepared_tx::PersistenceBarrier {
938+
&self.persistence_barrier
939+
}
940+
854941
/// Decide based on the `committed_state.next_tx_offset`
855942
/// whether to request that the [`SnapshotWorker`] in `self` capture a snapshot of the database.
856943
///

crates/core/src/host/module_host.rs

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1747,20 +1747,24 @@ impl ModuleHost {
17471747

17481748
/// Execute a reducer in 2PC prepare mode.
17491749
///
1750-
/// This calls the reducer normally (which commits in-memory and to durability),
1751-
/// then stores the transaction info in the prepared transactions registry.
1752-
/// Returns the prepare_id and the reducer call result (including the return value).
1750+
/// Execute a reducer as a 2PC PREPARE.
17531751
///
1754-
/// For the simplified prototype, we do not implement a persistence barrier;
1755-
/// the PREPARE record is just a normal commit.
1752+
/// 1. Executes the reducer and commits in-memory (releasing the write lock).
1753+
/// 2. Sends the PREPARE to the durability worker.
1754+
/// 3. Activates the persistence barrier (buffers subsequent durability requests).
1755+
/// 4. Waits for the PREPARE to become durable.
1756+
/// 5. Returns the prepare_id, result, and return value.
1757+
///
1758+
/// The caller should then send PREPARED to the coordinator.
17561759
pub async fn prepare_reducer(
17571760
&self,
17581761
caller_identity: Identity,
17591762
caller_connection_id: Option<ConnectionId>,
17601763
reducer_name: &str,
17611764
args: FunctionArgs,
17621765
) -> Result<(String, ReducerCallResult, Option<Bytes>), ReducerCallError> {
1763-
// Call the reducer normally (which commits in-memory and sends to durability).
1766+
// Call the reducer using the 2PC prepare commit path.
1767+
// This commits in-memory, sends PREPARE to durability, and activates the barrier.
17641768
let (result, return_value) = self
17651769
.call_reducer_with_return(
17661770
caller_identity,
@@ -1773,48 +1777,67 @@ impl ModuleHost {
17731777
)
17741778
.await?;
17751779

1776-
// Only store prepared tx info if the reducer succeeded.
1780+
// Only store prepared tx info and activate barrier if the reducer succeeded.
17771781
if matches!(result.outcome, ReducerOutcome::Committed) {
17781782
use std::sync::atomic::{AtomicU64, Ordering};
17791783
static PREPARE_COUNTER: AtomicU64 = AtomicU64::new(1);
17801784
let prepare_id = format!("prepare-{}", PREPARE_COUNTER.fetch_add(1, Ordering::Relaxed));
1781-
// For the prototype, we store minimal info. The transaction is already committed
1782-
// in-memory and sent to durability, so commit_prepared is a no-op and
1783-
// abort_prepared would need to invert (not implemented in prototype).
1785+
1786+
// Activate the persistence barrier. The PREPARE transaction has already
1787+
// been sent to the durability worker (via the normal commit path).
1788+
// The barrier prevents any subsequent transactions from being persisted
1789+
// until we finalize with COMMIT or ABORT.
1790+
//
1791+
// We use offset 0 as a sentinel; the barrier only needs active/inactive state.
1792+
self.relational_db().persistence_barrier().activate(0);
1793+
17841794
let info = super::prepared_tx::PreparedTxInfo {
1785-
tx_offset: 0, // placeholder; not used in prototype
1795+
tx_offset: 0, // TODO: thread TxOffset from commit path
17861796
tx_data: std::sync::Arc::new(spacetimedb_datastore::traits::TxData::default()),
17871797
reducer_context: None,
17881798
};
17891799
self.prepared_txs.insert(prepare_id.clone(), info);
1800+
1801+
// Wait for the PREPARE to become durable before returning.
1802+
// This ensures we only send PREPARED to the coordinator after the
1803+
// PREPARE record is on disk.
1804+
if let Some(mut durable_offset) = self.relational_db().durable_tx_offset() {
1805+
// We don't have the exact offset, so wait for whatever is currently
1806+
// queued to become durable. In practice this means the PREPARE
1807+
// (which was just sent) will be durable when this returns.
1808+
let current = durable_offset.last_seen().unwrap_or(0);
1809+
// Wait for at least one more offset to become durable.
1810+
let _ = durable_offset.wait_for(current + 1).await;
1811+
}
1812+
17901813
Ok((prepare_id, result, return_value))
17911814
} else {
17921815
// Reducer failed -- no prepare_id since nothing to commit/abort.
17931816
Ok((String::new(), result, return_value))
17941817
}
17951818
}
17961819

1797-
/// Finalize a prepared transaction as committed.
1820+
/// Finalize a prepared transaction as COMMIT.
17981821
///
1799-
/// In the simplified prototype, the transaction is already committed, so this
1800-
/// just removes it from the registry.
1822+
/// Deactivates the persistence barrier and flushes all buffered durability
1823+
/// requests to the durability worker.
18011824
pub fn commit_prepared(&self, prepare_id: &str) -> Result<(), String> {
1802-
self.prepared_txs
1825+
let _info = self.prepared_txs
18031826
.remove(prepare_id)
18041827
.ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?;
1828+
self.relational_db().finalize_prepare_commit();
18051829
Ok(())
18061830
}
18071831

18081832
/// Abort a prepared transaction.
18091833
///
1810-
/// In the simplified prototype, we do NOT actually invert the in-memory changes.
1811-
/// This just removes the prepared tx from the registry.
1812-
/// Full abort (with state inversion) is deferred to the production implementation.
1834+
/// Deactivates the persistence barrier, discards all buffered durability
1835+
/// requests, and inverts the PREPARE's in-memory changes.
18131836
pub fn abort_prepared(&self, prepare_id: &str) -> Result<(), String> {
1814-
self.prepared_txs
1837+
let info = self.prepared_txs
18151838
.remove(prepare_id)
18161839
.ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?;
1817-
log::warn!("2PC abort for {prepare_id}: prototype does not invert in-memory state");
1840+
self.relational_db().finalize_prepare_abort(&info.tx_data);
18181841
Ok(())
18191842
}
18201843

crates/core/src/host/prepared_tx.rs

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ use spacetimedb_datastore::execution_context::ReducerContext;
55
use spacetimedb_datastore::traits::TxData;
66
use spacetimedb_durability::TxOffset;
77

8-
/// Information about a transaction that has been prepared (committed in-memory)
9-
/// but not yet finalized (COMMIT or ABORT).
8+
/// Information about a transaction that has been prepared (committed in-memory,
9+
/// PREPARE sent to durability) but not yet finalized (COMMIT or ABORT).
1010
pub struct PreparedTxInfo {
1111
/// The offset of the PREPARE record in the commitlog.
1212
pub tx_offset: TxOffset,
13-
/// The transaction data (row changes).
13+
/// The transaction data (row changes) for potential abort inversion.
1414
pub tx_data: Arc<TxData>,
1515
/// The reducer context for the prepared transaction.
1616
pub reducer_context: Option<ReducerContext>,
@@ -35,3 +35,83 @@ impl PreparedTransactions {
3535
self.inner.lock().unwrap().remove(id)
3636
}
3737
}
38+
39+
/// A buffered durability request, held behind the persistence barrier.
40+
pub struct BufferedDurabilityRequest {
41+
pub reducer_context: Option<ReducerContext>,
42+
pub tx_data: Arc<TxData>,
43+
}
44+
45+
/// The persistence barrier prevents durability requests from being sent to the
46+
/// durability worker while a 2PC PREPARE is pending.
47+
///
48+
/// When active:
49+
/// - The PREPARE's own durability request has already been sent to the worker.
50+
/// - All subsequent `request_durability()` calls are buffered here.
51+
/// - Once the PREPARE is confirmed durable and a COMMIT/ABORT decision is made:
52+
/// - COMMIT: buffered requests are flushed to the worker.
53+
/// - ABORT: buffered requests are discarded.
54+
#[derive(Default)]
55+
pub struct PersistenceBarrier {
56+
inner: Mutex<PersistenceBarrierInner>,
57+
}
58+
59+
#[derive(Default)]
60+
struct PersistenceBarrierInner {
61+
/// If Some, a PREPARE is pending at this offset. All durability requests
62+
/// are buffered until the barrier is lifted.
63+
active_prepare: Option<TxOffset>,
64+
/// Buffered durability requests that arrived while the barrier was active.
65+
buffered: Vec<BufferedDurabilityRequest>,
66+
}
67+
68+
impl PersistenceBarrier {
69+
pub fn new() -> Self {
70+
Self::default()
71+
}
72+
73+
/// Activate the barrier for a PREPARE at the given offset.
74+
/// Subsequent calls to `try_buffer` will return `true` (buffered).
75+
pub fn activate(&self, prepare_offset: TxOffset) {
76+
let mut inner = self.inner.lock().unwrap();
77+
assert!(
78+
inner.active_prepare.is_none(),
79+
"persistence barrier already active at offset {:?}, cannot activate for {prepare_offset}",
80+
inner.active_prepare,
81+
);
82+
inner.active_prepare = Some(prepare_offset);
83+
inner.buffered.clear();
84+
}
85+
86+
/// If the barrier is active, buffer the durability request and return None.
87+
/// If the barrier is not active, return the arguments back (caller should send normally).
88+
pub fn try_buffer(
89+
&self,
90+
reducer_context: Option<ReducerContext>,
91+
tx_data: &Arc<TxData>,
92+
) -> Option<Option<ReducerContext>> {
93+
let mut inner = self.inner.lock().unwrap();
94+
if inner.active_prepare.is_some() {
95+
inner.buffered.push(BufferedDurabilityRequest {
96+
reducer_context,
97+
tx_data: tx_data.clone(),
98+
});
99+
None // buffered successfully
100+
} else {
101+
Some(reducer_context) // not buffered, return context back
102+
}
103+
}
104+
105+
/// Deactivate the barrier and return the buffered requests.
106+
/// Called on COMMIT (to flush them) or ABORT (to discard them).
107+
pub fn deactivate(&self) -> Vec<BufferedDurabilityRequest> {
108+
let mut inner = self.inner.lock().unwrap();
109+
inner.active_prepare = None;
110+
std::mem::take(&mut inner.buffered)
111+
}
112+
113+
/// Check if the barrier is currently active.
114+
pub fn is_active(&self) -> bool {
115+
self.inner.lock().unwrap().active_prepare.is_some()
116+
}
117+
}

0 commit comments

Comments
 (0)