Skip to content

Commit f9fdcf9

Browse files
committed
Add 2PC implementation plan with corrected protocol
Documents the full pipelined 2PC protocol for coordinator and participant, including the persistence barrier, serializable isolation (participant holds MutTxId across all calls in a coordinator transaction), two-phase participant response (immediate result + deferred PREPARED after durability), abort paths, commitlog format, and replay semantics. Identifies the open problem: MutTxId is !Send but must be held across multiple HTTP requests on the participant side.
1 parent 1448c55 commit f9fdcf9

4 files changed

Lines changed: 216 additions & 86 deletions

File tree

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# 2PC Implementation Plan (Pipelined)
2+
3+
## Context
4+
5+
The TPC-C benchmark on branch `origin/phoebe/tpcc/reducer-return-value` (public submodule) uses non-atomic HTTP calls for cross-database operations. We need 2PC so distributed transactions either commit on both databases or neither. Pipelined 2PC is chosen because it avoids blocking on persistence during lock-holding, and the codebase already separates in-memory commit from durability.
6+
7+
## Protocol (Corrected)
8+
9+
### Participant happy path:
10+
11+
1. Receive CALL from coordinator (reducer name + args)
12+
2. Execute reducer (write lock held)
13+
3. Return result to coordinator (write lock still held, transaction still open)
14+
4. Possibly receive more CALLs from coordinator (same transaction, same write lock)
15+
5. Receive END_CALLS from coordinator ("no more reducer calls in this transaction")
16+
6. Commit in-memory (release write lock)
17+
7. Send PREPARE to durability worker
18+
8. **Barrier up** -- no more durability requests go through
19+
9. In background: wait for PREPARE to be durable
20+
10. Once durable: send PREPARED to coordinator
21+
11. Wait for COMMIT or ABORT from coordinator
22+
12. Receive COMMIT
23+
13. Send COMMIT to durability worker
24+
14. **Barrier down** -- flush buffered requests
25+
26+
### Coordinator happy path:
27+
28+
1. Execute reducer, calling participant reducers along the way (participants hold write locks, return results, but don't commit)
29+
2. Reducer succeeds
30+
3. Send END_CALLS to all participants (they can now commit in-memory)
31+
4. Commit coordinator in-memory (release write lock)
32+
5. Send PREPARE to durability worker
33+
6. **Barrier up** -- no more durability requests go through
34+
7. Wait for coordinator's own PREPARE to be durable
35+
8. Wait for all participants to report PREPARED
36+
9. Send COMMIT to all participants
37+
10. Send COMMIT to durability worker
38+
11. **Barrier down** -- flush buffered requests
39+
40+
### Key correctness properties:
41+
42+
- **Serializable isolation**: Participant holds write lock from CALL through END_CALLS. Multiple CALLs from the same coordinator transaction execute within the same MutTxId on the participant. The second call sees the first call's writes.
43+
- **Persistence barrier**: After PREPARE is sent to durability (step 7/8 on participant, step 5/6 on coordinator), no speculative transactions can reach the durability worker until COMMIT or ABORT. Anything sent to the durability worker can eventually become persistent, so the barrier is required.
44+
- **Two responses from participant**: The immediate result (step 3) and the later PREPARED notification (step 10). The coordinator collects both: results during reducer execution, PREPARED notifications before deciding COMMIT.
45+
- **Pipelining benefit**: Locks are held only during reducer execution (steps 1-6), not during persistence (steps 7-14). The persistence and 2PC handshake happen after locks are released on both sides.
46+
47+
### Abort paths:
48+
49+
**Coordinator's reducer fails (step 2):**
50+
- Send ABORT to all participants (they still hold write locks)
51+
- Participants rollback their MutTxId (release write lock, no changes)
52+
- No PREPARE was sent, no barrier needed
53+
54+
**Participant's reducer fails (step 2):**
55+
- Participant returns error to coordinator
56+
- Coordinator's reducer fails (propagates error)
57+
- Coordinator sends ABORT to all other participants that succeeded
58+
- Those participants rollback their MutTxId
59+
60+
**Coordinator's PREPARE persists but a participant's PREPARE fails to persist:**
61+
- Participant cannot send PREPARED
62+
- Coordinator times out waiting for PREPARED
63+
- Coordinator sends ABORT to all participants
64+
- Coordinator inverts its own in-memory state, discards buffered durability requests
65+
66+
**Crash during protocol:**
67+
- See proposal §8 for recovery rules
68+
69+
### Open problem: MutTxId is !Send
70+
71+
The participant holds MutTxId across multiple HTTP requests (CALL, more CALLs, END_CALLS). MutTxId is !Send (holds SharedWriteGuard). Options:
72+
73+
1. **Dedicated blocking thread per participant transaction**: spawn_blocking holds the MutTxId, communicates via channels. HTTP handlers send messages, blocking thread processes them.
74+
2. **Session-based protocol**: Participant creates a session on first CALL, routes subsequent CALLs and END_CALLS to the same thread/task that holds the MutTxId.
75+
3. **Batch all calls**: Coordinator sends all reducer calls + args in a single request. Participant executes them all, returns all results, then commits. Single HTTP round-trip, no cross-request MutTxId holding.
76+
77+
Option 3 is simplest but limits the coordinator to not making decisions between calls. Option 1 is most general. TBD.
78+
79+
## Commitlog format
80+
81+
- PREPARE record: includes all row changes (inserts/deletes)
82+
- COMMIT record: follows PREPARE, marks transaction as committed
83+
- ABORT record: follows PREPARE, marks transaction as aborted
84+
- No other records can appear between PREPARE and COMMIT/ABORT in the durable log (persistence barrier enforces this)
85+
86+
## Replay semantics
87+
88+
On replay, when encountering a PREPARE:
89+
- Do not apply it to the datastore
90+
- Read the next record:
91+
- COMMIT: apply the PREPARE's changes
92+
- ABORT: skip the PREPARE
93+
- No next record (crash): transaction is still in progress, wait for coordinator or timeout and abort
94+
95+
## Key files
96+
97+
- `crates/core/src/db/relational_db.rs` -- PersistenceBarrier, arm/deactivate, send_or_buffer_durability
98+
- `crates/core/src/host/prepared_tx.rs` -- PreparedTxInfo, PreparedTransactions registry
99+
- `crates/core/src/host/module_host.rs` -- prepare_reducer, commit_prepared, abort_prepared
100+
- `crates/core/src/host/wasm_common/module_host_actor.rs` -- coordinator post-commit coordination
101+
- `crates/core/src/host/instance_env.rs` -- call_reducer_on_db_2pc, prepared_participants tracking
102+
- `crates/core/src/host/wasmtime/wasm_instance_env.rs` -- WASM host function
103+
- `crates/client-api/src/routes/database.rs` -- HTTP endpoints
104+
- `crates/bindings-sys/src/lib.rs` -- FFI
105+
- `crates/bindings/src/remote_reducer.rs` -- safe wrapper

crates/core/src/db/relational_db.rs

Lines changed: 73 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,24 @@ pub struct PersistenceBarrier {
9696
inner: std::sync::Mutex<PersistenceBarrierInner>,
9797
}
9898

99+
#[derive(Default, PartialEq, Eq, Debug, Clone, Copy)]
100+
enum BarrierState {
101+
/// No 2PC in progress. Durability requests go through normally.
102+
#[default]
103+
Inactive,
104+
/// A 2PC is about to commit. The NEXT durability request is the PREPARE
105+
/// and should go through to the worker. After that request, the barrier
106+
/// transitions to Active automatically.
107+
Armed,
108+
/// A 2PC PREPARE has been sent to durability. All subsequent durability
109+
/// requests are buffered until the barrier is deactivated (COMMIT or ABORT).
110+
Active,
111+
}
112+
99113
#[derive(Default)]
100114
struct PersistenceBarrierInner {
101-
/// If Some, a PREPARE is pending at this offset. All durability requests
102-
/// are buffered until the barrier is lifted.
103-
active_prepare: Option<TxOffset>,
104-
/// Buffered durability requests that arrived while the barrier was active.
115+
state: BarrierState,
116+
/// Buffered durability requests that arrived while the barrier was Active.
105117
buffered: Vec<BufferedDurabilityRequest>,
106118
}
107119

@@ -110,48 +122,64 @@ impl PersistenceBarrier {
110122
Self::default()
111123
}
112124

113-
/// Activate the barrier for a PREPARE at the given offset.
114-
pub fn activate(&self, prepare_offset: TxOffset) {
125+
/// Arm the barrier. The next durability request will go through (it's the
126+
/// PREPARE), and then the barrier transitions to Active, buffering all
127+
/// subsequent requests.
128+
///
129+
/// This must be called BEFORE the transaction commits, while the write lock
130+
/// is still held. This ensures no other transaction can send a durability
131+
/// request between the PREPARE and the barrier activation.
132+
pub fn arm(&self) {
115133
let mut inner = self.inner.lock().unwrap();
116-
assert!(
117-
inner.active_prepare.is_none(),
118-
"persistence barrier already active at offset {:?}, cannot activate for {prepare_offset}",
119-
inner.active_prepare,
134+
assert_eq!(
135+
inner.state,
136+
BarrierState::Inactive,
137+
"persistence barrier must be Inactive to arm, but is {:?}",
138+
inner.state,
120139
);
121-
inner.active_prepare = Some(prepare_offset);
140+
inner.state = BarrierState::Armed;
122141
inner.buffered.clear();
123142
}
124143

125-
/// If the barrier is active, buffer the durability request and return None.
126-
/// If the barrier is not active, return the arguments back unchanged.
127-
pub fn try_buffer(
144+
/// Called by `send_or_buffer_durability` for every durability request.
145+
///
146+
/// Returns `Some(reducer_context)` if the request should be sent to the
147+
/// durability worker (barrier is Inactive, or barrier is Armed and this is
148+
/// the PREPARE). Returns `None` if the request was buffered (barrier is Active).
149+
pub fn filter_durability_request(
128150
&self,
129151
reducer_context: Option<ReducerContext>,
130152
tx_data: &Arc<TxData>,
131153
) -> Option<Option<ReducerContext>> {
132154
let mut inner = self.inner.lock().unwrap();
133-
if inner.active_prepare.is_some() {
134-
inner.buffered.push(BufferedDurabilityRequest {
135-
reducer_context,
136-
tx_data: tx_data.clone(),
137-
});
138-
None // buffered
139-
} else {
140-
Some(reducer_context) // not buffered, return back
155+
match inner.state {
156+
BarrierState::Inactive => {
157+
// No barrier. Let it through.
158+
Some(reducer_context)
159+
}
160+
BarrierState::Armed => {
161+
// This is the PREPARE request. Let it through, then go Active.
162+
inner.state = BarrierState::Active;
163+
Some(reducer_context)
164+
}
165+
BarrierState::Active => {
166+
// Buffer this request.
167+
inner.buffered.push(BufferedDurabilityRequest {
168+
reducer_context,
169+
tx_data: tx_data.clone(),
170+
});
171+
None
172+
}
141173
}
142174
}
143175

144176
/// Deactivate the barrier and return the buffered requests.
177+
/// Called on COMMIT (to flush them) or ABORT (to discard them).
145178
pub fn deactivate(&self) -> Vec<BufferedDurabilityRequest> {
146179
let mut inner = self.inner.lock().unwrap();
147-
inner.active_prepare = None;
180+
inner.state = BarrierState::Inactive;
148181
std::mem::take(&mut inner.buffered)
149182
}
150-
151-
/// Check if the barrier is currently active.
152-
pub fn is_active(&self) -> bool {
153-
self.inner.lock().unwrap().active_prepare.is_some()
154-
}
155183
}
156184

157185
/// We've added a module version field to the system tables, but we don't yet
@@ -924,52 +952,32 @@ impl RelationalDB {
924952

925953
/// Send a durability request, or buffer it if the persistence barrier is active.
926954
fn send_or_buffer_durability(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
927-
match self.persistence_barrier.try_buffer(reducer_context, tx_data) {
928-
None => {
929-
// Buffered behind the persistence barrier; will be flushed on COMMIT
930-
// or discarded on ABORT.
931-
}
955+
match self.persistence_barrier.filter_durability_request(reducer_context, tx_data) {
932956
Some(reducer_context) => {
933-
// Not buffered (barrier not active). Send to durability worker.
957+
// Either barrier is Inactive (normal path) or Armed (this is the PREPARE).
958+
// Send to durability worker.
934959
if let Some(durability) = &self.durability {
935960
durability.request_durability(reducer_context, tx_data);
936961
}
937962
}
963+
None => {
964+
// Buffered behind the persistence barrier (Active state).
965+
}
938966
}
939967
}
940968

941-
/// Commit a transaction as a 2PC PREPARE: commit in-memory, send to
942-
/// durability worker, and activate the persistence barrier.
969+
/// Arm the persistence barrier for a 2PC PREPARE.
943970
///
944-
/// Returns the TxOffset and TxData. The caller should then wait for the
945-
/// PREPARE to become durable (via `durable_tx_offset().wait_for(offset)`)
946-
/// before sending PREPARED to the coordinator.
947-
#[tracing::instrument(level = "trace", skip_all)]
948-
pub fn commit_tx_prepare(
949-
&self,
950-
tx: MutTx,
951-
) -> Result<Option<(TxOffset, Arc<TxData>, TxMetrics, Option<ReducerName>)>, DBError> {
952-
log::trace!("COMMIT MUT TX (2PC PREPARE)");
953-
954-
let reducer_context = tx.ctx.reducer_context().cloned();
955-
let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else {
956-
return Ok(None);
957-
};
958-
959-
self.maybe_do_snapshot(&tx_data);
960-
961-
let tx_data = Arc::new(tx_data);
962-
963-
// Send the PREPARE to durability (bypassing the barrier, since this IS the prepare).
964-
if let Some(durability) = &self.durability {
965-
durability.request_durability(reducer_context.clone(), &tx_data);
966-
}
967-
968-
// Activate the persistence barrier AFTER sending the PREPARE.
969-
// All subsequent durability requests will be buffered.
970-
self.persistence_barrier.activate(tx_offset);
971-
972-
Ok(Some((tx_offset, tx_data, tx_metrics, reducer)))
971+
/// Call this BEFORE committing the transaction (while the write lock is
972+
/// still held). The next durability request (the PREPARE) will go through
973+
/// to the worker normally. After that, all subsequent durability requests
974+
/// are buffered until `finalize_prepare_commit()` or `finalize_prepare_abort()`.
975+
///
976+
/// This ensures no speculative transaction can reach the durability worker
977+
/// between the PREPARE and the COMMIT/ABORT decision, even though the
978+
/// write lock is released by `commit_tx_downgrade`.
979+
pub fn arm_persistence_barrier(&self) {
980+
self.persistence_barrier.arm();
973981
}
974982

975983
/// Finalize a 2PC transaction as COMMIT.

crates/core/src/host/module_host.rs

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,11 +1810,6 @@ impl ModuleHost {
18101810
let _ = durable_offset.wait_for(current + 1).await;
18111811
}
18121812

1813-
// PREPARE is now durable. Deactivate the barrier and flush all
1814-
// buffered speculative transactions to the durability worker.
1815-
// Subsequent transactions can persist normally until the next PREPARE.
1816-
self.relational_db().finalize_prepare_commit();
1817-
18181813
Ok((prepare_id, result, return_value))
18191814
} else {
18201815
// Reducer failed -- no prepare_id since nothing to commit/abort.
@@ -1824,30 +1819,25 @@ impl ModuleHost {
18241819

18251820
/// Finalize a prepared transaction as COMMIT.
18261821
///
1827-
/// The persistence barrier was already deactivated (and buffered requests
1828-
/// flushed) when the PREPARE became durable in `prepare_reducer`. This
1829-
/// method just removes the prepared tx from the registry.
1830-
///
1831-
/// TODO: Write a COMMIT record to the commitlog so replay knows to apply
1832-
/// the PREPARE.
1822+
/// Deactivates the persistence barrier and flushes all buffered durability
1823+
/// requests to the durability worker.
18331824
pub fn commit_prepared(&self, prepare_id: &str) -> Result<(), String> {
1834-
self.prepared_txs
1825+
let _info = self.prepared_txs
18351826
.remove(prepare_id)
18361827
.ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?;
1828+
self.relational_db().finalize_prepare_commit();
18371829
Ok(())
18381830
}
18391831

18401832
/// Abort a prepared transaction.
18411833
///
1842-
/// Inverts the PREPARE's in-memory changes and writes an ABORT record
1843-
/// so replay knows to skip the PREPARE.
1844-
///
1845-
/// TODO: Actually invert in-memory state and write ABORT to commitlog.
1834+
/// Deactivates the persistence barrier, discards all buffered durability
1835+
/// requests, and inverts the PREPARE's in-memory changes.
18461836
pub fn abort_prepared(&self, prepare_id: &str) -> Result<(), String> {
1847-
let _info = self.prepared_txs
1837+
let info = self.prepared_txs
18481838
.remove(prepare_id)
18491839
.ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?;
1850-
log::warn!("2PC abort for {prepare_id}: in-memory inversion not yet implemented");
1840+
self.relational_db().finalize_prepare_abort(&info.tx_data);
18511841
Ok(())
18521842
}
18531843

0 commit comments

Comments
 (0)