Skip to content

Commit b88d4ae

Browse files
committed
regular 2pc
1 parent ef1c969 commit b88d4ae

6 files changed

Lines changed: 443 additions & 318 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 17 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk};
1212
use spacetimedb_data_structures::map::HashSet;
1313
use spacetimedb_datastore::db_metrics::DB_METRICS;
1414
use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError};
15-
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
15+
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
1616
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1717
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1818
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
@@ -76,79 +76,6 @@ type RowCountFn = Arc<dyn Fn(TableId, &str) -> i64 + Send + Sync>;
7676
/// The type of transactions committed by [RelationalDB].
7777
pub type Txdata = commitlog::payload::Txdata<ProductValue>;
7878

79-
/// A buffered durability request, held behind the persistence barrier.
80-
pub struct BufferedDurabilityRequest {
81-
pub reducer_context: Option<ReducerContext>,
82-
pub tx_data: Arc<TxData>,
83-
}
84-
85-
/// The persistence barrier prevents durability requests from being sent to the
86-
/// durability worker while a 2PC PREPARE is pending.
87-
///
88-
/// When active:
89-
/// - The PREPARE's own durability request has already been sent to the worker.
90-
/// - All subsequent durability requests are buffered here.
91-
/// - Once the PREPARE is confirmed durable and a COMMIT/ABORT decision is made:
92-
/// - COMMIT: buffered requests are flushed to the worker.
93-
/// - ABORT: buffered requests are discarded.
94-
#[derive(Default)]
95-
pub struct PersistenceBarrier {
96-
inner: std::sync::Mutex<PersistenceBarrierInner>,
97-
}
98-
99-
#[derive(Default)]
100-
struct PersistenceBarrierInner {
101-
/// Whether the barrier is active. When active, all durability requests
102-
/// are buffered instead of being sent to the worker.
103-
active: bool,
104-
/// Buffered durability requests that arrived while the barrier was active.
105-
buffered: Vec<BufferedDurabilityRequest>,
106-
}
107-
108-
impl PersistenceBarrier {
109-
pub fn new() -> Self {
110-
Self::default()
111-
}
112-
113-
/// Activate the barrier. All subsequent durability requests will be buffered.
114-
///
115-
/// Called after committing in-memory and sending PREPARE to the durability
116-
/// worker. No race is possible because this runs on the same thread that
117-
/// just released the write lock, before any other transaction can commit.
118-
pub fn activate(&self) {
119-
let mut inner = self.inner.lock().unwrap();
120-
assert!(!inner.active, "persistence barrier already active");
121-
inner.active = true;
122-
inner.buffered.clear();
123-
}
124-
125-
/// If the barrier is active, buffer the durability request and return None.
126-
/// If inactive, return the arguments back (caller should send normally).
127-
pub fn try_buffer(
128-
&self,
129-
reducer_context: Option<ReducerContext>,
130-
tx_data: &Arc<TxData>,
131-
) -> Option<Option<ReducerContext>> {
132-
let mut inner = self.inner.lock().unwrap();
133-
if inner.active {
134-
inner.buffered.push(BufferedDurabilityRequest {
135-
reducer_context,
136-
tx_data: tx_data.clone(),
137-
});
138-
None
139-
} else {
140-
Some(reducer_context)
141-
}
142-
}
143-
144-
/// Deactivate the barrier and return the buffered requests.
145-
/// Called on COMMIT (to flush them) or ABORT (to discard them).
146-
pub fn deactivate(&self) -> Vec<BufferedDurabilityRequest> {
147-
let mut inner = self.inner.lock().unwrap();
148-
inner.active = false;
149-
std::mem::take(&mut inner.buffered)
150-
}
151-
}
15279

15380
/// We've added a module version field to the system tables, but we don't yet
15481
/// have the infrastructure to support multiple versions.
@@ -185,10 +112,6 @@ pub struct RelationalDB {
185112

186113
/// An async queue for recording transaction metrics off the main thread
187114
metrics_recorder_queue: Option<MetricsRecorderQueue>,
188-
189-
/// 2PC persistence barrier. When active, durability requests are buffered
190-
/// instead of being sent to the durability worker.
191-
persistence_barrier: PersistenceBarrier,
192115
}
193116

194117
/// Perform a snapshot every `SNAPSHOT_FREQUENCY` transactions.
@@ -253,7 +176,6 @@ impl RelationalDB {
253176

254177
workload_type_to_exec_counters,
255178
metrics_recorder_queue,
256-
persistence_barrier: PersistenceBarrier::new(),
257179
}
258180
}
259181

@@ -532,6 +454,17 @@ impl RelationalDB {
532454
Ok(self.with_read_only(Workload::Internal, |tx| self.inner.program(tx))?)
533455
}
534456

457+
/// Read any 2PC participant transactions that were in PREPARE state when the database
458+
/// last shut down (or crashed). Each returned string is a `prepare_id`.
459+
///
460+
/// If non-empty, the caller must resume these transactions: retransmit PREPARED to
461+
/// the coordinator and await a COMMIT or ABORT decision before allowing normal operation.
462+
pub fn pending_2pc_prepares(&self) -> Result<Vec<String>, DBError> {
463+
self.with_auto_commit(Workload::Internal, |tx| {
464+
tx.scan_st_2pc_state().map_err(DBError::from)
465+
})
466+
}
467+
535468
/// Read the set of clients currently connected to the database.
536469
pub fn connected_clients(&self) -> Result<ConnectedClients, DBError> {
537470
self.with_read_only(Workload::Internal, |tx| {
@@ -899,7 +832,9 @@ impl RelationalDB {
899832
self.maybe_do_snapshot(&tx_data);
900833

901834
let tx_data = Arc::new(tx_data);
902-
self.send_or_buffer_durability(reducer_context, &tx_data);
835+
if let Some(durability) = &self.durability {
836+
durability.request_durability(reducer_context, &tx_data);
837+
}
903838

904839
Ok(Some((tx_offset, tx_data, tx_metrics, reducer)))
905840
}
@@ -913,64 +848,11 @@ impl RelationalDB {
913848
self.maybe_do_snapshot(&tx_data);
914849

915850
let tx_data = Arc::new(tx_data);
916-
self.send_or_buffer_durability(tx.ctx.reducer_context().cloned(), &tx_data);
917-
918-
(tx_data, tx_metrics, tx)
919-
}
920-
921-
/// Send a durability request, or buffer it if the persistence barrier is active.
922-
fn send_or_buffer_durability(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
923-
match self.persistence_barrier.try_buffer(reducer_context, tx_data) {
924-
None => {
925-
// Buffered behind the persistence barrier.
926-
}
927-
Some(reducer_context) => {
928-
// Barrier not active. Send to durability worker.
929-
if let Some(durability) = &self.durability {
930-
durability.request_durability(reducer_context, tx_data);
931-
}
932-
}
933-
}
934-
}
935-
936-
/// Activate the persistence barrier for a 2PC PREPARE.
937-
///
938-
/// Call this AFTER committing in-memory and sending PREPARE to the
939-
/// durability worker. All subsequent durability requests will be buffered
940-
/// until `finalize_prepare_commit()` or `finalize_prepare_abort()`.
941-
pub fn activate_persistence_barrier(&self) {
942-
self.persistence_barrier.activate();
943-
}
944-
945-
/// Finalize a 2PC transaction as COMMIT.
946-
/// Deactivates the persistence barrier and flushes all buffered durability requests.
947-
pub fn finalize_prepare_commit(&self) {
948-
let buffered = self.persistence_barrier.deactivate();
949851
if let Some(durability) = &self.durability {
950-
for req in buffered {
951-
durability.request_durability(req.reducer_context, &req.tx_data);
952-
}
852+
durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data);
953853
}
954-
}
955854

956-
/// Finalize a 2PC transaction as ABORT.
957-
/// Deactivates the persistence barrier, discards buffered durability requests,
958-
/// and inverts the PREPARE's in-memory changes.
959-
pub fn finalize_prepare_abort(&self, prepare_tx_data: &TxData) {
960-
// Discard all buffered speculative transactions.
961-
let _discarded = self.persistence_barrier.deactivate();
962-
// TODO: Invert in-memory state using prepare_tx_data.
963-
// For now, log a warning. Full inversion requires:
964-
// 1. Begin new MutTx
965-
// 2. Delete rows from prepare_tx_data.persistent_inserts()
966-
// 3. Re-insert rows from prepare_tx_data.persistent_deletes()
967-
// 4. Commit without durability
968-
// 5. Re-execute discarded speculative transactions
969-
log::warn!(
970-
"2PC ABORT: persistence barrier deactivated, {} buffered transactions discarded. \
971-
In-memory state inversion not yet implemented.",
972-
_discarded.len()
973-
);
855+
(tx_data, tx_metrics, tx)
974856
}
975857

976858
/// Get the [`DurableOffset`] of this database, or `None` if this is an
@@ -981,11 +863,6 @@ impl RelationalDB {
981863
.map(|durability| durability.durable_tx_offset())
982864
}
983865

984-
/// Get a reference to the persistence barrier (for 2PC).
985-
pub fn persistence_barrier(&self) -> &PersistenceBarrier {
986-
&self.persistence_barrier
987-
}
988-
989866
/// Decide based on the `committed_state.next_tx_offset`
990867
/// whether to request that the [`SnapshotWorker`] in `self` capture a snapshot of the database.
991868
///

0 commit comments

Comments
 (0)