Skip to content

Commit ef1c969

Browse files
committed
Simplify PersistenceBarrier to two states: Inactive and Active
Remove the Armed state. No race is possible because the barrier is activated on the same thread that just released the write lock, and the PREPARE is sent to the durability worker directly (not through send_or_buffer_durability) before the barrier activates.
1 parent 4ca1319 commit ef1c969

4 files changed

Lines changed: 45 additions & 83 deletions

File tree

crates/core/2PC-IMPLEMENTATION-PLAN.md

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -153,17 +153,19 @@ On replay, when encountering a PREPARE:
153153

154154
## Persistence barrier
155155

156-
The barrier in `relational_db.rs` has three states: `Inactive`, `Armed`, `Active`.
156+
The barrier in `relational_db.rs` has two states: `Inactive` and `Active`.
157157

158158
- **Inactive**: normal operation, durability requests go through.
159-
- **Armed**: set BEFORE committing the transaction (while write lock is held). The NEXT durability request (the PREPARE) goes through to the worker and transitions the barrier to Active.
160-
- **Active**: all subsequent durability requests are buffered.
159+
- **Active**: all durability requests are buffered.
161160

162-
This ensures no race between the write lock release and the barrier activation. Since the barrier is Armed while the write lock is held, no other transaction can commit and send a durability request before the barrier transitions to Active.
161+
No race is possible because the barrier is activated on the same thread that holds the write lock. The sequence on both coordinator and participant is:
162+
163+
1. Commit in-memory (releases write lock)
164+
2. Send PREPARE to durability worker (direct call, bypasses barrier)
165+
3. Activate barrier
166+
167+
Steps 1-3 happen sequentially on one thread. No other transaction can commit between 1 and 3 because steps 2 and 3 are immediate (no async, no lock release between them). By the time another transaction acquires the write lock and commits, the barrier is already active and its durability request is buffered.
163168

164-
Used by both coordinator and participant:
165-
- Arm before committing the 2PC transaction
166-
- The commit's durability request (the PREPARE) transitions Armed -> Active
167169
- On COMMIT: deactivate, flush buffered requests
168170
- On ABORT: deactivate, discard buffered requests
169171

crates/core/src/db/relational_db.rs

Lines changed: 34 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -96,24 +96,12 @@ pub struct PersistenceBarrier {
9696
inner: std::sync::Mutex<PersistenceBarrierInner>,
9797
}
9898

99-
#[derive(Default, PartialEq, Eq, Debug, Clone, Copy)]
100-
enum BarrierState {
101-
/// No 2PC in progress. Durability requests go through normally.
102-
#[default]
103-
Inactive,
104-
/// A 2PC is about to commit. The NEXT durability request is the PREPARE
105-
/// and should go through to the worker. After that request, the barrier
106-
/// transitions to Active automatically.
107-
Armed,
108-
/// A 2PC PREPARE has been sent to durability. All subsequent durability
109-
/// requests are buffered until the barrier is deactivated (COMMIT or ABORT).
110-
Active,
111-
}
112-
11399
#[derive(Default)]
114100
struct PersistenceBarrierInner {
115-
state: BarrierState,
116-
/// Buffered durability requests that arrived while the barrier was Active.
101+
/// Whether the barrier is active. When active, all durability requests
102+
/// are buffered instead of being sent to the worker.
103+
active: bool,
104+
/// Buffered durability requests that arrived while the barrier was active.
117105
buffered: Vec<BufferedDurabilityRequest>,
118106
}
119107

@@ -122,62 +110,42 @@ impl PersistenceBarrier {
122110
Self::default()
123111
}
124112

125-
/// Arm the barrier. The next durability request will go through (it's the
126-
/// PREPARE), and then the barrier transitions to Active, buffering all
127-
/// subsequent requests.
113+
/// Activate the barrier. All subsequent durability requests will be buffered.
128114
///
129-
/// This must be called BEFORE the transaction commits, while the write lock
130-
/// is still held. This ensures no other transaction can send a durability
131-
/// request between the PREPARE and the barrier activation.
132-
pub fn arm(&self) {
115+
/// Called after committing in-memory and sending PREPARE to the durability
116+
/// worker. No race is possible because this runs on the same thread that
117+
/// just released the write lock, before any other transaction can commit.
118+
pub fn activate(&self) {
133119
let mut inner = self.inner.lock().unwrap();
134-
assert_eq!(
135-
inner.state,
136-
BarrierState::Inactive,
137-
"persistence barrier must be Inactive to arm, but is {:?}",
138-
inner.state,
139-
);
140-
inner.state = BarrierState::Armed;
120+
assert!(!inner.active, "persistence barrier already active");
121+
inner.active = true;
141122
inner.buffered.clear();
142123
}
143124

144-
/// Called by `send_or_buffer_durability` for every durability request.
145-
///
146-
/// Returns `Some(reducer_context)` if the request should be sent to the
147-
/// durability worker (barrier is Inactive, or barrier is Armed and this is
148-
/// the PREPARE). Returns `None` if the request was buffered (barrier is Active).
149-
pub fn filter_durability_request(
125+
/// If the barrier is active, buffer the durability request and return None.
126+
/// If inactive, return the arguments back (caller should send normally).
127+
pub fn try_buffer(
150128
&self,
151129
reducer_context: Option<ReducerContext>,
152130
tx_data: &Arc<TxData>,
153131
) -> Option<Option<ReducerContext>> {
154132
let mut inner = self.inner.lock().unwrap();
155-
match inner.state {
156-
BarrierState::Inactive => {
157-
// No barrier. Let it through.
158-
Some(reducer_context)
159-
}
160-
BarrierState::Armed => {
161-
// This is the PREPARE request. Let it through, then go Active.
162-
inner.state = BarrierState::Active;
163-
Some(reducer_context)
164-
}
165-
BarrierState::Active => {
166-
// Buffer this request.
167-
inner.buffered.push(BufferedDurabilityRequest {
168-
reducer_context,
169-
tx_data: tx_data.clone(),
170-
});
171-
None
172-
}
133+
if inner.active {
134+
inner.buffered.push(BufferedDurabilityRequest {
135+
reducer_context,
136+
tx_data: tx_data.clone(),
137+
});
138+
None
139+
} else {
140+
Some(reducer_context)
173141
}
174142
}
175143

176144
/// Deactivate the barrier and return the buffered requests.
177145
/// Called on COMMIT (to flush them) or ABORT (to discard them).
178146
pub fn deactivate(&self) -> Vec<BufferedDurabilityRequest> {
179147
let mut inner = self.inner.lock().unwrap();
180-
inner.state = BarrierState::Inactive;
148+
inner.active = false;
181149
std::mem::take(&mut inner.buffered)
182150
}
183151
}
@@ -952,32 +920,26 @@ impl RelationalDB {
952920

953921
/// Send a durability request, or buffer it if the persistence barrier is active.
954922
fn send_or_buffer_durability(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
955-
match self.persistence_barrier.filter_durability_request(reducer_context, tx_data) {
923+
match self.persistence_barrier.try_buffer(reducer_context, tx_data) {
924+
None => {
925+
// Buffered behind the persistence barrier.
926+
}
956927
Some(reducer_context) => {
957-
// Either barrier is Inactive (normal path) or Armed (this is the PREPARE).
958-
// Send to durability worker.
928+
// Barrier not active. Send to durability worker.
959929
if let Some(durability) = &self.durability {
960930
durability.request_durability(reducer_context, tx_data);
961931
}
962932
}
963-
None => {
964-
// Buffered behind the persistence barrier (Active state).
965-
}
966933
}
967934
}
968935

969-
/// Arm the persistence barrier for a 2PC PREPARE.
970-
///
971-
/// Call this BEFORE committing the transaction (while the write lock is
972-
/// still held). The next durability request (the PREPARE) will go through
973-
/// to the worker normally. After that, all subsequent durability requests
974-
/// are buffered until `finalize_prepare_commit()` or `finalize_prepare_abort()`.
936+
/// Activate the persistence barrier for a 2PC PREPARE.
975937
///
976-
/// This ensures no speculative transaction can reach the durability worker
977-
/// between the PREPARE and the COMMIT/ABORT decision, even though the
978-
/// write lock is released by `commit_tx_downgrade`.
979-
pub fn arm_persistence_barrier(&self) {
980-
self.persistence_barrier.arm();
938+
/// Call this AFTER committing in-memory and sending PREPARE to the
939+
/// durability worker. All subsequent durability requests will be buffered
940+
/// until `finalize_prepare_commit()` or `finalize_prepare_abort()`.
941+
pub fn activate_persistence_barrier(&self) {
942+
self.persistence_barrier.activate();
981943
}
982944

983945
/// Finalize a 2PC transaction as COMMIT.

crates/core/src/host/module_host.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1787,9 +1787,7 @@ impl ModuleHost {
17871787
// been sent to the durability worker (via the normal commit path).
17881788
// The barrier prevents any subsequent transactions from being persisted
17891789
// until we finalize with COMMIT or ABORT.
1790-
//
1791-
// We use offset 0 as a sentinel; the barrier only needs active/inactive state.
1792-
self.relational_db().persistence_barrier().activate(0);
1790+
self.relational_db().activate_persistence_barrier();
17931791

17941792
let info = super::prepared_tx::PreparedTxInfo {
17951793
tx_offset: 0, // TODO: thread TxOffset from commit path

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -993,7 +993,7 @@ impl InstanceCommon {
993993
// (via commit_and_broadcast_event -> commit_tx_downgrade -> send_or_buffer_durability).
994994
// No subsequent transactions should be persisted until we confirm all
995995
// participants are prepared and we decide COMMIT.
996-
stdb.persistence_barrier().activate(0);
996+
stdb.activate_persistence_barrier();
997997
}
998998

999999
let replica_ctx = inst.replica_ctx().clone();

0 commit comments

Comments
 (0)