Skip to content

Commit b1477a6

Browse files
committed
ack
1 parent 74bc3eb commit b1477a6

2 files changed

Lines changed: 86 additions & 1 deletion

File tree

crates/client-api/src/routes/database.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,30 @@ pub async fn status_2pc<S: ControlStateDelegate + NodeDelegate>(
374374
Ok((StatusCode::OK, decision))
375375
}
376376

377+
/// 2PC commit-ack endpoint.
378+
///
379+
/// Called by participant B after it commits via the status-poll recovery path,
380+
/// so that the coordinator can delete its `st_2pc_coordinator_log` entry.
381+
///
382+
/// `POST /v1/database/:name_or_identity/2pc/ack-commit/:prepare_id`
383+
pub async fn ack_commit_2pc<S: ControlStateDelegate + NodeDelegate>(
384+
State(worker_ctx): State<S>,
385+
Extension(_auth): Extension<SpacetimeAuth>,
386+
Path(TwoPcParams {
387+
name_or_identity,
388+
prepare_id,
389+
}): Path<TwoPcParams>,
390+
) -> axum::response::Result<impl IntoResponse> {
391+
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;
392+
393+
module.ack_2pc_coordinator_commit(&prepare_id).map_err(|e| {
394+
log::error!("2PC ack-commit failed: {e}");
395+
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
396+
})?;
397+
398+
Ok(StatusCode::OK)
399+
}
400+
377401
fn reducer_outcome_response(
378402
module: &ModuleHost,
379403
owner_identity: &Identity,
@@ -1388,6 +1412,8 @@ pub struct DatabaseRoutes<S> {
13881412
pub abort_2pc_post: MethodRouter<S>,
13891413
/// GET: /database/:name_or_identity/2pc/status/:prepare_id
13901414
pub status_2pc_get: MethodRouter<S>,
1415+
/// POST: /database/:name_or_identity/2pc/ack-commit/:prepare_id
1416+
pub ack_commit_2pc_post: MethodRouter<S>,
13911417
}
13921418

13931419
impl<S> Default for DatabaseRoutes<S>
@@ -1417,6 +1443,7 @@ where
14171443
commit_2pc_post: post(commit_2pc::<S>),
14181444
abort_2pc_post: post(abort_2pc::<S>),
14191445
status_2pc_get: get(status_2pc::<S>),
1446+
ack_commit_2pc_post: post(ack_commit_2pc::<S>),
14201447
}
14211448
}
14221449
}
@@ -1445,7 +1472,8 @@ where
14451472
.route("/prepare/:reducer", self.prepare_post)
14461473
.route("/2pc/commit/:prepare_id", self.commit_2pc_post)
14471474
.route("/2pc/abort/:prepare_id", self.abort_2pc_post)
1448-
.route("/2pc/status/:prepare_id", self.status_2pc_get);
1475+
.route("/2pc/status/:prepare_id", self.status_2pc_get)
1476+
.route("/2pc/ack-commit/:prepare_id", self.ack_commit_2pc_post);
14491477

14501478
axum::Router::new()
14511479
.route("/", self.root_post)

crates/core/src/host/module_host.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1875,6 +1875,16 @@ impl ModuleHost {
18751875
Ok(())
18761876
}
18771877

1878+
/// Delete a coordinator log entry for `prepare_id`.
1879+
/// Called when B has confirmed it committed, so A can stop retransmitting.
1880+
pub fn ack_2pc_coordinator_commit(&self, prepare_id: &str) -> Result<(), anyhow::Error> {
1881+
let db = self.relational_db().clone();
1882+
db.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |tx| {
1883+
tx.delete_st_2pc_coordinator_log(prepare_id)
1884+
.map_err(anyhow::Error::from)
1885+
})
1886+
}
1887+
18781888
/// Check whether `prepare_id` is present in the coordinator log of this database.
18791889
/// Used by participant B to ask coordinator A: "did you commit?"
18801890
pub fn has_2pc_coordinator_commit(&self, prepare_id: &str) -> bool {
@@ -2039,6 +2049,14 @@ impl ModuleHost {
20392049
Some(commit) => {
20402050
if commit {
20412051
let _ = this2.commit_prepared(&new_prepare_id);
2052+
// Tell A we committed so it can delete its coordinator log entry.
2053+
Self::send_ack_commit_to_coordinator(
2054+
&client,
2055+
&router,
2056+
auth_token.clone(),
2057+
coordinator_identity,
2058+
&original_prepare_id,
2059+
).await;
20422060
} else {
20432061
let _ = this2.abort_prepared(&new_prepare_id);
20442062
}
@@ -2099,6 +2117,45 @@ impl ModuleHost {
20992117
}
21002118
}
21012119

2120+
/// POST `POST /v1/database/{coordinator}/2pc/ack-commit/{prepare_id}` to tell A that
2121+
/// B has committed, so A can delete its coordinator log entry.
2122+
async fn send_ack_commit_to_coordinator(
2123+
client: &reqwest::Client,
2124+
router: &std::sync::Arc<dyn crate::host::reducer_router::ReducerCallRouter>,
2125+
auth_token: Option<String>,
2126+
coordinator_identity: Identity,
2127+
prepare_id: &str,
2128+
) {
2129+
let base_url = match router.resolve_base_url(coordinator_identity).await {
2130+
Ok(url) => url,
2131+
Err(e) => {
2132+
log::warn!("2PC ack-commit: cannot resolve coordinator URL: {e}");
2133+
return;
2134+
}
2135+
};
2136+
let url = format!(
2137+
"{}/v1/database/{}/2pc/ack-commit/{}",
2138+
base_url,
2139+
coordinator_identity.to_hex(),
2140+
prepare_id,
2141+
);
2142+
let mut req = client.post(&url);
2143+
if let Some(token) = &auth_token {
2144+
req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}"));
2145+
}
2146+
match req.send().await {
2147+
Ok(resp) if resp.status().is_success() => {
2148+
log::info!("2PC ack-commit: notified coordinator for {prepare_id}");
2149+
}
2150+
Ok(resp) => {
2151+
log::warn!("2PC ack-commit: coordinator returned {} for {prepare_id}", resp.status());
2152+
}
2153+
Err(e) => {
2154+
log::warn!("2PC ack-commit: transport error for {prepare_id}: {e}");
2155+
}
2156+
}
2157+
}
2158+
21022159
pub async fn call_view_add_single_subscription(
21032160
&self,
21042161
sender: Arc<ClientConnectionSender>,

0 commit comments

Comments
 (0)