Skip to content

Commit de6ed81

Browse files
Log and return ok on missing request_id
1 parent 2166798 commit de6ed81

12 files changed

Lines changed: 194 additions & 91 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -897,11 +897,7 @@ impl RelationalDB {
897897

898898
/// Send a tx to the durability worker, unless a durability barrier is active
899899
/// and the tx's offset exceeds the minimum active barrier. In that case, defer the tx.
900-
fn request_durability_maybe_barrier(
901-
&self,
902-
reducer_context: Option<ReducerContext>,
903-
tx_data: &Arc<TxData>,
904-
) {
900+
fn request_durability_maybe_barrier(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
905901
let Some(durability) = &self.durability else {
906902
return;
907903
};

crates/core/src/host/module_host.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -890,7 +890,6 @@ pub struct ModuleHost {
890890
///
891891
/// When this is true, most operations will fail with [`NoSuchModule`].
892892
closed: Arc<AtomicBool>,
893-
894893
}
895894

896895
impl fmt::Debug for ModuleHost {
@@ -1839,7 +1838,14 @@ impl ModuleHost {
18391838
let _ = this
18401839
.call(
18411840
&reducer_name_owned,
1842-
(params, prepare_id_clone, coordinator_identity, prepared_tx, decision_rx, commit_persist_rx),
1841+
(
1842+
params,
1843+
prepare_id_clone,
1844+
coordinator_identity,
1845+
prepared_tx,
1846+
decision_rx,
1847+
commit_persist_rx,
1848+
),
18431849
async |(p, pid, cid, ptx, drx, cprx), inst| {
18441850
inst.call_reducer_prepare_and_hold(p, pid, cid, ptx, drx, cprx);
18451851
Ok::<(), ReducerCallError>(())
@@ -2056,7 +2062,13 @@ impl ModuleHost {
20562062
// The PREPARE PERSIST only stored the reducer inputs, not the row
20572063
// mutations. We re-run to get a fresh MutTxId with the mutations.
20582064
let new_prepare_id = match this
2059-
.prepare_reducer(caller_identity, Some(caller_connection_id), &row.reducer_name, args, Some(coordinator_identity))
2065+
.prepare_reducer(
2066+
caller_identity,
2067+
Some(caller_connection_id),
2068+
&row.reducer_name,
2069+
args,
2070+
Some(coordinator_identity),
2071+
)
20602072
.await
20612073
{
20622074
Ok((pid, result, _rv)) if !pid.is_empty() => {

crates/core/src/host/prepared_tx.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,19 @@ impl PreparedTransactions {
4747
/// The entry stays for Round 2 (commit_persist).
4848
pub fn send_decision(&self, id: &str, commit: bool) -> Result<(), String> {
4949
let guard = self.inner.lock().unwrap();
50-
let info = guard.get(id).ok_or_else(|| format!("no such prepared transaction: {id}"))?;
50+
let info = guard
51+
.get(id)
52+
.ok_or_else(|| format!("no such prepared transaction: {id}"))?;
5153
let _ = info.decision_sender.send(commit);
5254
Ok(())
5355
}
5456

5557
/// Send a Round 2 COMMIT_PERSIST signal and remove the entry.
5658
pub fn send_commit_persist(&self, id: &str) -> Result<(), String> {
5759
let mut guard = self.inner.lock().unwrap();
58-
let info = guard.remove(id).ok_or_else(|| format!("no such prepared transaction: {id}"))?;
60+
let info = guard
61+
.remove(id)
62+
.ok_or_else(|| format!("no such prepared transaction: {id}"))?;
5963
let _ = info.commit_persist_sender.send(());
6064
Ok(())
6165
}

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -793,12 +793,7 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
793793

794794
// Step 10: wait for COMMIT_PERSIST from coordinator (tokio oneshot).
795795
// Sender dropped without sending ⇒ coordinator restarted / aborted ⇒ treat as abort.
796-
let persist_commit = match tokio::time::timeout(
797-
Duration::from_secs(60),
798-
commit_persist_rx,
799-
)
800-
.await
801-
{
796+
let persist_commit = match tokio::time::timeout(Duration::from_secs(60), commit_persist_rx).await {
802797
Ok(Ok(())) => true,
803798
Ok(Err(_)) => {
804799
// Sender dropped: coordinator crashed / aborted.
@@ -851,19 +846,15 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
851846
if let Err(e) = stdb.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| {
852847
Ok(del_tx.delete_st_2pc_state(&prepare_id)?)
853848
}) {
854-
log::error!(
855-
"call_reducer_prepare_and_hold: failed to delete st_2pc_state for {prepare_id}: {e}"
856-
);
849+
log::error!("call_reducer_prepare_and_hold: failed to delete st_2pc_state for {prepare_id}: {e}");
857850
}
858851
} else {
859852
// Round 2 abort: discard all deferred transactions.
860853
stdb.abort_durability_barrier(barrier_offset);
861854
// Trigger module restart via the on_panic hook rather than panicking,
862855
// since we're in an async task outside the WASM executor thread.
863856
if let Some(on_panic) = replica_ctx.on_panic.get() {
864-
log::error!(
865-
"2PC persistence aborted for {prepare_id}: triggering module restart"
866-
);
857+
log::error!("2PC persistence aborted for {prepare_id}: triggering module restart");
867858
on_panic();
868859
}
869860
}
@@ -1215,7 +1206,10 @@ impl InstanceCommon {
12151206
// through), A's own tx must also be deferred. Set barrier at offset - 1
12161207
// so A's tx (at offset) is > barrier and gets deferred.
12171208
let barrier_offset = tx.next_tx_offset().saturating_sub(1);
1218-
self.info.subscriptions.relational_db().set_durability_barrier(barrier_offset);
1209+
self.info
1210+
.subscriptions
1211+
.relational_db()
1212+
.set_durability_barrier(barrier_offset);
12191213
Some(barrier_offset)
12201214
} else {
12211215
None
@@ -1247,7 +1241,9 @@ impl InstanceCommon {
12471241
};
12481242
let url = format!(
12491243
"{}/v1/database/{}/2pc/abort/{}",
1250-
base_url, db_identity.to_hex(), prepare_id,
1244+
base_url,
1245+
db_identity.to_hex(),
1246+
prepare_id,
12511247
);
12521248
let mut req = client.post(&url);
12531249
if let Some(ref token) = auth_token {
@@ -1258,7 +1254,10 @@ impl InstanceCommon {
12581254
log::info!("2PC abort: {prepare_id} on {db_identity}");
12591255
}
12601256
Ok(resp) => {
1261-
log::error!("2PC abort: failed for {prepare_id} on {db_identity}: status {}", resp.status());
1257+
log::error!(
1258+
"2PC abort: failed for {prepare_id} on {db_identity}: status {}",
1259+
resp.status()
1260+
);
12621261
}
12631262
Err(e) => {
12641263
log::error!("2PC abort: transport error for {prepare_id} on {db_identity}: {e}");
@@ -1289,7 +1288,9 @@ impl InstanceCommon {
12891288
};
12901289
let url = format!(
12911290
"{}/v1/database/{}/2pc/commit/{}",
1292-
base_url, db_identity.to_hex(), prepare_id,
1291+
base_url,
1292+
db_identity.to_hex(),
1293+
prepare_id,
12931294
);
12941295
let mut req = client.post(&url);
12951296
if let Some(ref token) = auth_token {
@@ -1300,7 +1301,10 @@ impl InstanceCommon {
13001301
log::info!("2PC commit (Round 1): {prepare_id} on {db_identity}");
13011302
}
13021303
Ok(resp) => {
1303-
log::error!("2PC commit: failed for {prepare_id} on {db_identity}: status {}", resp.status());
1304+
log::error!(
1305+
"2PC commit: failed for {prepare_id} on {db_identity}: status {}",
1306+
resp.status()
1307+
);
13041308
}
13051309
Err(e) => {
13061310
log::error!("2PC commit: transport error for {prepare_id} on {db_identity}: {e}");
@@ -1367,7 +1371,9 @@ impl InstanceCommon {
13671371
};
13681372
let url = format!(
13691373
"{}/v1/database/{}/2pc/commit-persist/{}",
1370-
base_url, db_identity.to_hex(), prepare_id,
1374+
base_url,
1375+
db_identity.to_hex(),
1376+
prepare_id,
13711377
);
13721378
let mut req = client.post(&url);
13731379
if let Some(ref token) = auth_token {
@@ -1377,16 +1383,17 @@ impl InstanceCommon {
13771383
Ok(resp) if resp.status().is_success() => {
13781384
log::info!("2PC commit-persist: {prepare_id} on {db_identity}");
13791385
// Round 2 complete — delete coordinator log entry.
1380-
if let Err(e) = stdb
1381-
.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| {
1382-
Ok(del_tx.delete_st_2pc_coordinator_log(prepare_id)?)
1383-
})
1384-
{
1386+
if let Err(e) = stdb.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| {
1387+
Ok(del_tx.delete_st_2pc_coordinator_log(prepare_id)?)
1388+
}) {
13851389
log::warn!("delete_st_2pc_coordinator_log failed for {prepare_id}: {e}");
13861390
}
13871391
}
13881392
Ok(resp) => {
1389-
log::error!("2PC commit-persist: failed for {prepare_id} on {db_identity}: status {}", resp.status());
1393+
log::error!(
1394+
"2PC commit-persist: failed for {prepare_id} on {db_identity}: status {}",
1395+
resp.status()
1396+
);
13901397
}
13911398
Err(e) => {
13921399
log::error!("2PC commit-persist: transport error for {prepare_id} on {db_identity}: {e}");

crates/core/src/startup.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -413,10 +413,13 @@ impl Cores {
413413

414414
#[cfg(target_os = "linux")]
415415
fn cores_to_cpuset(cores: &[CoreId]) -> Option<nix::sched::CpuSet> {
416-
cores.iter().copied().try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| {
417-
cpuset.set(core.id).ok()?;
418-
Some(cpuset)
419-
})
416+
cores
417+
.iter()
418+
.copied()
419+
.try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| {
420+
cpuset.set(core.id).ok()?;
421+
Some(cpuset)
422+
})
420423
}
421424

422425
#[cfg(target_os = "linux")]
@@ -591,10 +594,7 @@ mod tests {
591594
#[cfg(target_os = "linux")]
592595
{
593596
assert!(split.tokio.workers.is_none());
594-
assert_eq!(
595-
cpuset_cardinality(split.tokio.blocking.as_ref().unwrap()),
596-
20
597-
);
597+
assert_eq!(cpuset_cardinality(split.tokio.blocking.as_ref().unwrap()), 20);
598598
assert!(split.rayon.dedicated.is_none());
599599
assert_eq!(split.rayon.shared.as_ref().unwrap().0, 20);
600600
}

crates/smoketests/tests/smoketests/cross_db_2pc.rs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,16 @@ fn test_cross_db_2pc_happy_path() {
125125
let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build();
126126

127127
// Publish participants first, then coordinator.
128-
test.publish_module_named(&db_b_name, false).expect("failed to publish bank B");
128+
test.publish_module_named(&db_b_name, false)
129+
.expect("failed to publish bank B");
129130
let db_b_identity = test.database_identity.clone().expect("bank B identity not set");
130131

131-
test.publish_module_named(&db_c_name, false).expect("failed to publish bank C");
132+
test.publish_module_named(&db_c_name, false)
133+
.expect("failed to publish bank C");
132134
let db_c_identity = test.database_identity.clone().expect("bank C identity not set");
133135

134-
test.publish_module_named(&db_a_name, false).expect("failed to publish bank A");
136+
test.publish_module_named(&db_a_name, false)
137+
.expect("failed to publish bank A");
135138
let db_a_identity = test.database_identity.clone().expect("bank A identity not set");
136139

137140
// Call transfer_funds; the return value is A's new alice balance.
@@ -167,13 +170,16 @@ fn test_cross_db_2pc_abort_insufficient_funds() {
167170

168171
let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build();
169172

170-
test.publish_module_named(&db_b_name, false).expect("failed to publish bank B");
173+
test.publish_module_named(&db_b_name, false)
174+
.expect("failed to publish bank B");
171175
let db_b_identity = test.database_identity.clone().expect("bank B identity not set");
172176

173-
test.publish_module_named(&db_c_name, false).expect("failed to publish bank C");
177+
test.publish_module_named(&db_c_name, false)
178+
.expect("failed to publish bank C");
174179
let db_c_identity = test.database_identity.clone().expect("bank C identity not set");
175180

176-
test.publish_module_named(&db_a_name, false).expect("failed to publish bank A");
181+
test.publish_module_named(&db_a_name, false)
182+
.expect("failed to publish bank A");
177183
let db_a_identity = test.database_identity.clone().expect("bank A identity not set");
178184

179185
// Transfer 110 from each — both only have 100, so B's debit panics → 2PC aborts all.
@@ -184,10 +190,25 @@ fn test_cross_db_2pc_abort_insufficient_funds() {
184190
&format!("[\"{db_b_identity}\", \"{db_c_identity}\", \"alice\", \"alice\", 110]"),
185191
)
186192
.expect("api_call failed");
187-
assert!(!resp.is_success(), "Expected transfer_funds to fail due to insufficient funds");
193+
assert!(
194+
!resp.is_success(),
195+
"Expected transfer_funds to fail due to insufficient funds"
196+
);
188197

189198
// All three accounts must still be at 100.
190-
assert_eq!(call_balance(&test, &db_a_identity, "alice"), 100, "A alice should still be 100");
191-
assert_eq!(call_balance(&test, &db_b_identity, "alice"), 100, "B alice should still be 100");
192-
assert_eq!(call_balance(&test, &db_c_identity, "alice"), 100, "C alice should still be 100");
199+
assert_eq!(
200+
call_balance(&test, &db_a_identity, "alice"),
201+
100,
202+
"A alice should still be 100"
203+
);
204+
assert_eq!(
205+
call_balance(&test, &db_b_identity, "alice"),
206+
100,
207+
"B alice should still be 100"
208+
);
209+
assert_eq!(
210+
call_balance(&test, &db_c_identity, "alice"),
211+
100,
212+
"C alice should still be 100"
213+
);
193214
}

0 commit comments

Comments
 (0)