Skip to content

Commit 405a7a9

Browse files
committed
update smoketests
1 parent 95d112a commit 405a7a9

5 files changed

Lines changed: 286 additions & 298 deletions

File tree

crates/client-api/src/routes/database.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,18 +256,27 @@ pub async fn prepare<S: ControlStateDelegate + NodeDelegate>(
256256
reducer,
257257
}): Path<CallParams>,
258258
TypedHeader(content_type): TypedHeader<headers::ContentType>,
259+
headers: axum::http::HeaderMap,
259260
body: Bytes,
260261
) -> axum::response::Result<impl IntoResponse> {
261262
let args = parse_call_args(content_type, body)?;
262263
let caller_identity = auth.claims.identity;
263264

265+
// The coordinator sends its actual database identity in `X-Coordinator-Identity`.
266+
// Without this, `anon_auth_middleware` gives the HTTP caller an ephemeral random
267+
// identity, which gets stored in `st_2pc_state` and breaks recovery polling.
268+
let coordinator_identity = headers
269+
.get("X-Coordinator-Identity")
270+
.and_then(|v| v.to_str().ok())
271+
.and_then(|s| spacetimedb_lib::Identity::from_hex(s).ok());
272+
264273
let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?;
265274

266275
// 2PC prepare is a server-to-server call; no client lifecycle management needed.
267276
// call_identity_connected/disconnected submit jobs to the module's executor, which
268277
// will be blocked holding the 2PC write lock after prepare_reducer returns — deadlock.
269278
let result = module
270-
.prepare_reducer(caller_identity, None, &reducer, args)
279+
.prepare_reducer(caller_identity, None, &reducer, args, coordinator_identity)
271280
.await;
272281

273282
match result {

crates/core/src/host/instance_env.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1038,7 +1038,7 @@ impl InstanceEnv {
10381038
.bytes()
10391039
.await
10401040
.map_err(|e| NodesError::HttpError(e.to_string()))?;
1041-
Ok((status, body))
1041+
Ok::<_, NodesError>((status, body))
10421042
}
10431043
.await;
10441044

@@ -1093,6 +1093,7 @@ impl InstanceEnv {
10931093
let mut req = client
10941094
.post(&url)
10951095
.header(http::header::CONTENT_TYPE, "application/octet-stream")
1096+
.header("X-Coordinator-Identity", caller_identity.to_hex().to_string())
10961097
.body(args);
10971098
if let Some(token) = auth_token {
10981099
req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}"));

crates/core/src/host/module_host.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1757,9 +1757,25 @@ impl ModuleHost {
17571757
caller_connection_id: Option<ConnectionId>,
17581758
reducer_name: &str,
17591759
args: FunctionArgs,
1760+
// The actual coordinator database identity (from `X-Coordinator-Identity` header).
1761+
// When `Some`, used for `prepare_id` namespacing and stored in `st_2pc_state` for
1762+
// recovery. Falls back to `caller_identity` when `None` (e.g., internal calls).
1763+
coordinator_identity_override: Option<Identity>,
17601764
) -> Result<(String, ReducerCallResult, Option<Bytes>), ReducerCallError> {
17611765
use std::sync::atomic::{AtomicU64, Ordering};
1762-
static PREPARE_COUNTER: AtomicU64 = AtomicU64::new(1);
1766+
use std::sync::OnceLock;
1767+
// Counter seeded from current time on first use so that restarts begin from a
1768+
// different value than any existing st_2pc_state entries (which hold IDs from
1769+
// previous sessions starting at much smaller counter values).
1770+
static PREPARE_COUNTER: AtomicU64 = AtomicU64::new(0);
1771+
static PREPARE_COUNTER_INIT: OnceLock<()> = OnceLock::new();
1772+
PREPARE_COUNTER_INIT.get_or_init(|| {
1773+
let seed = std::time::SystemTime::now()
1774+
.duration_since(std::time::UNIX_EPOCH)
1775+
.unwrap_or_default()
1776+
.as_micros() as u64;
1777+
PREPARE_COUNTER.store(seed, Ordering::Relaxed);
1778+
});
17631779

17641780
let (reducer_id, reducer_def) = self
17651781
.info
@@ -1788,9 +1804,13 @@ impl ModuleHost {
17881804
args,
17891805
};
17901806

1807+
// Resolve the effective coordinator identity before generating the prepare_id so
1808+
// the prefix is namespaced correctly even when called from the HTTP prepare handler.
1809+
let coordinator_identity = coordinator_identity_override.unwrap_or(caller_identity);
1810+
17911811
// Include the coordinator identity so prepare_ids from different coordinators
17921812
// cannot collide on the participant's st_2pc_state table.
1793-
let coordinator_hex = caller_identity.to_hex();
1813+
let coordinator_hex = coordinator_identity.to_hex();
17941814
let prepare_id = format!(
17951815
"prepare-{}-{}",
17961816
&coordinator_hex.to_string()[..16],
@@ -1815,7 +1835,6 @@ impl ModuleHost {
18151835
let this = self.clone();
18161836
let reducer_name_owned = reducer_def.name.clone();
18171837
let prepare_id_clone = prepare_id.clone();
1818-
let coordinator_identity = caller_identity;
18191838
tokio::spawn(async move {
18201839
let _ = this
18211840
.call(
@@ -1826,7 +1845,7 @@ impl ModuleHost {
18261845
Ok::<(), ReducerCallError>(())
18271846
},
18281847
// JS modules: no 2PC support yet.
1829-
async |(p, _pid, _cid, ptx, _drx), inst| Err(ReducerCallError::NoSuchReducer),
1848+
async |(_p, _pid, _cid, _ptx, _drx), _inst| Err(ReducerCallError::NoSuchReducer),
18301849
)
18311850
.await;
18321851
});
@@ -2008,7 +2027,7 @@ impl ModuleHost {
20082027

20092028
// Step 1: Re-run the reducer to reacquire the write lock.
20102029
let new_prepare_id = match this
2011-
.prepare_reducer(caller_identity, Some(caller_connection_id), &row.reducer_name, args)
2030+
.prepare_reducer(caller_identity, Some(caller_connection_id), &row.reducer_name, args, Some(coordinator_identity))
20122031
.await
20132032
{
20142033
Ok((pid, result, _rv)) if !pid.is_empty() => {

crates/smoketests/tests/smoketests/cross_db_2pc.rs

Lines changed: 115 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,21 @@ use spacetimedb_smoketests::Smoketest;
22

33
/// Module code for the 2PC test.
44
///
5-
/// Both the "bank A" and "bank B" databases use the same module.
5+
/// All three databases (A = coordinator, B and C = participants) use the same module.
66
///
77
/// Tables:
88
/// - `Ledger(account: String PK, balance: i64)` -- stores account balances.
99
///
1010
/// Reducers:
1111
/// - `init`: seeds "alice" with balance 100.
12+
/// - `balance(account) -> i64`: returns the current balance for an account.
1213
/// - `debit(account, amount)`: decrements balance, panics if insufficient funds.
1314
/// - `credit(account, amount)`: increments balance (or inserts if absent).
14-
/// - `transfer_funds(target_hex, from_account, to_account, amount)`:
15-
/// Credits `to_account` locally, then calls `debit` on the remote database
16-
/// using `call_reducer_on_db_2pc`. If the remote debit fails (panic/insufficient funds),
17-
/// the local credit is also rolled back by the 2PC protocol.
15+
/// - `transfer_funds(b_hex, c_hex, from_account, to_account, amount) -> TransferResult`:
16+
/// Credits `amount * 2` to `to_account` locally (collecting `amount` from each of B and C),
17+
/// then calls `debit(from_account, amount)` on both B and C via `call_reducer_on_db_2pc`.
18+
/// If either remote debit fails, all three databases are rolled back atomically.
19+
/// On success, returns the new local balance so the caller can verify without a second query.
1820
const MODULE_CODE: &str = r#"
1921
use spacetimedb::{log, ReducerContext, Table, Identity};
2022
@@ -30,6 +32,14 @@ pub fn init(ctx: &ReducerContext) {
3032
ctx.db.ledger().insert(Ledger { account: "alice".to_string(), balance: 100 });
3133
}
3234
35+
/// Returns the current balance for `account`.
36+
#[spacetimedb::reducer]
37+
pub fn balance(ctx: &ReducerContext, account: String) -> Result<i64, String> {
38+
ctx.db.ledger().account().find(&account)
39+
.map(|r| r.balance)
40+
.ok_or_else(|| format!("account '{}' not found", account))
41+
}
42+
3343
#[spacetimedb::reducer]
3444
pub fn debit(ctx: &ReducerContext, account: String, amount: i64) {
3545
let row = ctx.db.ledger().account().find(&account)
@@ -53,143 +63,131 @@ pub fn credit(ctx: &ReducerContext, account: String, amount: i64) {
5363
}
5464
}
5565
56-
/// Transfer `amount` from `from_account` on the remote database to `to_account` locally.
66+
/// Transfer `amount` from `from_account` on both B and C to `to_account` on A (locally).
67+
///
68+
/// Returns the new local balance of `to_account` so the caller can verify correctness
69+
/// without issuing a separate query.
5770
///
58-
/// Uses 2PC: credits locally first, then calls debit on the remote database via
59-
/// `call_reducer_on_db_2pc`. If the remote debit fails, the coordinator's reducer also
60-
/// fails, triggering abort of all participants.
71+
/// If either remote debit fails (insufficient funds), returns Err and the 2PC protocol
72+
/// rolls back all three databases atomically.
6173
#[spacetimedb::reducer]
62-
pub fn transfer_funds(ctx: &ReducerContext, target_hex: String, from_account: String, to_account: String, amount: i64) {
63-
// Credit locally first.
64-
credit(ctx, to_account.clone(), amount);
65-
66-
// Now call debit on the remote database using 2PC.
67-
let target = Identity::from_hex(&target_hex).expect("invalid target identity hex");
68-
let args = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(from_account, amount)).expect("failed to encode args");
69-
match spacetimedb::remote_reducer::call_reducer_on_db_2pc(target, "debit", &args) {
70-
Ok(()) => {
71-
log::info!("transfer_funds: remote debit succeeded");
72-
}
73-
Err(e) => {
74-
log::error!("transfer_funds: remote debit failed: {}", e);
75-
panic!("remote debit failed: {e}");
76-
}
77-
}
74+
pub fn transfer_funds(ctx: &ReducerContext, b_hex: String, c_hex: String, from_account: String, to_account: String, amount: i64) -> Result<i64, String> {
75+
credit(ctx, to_account.clone(), amount * 2);
76+
77+
let b = Identity::from_hex(&b_hex).map_err(|e| format!("invalid B identity: {e}"))?;
78+
let args_b = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(from_account.clone(), amount)).map_err(|e| format!("failed to encode args: {e}"))?;
79+
spacetimedb::remote_reducer::call_reducer_on_db_2pc(b, "debit", &args_b)
80+
.map_err(|e| format!("debit on B failed: {e}"))?;
81+
log::info!("transfer_funds: debit on B succeeded");
82+
83+
let c = Identity::from_hex(&c_hex).map_err(|e| format!("invalid C identity: {e}"))?;
84+
let args_c = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(from_account, amount)).map_err(|e| format!("failed to encode args: {e}"))?;
85+
spacetimedb::remote_reducer::call_reducer_on_db_2pc(c, "debit", &args_c)
86+
.map_err(|e| format!("debit on C failed: {e}"))?;
87+
log::info!("transfer_funds: debit on C succeeded");
88+
89+
// Return new local balance so the caller can assert correctness immediately.
90+
ctx.db.ledger().account().find(&to_account)
91+
.map(|r| r.balance)
92+
.ok_or_else(|| format!("account '{}' not found after credit", to_account))
7893
}
7994
"#;
8095

81-
/// Happy path: transfer 50 from B's alice to A's alice.
82-
/// After: A alice = 150, B alice = 50.
96+
/// Call `balance(account)` on `db_identity` via the HTTP API and return the i64 result.
97+
fn call_balance(test: &Smoketest, db_identity: &str, account: &str) -> i64 {
98+
let resp = test
99+
.api_call_json(
100+
"POST",
101+
&format!("/v1/database/{db_identity}/call/balance"),
102+
&format!("[\"{account}\"]"),
103+
)
104+
.unwrap_or_else(|e| panic!("balance call failed for {db_identity}: {e}"));
105+
assert!(resp.is_success(), "balance reducer returned {}", resp.status_code);
106+
resp.json()
107+
.unwrap_or_else(|e| panic!("failed to parse balance JSON: {e}"))
108+
.as_i64()
109+
.unwrap_or_else(|| panic!("balance JSON was not an integer"))
110+
}
111+
112+
/// Happy path: transfer 30 from both B's alice and C's alice to A's alice.
113+
///
114+
/// The coordinator reducer returns the new local balance (160), which is used directly
115+
/// to assert A's result. B and C balances are verified via `balance` reducer calls.
116+
///
117+
/// Expected: A=160, B=70, C=70.
83118
#[test]
84119
fn test_cross_db_2pc_happy_path() {
85120
let pid = std::process::id();
86121
let db_a_name = format!("2pc-bank-a-{pid}");
87122
let db_b_name = format!("2pc-bank-b-{pid}");
123+
let db_c_name = format!("2pc-bank-c-{pid}");
88124

89125
let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build();
90126

91-
// Publish bank B (the participant that will be debited).
92-
test.publish_module_named(&db_b_name, false)
93-
.expect("failed to publish bank B");
127+
// Publish participants first, then coordinator.
128+
test.publish_module_named(&db_b_name, false).expect("failed to publish bank B");
94129
let db_b_identity = test.database_identity.clone().expect("bank B identity not set");
95130

96-
// Publish bank A (the coordinator that will be credited).
97-
test.publish_module_named(&db_a_name, false)
98-
.expect("failed to publish bank A");
99-
let _db_a_identity = test.database_identity.clone().expect("bank A identity not set");
100-
101-
// Transfer 50 from B's alice to A's alice.
102-
// The coordinator is bank A. It credits locally, then calls debit on B via 2PC.
103-
test.call("transfer_funds", &[&db_b_identity, "alice", "alice", "50"])
104-
.expect("transfer_funds failed");
105-
106-
// Verify bank A: alice should have 150.
107-
let result_a = test
108-
.spacetime(&[
109-
"sql",
110-
"--server",
111-
&test.server_url,
112-
test.database_identity.as_ref().unwrap(),
113-
"SELECT balance FROM ledger WHERE account = 'alice'",
114-
])
115-
.expect("sql query on bank A failed");
116-
assert!(
117-
result_a.contains("150"),
118-
"Expected bank A alice balance = 150, got:\n{result_a}"
119-
);
120-
121-
// Verify bank B: alice should have 50.
122-
let result_b = test
123-
.spacetime(&[
124-
"sql",
125-
"--server",
126-
&test.server_url,
127-
&db_b_identity,
128-
"SELECT balance FROM ledger WHERE account = 'alice'",
129-
])
130-
.expect("sql query on bank B failed");
131-
assert!(
132-
result_b.contains("50"),
133-
"Expected bank B alice balance = 50, got:\n{result_b}"
134-
);
131+
test.publish_module_named(&db_c_name, false).expect("failed to publish bank C");
132+
let db_c_identity = test.database_identity.clone().expect("bank C identity not set");
133+
134+
test.publish_module_named(&db_a_name, false).expect("failed to publish bank A");
135+
let db_a_identity = test.database_identity.clone().expect("bank A identity not set");
136+
137+
// Call transfer_funds; the return value is A's new alice balance.
138+
let resp = test
139+
.api_call_json(
140+
"POST",
141+
&format!("/v1/database/{db_a_identity}/call/transfer_funds"),
142+
&format!("[\"{db_b_identity}\", \"{db_c_identity}\", \"alice\", \"alice\", 30]"),
143+
)
144+
.expect("transfer_funds call failed");
145+
assert!(resp.is_success(), "transfer_funds failed: {}", resp.status_code);
146+
let new_a_balance = resp.json().expect("invalid JSON").as_i64().expect("not i64");
147+
assert_eq!(new_a_balance, 160, "transfer_funds return value: expected A alice=160");
148+
149+
// Verify B and C via balance reducer.
150+
assert_eq!(call_balance(&test, &db_b_identity, "alice"), 70, "B alice should be 70");
151+
assert_eq!(call_balance(&test, &db_c_identity, "alice"), 70, "C alice should be 70");
135152
}
136153

137-
/// Abort path: try to transfer 200, but B only has 100.
138-
/// The remote debit should fail, causing the coordinator reducer to panic,
139-
/// which should roll back the local credit.
140-
/// After: both A and B should still have alice = 100.
154+
/// Abort path: try to transfer 110 from B and C, but both only have 100.
155+
///
156+
/// B's debit fails (insufficient funds), so the coordinator reducer panics and the
157+
/// 2PC protocol rolls back all three databases. We verify via `balance` reducer calls
158+
/// that every account is still at 100.
159+
///
160+
/// Expected: A=100, B=100, C=100.
141161
#[test]
142162
fn test_cross_db_2pc_abort_insufficient_funds() {
143163
let pid = std::process::id();
144164
let db_a_name = format!("2pc-abort-a-{pid}");
145165
let db_b_name = format!("2pc-abort-b-{pid}");
166+
let db_c_name = format!("2pc-abort-c-{pid}");
146167

147168
let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build();
148169

149-
// Publish bank B.
150-
test.publish_module_named(&db_b_name, false)
151-
.expect("failed to publish bank B");
170+
test.publish_module_named(&db_b_name, false).expect("failed to publish bank B");
152171
let db_b_identity = test.database_identity.clone().expect("bank B identity not set");
153172

154-
// Publish bank A.
155-
test.publish_module_named(&db_a_name, false)
156-
.expect("failed to publish bank A");
157-
158-
// Try to transfer 200 -- B only has 100, so the remote debit will fail.
159-
let result = test.call("transfer_funds", &[&db_b_identity, "alice", "alice", "200"]);
160-
// The call should fail because the remote debit panicked.
161-
assert!(
162-
result.is_err(),
163-
"Expected transfer_funds to fail due to insufficient funds"
164-
);
165-
166-
// Verify bank A: alice should still have 100 (the local credit was rolled back).
167-
let result_a = test
168-
.spacetime(&[
169-
"sql",
170-
"--server",
171-
&test.server_url,
172-
test.database_identity.as_ref().unwrap(),
173-
"SELECT balance FROM ledger WHERE account = 'alice'",
174-
])
175-
.expect("sql query on bank A failed");
176-
assert!(
177-
result_a.contains("100"),
178-
"Expected bank A alice balance = 100 after failed transfer, got:\n{result_a}"
179-
);
180-
181-
// Verify bank B: alice should still have 100.
182-
let result_b = test
183-
.spacetime(&[
184-
"sql",
185-
"--server",
186-
&test.server_url,
187-
&db_b_identity,
188-
"SELECT balance FROM ledger WHERE account = 'alice'",
189-
])
190-
.expect("sql query on bank B failed");
191-
assert!(
192-
result_b.contains("100"),
193-
"Expected bank B alice balance = 100 after failed transfer, got:\n{result_b}"
194-
);
173+
test.publish_module_named(&db_c_name, false).expect("failed to publish bank C");
174+
let db_c_identity = test.database_identity.clone().expect("bank C identity not set");
175+
176+
test.publish_module_named(&db_a_name, false).expect("failed to publish bank A");
177+
let db_a_identity = test.database_identity.clone().expect("bank A identity not set");
178+
179+
// Transfer 110 from each — both only have 100, so B's debit panics → 2PC aborts all.
180+
let resp = test
181+
.api_call_json(
182+
"POST",
183+
&format!("/v1/database/{db_a_identity}/call/transfer_funds"),
184+
&format!("[\"{db_b_identity}\", \"{db_c_identity}\", \"alice\", \"alice\", 110]"),
185+
)
186+
.expect("api_call failed");
187+
assert!(!resp.is_success(), "Expected transfer_funds to fail due to insufficient funds");
188+
189+
// All three accounts must still be at 100.
190+
assert_eq!(call_balance(&test, &db_a_identity, "alice"), 100, "A alice should still be 100");
191+
assert_eq!(call_balance(&test, &db_b_identity, "alice"), 100, "B alice should still be 100");
192+
assert_eq!(call_balance(&test, &db_c_identity, "alice"), 100, "C alice should still be 100");
195193
}

0 commit comments

Comments
 (0)