Skip to content

Commit 4ca1319

Browse files
committed
Update 2PC plan: reuse existing blocking pattern for MutTxId
Instead of inventing a new threading model, reuse the same std::thread::scope + blocking_recv pattern that call_reducer_on_db already uses. The participant's thread executes the reducer, sends the result, then blocks on a channel waiting for the next command. The MutTxId stays alive on that same thread. Includes updated ASCII diagram showing the coordinator/participant thread interaction, the session-based HTTP protocol, and how the persistence barrier arms before commit.
1 parent 5516ed3 commit 4ca1319

1 file changed

Lines changed: 73 additions & 33 deletions

File tree

crates/core/2PC-IMPLEMENTATION-PLAN.md

Lines changed: 73 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -44,46 +44,70 @@ The TPC-C benchmark on branch `origin/phoebe/tpcc/reducer-return-value` (public
4444
- **Two responses from participant**: The immediate result (step 3) and the later PREPARED notification (step 10). The coordinator collects both: results during reducer execution, PREPARED notifications before deciding COMMIT.
4545
- **Pipelining benefit**: Locks are held only during reducer execution (steps 1-6), not during persistence (steps 7-14). The persistence and 2PC handshake happen after locks are released on both sides.
4646

47-
## Holding MutTxId: dedicated blocking thread
47+
## Holding MutTxId: reuse existing blocking pattern
4848

49-
`MutTxId` is `!Send` (holds `SharedWriteGuard`). The participant must hold it across multiple CALL requests from the coordinator for serializable isolation. The solution: a **dedicated blocking thread per participant transaction** that holds the `MutTxId` for its entire lifetime. Async HTTP handlers communicate with this thread via channels. The `MutTxId` never crosses a thread boundary or touches an async context.
49+
`MutTxId` is `!Send` (holds `SharedWriteGuard`). The participant must hold it across multiple CALL requests from the coordinator for serializable isolation.
50+
51+
The codebase already has a blocking pattern: on the coordinator side, `call_reducer_on_db` uses `std::thread::scope` + `Handle::block_on` to block the WASM thread while making an async HTTP call. The same pattern works for the participant: instead of returning from the reducer execution, the participant's thread blocks on a channel (`blocking_recv`) waiting for the next command. The `MutTxId` stays alive on that same thread. No new threading model is needed.
5052

5153
```
52-
HTTP handler (async) Blocking thread (sync, holds MutTxId)
53-
--------------------- -------------------------------------
54-
CALL request arrives ---> receive on channel, execute reducer
55-
<--- send result back on channel
56-
return HTTP response
57-
58-
CALL request arrives ---> execute next reducer (same MutTxId)
59-
<--- send result
60-
return HTTP response
61-
62-
END_CALLS arrives ---> commit in-memory, release write lock
63-
send PREPARE to durability, barrier up
64-
wait for durability...
65-
<--- send PREPARED
66-
return HTTP response
67-
68-
COMMIT arrives ---> send COMMIT to durability, barrier down
69-
thread exits
54+
Coordinator thread Participant thread
55+
(WASM reducer running, (holds MutTxId, holds WASM instance)
56+
holds coordinator MutTxId)
57+
58+
call_reducer_on_db_2pc()
59+
|
60+
|-- HTTP POST /2pc/begin/debit -> spawn thread, create MutTxId
61+
| execute reducer
62+
| send result via channel
63+
| <-- HTTP response (result block on channel (blocking_recv)
64+
| + session_id) |
65+
| | [MutTxId held, write lock held]
66+
| |
67+
call_reducer_on_db_2pc() (2nd call) |
68+
| |
69+
|-- HTTP POST /2pc/{sid}/call/x -> send command via channel
70+
| wake up, execute reducer
71+
| send result via channel
72+
| <-- HTTP response block on channel
73+
| |
74+
reducer finishes |
75+
| |
76+
[post-commit coordination] |
77+
| |
78+
|-- HTTP POST /2pc/{sid}/end ---> wake up, commit in-memory
79+
| release write lock
80+
| send PREPARE to durability
81+
| barrier up
82+
| wait for PREPARE durable...
83+
| <-- HTTP response (PREPARED) block on channel
84+
| |
85+
|-- HTTP POST /2pc/{sid}/commit -> wake up
86+
| send COMMIT to durability
87+
| barrier down, flush
88+
| <-- HTTP response thread exits
7089
```
7190

91+
### Implementation
92+
7293
On first CALL for a new 2PC transaction:
73-
1. Spawn a blocking thread (`std::thread::spawn` or `tokio::task::spawn_blocking`)
74-
2. Thread creates `MutTxId` (acquires write lock)
75-
3. Thread blocks on a command channel (`mpsc::Receiver<TxCommand>`)
76-
4. Store the command sender (`mpsc::Sender<TxCommand>`) in a session map keyed by session_id
77-
5. Return session_id to coordinator along with the first CALL's result
94+
1. The async HTTP handler spawns a blocking thread (via `std::thread::scope` or `tokio::task::spawn_blocking`)
95+
2. The blocking thread takes a WASM instance from the module's instance pool
96+
3. The blocking thread creates `MutTxId` (acquires write lock)
97+
4. The blocking thread executes the first reducer
98+
5. The blocking thread sends the result back via a `oneshot` channel
99+
6. The async HTTP handler receives the result and returns the HTTP response with a `session_id`
100+
7. The blocking thread blocks on a `mpsc::Receiver<TxCommand>` waiting for the next command
101+
8. The async HTTP handler stores the `mpsc::Sender<TxCommand>` in a session map keyed by `session_id`
78102

79-
Subsequent CALLs and END_CALLS look up the session_id, send commands on the channel. The blocking thread processes them sequentially on the same `MutTxId`.
103+
Subsequent CALLs and END_CALLS look up the `session_id`, send commands on the channel. The blocking thread processes them sequentially on the same `MutTxId`.
80104

81-
The blocking thread also needs access to a WASM module instance to execute reducers. The instance must be taken from the pool on thread creation and returned on thread exit (after COMMIT or ABORT).
105+
When the thread exits (after COMMIT or ABORT), it returns the WASM instance to the pool.
82106

83107
```rust
84108
enum TxCommand {
85109
Call { reducer: String, args: Bytes, reply: oneshot::Sender<CallResult> },
86-
EndCalls { reply: oneshot::Sender<()> },
110+
EndCalls { reply: oneshot::Sender<PreparedResult> },
87111
Commit { reply: oneshot::Sender<()> },
88112
Abort { reply: oneshot::Sender<()> },
89113
}
@@ -127,14 +151,30 @@ On replay, when encountering a PREPARE:
127151
- ABORT: skip the PREPARE
128152
- No next record (crash): transaction is still in progress, wait for coordinator or timeout and abort
129153

154+
## Persistence barrier
155+
156+
The barrier in `relational_db.rs` has three states: `Inactive`, `Armed`, `Active`.
157+
158+
- **Inactive**: normal operation, durability requests go through.
159+
- **Armed**: set BEFORE committing the transaction (while write lock is held). The NEXT durability request (the PREPARE) goes through to the worker and transitions the barrier to Active.
160+
- **Active**: all subsequent durability requests are buffered.
161+
162+
This ensures no race between the write lock release and the barrier activation. Since the barrier is Armed while the write lock is held, no other transaction can commit and send a durability request before the barrier transitions to Active.
163+
164+
Used by both coordinator and participant:
165+
- Arm before committing the 2PC transaction
166+
- The commit's durability request (the PREPARE) transitions Armed -> Active
167+
- On COMMIT: deactivate, flush buffered requests
168+
- On ABORT: deactivate, discard buffered requests
169+
130170
## Key files
131171

132-
- `crates/core/src/db/relational_db.rs` -- PersistenceBarrier, send_or_buffer_durability, finalize_prepare_commit/abort
133-
- `crates/core/src/host/prepared_tx.rs` -- PreparedTxInfo, PreparedTransactions registry
134-
- `crates/core/src/host/module_host.rs` -- prepare_reducer, commit_prepared, abort_prepared
135-
- `crates/core/src/host/wasm_common/module_host_actor.rs` -- coordinator post-commit coordination
172+
- `crates/core/src/db/relational_db.rs` -- PersistenceBarrier (Inactive/Armed/Active), send_or_buffer_durability, finalize_prepare_commit/abort
173+
- `crates/core/src/host/prepared_tx.rs` -- TxCommand, TxSession, PreparedTransactions registry, session map
174+
- `crates/core/src/host/module_host.rs` -- begin_2pc_session, commit_prepared, abort_prepared
175+
- `crates/core/src/host/wasm_common/module_host_actor.rs` -- coordinator post-commit coordination (END_CALLS, wait PREPARED, COMMIT)
136176
- `crates/core/src/host/instance_env.rs` -- call_reducer_on_db_2pc, prepared_participants tracking
137177
- `crates/core/src/host/wasmtime/wasm_instance_env.rs` -- WASM host function
138-
- `crates/client-api/src/routes/database.rs` -- HTTP endpoints (CALL, END_CALLS, COMMIT, ABORT, PREPARED notification)
178+
- `crates/client-api/src/routes/database.rs` -- HTTP endpoints: /2pc/begin/:reducer, /2pc/:sid/call/:reducer, /2pc/:sid/end, /2pc/:sid/commit, /2pc/:sid/abort
139179
- `crates/bindings-sys/src/lib.rs` -- FFI
140180
- `crates/bindings/src/remote_reducer.rs` -- safe wrapper

0 commit comments

Comments
 (0)