Skip to content

Commit 39ac0e2

Browse files
Pipelined 2PC with durability barrier (#4728)
## Summary - Split 2PC into two consecutive rounds: Round 1 (memory commit, lock released) and Round 2 (persistence, no locks held) - Durability barrier prevents tainted transactions from reaching disk while 2PC is pending - PREPARE PERSIST commitlog entry contains only reducer inputs (st_2pc_state); COMMIT PERSIST contains actual row changes - Confirmed-read clients naturally wait for COMMIT PERSIST durability via the durable offset mechanism - TLA+ spec (in private repo) verified with 6 invariants including BarrierSafety, 60 distinct states ## TODOs - Trigger module restart on persistence abort to flush tainted in-memory state - Retry limit for send_prepared_to_persist_to_coordinator - Handle prepare_id mismatch in recovery (new vs original ID) ## Test plan - Existing 2PC smoke tests (cross_db_2pc, cross_db_2pc_recovery) - TLA+ verification - Verify barrier prevents COMMIT PERSIST from reaching disk prematurely - Verify confirmed-read clients dont see changes until COMMIT PERSIST is durable --------- Co-authored-by: Shubham Mishra <shivam828787@gmail.com>
1 parent 3fec4df commit 39ac0e2

10 files changed

Lines changed: 721 additions & 227 deletions

File tree

crates/client-api/src/routes/database.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,54 @@ pub async fn ack_commit_2pc<S: ControlStateDelegate + NodeDelegate>(
406406
Ok(StatusCode::OK)
407407
}
408408

409+
/// 2PC prepared-to-persist endpoint (pipelined 2PC Round 2).
410+
///
411+
/// Called by participant B after its PREPARE_PERSIST is durable, to notify
412+
/// coordinator A that B is ready for COMMIT_PERSIST.
413+
///
414+
/// `POST /v1/database/:name_or_identity/2pc/prepared-to-persist/:prepare_id`
415+
pub async fn prepared_to_persist_2pc<S: ControlStateDelegate + NodeDelegate>(
416+
State(worker_ctx): State<S>,
417+
Extension(_auth): Extension<SpacetimeAuth>,
418+
Path(TwoPcParams {
419+
name_or_identity,
420+
prepare_id,
421+
}): Path<TwoPcParams>,
422+
) -> axum::response::Result<impl IntoResponse> {
423+
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;
424+
425+
module.signal_persist_prepared(&prepare_id).map_err(|e| {
426+
log::warn!("2PC prepared-to-persist: {e}");
427+
(StatusCode::NOT_FOUND, e).into_response()
428+
})?;
429+
430+
Ok(StatusCode::OK)
431+
}
432+
433+
/// 2PC commit-persist endpoint (pipelined 2PC Round 2).
434+
///
435+
/// Called by coordinator A to tell participant B that A's COMMIT_PERSIST is durable
436+
/// and B should finalize (delete PREPARE marker, flush deferred transactions).
437+
///
438+
/// `POST /v1/database/:name_or_identity/2pc/commit-persist/:prepare_id`
439+
pub async fn commit_persist_2pc<S: ControlStateDelegate + NodeDelegate>(
440+
State(worker_ctx): State<S>,
441+
Extension(_auth): Extension<SpacetimeAuth>,
442+
Path(TwoPcParams {
443+
name_or_identity,
444+
prepare_id,
445+
}): Path<TwoPcParams>,
446+
) -> axum::response::Result<impl IntoResponse> {
447+
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;
448+
449+
module.commit_persist_prepared(&prepare_id).map_err(|e| {
450+
log::error!("2PC commit-persist failed: {e}");
451+
(StatusCode::NOT_FOUND, e).into_response()
452+
})?;
453+
454+
Ok(StatusCode::OK)
455+
}
456+
409457
/// Encode a reducer return value as an HTTP response.
410458
///
411459
/// If the outcome is an error, return a raw string with `application/text`. Ignore `want_bsatn` in this case.
@@ -1445,6 +1493,10 @@ pub struct DatabaseRoutes<S> {
14451493
pub status_2pc_get: MethodRouter<S>,
14461494
/// POST: /database/:name_or_identity/2pc/ack-commit/:prepare_id
14471495
pub ack_commit_2pc_post: MethodRouter<S>,
1496+
/// POST: /database/:name_or_identity/2pc/prepared-to-persist/:prepare_id
1497+
pub prepared_to_persist_2pc_post: MethodRouter<S>,
1498+
/// POST: /database/:name_or_identity/2pc/commit-persist/:prepare_id
1499+
pub commit_persist_2pc_post: MethodRouter<S>,
14481500
}
14491501

14501502
impl<S> Default for DatabaseRoutes<S>
@@ -1475,6 +1527,8 @@ where
14751527
abort_2pc_post: post(abort_2pc::<S>),
14761528
status_2pc_get: get(status_2pc::<S>),
14771529
ack_commit_2pc_post: post(ack_commit_2pc::<S>),
1530+
prepared_to_persist_2pc_post: post(prepared_to_persist_2pc::<S>),
1531+
commit_persist_2pc_post: post(commit_persist_2pc::<S>),
14781532
}
14791533
}
14801534
}
@@ -1504,7 +1558,12 @@ where
15041558
.route("/2pc/commit/:prepare_id", self.commit_2pc_post)
15051559
.route("/2pc/abort/:prepare_id", self.abort_2pc_post)
15061560
.route("/2pc/status/:prepare_id", self.status_2pc_get)
1507-
.route("/2pc/ack-commit/:prepare_id", self.ack_commit_2pc_post);
1561+
.route("/2pc/ack-commit/:prepare_id", self.ack_commit_2pc_post)
1562+
.route(
1563+
"/2pc/prepared-to-persist/:prepare_id",
1564+
self.prepared_to_persist_2pc_post,
1565+
)
1566+
.route("/2pc/commit-persist/:prepare_id", self.commit_persist_2pc_post);
15081567

15091568
axum::Router::new()
15101569
.route("/", self.root_post)

crates/core/src/db/relational_db.rs

Lines changed: 128 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,20 @@ pub const ONLY_MODULE_VERSION: &str = "0.0.1";
9494
/// for each entry in [`ConnectedClients`].
9595
pub type ConnectedClients = HashSet<(Identity, ConnectionId)>;
9696

97+
/// Durability barrier for pipelined 2PC.
98+
///
99+
/// Supports multiple concurrent 2PC transactions. Each active barrier is
100+
/// identified by its tx_offset. Transactions above the *minimum* active
101+
/// barrier offset are deferred. When a barrier is cleared, pending
102+
/// transactions up to the new minimum (or all, if no barriers remain)
103+
/// are flushed to the durability worker.
104+
struct DurabilityBarrier {
105+
/// Active barrier offsets (one per in-flight 2PC transaction).
106+
active: std::collections::BTreeSet<u64>,
107+
/// Transactions deferred by the barrier.
108+
pending: Vec<(Option<ReducerContext>, Arc<TxData>)>,
109+
}
110+
97111
pub struct RelationalDB {
98112
database_identity: Identity,
99113
owner_identity: Identity,
@@ -112,6 +126,10 @@ pub struct RelationalDB {
112126

113127
/// An async queue for recording transaction metrics off the main thread
114128
metrics_recorder_queue: Option<MetricsRecorderQueue>,
129+
130+
/// Pipelined 2PC durability barrier.
131+
/// When set, transactions past the barrier offset are deferred (not sent to disk).
132+
durability_barrier: std::sync::Mutex<Option<DurabilityBarrier>>,
115133
}
116134

117135
/// Perform a snapshot every `SNAPSHOT_FREQUENCY` transactions.
@@ -176,6 +194,7 @@ impl RelationalDB {
176194

177195
workload_type_to_exec_counters,
178196
metrics_recorder_queue,
197+
durability_barrier: std::sync::Mutex::new(None),
179198
}
180199
}
181200

@@ -464,6 +483,14 @@ impl RelationalDB {
464483
self.with_auto_commit(Workload::Internal, |tx| tx.scan_st_2pc_state().map_err(DBError::from))
465484
}
466485

486+
/// The offset that will be assigned to the next committed transaction.
487+
///
488+
/// Safe to call while holding the write lock (MutTxId) -- the offset won't change
489+
/// until the write lock is released via commit.
490+
pub fn next_tx_offset(&self) -> u64 {
491+
self.inner.next_tx_offset()
492+
}
493+
467494
/// Read any 2PC coordinator log entries that have not yet been acknowledged by their
468495
/// participants. Used on coordinator crash-recovery to retransmit COMMIT decisions.
469496
pub fn pending_2pc_coordinator_commits(
@@ -841,9 +868,7 @@ impl RelationalDB {
841868
self.maybe_do_snapshot(&tx_data);
842869

843870
let tx_data = Arc::new(tx_data);
844-
if let Some(durability) = &self.durability {
845-
durability.request_durability(reducer_context, &tx_data);
846-
}
871+
self.request_durability_maybe_barrier(reducer_context, &tx_data);
847872

848873
Ok(Some((tx_offset, tx_data, tx_metrics, reducer)))
849874
}
@@ -857,9 +882,7 @@ impl RelationalDB {
857882
self.maybe_do_snapshot(&tx_data);
858883

859884
let tx_data = Arc::new(tx_data);
860-
if let Some(durability) = &self.durability {
861-
durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data);
862-
}
885+
self.request_durability_maybe_barrier(tx.ctx.reducer_context().cloned(), &tx_data);
863886

864887
(tx_data, tx_metrics, tx)
865888
}
@@ -869,8 +892,106 @@ impl RelationalDB {
869892
/// Used by the 2PC participant path to make the `st_2pc_state` PREPARE marker durable
870893
/// while the main write lock is still held (i.e. without going through a full commit).
871894
pub fn request_durability_for_tx_data(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
895+
self.request_durability_maybe_barrier(reducer_context, tx_data);
896+
}
897+
898+
/// Send a tx to the durability worker, unless a durability barrier is active
899+
/// and the tx's offset exceeds the minimum active barrier. In that case, defer the tx.
900+
fn request_durability_maybe_barrier(
901+
&self,
902+
reducer_context: Option<ReducerContext>,
903+
tx_data: &Arc<TxData>,
904+
) {
905+
let Some(durability) = &self.durability else {
906+
return;
907+
};
908+
909+
let mut barrier = self.durability_barrier.lock().unwrap();
910+
if let Some(ref mut b) = *barrier {
911+
if let Some(&min_barrier) = b.active.first() {
912+
if let Some(offset) = tx_data.tx_offset() {
913+
if offset > min_barrier {
914+
// Past the lowest active barrier: defer.
915+
b.pending.push((reducer_context, tx_data.clone()));
916+
return;
917+
}
918+
}
919+
}
920+
}
921+
// At or before the barrier (or no barrier): normal path.
922+
durability.request_durability(reducer_context, tx_data);
923+
}
924+
925+
/// Set a durability barrier at `barrier_offset`.
926+
///
927+
/// Transactions at this offset pass through to the durability worker normally.
928+
/// Transactions with higher offsets are deferred until all barriers are cleared.
929+
/// Multiple concurrent barriers are supported; the effective barrier is the minimum.
930+
///
931+
/// Call while holding the database write lock to prevent races.
932+
pub fn set_durability_barrier(&self, barrier_offset: u64) {
933+
let mut barrier = self.durability_barrier.lock().unwrap();
934+
let b = barrier.get_or_insert_with(|| DurabilityBarrier {
935+
active: std::collections::BTreeSet::new(),
936+
pending: Vec::new(),
937+
});
938+
b.active.insert(barrier_offset);
939+
}
940+
941+
/// Abort a durability barrier, discarding ALL deferred transactions.
942+
///
943+
/// Used when Round 2 of pipelined 2PC aborts. All transactions behind the
944+
/// barrier are tainted (they may have read data from the aborted 2PC tx)
945+
/// and must not reach disk. On restart, the in-memory state is lost and
946+
/// the pipeline is effectively flushed.
947+
pub fn abort_durability_barrier(&self, barrier_offset: u64) {
948+
let mut barrier = self.durability_barrier.lock().unwrap();
949+
let Some(ref mut b) = *barrier else {
950+
return;
951+
};
952+
b.active.remove(&barrier_offset);
953+
if b.active.is_empty() {
954+
// Drop all pending transactions -- they are tainted.
955+
*barrier = None;
956+
}
957+
// If other barriers remain, the pending list stays (those transactions
958+
// are still blocked by the other barriers and will be resolved by them).
959+
}
960+
961+
/// Clear one durability barrier, flushing deferred transactions that are now
962+
/// below the new minimum barrier (or all if no barriers remain).
963+
pub fn clear_durability_barrier(&self, barrier_offset: u64) {
964+
let to_flush = {
965+
let mut barrier = self.durability_barrier.lock().unwrap();
966+
let Some(ref mut b) = *barrier else {
967+
return;
968+
};
969+
b.active.remove(&barrier_offset);
970+
if b.active.is_empty() {
971+
// No more barriers: flush everything.
972+
let pending = std::mem::take(&mut b.pending);
973+
*barrier = None;
974+
pending
975+
} else {
976+
// Flush pending transactions up to the new minimum barrier.
977+
let &new_min = b.active.first().unwrap();
978+
let mut flush = Vec::new();
979+
b.pending.retain(|(ctx, td)| {
980+
if let Some(offset) = td.tx_offset() {
981+
if offset <= new_min {
982+
flush.push((ctx.clone(), td.clone()));
983+
return false;
984+
}
985+
}
986+
true
987+
});
988+
flush
989+
}
990+
};
872991
if let Some(durability) = &self.durability {
873-
durability.request_durability(reducer_context, tx_data);
992+
for (reducer_context, tx_data) in to_flush {
993+
durability.request_durability(reducer_context, &tx_data);
994+
}
874995
}
875996
}
876997

crates/core/src/host/host_controller.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,8 @@ async fn make_replica_ctx(
727727
call_reducer_client,
728728
call_reducer_router,
729729
call_reducer_auth_token,
730+
prepared_txs: crate::host::prepared_tx::PreparedTransactions::new(),
731+
on_panic: std::sync::Arc::new(std::sync::OnceLock::new()),
730732
})
731733
}
732734

@@ -830,14 +832,23 @@ impl<F: Fn() + Send + Sync + 'static> ModuleLauncher<F> {
830832
)
831833
.await
832834
.map(Arc::new)?;
835+
836+
// Share the unregister function with the replica context so that async tasks
837+
// (e.g., 2PC Round 2) can trigger module restart without holding the executor thread.
838+
let on_panic = std::sync::Arc::new(self.on_panic);
839+
let _ = replica_ctx.on_panic.set(Box::new({
840+
let op = on_panic.clone();
841+
move || op()
842+
}));
843+
833844
let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db().clone());
834845
let (program, module_host) = make_module_host(
835846
self.runtimes.clone(),
836847
replica_ctx.clone(),
837848
scheduler.clone(),
838849
self.program,
839850
self.energy_monitor,
840-
self.on_panic,
851+
move || on_panic(),
841852
self.core,
842853
)
843854
.await?;

crates/core/src/host/instance_env.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,6 +1509,8 @@ mod test {
15091509
call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()),
15101510
call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")),
15111511
call_reducer_auth_token: None,
1512+
prepared_txs: crate::host::prepared_tx::PreparedTransactions::new(),
1513+
on_panic: std::sync::Arc::new(std::sync::OnceLock::new()),
15121514
},
15131515
runtime,
15141516
))

0 commit comments

Comments
 (0)