Skip to content

Commit 1744f0f

Browse files
committed
Implement pipelined 2PC prototype for cross-database atomicity
Add two-phase commit support for distributed transactions. Participant: PreparedTransactions registry, HTTP endpoints for prepare/commit/abort, prepare_reducer() on ModuleHost. Coordinator: call_reducer_on_db_2pc host function (ABI spacetime_10.5), post-commit sends /2pc/commit to participants, on failure sends abort. Bindings: FFI and safe wrapper for call_reducer_on_db_2pc. Smoketests: cross_db_2pc with happy path and abort path.
1 parent c7550a1 commit 1744f0f

14 files changed

Lines changed: 848 additions & 1 deletion

File tree

crates/bindings-sys/src/lib.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,23 @@ pub mod raw {
896896
args_len: u32,
897897
out: *mut BytesSource,
898898
) -> u16;
899+
900+
/// 2PC variant of `call_reducer_on_db`.
901+
///
902+
/// Calls the target database's `/prepare/{reducer}` endpoint instead of `/call/{reducer}`.
903+
/// On success, the runtime stores the `prepare_id` internally.
904+
/// After the coordinator's reducer commits, all participants are committed.
905+
/// If the coordinator's reducer fails, all participants are aborted.
906+
///
907+
/// Returns and errors are identical to `call_reducer_on_db`.
908+
pub fn call_reducer_on_db_2pc(
909+
identity_ptr: *const u8, // exactly 32 bytes, BSATN-encoded Identity
910+
reducer_ptr: *const u8,
911+
reducer_len: u32,
912+
args_ptr: *const u8,
913+
args_len: u32,
914+
out: *mut BytesSource,
915+
) -> u16;
899916
}
900917

901918
/// What strategy does the database index use?
@@ -1510,6 +1527,37 @@ pub fn call_reducer_on_db(
15101527
}
15111528
}
15121529

1530+
/// 2PC variant of [`call_reducer_on_db`].
1531+
///
1532+
/// Calls `/prepare/{reducer}` on the target database. On success, the runtime
1533+
/// stores the prepare_id internally. After the coordinator's reducer commits,
1534+
/// all participants are committed. On failure, all participants are aborted.
1535+
///
1536+
/// Returns and errors are identical to [`call_reducer_on_db`].
1537+
#[inline]
1538+
pub fn call_reducer_on_db_2pc(
1539+
identity: [u8; 32],
1540+
reducer_name: &str,
1541+
args: &[u8],
1542+
) -> Result<(u16, raw::BytesSource), raw::BytesSource> {
1543+
let mut out = raw::BytesSource::INVALID;
1544+
let status = unsafe {
1545+
raw::call_reducer_on_db_2pc(
1546+
identity.as_ptr(),
1547+
reducer_name.as_ptr(),
1548+
reducer_name.len() as u32,
1549+
args.as_ptr(),
1550+
args.len() as u32,
1551+
&mut out,
1552+
)
1553+
};
1554+
if status == Errno::HTTP_ERROR.code() {
1555+
Err(out)
1556+
} else {
1557+
Ok((status, out))
1558+
}
1559+
}
1560+
15131561
/// Finds the JWT payload associated with `connection_id`.
15141562
/// If nothing is found for the connection, this returns None.
15151563
/// If a payload is found, this will return a valid [`raw::BytesSource`].

crates/bindings/src/remote_reducer.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,43 @@ pub fn call_reducer_on_db(database_identity: Identity, reducer_name: &str, args:
8484
}
8585
}
8686
}
87+
88+
/// Call a reducer on a remote database using the 2PC prepare protocol.
89+
///
90+
/// This is the 2PC variant of [`call_reducer_on_db`]. It calls the target database's
91+
/// `/prepare/{reducer}` endpoint. On success, the runtime stores the prepare_id internally.
92+
/// After the coordinator's reducer commits, all participants are committed automatically.
93+
/// If the coordinator's reducer fails (panics or returns Err), all participants are aborted.
94+
///
95+
/// Returns and errors are identical to [`call_reducer_on_db`].
96+
pub fn call_reducer_on_db_2pc(
97+
database_identity: Identity,
98+
reducer_name: &str,
99+
args: &[u8],
100+
) -> Result<(), RemoteCallError> {
101+
let identity_bytes = database_identity.to_byte_array();
102+
match spacetimedb_bindings_sys::call_reducer_on_db_2pc(identity_bytes, reducer_name, args) {
103+
Ok((status, body_source)) => {
104+
if status < 300 {
105+
return Ok(());
106+
}
107+
let msg = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID {
108+
String::new()
109+
} else {
110+
let mut buf = IterBuf::take();
111+
read_bytes_source_into(body_source, &mut buf);
112+
String::from_utf8_lossy(&buf).into_owned()
113+
};
114+
if status == 404 {
115+
Err(RemoteCallError::NotFound(msg))
116+
} else {
117+
Err(RemoteCallError::Failed(msg))
118+
}
119+
}
120+
Err(err_source) => {
121+
use crate::rt::read_bytes_source_as;
122+
let msg = read_bytes_source_as::<String>(err_source);
123+
Err(RemoteCallError::Unreachable(msg))
124+
}
125+
}
126+
}

crates/client-api/src/routes/database.rs

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,115 @@ fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::res
241241
}
242242
}
243243

244+
/// 2PC prepare endpoint: execute a reducer and return a prepare_id.
245+
///
246+
/// `POST /v1/database/:name_or_identity/prepare/:reducer`
247+
///
248+
/// On success, the response includes:
249+
/// - `X-Prepare-Id` header with the prepare_id
250+
/// - Body contains the reducer return value (if any)
251+
pub async fn prepare<S: ControlStateDelegate + NodeDelegate>(
252+
State(worker_ctx): State<S>,
253+
Extension(auth): Extension<SpacetimeAuth>,
254+
Path(CallParams {
255+
name_or_identity,
256+
reducer,
257+
}): Path<CallParams>,
258+
TypedHeader(content_type): TypedHeader<headers::ContentType>,
259+
body: Bytes,
260+
) -> axum::response::Result<impl IntoResponse> {
261+
let args = parse_call_args(content_type, body)?;
262+
let caller_identity = auth.claims.identity;
263+
264+
let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?;
265+
266+
let connection_id = generate_random_connection_id();
267+
268+
module
269+
.call_identity_connected(auth.into(), connection_id)
270+
.await
271+
.map_err(client_connected_error_to_response)?;
272+
273+
let result = module
274+
.prepare_reducer(caller_identity, Some(connection_id), &reducer, args)
275+
.await;
276+
277+
module
278+
.call_identity_disconnected(caller_identity, connection_id)
279+
.await
280+
.map_err(client_disconnected_error_to_response)?;
281+
282+
match result {
283+
Ok((prepare_id, rcr, return_value)) => {
284+
let (status, body) =
285+
reducer_outcome_response(&module, &owner_identity, &reducer, rcr.outcome, return_value)?;
286+
let mut response = (
287+
status,
288+
TypedHeader(SpacetimeEnergyUsed(rcr.energy_used)),
289+
TypedHeader(SpacetimeExecutionDurationMicros(rcr.execution_duration)),
290+
body,
291+
)
292+
.into_response();
293+
if !prepare_id.is_empty() {
294+
response.headers_mut().insert(
295+
"X-Prepare-Id",
296+
http::HeaderValue::from_str(&prepare_id).unwrap(),
297+
);
298+
}
299+
Ok(response)
300+
}
301+
Err(e) => Err(map_reducer_error(e, &reducer).into()),
302+
}
303+
}
304+
305+
#[derive(Deserialize)]
306+
pub struct TwoPcParams {
307+
name_or_identity: NameOrIdentity,
308+
prepare_id: String,
309+
}
310+
311+
/// 2PC commit endpoint: finalize a prepared transaction.
312+
///
313+
/// `POST /v1/database/:name_or_identity/2pc/commit/:prepare_id`
314+
pub async fn commit_2pc<S: ControlStateDelegate + NodeDelegate>(
315+
State(worker_ctx): State<S>,
316+
Extension(_auth): Extension<SpacetimeAuth>,
317+
Path(TwoPcParams {
318+
name_or_identity,
319+
prepare_id,
320+
}): Path<TwoPcParams>,
321+
) -> axum::response::Result<impl IntoResponse> {
322+
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;
323+
324+
module.commit_prepared(&prepare_id).map_err(|e| {
325+
log::error!("2PC commit failed: {e}");
326+
(StatusCode::NOT_FOUND, e).into_response()
327+
})?;
328+
329+
Ok(StatusCode::OK)
330+
}
331+
332+
/// 2PC abort endpoint: abort a prepared transaction.
333+
///
334+
/// `POST /v1/database/:name_or_identity/2pc/abort/:prepare_id`
335+
pub async fn abort_2pc<S: ControlStateDelegate + NodeDelegate>(
336+
State(worker_ctx): State<S>,
337+
Extension(_auth): Extension<SpacetimeAuth>,
338+
Path(TwoPcParams {
339+
name_or_identity,
340+
prepare_id,
341+
}): Path<TwoPcParams>,
342+
) -> axum::response::Result<impl IntoResponse> {
343+
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;
344+
345+
module.abort_prepared(&prepare_id).map_err(|e| {
346+
log::error!("2PC abort failed: {e}");
347+
(StatusCode::NOT_FOUND, e).into_response()
348+
})?;
349+
350+
Ok(StatusCode::OK)
351+
}
352+
244353
fn reducer_outcome_response(
245354
module: &ModuleHost,
246355
owner_identity: &Identity,
@@ -1247,6 +1356,12 @@ pub struct DatabaseRoutes<S> {
12471356
pub db_reset: MethodRouter<S>,
12481357
/// GET: /database/: name_or_identity/unstable/timestamp
12491358
pub timestamp_get: MethodRouter<S>,
1359+
/// POST: /database/:name_or_identity/prepare/:reducer
1360+
pub prepare_post: MethodRouter<S>,
1361+
/// POST: /database/:name_or_identity/2pc/commit/:prepare_id
1362+
pub commit_2pc_post: MethodRouter<S>,
1363+
/// POST: /database/:name_or_identity/2pc/abort/:prepare_id
1364+
pub abort_2pc_post: MethodRouter<S>,
12501365
}
12511366

12521367
impl<S> Default for DatabaseRoutes<S>
@@ -1272,6 +1387,9 @@ where
12721387
pre_publish: post(pre_publish::<S>),
12731388
db_reset: put(reset::<S>),
12741389
timestamp_get: get(get_timestamp::<S>),
1390+
prepare_post: post(prepare::<S>),
1391+
commit_2pc_post: post(commit_2pc::<S>),
1392+
abort_2pc_post: post(abort_2pc::<S>),
12751393
}
12761394
}
12771395
}
@@ -1296,7 +1414,10 @@ where
12961414
.route("/sql", self.sql_post)
12971415
.route("/unstable/timestamp", self.timestamp_get)
12981416
.route("/pre_publish", self.pre_publish)
1299-
.route("/reset", self.db_reset);
1417+
.route("/reset", self.db_reset)
1418+
.route("/prepare/:reducer", self.prepare_post)
1419+
.route("/2pc/commit/:prepare_id", self.commit_2pc_post)
1420+
.route("/2pc/abort/:prepare_id", self.abort_2pc_post);
13001421

13011422
axum::Router::new()
13021423
.route("/", self.root_post)

0 commit comments

Comments
 (0)