Skip to content

Commit 9968095

Browse files
committed
Merge branch 'jdetter/tpcc' into jsdt/2pc-wound-wait
2 parents 33ee988 + 3fec4df commit 9968095

157 files changed

Lines changed: 13140 additions & 3417 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ members = [
5252
"modules/sdk-test-view-pk",
5353
"modules/sdk-test-event-table",
5454
"modules/tpcc",
55+
"modules/tpcc-metrics",
5556
"sdks/rust/tests/test-client",
5657
"sdks/rust/tests/test-counter",
5758
"sdks/rust/tests/connect_disconnect_client",
@@ -158,7 +159,7 @@ anymap = "0.12"
158159
arrayvec = "0.7.2"
159160
async-stream = "0.3.6"
160161
async-trait = "0.1.68"
161-
axum = { version = "0.7", features = ["tracing"] }
162+
axum = { version = "0.7", features = ["tracing", "http2"] }
162163
axum-extra = { version = "0.9", features = ["typed-header"] }
163164
backtrace = "0.3.66"
164165
base64 = "0.21.2"

crates/bindings/src/remote_reducer.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,22 @@ impl core::fmt::Display for RemoteCallError {
5252
/// - `reducer_name`: the name of the reducer to invoke (must be valid UTF-8).
5353
/// - `args`: BSATN-encoded reducer arguments.
5454
///
55-
/// Returns `Ok(())` when the remote reducer ran and succeeded.
55+
/// Returns `Ok(bytes)` when the remote reducer ran and succeeded, with `bytes` being the reducer's output.
5656
/// Returns `Err(RemoteCallError::Failed(msg))` when the reducer ran but returned an error.
5757
/// Returns `Err(RemoteCallError::NotFound(msg))` when the database or reducer does not exist.
5858
/// Returns `Err(RemoteCallError::Unreachable(msg))` on transport failure (connection refused, timeout, …).
59-
pub fn call_reducer_on_db(database_identity: Identity, reducer_name: &str, args: &[u8]) -> Result<(), RemoteCallError> {
59+
pub fn call_reducer_on_db(
60+
database_identity: Identity,
61+
reducer_name: &str,
62+
args: &[u8],
63+
) -> Result<Vec<u8>, RemoteCallError> {
6064
let identity_bytes = database_identity.to_byte_array();
6165
match spacetimedb_bindings_sys::call_reducer_on_db(identity_bytes, reducer_name, args) {
6266
Ok((status, body_source)) => {
6367
if status < 300 {
64-
return Ok(());
68+
let mut out = Vec::new();
69+
read_bytes_source_into(body_source, &mut out);
70+
return Ok(out);
6571
}
6672
// Decode the response body as the error message.
6773
let msg = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID {

crates/client-api/src/routes/database.rs

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
148148
.and_then(|v| v.to_str().ok())
149149
.and_then(|s| s.parse::<GlobalTxId>().ok());
150150

151-
let args = parse_call_args(content_type, body)?;
151+
let (args, want_bsatn) = parse_call_args(content_type, body)?;
152152

153153
// HTTP callers always need a connection ID to provide to connect/disconnect,
154154
// so generate one.
@@ -202,8 +202,14 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
202202

203203
match result {
204204
Ok(CallResult::Reducer(result)) => {
205-
let (status, body) =
206-
reducer_outcome_response(&module, &owner_identity, &reducer, result.outcome, reducer_return_value)?;
205+
let (status, body) = reducer_outcome_response(
206+
&module,
207+
&owner_identity,
208+
&reducer,
209+
result.outcome,
210+
reducer_return_value,
211+
want_bsatn,
212+
)?;
207213
Ok((
208214
status,
209215
TypedHeader(SpacetimeEnergyUsed(result.energy_used)),
@@ -216,7 +222,7 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
216222
// Procedures don't assign a special meaning to error returns, unlike reducers,
217223
// as there's no transaction for them to automatically abort.
218224
// Instead, we just pass on their return value with the OK status so long as we successfully invoked the procedure.
219-
let (status, body) = procedure_outcome_response(result.return_val);
225+
let (status, body) = procedure_outcome_response(result.return_val, want_bsatn);
220226
Ok((
221227
status,
222228
TypedHeader(SpacetimeExecutionDurationMicros(result.execution_duration)),
@@ -232,13 +238,15 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
232238
///
233239
/// - `application/json` → [`FunctionArgs::Json`] (UTF-8 required).
234240
/// - `application/octet-stream` → [`FunctionArgs::Bsatn`] (raw BSATN bytes).
235-
fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::response::Result<FunctionArgs> {
241+
///
242+
/// Also returns `want_bsatn`, a bool, which is true if and only if the response should be BSATN-encoded.
243+
fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::response::Result<(FunctionArgs, bool)> {
236244
if content_type == headers::ContentType::json() {
237245
let s = bytestring::ByteString::try_from(body)
238246
.map_err(|_| (StatusCode::BAD_REQUEST, "request body is not valid UTF-8").into_response())?;
239-
Ok(FunctionArgs::Json(s))
247+
Ok((FunctionArgs::Json(s), false))
240248
} else if content_type == headers::ContentType::octet_stream() {
241-
Ok(FunctionArgs::Bsatn(body))
249+
Ok((FunctionArgs::Bsatn(body), true))
242250
} else {
243251
Err((
244252
StatusCode::BAD_REQUEST,
@@ -258,12 +266,12 @@ fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::res
258266
pub async fn prepare<S: ControlStateDelegate + NodeDelegate>(
259267
State(worker_ctx): State<S>,
260268
Extension(auth): Extension<SpacetimeAuth>,
261-
headers: HeaderMap,
262269
Path(CallParams {
263270
name_or_identity,
264271
reducer,
265272
}): Path<CallParams>,
266273
TypedHeader(content_type): TypedHeader<headers::ContentType>,
274+
headers: axum::http::HeaderMap,
267275
body: Bytes,
268276
) -> axum::response::Result<impl IntoResponse> {
269277
let args = parse_call_args(content_type, body)?;
@@ -273,19 +281,27 @@ pub async fn prepare<S: ControlStateDelegate + NodeDelegate>(
273281
.and_then(|v| v.to_str().ok())
274282
.and_then(|s| s.parse::<GlobalTxId>().ok());
275283

284+
// The coordinator sends its actual database identity in `X-Coordinator-Identity`.
285+
// Without this, `anon_auth_middleware` gives the HTTP caller an ephemeral random
286+
// identity, which gets stored in `st_2pc_state` and breaks recovery polling.
287+
let coordinator_identity = headers
288+
.get("X-Coordinator-Identity")
289+
.and_then(|v| v.to_str().ok())
290+
.and_then(|s| spacetimedb_lib::Identity::from_hex(s).ok());
291+
276292
let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?;
277293

278294
// 2PC prepare is a server-to-server call; no client lifecycle management needed.
279295
// call_identity_connected/disconnected submit jobs to the module's executor, which
280296
// will be blocked holding the 2PC write lock after prepare_reducer returns — deadlock.
281297
let result = module
282-
.prepare_reducer(caller_identity, None, tx_id, &reducer, args)
298+
.prepare_reducer(caller_identity, None, tx_id, &reducer, args.0, coordinator_identity)
283299
.await;
284300

285301
match result {
286302
Ok((prepare_id, rcr, return_value)) => {
287303
let (status, body) =
288-
reducer_outcome_response(&module, &owner_identity, &reducer, rcr.outcome, return_value)?;
304+
reducer_outcome_response(&module, &owner_identity, &reducer, rcr.outcome, return_value, args.1)?;
289305
let mut response = (
290306
status,
291307
TypedHeader(SpacetimeEnergyUsed(rcr.energy_used)),
@@ -436,12 +452,18 @@ pub async fn wound_2pc<S: ControlStateDelegate + NodeDelegate>(
436452
Ok(StatusCode::OK)
437453
}
438454

455+
/// Encode a reducer return value as an HTTP response.
456+
///
457+
/// If the outcome is an error, return a raw string with `application/text`. Ignore `want_bsatn` in this case.
458+
/// If the outcome is successful, and `want_bsatn`, send BSATN with `application/octet-stream`.
459+
/// If the outcome is successful, and not `want_bsatn`, send JSON with `application/json`.
439460
fn reducer_outcome_response(
440461
module: &ModuleHost,
441462
owner_identity: &Identity,
442463
reducer: &str,
443464
outcome: ReducerOutcome,
444465
reducer_return_value: Option<Bytes>,
466+
want_bsatn: bool,
445467
) -> axum::response::Result<(StatusCode, axum::response::Response)> {
446468
match outcome {
447469
ReducerOutcome::Committed => {
@@ -453,18 +475,25 @@ fn reducer_outcome_response(
453475
};
454476

455477
if let Some(bytes) = reducer_return_value.filter(|value| !value.is_empty()) {
456-
let seed = sats::WithTypespace::new(module.info.module_def.typespace(), &return_value);
457-
let mut reader = &bytes[..];
458-
let value: AlgebraicValue = seed.deserialize(bsatn::Deserializer::new(&mut reader)).map_err(|err| {
459-
(
460-
StatusCode::INTERNAL_SERVER_ERROR,
461-
format!("Failed to decode reducer return value: {err}"),
462-
)
463-
})?;
464-
Ok((
465-
StatusCode::OK,
466-
axum::Json(sats::serde::SerdeWrapper(value)).into_response(),
467-
))
478+
if want_bsatn {
479+
Ok((StatusCode::OK, bytes.into_response()))
480+
} else {
481+
let seed = sats::WithTypespace::new(module.info.module_def.typespace(), &return_value);
482+
let mut reader = &bytes[..];
483+
let value: AlgebraicValue =
484+
seed.deserialize(bsatn::Deserializer::new(&mut reader)).map_err(|err| {
485+
(
486+
StatusCode::INTERNAL_SERVER_ERROR,
487+
format!("Failed to decode reducer return value: {err}"),
488+
)
489+
})?;
490+
Ok((
491+
StatusCode::OK,
492+
axum::Json(sats::serde::SerdeWrapper(value)).into_response(),
493+
))
494+
}
495+
} else if want_bsatn {
496+
Ok((StatusCode::OK, Vec::<u8>::new().into_response()))
468497
} else {
469498
Ok((StatusCode::OK, "".into_response()))
470499
}
@@ -550,10 +579,20 @@ pub enum DBCallErr {
550579
InstanceNotScheduled,
551580
}
552581

553-
fn procedure_outcome_response(return_val: AlgebraicValue) -> (StatusCode, axum::response::Response) {
582+
/// Encode a procedure's result as an HTTP response.
583+
///
584+
/// If `want_bsatn`, send BSATN bytes as `application/octet-stream`.
585+
/// If not `want_bsatn`, send JSON as `application/json`.
586+
fn procedure_outcome_response(return_val: AlgebraicValue, want_bsatn: bool) -> (StatusCode, axum::response::Response) {
554587
(
555588
StatusCode::OK,
556-
axum::Json(sats::serde::SerdeWrapper(return_val)).into_response(),
589+
if want_bsatn {
590+
bsatn::to_vec(&return_val)
591+
.expect("BSATN serializing an AlgebraicValue should be infallible")
592+
.into_response()
593+
} else {
594+
axum::Json(sats::serde::SerdeWrapper(return_val)).into_response()
595+
},
557596
)
558597
}
559598

crates/core/src/config.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,12 @@ impl MetadataFile {
5656
path.write(self.to_string())
5757
}
5858

59-
fn check_compatibility(previous: &Self, current: &Self) -> anyhow::Result<()> {
59+
fn check_compatibility(previous: &Self, current: &Self, metafile: &Path) -> anyhow::Result<()> {
6060
anyhow::ensure!(
6161
previous.edition == current.edition,
62-
"metadata.toml indicates that this database is from a different \
62+
"metadata.toml at {} indicates that this database is from a different \
6363
edition of SpacetimeDB (running {:?}, but this database is {:?})",
64+
metafile.display(),
6465
current.edition,
6566
previous.edition,
6667
);
@@ -110,8 +111,8 @@ impl MetadataFile {
110111
/// `self` is the metadata file read from a database, and current is
111112
/// the default metadata file that the active database version would
112113
/// right to a new database.
113-
pub fn check_compatibility_and_update(mut self, current: Self) -> anyhow::Result<Self> {
114-
Self::check_compatibility(&self, &current)?;
114+
pub fn check_compatibility_and_update(mut self, current: Self, metafile: &Path) -> anyhow::Result<Self> {
115+
Self::check_compatibility(&self, &current, metafile)?;
115116
// bump the version in the file only if it's being run in a newer database.
116117
self.version = std::cmp::max(self.version, current.version);
117118
Ok(self)
@@ -344,67 +345,67 @@ mod tests {
344345
fn check_metadata_compatibility_checking() {
345346
assert_eq!(
346347
mkmeta(1, 0, 0)
347-
.check_compatibility_and_update(mkmeta(1, 0, 1))
348+
.check_compatibility_and_update(mkmeta(1, 0, 1), Path::new("metadata.toml"))
348349
.unwrap()
349350
.version,
350351
mkver(1, 0, 1)
351352
);
352353
assert_eq!(
353354
mkmeta(1, 0, 1)
354-
.check_compatibility_and_update(mkmeta(1, 0, 0))
355+
.check_compatibility_and_update(mkmeta(1, 0, 0), Path::new("metadata.toml"))
355356
.unwrap()
356357
.version,
357358
mkver(1, 0, 1)
358359
);
359360

360361
mkmeta(1, 1, 0)
361-
.check_compatibility_and_update(mkmeta(1, 0, 5))
362+
.check_compatibility_and_update(mkmeta(1, 0, 5), Path::new("metadata.toml"))
362363
.unwrap_err();
363364
mkmeta(2, 0, 0)
364-
.check_compatibility_and_update(mkmeta(1, 3, 5))
365+
.check_compatibility_and_update(mkmeta(1, 3, 5), Path::new("metadata.toml"))
365366
.unwrap_err();
366367
assert_eq!(
367368
mkmeta(1, 12, 0)
368-
.check_compatibility_and_update(mkmeta(2, 0, 0))
369+
.check_compatibility_and_update(mkmeta(2, 0, 0), Path::new("metadata.toml"))
369370
.unwrap()
370371
.version,
371372
mkver(2, 0, 0)
372373
);
373374
mkmeta(2, 0, 0)
374-
.check_compatibility_and_update(mkmeta(3, 0, 0))
375+
.check_compatibility_and_update(mkmeta(3, 0, 0), Path::new("metadata.toml"))
375376
.unwrap_err();
376377
}
377378

378379
#[test]
379380
fn check_metadata_compatibility_prerelease() {
380381
mkmeta(1, 9, 0)
381-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
382+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
382383
.unwrap();
383384

384385
mkmeta_pre(2, 0, 0, "rc1")
385-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
386+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
386387
.unwrap();
387388

388389
mkmeta_pre(2, 0, 0, "rc1")
389-
.check_compatibility_and_update(mkmeta(2, 0, 1))
390+
.check_compatibility_and_update(mkmeta(2, 0, 1), Path::new("metadata.toml"))
390391
.unwrap();
391392

392393
mkmeta_pre(2, 0, 0, "rc1")
393-
.check_compatibility_and_update(mkmeta(2, 0, 0))
394+
.check_compatibility_and_update(mkmeta(2, 0, 0), Path::new("metadata.toml"))
394395
.unwrap();
395396

396397
// Now check some failures..
397398

398399
mkmeta_pre(2, 0, 0, "rc1")
399-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc2"))
400+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc2"), Path::new("metadata.toml"))
400401
.unwrap_err();
401402

402403
mkmeta_pre(2, 0, 0, "rc2")
403-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
404+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
404405
.unwrap_err();
405406

406407
mkmeta(2, 0, 0)
407-
.check_compatibility_and_update(mkmeta_pre(2, 1, 0, "rc1"))
408+
.check_compatibility_and_update(mkmeta_pre(2, 1, 0, "rc1"), Path::new("metadata.toml"))
408409
.unwrap_err();
409410
}
410411

0 commit comments

Comments
 (0)