Skip to content

Commit be4a56a

Browse files
committed
dead code
1 parent 8fb3e92 commit be4a56a

5 files changed

Lines changed: 59 additions & 142 deletions

File tree

crates/core/src/host/instance_env.rs

Lines changed: 0 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,94 +1125,6 @@ impl InstanceEnv {
11251125
result
11261126
}
11271127
}
1128-
1129-
/// Commit all prepared participants (called after coordinator's reducer succeeds).
1130-
pub fn commit_all_prepared(&mut self) -> impl Future<Output = ()> + use<> {
1131-
let participants = mem::take(&mut self.prepared_participants);
1132-
let client = self.replica_ctx.call_reducer_client.clone();
1133-
let router = self.replica_ctx.call_reducer_router.clone();
1134-
let auth_token = self.replica_ctx.call_reducer_auth_token.clone();
1135-
1136-
async move {
1137-
for (db_identity, prepare_id) in participants {
1138-
let base_url = match router.resolve_base_url(db_identity).await {
1139-
Ok(url) => url,
1140-
Err(e) => {
1141-
log::error!("2PC commit: failed to resolve base URL for {db_identity}: {e}");
1142-
continue;
1143-
}
1144-
};
1145-
let url = format!(
1146-
"{}/v1/database/{}/2pc/commit/{}",
1147-
base_url,
1148-
db_identity.to_hex(),
1149-
prepare_id,
1150-
);
1151-
let mut req = client.post(&url);
1152-
if let Some(ref token) = auth_token {
1153-
req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}"));
1154-
}
1155-
match req.send().await {
1156-
Ok(resp) if resp.status().is_success() => {
1157-
log::info!("2PC commit: committed {prepare_id} on {db_identity}");
1158-
}
1159-
Ok(resp) => {
1160-
log::error!(
1161-
"2PC commit: failed for {prepare_id} on {db_identity}: status {}",
1162-
resp.status()
1163-
);
1164-
}
1165-
Err(e) => {
1166-
log::error!("2PC commit: transport error for {prepare_id} on {db_identity}: {e}");
1167-
}
1168-
}
1169-
}
1170-
}
1171-
}
1172-
1173-
/// Abort all prepared participants (called when coordinator's reducer fails).
1174-
pub fn abort_all_prepared(&mut self) -> impl Future<Output = ()> + use<> {
1175-
let participants = mem::take(&mut self.prepared_participants);
1176-
let client = self.replica_ctx.call_reducer_client.clone();
1177-
let router = self.replica_ctx.call_reducer_router.clone();
1178-
let auth_token = self.replica_ctx.call_reducer_auth_token.clone();
1179-
1180-
async move {
1181-
for (db_identity, prepare_id) in participants {
1182-
let base_url = match router.resolve_base_url(db_identity).await {
1183-
Ok(url) => url,
1184-
Err(e) => {
1185-
log::error!("2PC abort: failed to resolve base URL for {db_identity}: {e}");
1186-
continue;
1187-
}
1188-
};
1189-
let url = format!(
1190-
"{}/v1/database/{}/2pc/abort/{}",
1191-
base_url,
1192-
db_identity.to_hex(),
1193-
prepare_id,
1194-
);
1195-
let mut req = client.post(&url);
1196-
if let Some(ref token) = auth_token {
1197-
req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}"));
1198-
}
1199-
match req.send().await {
1200-
Ok(resp) if resp.status().is_success() => {
1201-
log::info!("2PC abort: aborted {prepare_id} on {db_identity}");
1202-
}
1203-
Ok(resp) => {
1204-
log::error!(
1205-
"2PC abort: failed for {prepare_id} on {db_identity}: status {}",
1206-
resp.status()
1207-
);
1208-
}
1209-
Err(e) => {
1210-
log::error!("2PC abort: transport error for {prepare_id} on {db_identity}: {e}");
1211-
}
1212-
}
1213-
}
1214-
}
1215-
}
12161128
}
12171129

12181130
/// Default timeout for HTTP requests performed by [`InstanceEnv::http_request`].

crates/core/src/host/module_host.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1928,7 +1928,10 @@ impl ModuleHost {
19281928
let participant_identity = match Identity::from_hex(&row.participant_identity_hex) {
19291929
Ok(id) => id,
19301930
Err(e) => {
1931-
log::error!("recover_2pc_coordinator: invalid participant identity hex {}: {e}", row.participant_identity_hex);
1931+
log::error!(
1932+
"recover_2pc_coordinator: invalid participant identity hex {}: {e}",
1933+
row.participant_identity_hex
1934+
);
19321935
continue;
19331936
}
19341937
};
@@ -1959,7 +1962,10 @@ impl ModuleHost {
19591962
}
19601963
}
19611964
Ok(resp) => {
1962-
log::warn!("recover_2pc_coordinator: commit for {prepare_id} returned {}", resp.status());
1965+
log::warn!(
1966+
"recover_2pc_coordinator: commit for {prepare_id} returned {}",
1967+
resp.status()
1968+
);
19631969
}
19641970
Err(e) => {
19651971
log::warn!("recover_2pc_coordinator: transport error for {prepare_id}: {e}");
@@ -1994,14 +2000,18 @@ impl ModuleHost {
19942000
let coordinator_identity = match Identity::from_hex(&row.coordinator_identity_hex) {
19952001
Ok(id) => id,
19962002
Err(e) => {
1997-
log::error!("recover_2pc_participant: invalid coordinator identity hex for {original_prepare_id}: {e}");
2003+
log::error!(
2004+
"recover_2pc_participant: invalid coordinator identity hex for {original_prepare_id}: {e}"
2005+
);
19982006
continue;
19992007
}
20002008
};
20012009
let caller_identity = match Identity::from_hex(&row.caller_identity_hex) {
20022010
Ok(id) => id,
20032011
Err(e) => {
2004-
log::error!("recover_2pc_participant: invalid caller identity hex for {original_prepare_id}: {e}");
2012+
log::error!(
2013+
"recover_2pc_participant: invalid caller identity hex for {original_prepare_id}: {e}"
2014+
);
20052015
continue;
20062016
}
20072017
};
@@ -2051,7 +2061,8 @@ impl ModuleHost {
20512061
auth_token.clone(),
20522062
coordinator_identity,
20532063
&original_prepare_id,
2054-
).await;
2064+
)
2065+
.await;
20552066
match decision {
20562067
Some(commit) => {
20572068
if commit {

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,10 @@ async fn send_ack_commit_to_coordinator(
432432
log::info!("2PC ack-commit: notified coordinator for {prepare_id}");
433433
}
434434
Ok(resp) => {
435-
log::warn!("2PC ack-commit: coordinator returned {} for {prepare_id}", resp.status());
435+
log::warn!(
436+
"2PC ack-commit: coordinator returned {} for {prepare_id}",
437+
resp.status()
438+
);
436439
}
437440
Err(e) => {
438441
log::warn!("2PC ack-commit: transport error for {prepare_id}: {e}");
@@ -709,8 +712,7 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
709712
let _ = prepared_tx.send((res, return_value));
710713

711714
// Step 4: wait for coordinator's decision (B never aborts on its own).
712-
let commit =
713-
Self::wait_for_2pc_decision(decision_rx, &prepare_id, coordinator_identity, &replica_ctx);
715+
let commit = Self::wait_for_2pc_decision(decision_rx, &prepare_id, coordinator_identity, &replica_ctx);
714716

715717
if commit {
716718
// Delete the marker in the same tx as the reducer changes (atomic commit).
@@ -1196,9 +1198,11 @@ impl InstanceCommon {
11961198
// B acknowledged COMMIT — remove coordinator log entry
11971199
// (best-effort; recovery will clean up on restart if missed).
11981200
if committed {
1199-
if let Err(e) = stdb.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| {
1200-
Ok(del_tx.delete_st_2pc_coordinator_log(prepare_id)?)
1201-
}) {
1201+
if let Err(e) = stdb
1202+
.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| {
1203+
Ok(del_tx.delete_st_2pc_coordinator_log(prepare_id)?)
1204+
})
1205+
{
12021206
log::warn!("delete_st_2pc_coordinator_log failed for {prepare_id}: {e}");
12031207
}
12041208
}

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@ use crate::{
2222
system_tables::{
2323
with_sys_table_buf, St2pcCoordinatorLogFields, St2pcCoordinatorLogRow, St2pcStateFields, St2pcStateRow,
2424
StClientFields, StClientRow, StColumnAccessorFields, StColumnAccessorRow, StColumnFields, StColumnRow,
25-
StConstraintFields, StConstraintRow, StEventTableRow,
26-
StFields as _, StIndexAccessorFields, StIndexAccessorRow, StIndexFields, StIndexRow, StRowLevelSecurityFields,
27-
StRowLevelSecurityRow, StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow,
28-
StTableAccessorFields, StTableAccessorRow, StTableFields, StTableRow, SystemTable,
29-
ST_2PC_COORDINATOR_LOG_ID, ST_2PC_STATE_ID, ST_CLIENT_ID, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID,
25+
StConstraintFields, StConstraintRow, StEventTableRow, StFields as _, StIndexAccessorFields, StIndexAccessorRow,
26+
StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields, StScheduledRow,
27+
StSequenceFields, StSequenceRow, StTableAccessorFields, StTableAccessorRow, StTableFields, StTableRow,
28+
SystemTable, ST_2PC_COORDINATOR_LOG_ID, ST_2PC_STATE_ID, ST_CLIENT_ID, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID,
3029
ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, ST_INDEX_ACCESSOR_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID,
3130
ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ACCESSOR_ID, ST_TABLE_ID,
3231
},
@@ -2771,9 +2770,7 @@ impl MutTxId {
27712770
self.insert_via_serialize_bsatn(ST_2PC_COORDINATOR_LOG_ID, row)
27722771
.map(|_| ())
27732772
.inspect_err(|e| {
2774-
log::error!(
2775-
"insert_st_2pc_coordinator_log: failed for prepare_id ({participant_prepare_id}): {e}"
2776-
);
2773+
log::error!("insert_st_2pc_coordinator_log: failed for prepare_id ({participant_prepare_id}): {e}");
27772774
})
27782775
}
27792776

@@ -2785,9 +2782,7 @@ impl MutTxId {
27852782
St2pcCoordinatorLogFields::ParticipantPrepareId.col_id(),
27862783
&AlgebraicValue::String(participant_prepare_id.into()),
27872784
) {
2788-
log::error!(
2789-
"delete_st_2pc_coordinator_log: no row for prepare_id ({participant_prepare_id}): {e}"
2790-
);
2785+
log::error!("delete_st_2pc_coordinator_log: no row for prepare_id ({participant_prepare_id}): {e}");
27912786
}
27922787
Ok(())
27932788
}

crates/smoketests/tests/smoketests/cross_db_2pc_recovery.rs

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,7 @@ fn setup_two_banks(test: &mut Smoketest, pid: u32, suffix: &str) -> (String, Str
166166
fn test_2pc_committed_data_survives_restart() {
167167
require_local_server!();
168168
let pid = std::process::id();
169-
let mut test = Smoketest::builder()
170-
.module_code(MODULE_CODE)
171-
.autopublish(false)
172-
.build();
169+
let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build();
173170

174171
let (db_a_identity, db_b_identity) = setup_two_banks(&mut test, pid, "dur");
175172

@@ -178,8 +175,16 @@ fn test_2pc_committed_data_survives_restart() {
178175
.expect("transfer_funds failed");
179176

180177
// Verify pre-restart state.
181-
assert_eq!(alice_balance(&test, &db_a_identity), 150, "A should have 150 before restart");
182-
assert_eq!(alice_balance(&test, &db_b_identity), 50, "B should have 50 before restart");
178+
assert_eq!(
179+
alice_balance(&test, &db_a_identity),
180+
150,
181+
"A should have 150 before restart"
182+
);
183+
assert_eq!(
184+
alice_balance(&test, &db_b_identity),
185+
50,
186+
"B should have 50 before restart"
187+
);
183188

184189
// Restart the server — exercises recovery path even though there's nothing to recover.
185190
test.restart_server();
@@ -207,18 +212,23 @@ fn test_2pc_committed_data_survives_restart() {
207212
fn test_2pc_aborted_state_survives_restart() {
208213
require_local_server!();
209214
let pid = std::process::id();
210-
let mut test = Smoketest::builder()
211-
.module_code(MODULE_CODE)
212-
.autopublish(false)
213-
.build();
215+
let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build();
214216

215217
let (db_a_identity, db_b_identity) = setup_two_banks(&mut test, pid, "abort-dur");
216218

217219
// Try to transfer 200 — B only has 100, so the remote debit panics → abort.
218220
let _ = test.call("transfer_funds", &[&db_b_identity, "alice", "alice", "200"]);
219221

220-
assert_eq!(alice_balance(&test, &db_a_identity), 100, "A should still be 100 after abort");
221-
assert_eq!(alice_balance(&test, &db_b_identity), 100, "B should still be 100 after abort");
222+
assert_eq!(
223+
alice_balance(&test, &db_a_identity),
224+
100,
225+
"A should still be 100 after abort"
226+
);
227+
assert_eq!(
228+
alice_balance(&test, &db_b_identity),
229+
100,
230+
"B should still be 100 after abort"
231+
);
222232

223233
test.restart_server();
224234

@@ -243,10 +253,7 @@ fn test_2pc_aborted_state_survives_restart() {
243253
#[test]
244254
fn test_2pc_status_endpoint_unknown_returns_abort() {
245255
let pid = std::process::id();
246-
let mut test = Smoketest::builder()
247-
.module_code(MODULE_CODE)
248-
.autopublish(false)
249-
.build();
256+
let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build();
250257

251258
let (db_a_identity, _db_b_identity) = setup_two_banks(&mut test, pid, "status");
252259

@@ -284,10 +291,7 @@ fn test_2pc_status_endpoint_unknown_returns_abort() {
284291
fn test_2pc_atomicity_under_crash() {
285292
require_local_server!();
286293
let pid = std::process::id();
287-
let mut test = Smoketest::builder()
288-
.module_code(MODULE_CODE)
289-
.autopublish(false)
290-
.build();
294+
let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build();
291295

292296
let (db_a_identity, db_b_identity) = setup_two_banks(&mut test, pid, "crash");
293297

@@ -340,10 +344,7 @@ fn test_2pc_atomicity_under_crash() {
340344
fn test_2pc_coordinator_recovery() {
341345
require_local_server!();
342346
let pid = std::process::id();
343-
let mut test = Smoketest::builder()
344-
.module_code(MODULE_CODE)
345-
.autopublish(false)
346-
.build();
347+
let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build();
347348

348349
let (db_a_identity, db_b_identity) = setup_two_banks(&mut test, pid, "coord-rec");
349350

@@ -378,10 +379,7 @@ fn test_2pc_coordinator_recovery() {
378379
let bal_a = alice_balance(&test, &db_a_identity);
379380
let bal_b = alice_balance(&test, &db_b_identity);
380381

381-
assert_eq!(
382-
bal_a, 150,
383-
"A should have committed (alice_a=150) before crash"
384-
);
382+
assert_eq!(bal_a, 150, "A should have committed (alice_a=150) before crash");
385383
assert_eq!(
386384
bal_b, 50,
387385
"B should have committed via coordinator recovery (alice_b=50), got {bal_b}"
@@ -407,10 +405,7 @@ fn test_2pc_coordinator_recovery() {
407405
fn test_2pc_participant_recovery_polls_and_aborts() {
408406
require_local_server!();
409407
let pid = std::process::id();
410-
let mut test = Smoketest::builder()
411-
.module_code(MODULE_CODE)
412-
.autopublish(false)
413-
.build();
408+
let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build();
414409

415410
let (db_a_identity, db_b_identity) = setup_two_banks(&mut test, pid, "part-rec");
416411

0 commit comments

Comments
 (0)