Skip to content

Commit f0c14b3

Browse files
committed
Merge remote-tracking branch 'origin/jdetter/tpcc' into shub/2pc-regular
2 parents 405a7a9 + 1c3572e commit f0c14b3

152 files changed

Lines changed: 12555 additions & 3324 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ members = [
5252
"modules/sdk-test-view-pk",
5353
"modules/sdk-test-event-table",
5454
"modules/tpcc",
55+
"modules/tpcc-metrics",
5556
"sdks/rust/tests/test-client",
5657
"sdks/rust/tests/test-counter",
5758
"sdks/rust/tests/connect_disconnect_client",

crates/bindings/src/remote_reducer.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,22 @@ impl core::fmt::Display for RemoteCallError {
5252
/// - `reducer_name`: the name of the reducer to invoke (must be valid UTF-8).
5353
/// - `args`: BSATN-encoded reducer arguments.
5454
///
55-
/// Returns `Ok(())` when the remote reducer ran and succeeded.
55+
/// Returns `Ok(bytes)` when the remote reducer ran and succeeded, with `bytes` being the reducer's output.
5656
/// Returns `Err(RemoteCallError::Failed(msg))` when the reducer ran but returned an error.
5757
/// Returns `Err(RemoteCallError::NotFound(msg))` when the database or reducer does not exist.
5858
/// Returns `Err(RemoteCallError::Unreachable(msg))` on transport failure (connection refused, timeout, …).
59-
pub fn call_reducer_on_db(database_identity: Identity, reducer_name: &str, args: &[u8]) -> Result<(), RemoteCallError> {
59+
pub fn call_reducer_on_db(
60+
database_identity: Identity,
61+
reducer_name: &str,
62+
args: &[u8],
63+
) -> Result<Vec<u8>, RemoteCallError> {
6064
let identity_bytes = database_identity.to_byte_array();
6165
match spacetimedb_bindings_sys::call_reducer_on_db(identity_bytes, reducer_name, args) {
6266
Ok((status, body_source)) => {
6367
if status < 300 {
64-
return Ok(());
68+
let mut out = Vec::new();
69+
read_bytes_source_into(body_source, &mut out);
70+
return Ok(out);
6571
}
6672
// Decode the response body as the error message.
6773
let msg = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID {

crates/client-api/src/routes/database.rs

Lines changed: 57 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
142142
) -> axum::response::Result<impl IntoResponse> {
143143
let caller_identity = auth.claims.identity;
144144

145-
let args = parse_call_args(content_type, body)?;
145+
let (args, want_bsatn) = parse_call_args(content_type, body)?;
146146

147147
// HTTP callers always need a connection ID to provide to connect/disconnect,
148148
// so generate one.
@@ -195,8 +195,14 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
195195

196196
match result {
197197
Ok(CallResult::Reducer(result)) => {
198-
let (status, body) =
199-
reducer_outcome_response(&module, &owner_identity, &reducer, result.outcome, reducer_return_value)?;
198+
let (status, body) = reducer_outcome_response(
199+
&module,
200+
&owner_identity,
201+
&reducer,
202+
result.outcome,
203+
reducer_return_value,
204+
want_bsatn,
205+
)?;
200206
Ok((
201207
status,
202208
TypedHeader(SpacetimeEnergyUsed(result.energy_used)),
@@ -209,7 +215,7 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
209215
// Procedures don't assign a special meaning to error returns, unlike reducers,
210216
// as there's no transaction for them to automatically abort.
211217
// Instead, we just pass on their return value with the OK status so long as we successfully invoked the procedure.
212-
let (status, body) = procedure_outcome_response(result.return_val);
218+
let (status, body) = procedure_outcome_response(result.return_val, want_bsatn);
213219
Ok((
214220
status,
215221
TypedHeader(SpacetimeExecutionDurationMicros(result.execution_duration)),
@@ -225,13 +231,15 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
225231
///
226232
/// - `application/json` → [`FunctionArgs::Json`] (UTF-8 required).
227233
/// - `application/octet-stream` → [`FunctionArgs::Bsatn`] (raw BSATN bytes).
228-
fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::response::Result<FunctionArgs> {
234+
///
235+
/// Also returns `want_bsatn`, a bool, which is true if and only if the response should be BSATN-encoded.
236+
fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::response::Result<(FunctionArgs, bool)> {
229237
if content_type == headers::ContentType::json() {
230238
let s = bytestring::ByteString::try_from(body)
231239
.map_err(|_| (StatusCode::BAD_REQUEST, "request body is not valid UTF-8").into_response())?;
232-
Ok(FunctionArgs::Json(s))
240+
Ok((FunctionArgs::Json(s), false))
233241
} else if content_type == headers::ContentType::octet_stream() {
234-
Ok(FunctionArgs::Bsatn(body))
242+
Ok((FunctionArgs::Bsatn(body), true))
235243
} else {
236244
Err((
237245
StatusCode::BAD_REQUEST,
@@ -276,13 +284,13 @@ pub async fn prepare<S: ControlStateDelegate + NodeDelegate>(
276284
// call_identity_connected/disconnected submit jobs to the module's executor, which
277285
// will be blocked holding the 2PC write lock after prepare_reducer returns — deadlock.
278286
let result = module
279-
.prepare_reducer(caller_identity, None, &reducer, args, coordinator_identity)
287+
.prepare_reducer(caller_identity, None, &reducer, args.0, coordinator_identity)
280288
.await;
281289

282290
match result {
283291
Ok((prepare_id, rcr, return_value)) => {
284292
let (status, body) =
285-
reducer_outcome_response(&module, &owner_identity, &reducer, rcr.outcome, return_value)?;
293+
reducer_outcome_response(&module, &owner_identity, &reducer, rcr.outcome, return_value, args.1)?;
286294
let mut response = (
287295
status,
288296
TypedHeader(SpacetimeEnergyUsed(rcr.energy_used)),
@@ -398,12 +406,18 @@ pub async fn ack_commit_2pc<S: ControlStateDelegate + NodeDelegate>(
398406
Ok(StatusCode::OK)
399407
}
400408

409+
/// Encode a reducer return value as an HTTP response.
410+
///
411+
/// If the outcome is an error, return a raw string with `application/text`. Ignore `want_bsatn` in this case.
412+
/// If the outcome is successful, and `want_bsatn`, send BSATN with `application/octet-stream`.
413+
/// If the outcome is successful, and not `want_bsatn`, send JSON with `application/json`.
401414
fn reducer_outcome_response(
402415
module: &ModuleHost,
403416
owner_identity: &Identity,
404417
reducer: &str,
405418
outcome: ReducerOutcome,
406419
reducer_return_value: Option<Bytes>,
420+
want_bsatn: bool,
407421
) -> axum::response::Result<(StatusCode, axum::response::Response)> {
408422
match outcome {
409423
ReducerOutcome::Committed => {
@@ -415,20 +429,29 @@ fn reducer_outcome_response(
415429
};
416430

417431
if let Some(bytes) = reducer_return_value.filter(|value| !value.is_empty()) {
418-
let seed = sats::WithTypespace::new(module.info.module_def.typespace(), &return_value);
419-
let mut reader = &bytes[..];
420-
let value: AlgebraicValue = seed.deserialize(bsatn::Deserializer::new(&mut reader)).map_err(|err| {
421-
(
422-
StatusCode::INTERNAL_SERVER_ERROR,
423-
format!("Failed to decode reducer return value: {err}"),
424-
)
425-
})?;
426-
Ok((
427-
StatusCode::OK,
428-
axum::Json(sats::serde::SerdeWrapper(value)).into_response(),
429-
))
432+
if want_bsatn {
433+
Ok((StatusCode::OK, bytes.into_response()))
434+
} else {
435+
let seed = sats::WithTypespace::new(module.info.module_def.typespace(), &return_value);
436+
let mut reader = &bytes[..];
437+
let value: AlgebraicValue =
438+
seed.deserialize(bsatn::Deserializer::new(&mut reader)).map_err(|err| {
439+
(
440+
StatusCode::INTERNAL_SERVER_ERROR,
441+
format!("Failed to decode reducer return value: {err}"),
442+
)
443+
})?;
444+
Ok((
445+
StatusCode::OK,
446+
axum::Json(sats::serde::SerdeWrapper(value)).into_response(),
447+
))
448+
}
430449
} else {
431-
Ok((StatusCode::OK, "".into_response()))
450+
if want_bsatn {
451+
Ok((StatusCode::OK, Vec::<u8>::new().into_response()))
452+
} else {
453+
Ok((StatusCode::OK, "".into_response()))
454+
}
432455
}
433456
}
434457
ReducerOutcome::Failed(errmsg) => {
@@ -511,10 +534,20 @@ pub enum DBCallErr {
511534
InstanceNotScheduled,
512535
}
513536

514-
fn procedure_outcome_response(return_val: AlgebraicValue) -> (StatusCode, axum::response::Response) {
537+
/// Encode a procedure's result as an HTTP response.
538+
///
539+
/// If `want_bsatn`, send BSATN bytes as `application/octet-stream`.
540+
/// If not `want_bsatn`, send JSON as `application/json`.
541+
fn procedure_outcome_response(return_val: AlgebraicValue, want_bsatn: bool) -> (StatusCode, axum::response::Response) {
515542
(
516543
StatusCode::OK,
517-
axum::Json(sats::serde::SerdeWrapper(return_val)).into_response(),
544+
if want_bsatn {
545+
bsatn::to_vec(&return_val)
546+
.expect("BSATN serializing an AlgebraicValue should be infallible")
547+
.into_response()
548+
} else {
549+
axum::Json(sats::serde::SerdeWrapper(return_val)).into_response()
550+
},
518551
)
519552
}
520553

crates/core/src/config.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,12 @@ impl MetadataFile {
5656
path.write(self.to_string())
5757
}
5858

59-
fn check_compatibility(previous: &Self, current: &Self) -> anyhow::Result<()> {
59+
fn check_compatibility(previous: &Self, current: &Self, metafile: &Path) -> anyhow::Result<()> {
6060
anyhow::ensure!(
6161
previous.edition == current.edition,
62-
"metadata.toml indicates that this database is from a different \
62+
"metadata.toml at {} indicates that this database is from a different \
6363
edition of SpacetimeDB (running {:?}, but this database is {:?})",
64+
metafile.display(),
6465
current.edition,
6566
previous.edition,
6667
);
@@ -110,8 +111,8 @@ impl MetadataFile {
110111
/// `self` is the metadata file read from a database, and current is
111112
/// the default metadata file that the active database version would
112113
/// right to a new database.
113-
pub fn check_compatibility_and_update(mut self, current: Self) -> anyhow::Result<Self> {
114-
Self::check_compatibility(&self, &current)?;
114+
pub fn check_compatibility_and_update(mut self, current: Self, metafile: &Path) -> anyhow::Result<Self> {
115+
Self::check_compatibility(&self, &current, metafile)?;
115116
// bump the version in the file only if it's being run in a newer database.
116117
self.version = std::cmp::max(self.version, current.version);
117118
Ok(self)
@@ -344,67 +345,67 @@ mod tests {
344345
fn check_metadata_compatibility_checking() {
345346
assert_eq!(
346347
mkmeta(1, 0, 0)
347-
.check_compatibility_and_update(mkmeta(1, 0, 1))
348+
.check_compatibility_and_update(mkmeta(1, 0, 1), Path::new("metadata.toml"))
348349
.unwrap()
349350
.version,
350351
mkver(1, 0, 1)
351352
);
352353
assert_eq!(
353354
mkmeta(1, 0, 1)
354-
.check_compatibility_and_update(mkmeta(1, 0, 0))
355+
.check_compatibility_and_update(mkmeta(1, 0, 0), Path::new("metadata.toml"))
355356
.unwrap()
356357
.version,
357358
mkver(1, 0, 1)
358359
);
359360

360361
mkmeta(1, 1, 0)
361-
.check_compatibility_and_update(mkmeta(1, 0, 5))
362+
.check_compatibility_and_update(mkmeta(1, 0, 5), Path::new("metadata.toml"))
362363
.unwrap_err();
363364
mkmeta(2, 0, 0)
364-
.check_compatibility_and_update(mkmeta(1, 3, 5))
365+
.check_compatibility_and_update(mkmeta(1, 3, 5), Path::new("metadata.toml"))
365366
.unwrap_err();
366367
assert_eq!(
367368
mkmeta(1, 12, 0)
368-
.check_compatibility_and_update(mkmeta(2, 0, 0))
369+
.check_compatibility_and_update(mkmeta(2, 0, 0), Path::new("metadata.toml"))
369370
.unwrap()
370371
.version,
371372
mkver(2, 0, 0)
372373
);
373374
mkmeta(2, 0, 0)
374-
.check_compatibility_and_update(mkmeta(3, 0, 0))
375+
.check_compatibility_and_update(mkmeta(3, 0, 0), Path::new("metadata.toml"))
375376
.unwrap_err();
376377
}
377378

378379
#[test]
379380
fn check_metadata_compatibility_prerelease() {
380381
mkmeta(1, 9, 0)
381-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
382+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
382383
.unwrap();
383384

384385
mkmeta_pre(2, 0, 0, "rc1")
385-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
386+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
386387
.unwrap();
387388

388389
mkmeta_pre(2, 0, 0, "rc1")
389-
.check_compatibility_and_update(mkmeta(2, 0, 1))
390+
.check_compatibility_and_update(mkmeta(2, 0, 1), Path::new("metadata.toml"))
390391
.unwrap();
391392

392393
mkmeta_pre(2, 0, 0, "rc1")
393-
.check_compatibility_and_update(mkmeta(2, 0, 0))
394+
.check_compatibility_and_update(mkmeta(2, 0, 0), Path::new("metadata.toml"))
394395
.unwrap();
395396

396397
// Now check some failures..
397398

398399
mkmeta_pre(2, 0, 0, "rc1")
399-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc2"))
400+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc2"), Path::new("metadata.toml"))
400401
.unwrap_err();
401402

402403
mkmeta_pre(2, 0, 0, "rc2")
403-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
404+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
404405
.unwrap_err();
405406

406407
mkmeta(2, 0, 0)
407-
.check_compatibility_and_update(mkmeta_pre(2, 1, 0, "rc1"))
408+
.check_compatibility_and_update(mkmeta_pre(2, 1, 0, "rc1"), Path::new("metadata.toml"))
408409
.unwrap_err();
409410
}
410411

crates/core/src/db/relational_db.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1717
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1818
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
1919
};
20-
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
20+
use spacetimedb_datastore::locking_tx_datastore::{IndexScanPointOrRange, MutTxId, TxId};
2121
use spacetimedb_datastore::system_tables::{
2222
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
2323
};
@@ -55,10 +55,11 @@ use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotError, SnapshotReposit
5555
use spacetimedb_table::indexes::RowPointer;
5656
use spacetimedb_table::page_pool::PagePool;
5757
use spacetimedb_table::table::{RowRef, TableScanIter};
58+
use spacetimedb_table::table_index::IndexKey;
5859
use std::borrow::Cow;
5960
use std::io;
6061
use std::num::NonZeroUsize;
61-
use std::ops::{Bound, RangeBounds};
62+
use std::ops::RangeBounds;
6263
use std::sync::Arc;
6364
use tokio::sync::watch;
6465

@@ -1424,32 +1425,24 @@ impl RelationalDB {
14241425
Ok(self.inner.iter_by_col_range_tx(tx, table_id.into(), cols, range)?)
14251426
}
14261427

1427-
pub fn index_scan_range<'a>(
1428+
pub fn index_scan_range<'de, 'a>(
14281429
&'a self,
14291430
tx: &'a MutTx,
14301431
index_id: IndexId,
1431-
prefix: &[u8],
1432+
prefix: &'de [u8],
14321433
prefix_elems: ColId,
1433-
rstart: &[u8],
1434-
rend: &[u8],
1435-
) -> Result<
1436-
(
1437-
TableId,
1438-
Bound<AlgebraicValue>,
1439-
Bound<AlgebraicValue>,
1440-
impl Iterator<Item = RowRef<'a>> + use<'a>,
1441-
),
1442-
DBError,
1443-
> {
1434+
rstart: &'de [u8],
1435+
rend: &'de [u8],
1436+
) -> Result<(TableId, IndexScanPointOrRange<'de, 'a>), DBError> {
14441437
Ok(tx.index_scan_range(index_id, prefix, prefix_elems, rstart, rend)?)
14451438
}
14461439

1447-
pub fn index_scan_point<'a>(
1440+
pub fn index_scan_point<'a, 'p>(
14481441
&'a self,
14491442
tx: &'a MutTx,
14501443
index_id: IndexId,
1451-
point: &[u8],
1452-
) -> Result<(TableId, AlgebraicValue, impl Iterator<Item = RowRef<'a>> + use<'a>), DBError> {
1444+
point: &'p [u8],
1445+
) -> Result<(TableId, IndexKey<'p>, impl Iterator<Item = RowRef<'a>> + use<'a>), DBError> {
14531446
Ok(tx.index_scan_point(index_id, point)?)
14541447
}
14551448

0 commit comments

Comments
 (0)