Skip to content

Commit b094fae

Browse files
committed
Merge branch 'jdetter/tpcc' of github.com:clockworklabs/SpacetimeDB into jdetter/tpcc
2 parents fcee374 + 2600b2d commit b094fae

47 files changed

Lines changed: 5946 additions & 9 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ members = [
5252
"modules/sdk-test-view-pk",
5353
"modules/sdk-test-event-table",
5454
"modules/tpcc",
55+
"modules/tpcc-metrics",
5556
"sdks/rust/tests/test-client",
5657
"sdks/rust/tests/test-counter",
5758
"sdks/rust/tests/connect_disconnect_client",

crates/core/src/startup.rs

Lines changed: 168 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,17 @@ pub fn pin_threads_with_reservations(reservations: CoreReservations) -> Cores {
179179
Cores::get(reservations).unwrap_or_default()
180180
}
181181

182+
/// Like [`pin_threads`], but reserves exactly `database_threads` cores for databases.
183+
///
184+
/// Database executors are pinned one-per-core. On Linux, Tokio workers, Tokio
185+
/// blocking threads, and Rayon threads are constrained to the remaining cores
186+
/// via a shared affinity mask so the OS can schedule them freely within that
187+
/// set. No separate IRQ or extra reserved cores are carved out in this mode.
188+
#[must_use]
189+
pub fn pin_threads_with_dedicated_database_cores(database_threads: usize) -> Cores {
190+
Cores::get_with_dedicated_database_cores(database_threads).unwrap_or_default()
191+
}
192+
182193
/// The desired distribution of available cores to purposes.
183194
///
184195
/// Note that, in addition to `reserved`, [`Cores`] reserves two additional
@@ -288,7 +299,11 @@ impl Cores {
288299

289300
let databases = DatabaseCores(databases);
290301
let reserved = (!reserved.is_empty()).then(|| reserved.into());
291-
let rayon = RayonCores((!rayon.is_empty()).then_some(rayon));
302+
let rayon = RayonCores {
303+
dedicated: (!rayon.is_empty()).then_some(rayon),
304+
#[cfg(target_os = "linux")]
305+
shared: None,
306+
};
292307

293308
// see comment on `TokioCores.blocking`
294309
#[cfg(target_os = "linux")]
@@ -302,6 +317,8 @@ impl Cores {
302317
let tokio = TokioCores {
303318
workers: Some(tokio_workers),
304319
#[cfg(target_os = "linux")]
320+
workers_shared: None,
321+
#[cfg(target_os = "linux")]
305322
blocking: remaining,
306323
};
307324

@@ -315,11 +332,74 @@ impl Cores {
315332
})
316333
}
317334

335+
fn get_with_dedicated_database_cores(database_threads: usize) -> Option<Self> {
336+
let cores = Self::get_core_ids()?;
337+
Self::from_core_ids_with_dedicated_database_cores(cores, database_threads)
338+
}
339+
340+
fn from_core_ids_with_dedicated_database_cores(mut cores: Vec<CoreId>, database_threads: usize) -> Option<Self> {
341+
if database_threads == 0 || database_threads >= cores.len() {
342+
return None;
343+
}
344+
345+
let databases = cores.drain(..database_threads).collect_vec();
346+
let shared = cores;
347+
348+
let databases = DatabaseCores(databases);
349+
#[cfg(target_os = "linux")]
350+
let shared_cpuset = cores_to_cpuset(&shared)?;
351+
352+
#[cfg(target_os = "linux")]
353+
let tokio = TokioCores {
354+
workers: None,
355+
workers_shared: Some((shared.len(), shared_cpuset)),
356+
blocking: Some(shared_cpuset),
357+
};
358+
359+
#[cfg(not(target_os = "linux"))]
360+
let tokio = TokioCores {
361+
workers: (!shared.is_empty()).then_some(shared.clone()),
362+
};
363+
364+
let rayon = RayonCores {
365+
#[cfg(not(target_os = "linux"))]
366+
dedicated: (!shared.is_empty()).then_some(shared.clone()),
367+
#[cfg(target_os = "linux")]
368+
dedicated: None,
369+
#[cfg(target_os = "linux")]
370+
shared: Some((shared.len(), shared_cpuset)),
371+
};
372+
373+
Some(Self {
374+
databases,
375+
tokio,
376+
rayon,
377+
reserved: None,
378+
#[cfg(target_os = "linux")]
379+
blocking: Some(shared_cpuset),
380+
})
381+
}
382+
383+
#[cfg(target_os = "linux")]
384+
fn get_core_ids() -> Option<Vec<CoreId>> {
385+
if cfg!(feature = "no-core-pinning") {
386+
return None;
387+
}
388+
389+
let cores = core_affinity::get_core_ids()
390+
.filter(|cores| cores.len() >= 10)?
391+
.into_iter()
392+
.collect_vec();
393+
394+
(!cores.is_empty()).then_some(cores)
395+
}
396+
318397
/// Get the cores of the local host, as reported by the operating system.
319398
///
320399
/// Returns `None` if `num_cpus` is less than 8
321400
/// or if core pinning is disabled.
322401
/// If `Some` is returned, the `Vec` is non-empty.
402+
#[cfg(not(target_os = "linux"))]
323403
pub fn get_core_ids() -> Option<Vec<CoreId>> {
324404
if cfg!(feature = "no-core-pinning") {
325405
return None;
@@ -334,9 +414,19 @@ impl Cores {
334414
}
335415
}
336416

417+
#[cfg(target_os = "linux")]
418+
fn cores_to_cpuset(cores: &[CoreId]) -> Option<nix::sched::CpuSet> {
419+
cores.iter().copied().try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| {
420+
cpuset.set(core.id).ok()?;
421+
Some(cpuset)
422+
})
423+
}
424+
337425
#[derive(Default)]
338426
pub struct TokioCores {
339427
pub workers: Option<Vec<CoreId>>,
428+
#[cfg(target_os = "linux")]
429+
pub workers_shared: Option<(usize, nix::sched::CpuSet)>,
340430
// For blocking threads, we don't want to limit them to a specific number
341431
// and pin them to their own cores - they're supposed to run concurrently
342432
// with each other. However, `core_affinity` doesn't support affinity masks,
@@ -371,25 +461,62 @@ impl TokioCores {
371461
}
372462
}
373463
});
464+
} else {
465+
#[cfg(target_os = "linux")]
466+
if let Some((threads, cpuset)) = self.workers_shared {
467+
builder.worker_threads(threads);
468+
builder.on_thread_start(move || {
469+
let this = nix::unistd::Pid::from_raw(0);
470+
let _ = nix::sched::sched_setaffinity(this, &cpuset);
471+
});
472+
}
374473
}
375474
}
376475
}
377476

378477
#[derive(Default)]
379-
pub struct RayonCores(Option<Vec<CoreId>>);
478+
pub struct RayonCores {
479+
dedicated: Option<Vec<CoreId>>,
480+
#[cfg(target_os = "linux")]
481+
shared: Option<(usize, nix::sched::CpuSet)>,
482+
}
380483

381484
impl RayonCores {
382485
/// Configures a global rayon threadpool, pinning its threads to specific cores.
383486
///
384487
/// All rayon threads will be run with `tokio_handle` enetered into.
385488
pub fn configure(self, tokio_handle: &tokio::runtime::Handle) {
489+
let dedicated = self.dedicated;
490+
#[cfg(target_os = "linux")]
491+
let shared = self.shared;
492+
493+
let num_threads = dedicated.as_ref().map_or_else(
494+
|| {
495+
#[cfg(target_os = "linux")]
496+
{
497+
shared.as_ref().map_or(0, |(threads, _)| *threads)
498+
}
499+
#[cfg(not(target_os = "linux"))]
500+
{
501+
0
502+
}
503+
},
504+
|cores| cores.len(),
505+
);
506+
386507
rayon_core::ThreadPoolBuilder::new()
387508
.thread_name(|_idx| "rayon-worker".to_string())
388509
.spawn_handler(thread_spawn_handler(tokio_handle))
389-
.num_threads(self.0.as_ref().map_or(0, |cores| cores.len()))
510+
.num_threads(num_threads)
390511
.start_handler(move |i| {
391-
if let Some(cores) = &self.0 {
512+
if let Some(cores) = &dedicated {
392513
core_affinity::set_for_current(cores[i]);
514+
} else {
515+
#[cfg(target_os = "linux")]
516+
if let Some((_, cpuset)) = &shared {
517+
let this = nix::unistd::Pid::from_raw(0);
518+
let _ = nix::sched::sched_setaffinity(this, cpuset);
519+
}
393520
}
394521
})
395522
.build_global()
@@ -445,3 +572,40 @@ impl DatabaseCores {
445572
JobCores::from_pinned_cores(self.0)
446573
}
447574
}
575+
576+
#[cfg(test)]
577+
mod tests {
578+
use super::*;
579+
580+
#[test]
581+
fn dedicated_database_core_mode_splits_database_and_shared_cores() {
582+
let cores = (0..32).map(|id| CoreId { id }).collect_vec();
583+
let split = Cores::from_core_ids_with_dedicated_database_cores(cores, 12).unwrap();
584+
585+
assert_eq!(split.databases.0.len(), 12);
586+
assert_eq!(split.databases.0[0].id, 0);
587+
assert_eq!(split.databases.0[11].id, 11);
588+
#[cfg(target_os = "linux")]
589+
{
590+
assert!(split.tokio.workers.is_none());
591+
assert_eq!(split.tokio.workers_shared.as_ref().unwrap().0, 20);
592+
assert!(split.rayon.dedicated.is_none());
593+
assert_eq!(split.rayon.shared.as_ref().unwrap().0, 20);
594+
}
595+
#[cfg(not(target_os = "linux"))]
596+
{
597+
assert_eq!(split.tokio.workers.as_ref().unwrap().len(), 20);
598+
assert_eq!(split.tokio.workers.as_ref().unwrap()[0].id, 12);
599+
assert_eq!(split.tokio.workers.as_ref().unwrap()[19].id, 31);
600+
assert_eq!(split.rayon.dedicated.as_ref().unwrap().len(), 20);
601+
}
602+
}
603+
604+
#[test]
605+
fn dedicated_database_core_mode_requires_one_non_database_core() {
606+
let cores = (0..12).map(|id| CoreId { id }).collect_vec();
607+
assert!(Cores::from_core_ids_with_dedicated_database_cores(cores.clone(), 0).is_none());
608+
assert!(Cores::from_core_ids_with_dedicated_database_cores(cores.clone(), 12).is_none());
609+
assert!(Cores::from_core_ids_with_dedicated_database_cores(cores, 13).is_none());
610+
}
611+
}

crates/standalone/src/main.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use clap::Command;
1+
use anyhow::Context as _;
2+
use clap::{ArgMatches, Command};
23

34
use spacetimedb::startup;
45
use spacetimedb::util::jobs::JobCores;
@@ -8,8 +9,7 @@ use spacetimedb_standalone::*;
89
use std::panic;
910
use std::process;
1011

11-
async fn async_main(db_cores: JobCores) -> anyhow::Result<()> {
12-
let matches = get_command().get_matches();
12+
async fn async_main(matches: ArgMatches, db_cores: JobCores) -> anyhow::Result<()> {
1313
let (cmd, subcommand_args) = matches.subcommand().unwrap();
1414
exec_subcommand(cmd, subcommand_args, db_cores).await?;
1515
Ok(())
@@ -68,7 +68,27 @@ fn main() -> anyhow::Result<()> {
6868
process::exit(1);
6969
}));
7070

71-
let cores = startup::pin_threads();
71+
let matches = get_command().get_matches();
72+
let cores = match matches.subcommand() {
73+
Some(("start", args)) => {
74+
if let Some(&database_cores) = args.get_one::<usize>("dedicated_database_cores") {
75+
let available = startup::Cores::get_core_ids().context(
76+
"dedicated database core pinning requires core pinning support and at least 10 visible CPUs",
77+
)?;
78+
if database_cores >= available.len() {
79+
anyhow::bail!(
80+
"--dedicated-database-cores={} requires at least one remaining non-database core, but only {} pinnable cores are available",
81+
database_cores,
82+
available.len()
83+
);
84+
}
85+
startup::pin_threads_with_dedicated_database_cores(database_cores)
86+
} else {
87+
startup::pin_threads()
88+
}
89+
}
90+
_ => startup::pin_threads(),
91+
};
7292

7393
// Create a multi-threaded run loop
7494
let mut builder = Builder::new_multi_thread();
@@ -81,7 +101,7 @@ fn main() -> anyhow::Result<()> {
81101
// Keep a handle on the `database_cores` alive outside of `async_main`
82102
// and explicitly drop it to avoid dropping it from an `async` context -
83103
// Tokio gets angry when you drop a runtime within another runtime.
84-
let res = rt.block_on(async_main(database_cores.clone()));
104+
let res = rt.block_on(async_main(matches, database_cores.clone()));
85105
drop(database_cores);
86106

87107
res

crates/standalone/src/subcommands/start.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,15 @@ pub fn cli() -> clap::Command {
9191
.action(SetTrue)
9292
.help("Run in non-interactive mode (fail immediately if port is in use)"),
9393
)
94+
.arg(
95+
Arg::new("dedicated_database_cores")
96+
.long("dedicated-database-cores")
97+
.default_value("13")
98+
.value_parser(clap::value_parser!(usize))
99+
.help(
100+
"Pin exactly this many database executors to dedicated cores. On Linux, Tokio, Rayon, and blocking threads are restricted to the remaining cores while the OS schedules them within that set.",
101+
),
102+
)
94103
// .after_help("Run `spacetime help start` for more detailed information.")
95104
}
96105

modules/tpcc-metrics/Cargo.toml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[package]
2+
name = "tpcc-metrics"
3+
version.workspace = true
4+
edition.workspace = true
5+
rust-version.workspace = true
6+
7+
[lib]
8+
crate-type = ["cdylib"]
9+
10+
[dependencies]
11+
anyhow.workspace = true
12+
log.workspace = true
13+
spacetimedb = { workspace = true, features = ["unstable"] }
14+
spacetimedb-sats = { workspace = true, features = ["serde"] }
15+
http.workspace = true
16+
serde_json.workspace = true
17+
18+
[lints]
19+
workspace = true

0 commit comments

Comments
 (0)