From 4a00ddca051a099ef847b01484d53656f5266070 Mon Sep 17 00:00:00 2001 From: Matthew A Johnson Date: Mon, 4 May 2026 21:59:47 +0100 Subject: [PATCH] Minor CI fixes and physical-CPU-aware worker sizing Packaging - Ship examples/*.txt in the wheel (move out of examples/assets/, fix the package_data glob, update sketches.py loader paths). Windows / x86 - Add a windows-x86 (CPython 3.12) job to pr_gate.yml. - Fix MSVC volatile warnings in compat.h. - Provide working 64-bit atomics on 32-bit x86: emulate load/store/exchange/fetch_add via _InterlockedCompareExchange64 CAS loops where the Interlocked*64 intrinsics are unavailable. - sched.c: cast atomic args through (void *) to silence MSVC cast-qual warnings. Physical-CPU-aware worker sizing - New boc_physical_cpu_count() with Linux (sysfs + sched_getaffinity), macOS (sysctl hw.physicalcpu_max) and Windows (GetLogicalProcessorInformationEx) backends; exposed as bocpy._core.physical_cpu_count(). - WORKER_COUNT now defaults to physical_cpu_count() - 1, with a BOCPY_WORKERS env override and graceful fallback to sched_getaffinity / multiprocessing. Avoids HT/SMT oversubscription that hurts CPU-bound throughput. Benchmark - Use physical core count for default sweep values, the oversubscription warning and run metadata. - derive_sizes() biases toward more independent rings rather than more chains-per-ring so the chain-ring workload stops starving at higher worker counts. README - New "Scaling with cores" section with a Mermaid speedup chart (~7.5x at 8 workers on the chain-ring benchmark) and the exact reproduction command. Boids example - Add --scale for the rendered triangle size, show FPS alongside behavior/s, and print an average behavior/s summary on a successful recorded run. Misc - Drop the always-skipped TestStealSpuriousFailureStress placeholder in test_scheduler_steal.py. Signed-off-by: Matthew A Johnson --- .github/workflows/pr_gate.yml | 25 ++++ README.md | 40 ++++++ examples/benchmark.py | 45 +++++-- examples/boids.py | 49 ++++++- examples/{assets => }/cheese.txt | 0 examples/{assets => }/menu.txt | 0 examples/sketches.py | 4 +- pyproject.toml | 2 +- src/bocpy/_core.c | 43 ++++++- src/bocpy/_core.pyi | 31 +++++ src/bocpy/behaviors.py | 49 ++++++- src/bocpy/compat.c | 213 ++++++++++++++++++++++++++++++- src/bocpy/compat.h | 88 ++++++++++++- src/bocpy/sched.c | 13 +- test/test_scheduler_steal.py | 29 ----- 15 files changed, 567 insertions(+), 64 deletions(-) rename examples/{assets => }/cheese.txt (100%) rename examples/{assets => }/menu.txt (100%) diff --git a/.github/workflows/pr_gate.yml b/.github/workflows/pr_gate.yml index aa54214..f392083 100644 --- a/.github/workflows/pr_gate.yml +++ b/.github/workflows/pr_gate.yml @@ -96,6 +96,31 @@ jobs: - name: Python test run: pytest -vv -s + windows-x86: + runs-on: windows-latest + strategy: + matrix: + python_version: ["3.12"] + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Use Python ${{matrix.python_version}} (x86) + uses: actions/setup-python@v6 + with: + python-version: ${{matrix.python_version}} + architecture: x86 + + - name: Get dependencies + run: | + python -m pip install --upgrade pip + + - name: Python build + run: pip install -e .[test] --verbose + + - name: Python test + run: pytest -vv -s + macos-arm64: runs-on: macos-latest strategy: diff --git a/README.md b/README.md index e94c8c3..a9efb44 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,46 @@ the free-threaded build and the relevant CPython APIs stabilise. [pep703]: https://peps.python.org/pep-0703/ [issue5]: https://github.com/microsoft/bocpy/issues/5 +### Scaling with cores + +The chart below shows BOC runtime throughput as the worker count grows from +1 to 8, plotted as **speedup relative to a single worker**. Numbers come +from [examples/benchmark.py](examples/benchmark.py) — a chain-ring workload +that exercises the scheduler, two-phase locking, sub-interpreter crossings +and the message queue together — run on CPython 3.14 (mean of 3 repeats, +8 s each). + + +```mermaid +--- +config: + xyChart: + width: 700 + height: 360 +--- +xychart-beta + title "bocpy speedup vs. worker count (chain-ring benchmark, CPython 3.14)" + x-axis "Workers" [1, 2, 3, 4, 5, 6, 7, 8] + y-axis "Speedup vs. 1 worker" 0 --> 9 + bar [1.00, 1.97, 2.94, 3.90, 4.87, 5.82, 6.75, 7.54] + line [1, 2, 3, 4, 5, 6, 7, 8] +``` + + +The line is the ideal `y = x` reference; the bars are measured speedup. Up +to 8 workers, BOC delivers roughly linear scaling on this microbenchmark +(≈7.5× at 8 workers). Real applications carry serial costs that this +benchmark deliberately strips out — see the docstring at the top of +[examples/benchmark.py](examples/benchmark.py) for the load-bearing +caveats. To reproduce: + +```bash +python examples/benchmark.py \ + --sweep-axis workers --sweep-values 1,2,3,4,5,6,7,8 \ + --duration 8 --warmup 2 --repeats 3 \ + --output scaling.json +``` + A behavior can be thought of as a function which depends on zero or more concurrently-owned data objects (which we call **cowns**). As a programmer, you indicate that you want the function to be called once all of those resources are diff --git a/examples/benchmark.py b/examples/benchmark.py index a227ea2..e0e9710 100644 --- a/examples/benchmark.py +++ b/examples/benchmark.py @@ -34,6 +34,7 @@ from bocpy import (Cown, Matrix, noticeboard, notice_write, receive, send, start, wait, when) +from bocpy import _core # Sentinels for the parent/child JSON protocol. Uppercase so the # transpiler keeps them as module-level constants in the worker export. @@ -42,6 +43,21 @@ SCHEMA_VERSION = 1 +def _physical_cpu_count() -> int: + """Return physical core count, falling back to logical or 1. + + Used as the oversubscription threshold so warnings fire when the + requested worker count starts using SMT siblings rather than + waiting until logical cores are exhausted. + + :return: A positive integer. + """ + n = _core.physical_cpu_count() + if n > 0: + return n + return os.cpu_count() or 1 + + # --------------------------------------------------------------------------- # Behavior code (chain workload) # --------------------------------------------------------------------------- @@ -203,11 +219,22 @@ def derive_sizes(cfg: BenchConfig) -> BenchConfig: :return: The same config with ``rings`` / ``chains_per_ring`` set. """ if cfg.chains_per_ring is None: + # Use a small per-ring chain count (4) so chains never collide + # on adjacent slots as they advance. Independent rings carry + # the load instead. cfg.chains_per_ring = max( - 1, cfg.ring_size // (cfg.group_size * cfg.stride * 2)) + 1, cfg.ring_size // (cfg.group_size * cfg.stride * 8)) if cfg.rings is None: - cfg.rings = max(cfg.workers * 4 // cfg.chains_per_ring, - cfg.workers * 2) + # Bias toward more *rings* rather than more chains-per-ring: + # chains on the same ring contend for adjacent slots as they + # advance, so per-ring concurrency is bounded well below + # ``chains_per_ring``. Independent rings, by contrast, never + # collide. Provision at least ``workers * 4`` rings so every + # worker sees a deep, independent supply of ready chains and + # the measured throughput reflects scheduler scaling rather + # than workload starvation. + cfg.rings = max(cfg.workers * 16 // cfg.chains_per_ring, + cfg.workers * 4) return cfg @@ -244,8 +271,9 @@ def emit_soft_warnings(cfg: BenchConfig, cpu_count: int) -> None: print(f"warning: duration={cfg.duration}s is short; results will " "be noisy", file=sys.stderr) if cfg.workers > cpu_count: - print(f"warning: workers={cfg.workers} exceeds cpu_count=" - f"{cpu_count}; oversubscribed", file=sys.stderr) + print(f"warning: workers={cfg.workers} exceeds physical core " + f"count={cpu_count}; oversubscribed (SMT siblings or " + f"hyperthreads will be used)", file=sys.stderr) # --------------------------------------------------------------------------- @@ -862,6 +890,7 @@ def collect_metadata(argv: list, git_sha: Optional[str]) -> dict: "hostname": socket.gethostname(), "platform": sys.platform, "cpu_count": os.cpu_count() or 0, + "physical_cpu_count": _physical_cpu_count(), "python_version": sys.version.split()[0], "python_implementation": sys.implementation.name, "free_threaded": free_threaded, @@ -1054,7 +1083,7 @@ def _default_sweep_values(axis: str) -> list: :param axis: The sweep axis name. :return: A list of default values. """ - cpu = os.cpu_count() or 1 + cpu = _physical_cpu_count() if axis == "workers": return sorted(set([1, 2, 4, 8, min(16, cpu)])) if axis == "iters": @@ -1158,7 +1187,7 @@ def child_main(args) -> int: if err is not None: print(f"benchmark: invalid config: {err}", file=sys.stderr) return 2 - emit_soft_warnings(cfg, os.cpu_count() or 1) + emit_soft_warnings(cfg, _physical_cpu_count()) rep = run_single_point_body(cfg, repeat_index=0) payload = { "inputs": asdict(cfg), @@ -1197,7 +1226,7 @@ def parent_main(args) -> int: return 2 # Pre-spawn validation across every sweep point. - cpu = os.cpu_count() or 1 + cpu = _physical_cpu_count() derived_points = [] for value in sweep_values: cfg = cfg_for_axis(base, args.sweep_axis, value) diff --git a/examples/boids.py b/examples/boids.py index 930ee1f..7a0f44e 100644 --- a/examples/boids.py +++ b/examples/boids.py @@ -352,7 +352,7 @@ class Boids(pyglet.window.Window): """Pyglet window that renders a boids simulation.""" def __init__(self, width: int, height: int, num_boids: int, - show_overlay: bool = True): + show_overlay: bool = True, scale: float = 1.0): """Initialize the window and create boids. :param width: Window width in pixels. @@ -360,6 +360,8 @@ def __init__(self, width: int, height: int, num_boids: int, :param num_boids: The number of boids to simulate. :param show_overlay: Whether to render the boid count and behavior-rate overlay in the bottom-left corner. + :param scale: Multiplier applied to the drawn boid triangle + size. ``1.0`` keeps the original 20x14 pixel triangle. """ pyglet.window.Window.__init__(self, width, height, "Boids") pyglet.gl.glClearColor(1, 1, 1, 1) @@ -367,7 +369,11 @@ def __init__(self, width: int, height: int, num_boids: int, self.elapsed = 0 self.simulation = Simulation(num_boids, width, height) self.num_behaviors = 0 + self.num_frames = 0 + self.total_behaviors = 0 + self.total_elapsed = 0.0 self.samples = deque() + self.fps_samples = deque() self.show_overlay = show_overlay if show_overlay: @@ -380,13 +386,24 @@ def __init__(self, width: int, height: int, num_boids: int, "behavior/s: ", font_size=24, x=5, y=50, color=(100, 100, 100, 255)) + + self.fps_label = pyglet.text.Label( + "fps: ", + font_size=24, x=5, y=95, + color=(100, 100, 100, 255)) else: self.num_boids_label = None self.behaviors_label = None + self.fps_label = None self.triangles: pyglet.shapes.Triangle = [] + tip_x = 0.0 + base_x = -20.0 * scale + base_y = 7.0 * scale for _ in range(num_boids): - tri = pyglet.shapes.Triangle(0, 0, -20, +7, -20, -7, + tri = pyglet.shapes.Triangle(tip_x, 0, + base_x, +base_y, + base_x, -base_y, color=(55, 255, 255, 255), batch=self.batch) tri.anchor_position = 0, 0 @@ -399,6 +416,7 @@ def on_draw(self): if self.show_overlay: self.num_boids_label.draw() self.behaviors_label.draw() + self.fps_label.draw() def on_close(self): wait() @@ -412,17 +430,26 @@ def update(self, delta_time: float): self.elapsed += delta_time self.simulation.step(self.width, self.height) self.num_behaviors += self.simulation.num_behaviors + self.num_frames += 1 + self.total_behaviors += self.simulation.num_behaviors + self.total_elapsed += delta_time if self.elapsed > 1: self.samples.append(self.num_behaviors / self.elapsed) + self.fps_samples.append(self.num_frames / self.elapsed) self.num_behaviors = 0 + self.num_frames = 0 self.elapsed = 0 if len(self.samples) > 10: self.samples.popleft() + if len(self.fps_samples) > 10: + self.fps_samples.popleft() if len(self.samples) > 3 and self.behaviors_label is not None: behavior_rate = sum(self.samples) / len(self.samples) self.behaviors_label.text = f"behavior/s: {behavior_rate:.0f}" + fps = sum(self.fps_samples) / len(self.fps_samples) + self.fps_label.text = f"fps: {fps:.1f}" positions = self.simulation.positions velocities = self.simulation.velocities @@ -459,6 +486,9 @@ def update(self, delta_time: float): parser.add_argument("--workers", type=int, default=None, help="Number of BOC worker sub-interpreters. " "Defaults to bocpy's default (CPU count - 1).") + parser.add_argument("--scale", type=float, default=1.0, + help="Multiplier applied to the drawn boid " + "triangle size (default: 1.0).") args = parser.parse_args() # Validate at the boundary; downstream code (Matrix sizing, hash modulo, @@ -473,6 +503,8 @@ def update(self, delta_time: float): parser.error("--fps must be positive") if args.workers is not None and args.workers <= 0: parser.error("--workers must be positive") + if args.scale <= 0: + parser.error("--scale must be positive") # Start the BOC runtime explicitly so --workers takes effect for every # mode. @@ -486,7 +518,7 @@ def update(self, delta_time: float): # The overlay (boid count / behavior rate) is suppressed in video # mode so the rendered output stays clean. boids = Boids(args.width, args.height, args.boids, - show_overlay=False) + show_overlay=False, scale=args.scale) # Allow graceful close: override on_close to set a flag and return # True so pyglet does not destroy the window mid-frame. The loop @@ -630,11 +662,20 @@ def _on_close(): print(ff_stderr.decode("utf-8", errors="replace"), end="") return + # Successful render: report the average behavior rate over the + # entire run, computed from cumulative counters (not the rolling + # 1s window used for the on-screen overlay). + if boids.total_elapsed > 0: + avg_rate = boids.total_behaviors / boids.total_elapsed + print(f"behavior/s (avg over {boids.total_elapsed:.1f}s of " + f"simulated time): {avg_rate:.0f} " + f"({boids.total_behaviors} behaviors)") + print(f"Wrote {args.output} ({frames_written} frames)" f"{' (interrupted)' if boids.bocpy_video_closing else ''}") return - boids = Boids(args.width, args.height, args.boids) + boids = Boids(args.width, args.height, args.boids, scale=args.scale) pyglet.clock.schedule_interval(boids.update, 1 / args.fps) pyglet.app.run() diff --git a/examples/assets/cheese.txt b/examples/cheese.txt similarity index 100% rename from examples/assets/cheese.txt rename to examples/cheese.txt diff --git a/examples/assets/menu.txt b/examples/menu.txt similarity index 100% rename from examples/assets/menu.txt rename to examples/menu.txt diff --git a/examples/sketches.py b/examples/sketches.py index 12e7f60..4a8f4e0 100644 --- a/examples/sketches.py +++ b/examples/sketches.py @@ -10,7 +10,7 @@ def all_known_cheeses() -> list[str]: """Load the cheese inventory from disk.""" - path = os.path.join(os.path.dirname(__file__), "assets", "cheese.txt") + path = os.path.join(os.path.dirname(__file__), "cheese.txt") with open(path) as file: return [line.strip() for line in file] @@ -26,7 +26,7 @@ def is_available(logger, name: str) -> bool: def menu() -> list[str]: """Load the menu and shuffle it.""" - path = os.path.join(os.path.dirname(__file__), "assets", "menu.txt") + path = os.path.join(os.path.dirname(__file__), "menu.txt") with open(path) as file: menu = [line.strip() for line in file] random.shuffle(menu) diff --git a/pyproject.toml b/pyproject.toml index ae9673c..d4d1584 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,4 +53,4 @@ packages = ["bocpy", "bocpy.examples"] "bocpy.examples" = "examples" [tool.setuptools.package-data] -"bocpy.examples" = ["assets/*.txt"] +"bocpy.examples" = ["*.txt"] diff --git a/src/bocpy/_core.c b/src/bocpy/_core.c index 089cc6c..97019f5 100644 --- a/src/bocpy/_core.c +++ b/src/bocpy/_core.c @@ -2192,7 +2192,7 @@ static BOCQueue *get_queue_for_tag(PyObject *tag) { // to win the slot against a racing peer; on CAS loss we tag- // release and fall through to the discovery branch below // exactly as the prior code did. - int_least64_t observed = atomic_load_intptr(&qptr->state); + int_least64_t observed = atomic_load(&qptr->state); if (observed == BOC_QUEUE_UNASSIGNED) { // Allocate the tag *before* the CAS so that an allocation failure // (UTF-8 error / OOM in tag_from_PyUnicode) leaves the slot in @@ -3567,6 +3567,15 @@ static PyObject *BehaviorCapsule_schedule(PyObject *op, BehaviorCapsuleObject *capsule = (BehaviorCapsuleObject *)op; BOCBehavior *behavior = capsule->behavior; + // Drain the caller's recycle queue opportunistically. The main + // interpreter ordinarily drains via its own receive() loop; a worker + // that calls @when from inside a behavior body (i.e. is the caller + // here) would otherwise have to wait until it returns to + // _core_scheduler_worker_pop before reclaiming any xidata pushed onto + // its queue by other interpreters. Non-blocking; the recycle queue is + // single-consumer (this interpreter), so the drain is safe. + BOCRecycleQueue_empty(BOC_STATE->recycle_queue, false); + // Build the request array if it has not already been built (e.g. by an // external caller having invoked create_requests first). create_requests // is idempotent only via its own guard; here we just skip if populated. @@ -4171,6 +4180,18 @@ static PyObject *_core_recycle(PyObject *module, PyObject *Py_UNUSED(dummy)) { Py_RETURN_NONE; } +/// @brief Return the best-effort physical CPU count for the process. +/// @details Thin wrapper around @ref boc_physical_cpu_count. Returns 0 +/// if the platform-specific detection failed; the Python caller falls +/// back to the logical CPU count in that case. +/// @param module The _core module (unused) +/// @param Py_UNUSED unused arg +/// @return PyLong of the physical core count, or 0 on detection failure. +static PyObject *_core_physical_cpu_count(PyObject *Py_UNUSED(module), + PyObject *Py_UNUSED(dummy)) { + return PyLong_FromLong((long)boc_physical_cpu_count()); +} + /// @brief Returns any cowns for which this module has a weak reference. /// @details These are cowns which were sent by this module to another module /// and have not yet been recycled. As such, those cowns contain XIData which @@ -4502,8 +4523,7 @@ static PyObject *_core_queue_stats(PyObject *Py_UNUSED(module), if (state != BOC_QUEUE_ASSIGNED) { continue; } - BOCTag *tag = - (BOCTag *)atomic_load_explicit(&qptr->tag, memory_order_relaxed); + BOCTag *tag = (BOCTag *)atomic_load_intptr(&qptr->tag); PyObject *tag_obj; if (tag != NULL && tag->str != NULL) { tag_obj = PyUnicode_FromString(tag->str); @@ -4747,6 +4767,18 @@ static PyObject *_core_scheduler_worker_pop(PyObject *Py_UNUSED(module), // Verona closure at `core.h:28-32`. BOCBehavior *behavior; for (;;) { + // Drain this worker interpreter's recycle queue. Cross-interpreter + // cown_acquire pushes the previous owner's xidata onto THAT owner's + // queue; only the owning interpreter is allowed to consume it (the + // recycle queue is single-consumer). Without this drain the worker + // never reclaims xidata that other workers/the main interpreter + // pushed onto its queue, and the corresponding BOCCown weak refs + // (taken by BOCRecycleQueue_register on every cown_release) are + // never released -- a steady leak of one BOCCown per cross-worker + // hop. The legacy receive("boc_behavior") loop drained on every + // spin (see receive_single_tag); the distributed-scheduler worker + // bypasses receive entirely, so the drain has to live here. + BOCRecycleQueue_empty(BOC_STATE->recycle_queue, false); boc_bq_node_t *n = boc_sched_worker_pop_fast(self); if (n == NULL) { n = boc_sched_worker_pop_slow(self); @@ -4871,6 +4903,11 @@ static PyMethodDef _core_module_methods[] = { {"is_primary", _core_is_primary, METH_NOARGS, NULL}, {"index", _core_index, METH_NOARGS, NULL}, {"recycle", _core_recycle, METH_NOARGS, NULL}, + {"physical_cpu_count", _core_physical_cpu_count, METH_NOARGS, + "physical_cpu_count($module, /)\n--\n\n" + "Best-effort count of physical CPU cores available to this process. " + "Returns 0 if detection failed; callers should fall back to the logical " + "CPU count in that case."}, {"cowns", _core_cowns, METH_NOARGS, NULL}, {"set_tags", _core_set_tags, METH_VARARGS, "set_tags($module, tags, /)\n--\n\nAssigns tags to message queues."}, diff --git a/src/bocpy/_core.pyi b/src/bocpy/_core.pyi index 7c39e6b..209f822 100644 --- a/src/bocpy/_core.pyi +++ b/src/bocpy/_core.pyi @@ -64,3 +64,34 @@ def queue_stats() -> list[dict[str, Any]]: :return: A list of per-queue stats dicts. :rtype: list[dict[str, Any]] """ + + +def physical_cpu_count() -> int: + """Return the best-effort count of physical CPU cores available to + this process. + + Unlike ``os.cpu_count()`` and ``len(os.sched_getaffinity(0))``, + excludes hyperthread / SMT siblings so it reflects the count of + independent execution units. Used to size the default worker + pool (see :data:`bocpy.WORKER_COUNT`): oversubscribing CPU-bound + Python workloads on hyperthread siblings often *reduces* + throughput because two siblings on the same physical core fight + for the same execution resources. + + Per-platform sourcing: + + * **Linux**: walks ``/sys/devices/system/cpu/cpu*/topology/thread_siblings_list`` + and intersects with ``sched_getaffinity(0)`` so cgroup / + container CPU restrictions are honoured. + * **macOS**: ``sysctlbyname("hw.physicalcpu_max", ...)`` with + ``"hw.physicalcpu"`` as fallback. + * **Windows**: ``GetLogicalProcessorInformationEx`` with + ``RelationProcessorCore``. + + Returns ``0`` on any detection failure (sysfs unreadable, + sysctl / API failure, etc.); callers should fall back to the + logical CPU count in that case. + + :return: Physical core count, or 0 on failure. + :rtype: int + """ diff --git a/src/bocpy/behaviors.py b/src/bocpy/behaviors.py index 16d91a6..aa902e4 100644 --- a/src/bocpy/behaviors.py +++ b/src/bocpy/behaviors.py @@ -31,12 +31,49 @@ BEHAVIORS = None -WORKER_COUNT: int = 1 -try: - WORKER_COUNT = len(os.sched_getaffinity(0)) - 1 -except AttributeError: - from multiprocessing import cpu_count - WORKER_COUNT = cpu_count() - 1 + + +def _default_worker_count() -> int: + """Pick a sensible default worker count for this process. + + Resolution order: + + 1. ``BOCPY_WORKERS`` environment variable (must parse as a positive + integer; ignored otherwise). + 2. ``_core.physical_cpu_count() - 1`` -- one worker per physical + core, leaving one for the main interpreter. Avoids HT + oversubscription, which on CPU-bound Python workloads commonly + *reduces* throughput because hyperthread siblings on the same + physical core fight for the same execution units. + 3. ``len(os.sched_getaffinity(0)) - 1`` (logical cores minus the + main interpreter) when physical detection is unavailable + (returns 0). + 4. ``multiprocessing.cpu_count() - 1`` as a final portable fallback. + + Always returns at least 1 so a single-core / 2-logical-core + machine still produces a usable runtime. + """ + env = os.environ.get("BOCPY_WORKERS") + if env is not None: + try: + value = int(env) + except ValueError: + value = 0 + if value >= 1: + return value + + physical = _core.physical_cpu_count() + if physical >= 1: + return max(1, physical - 1) + + try: + return max(1, len(os.sched_getaffinity(0)) - 1) + except AttributeError: + from multiprocessing import cpu_count + return max(1, cpu_count() - 1) + + +WORKER_COUNT: int = _default_worker_count() T = TypeVar("T") diff --git a/src/bocpy/compat.c b/src/bocpy/compat.c index 92971ba..312edaa 100644 --- a/src/bocpy/compat.c +++ b/src/bocpy/compat.c @@ -12,11 +12,25 @@ #ifdef _WIN32 int_least64_t atomic_fetch_add(atomic_int_least64_t *ptr, int_least64_t value) { +#if defined(_M_IX86) + int_least64_t old = *ptr; + for (;;) { + int_least64_t prev = InterlockedCompareExchange64(ptr, old + value, old); + if (prev == old) + return old; + old = prev; + } +#else return InterlockedExchangeAdd64(ptr, value); +#endif } int_least64_t atomic_fetch_sub(atomic_int_least64_t *ptr, int_least64_t value) { +#if defined(_M_IX86) + return atomic_fetch_add(ptr, -value); +#else return InterlockedExchangeAdd64(ptr, -value); +#endif } bool atomic_compare_exchange_strong(atomic_int_least64_t *ptr, @@ -32,14 +46,40 @@ bool atomic_compare_exchange_strong(atomic_int_least64_t *ptr, return false; } -int_least64_t atomic_load(atomic_int_least64_t *ptr) { return *ptr; } +int_least64_t atomic_load(atomic_int_least64_t *ptr) { +#if defined(_M_IX86) + return InterlockedCompareExchange64(ptr, 0, 0); +#else + return *ptr; +#endif +} int_least64_t atomic_exchange(atomic_int_least64_t *ptr, int_least64_t value) { +#if defined(_M_IX86) + int_least64_t old = *ptr; + for (;;) { + int_least64_t prev = InterlockedCompareExchange64(ptr, value, old); + if (prev == old) + return old; + old = prev; + } +#else return InterlockedExchange64(ptr, value); +#endif } void atomic_store(atomic_int_least64_t *ptr, int_least64_t value) { +#if defined(_M_IX86) + int_least64_t old = *ptr; + for (;;) { + int_least64_t prev = InterlockedCompareExchange64(ptr, value, old); + if (prev == old) + return; + old = prev; + } +#else *ptr = value; +#endif } void thrd_sleep(const struct timespec *duration, struct timespec *remaining) { @@ -49,7 +89,176 @@ void thrd_sleep(const struct timespec *duration, struct timespec *remaining) { Sleep(ms); } -#endif // _WIN32 +// --------------------------------------------------------------------------- +// Physical CPU detection (Windows arm). See compat.h for contract. +// --------------------------------------------------------------------------- + +int boc_physical_cpu_count(void) { + DWORD len = 0; + // First call: query required buffer size. + GetLogicalProcessorInformationEx(RelationProcessorCore, NULL, &len); + if (len == 0) { + return 0; + } + BYTE *buf = (BYTE *)malloc((size_t)len); + if (buf == NULL) { + return 0; + } + if (!GetLogicalProcessorInformationEx( + RelationProcessorCore, (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *)buf, + &len)) { + free(buf); + return 0; + } + // One record per physical core; count records. + int count = 0; + DWORD off = 0; + while (off < len) { + SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *info = + (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *)(buf + off); + if (info->Relationship == RelationProcessorCore) { + ++count; + } + off += info->Size; + } + free(buf); + return count; +} + +#elif defined(__APPLE__) + +// --------------------------------------------------------------------------- +// Physical CPU detection (macOS arm). See compat.h for contract. +// --------------------------------------------------------------------------- + +#include + +int boc_physical_cpu_count(void) { + int value = 0; + size_t len = sizeof(value); + // hw.physicalcpu_max reports physical cores in the machine; fall back + // to hw.physicalcpu (cores currently available) if unavailable. + if (sysctlbyname("hw.physicalcpu_max", &value, &len, NULL, 0) == 0 && + value > 0) { + return value; + } + len = sizeof(value); + if (sysctlbyname("hw.physicalcpu", &value, &len, NULL, 0) == 0 && value > 0) { + return value; + } + return 0; +} + +#else // assume Linux / glibc-compatible + +// --------------------------------------------------------------------------- +// Physical CPU detection (Linux arm). See compat.h for contract. +// --------------------------------------------------------------------------- + +#include +#include +#include +#include +#include +#include + +/// @brief Read the first comma/range-separated CPU id from a +/// thread_siblings_list file. +/// @details Sibling lists look like "0,28" or "0-1" or "0". The first +/// id is the canonical leader of the sibling set; counting distinct +/// leaders across all CPUs yields the physical-core count. +/// @return The leader CPU id, or -1 on parse failure. +static int boc_read_first_sibling(const char *path) { + FILE *f = fopen(path, "r"); + if (f == NULL) { + return -1; + } + int id = -1; + if (fscanf(f, "%d", &id) != 1) { + id = -1; + } + fclose(f); + return id; +} + +int boc_physical_cpu_count(void) { + // Snapshot the affinity mask first so cgroup / taskset restrictions + // are honoured. Without this the count would be host-physical even + // inside a container with --cpuset-cpus=0-3. + cpu_set_t affinity; + CPU_ZERO(&affinity); + bool have_affinity = (sched_getaffinity(0, sizeof(affinity), &affinity) == 0); + + // Per-CPU sibling-leader array. Sized to the kernel's maximum + // reasonable CPU id; CPU_SETSIZE is 1024 on glibc, which exceeds + // any current hardware. + enum { MAX_CPU = 4096 }; + int leaders[MAX_CPU]; + int leader_count = 0; + + DIR *d = opendir("/sys/devices/system/cpu"); + if (d == NULL) { + return 0; + } + struct dirent *ent; + while ((ent = readdir(d)) != NULL) { + // Match "cpu" entries only. "cpuidle", "cpufreq", etc. + // share the prefix but are not per-CPU dirs. + if (strncmp(ent->d_name, "cpu", 3) != 0) { + continue; + } + const char *suffix = ent->d_name + 3; + if (*suffix == '\0' || !isdigit((unsigned char)*suffix)) { + continue; + } + char *endp; + long cpu_id = strtol(suffix, &endp, 10); + if (*endp != '\0' || cpu_id < 0 || cpu_id >= MAX_CPU) { + continue; + } + + // Skip CPUs outside our affinity mask: a worker scheduled on + // them could not run, so they don't contribute usable physical + // cores from this process's point of view. + if (have_affinity && !CPU_ISSET((int)cpu_id, &affinity)) { + continue; + } + + char path[256]; + snprintf(path, sizeof(path), + "/sys/devices/system/cpu/cpu%ld/topology/thread_siblings_list", + cpu_id); + int leader = boc_read_first_sibling(path); + if (leader < 0) { + // Topology unreadable (very old kernel, sysfs not mounted, etc.). + // Bail out so the caller falls back to the logical count. + closedir(d); + return 0; + } + + // Linear dedup: physical-core counts on real hardware are small + // (< 256 typical, < 1024 on the largest current servers), so + // O(n^2) over leaders is fine. + bool seen = false; + for (int i = 0; i < leader_count; ++i) { + if (leaders[i] == leader) { + seen = true; + break; + } + } + if (!seen) { + if (leader_count >= MAX_CPU) { + closedir(d); + return 0; + } + leaders[leader_count++] = leader; + } + } + closedir(d); + return leader_count; +} + +#endif // _WIN32 / __APPLE__ / other double boc_now_s(void) { const double S_PER_NS = 1.0e-9; diff --git a/src/bocpy/compat.h b/src/bocpy/compat.h index e0c3da6..0b22207 100644 --- a/src/bocpy/compat.h +++ b/src/bocpy/compat.h @@ -161,14 +161,26 @@ static inline bool atomic_compare_exchange_strong_intptr(atomic_intptr_t *ptr, // All Interlocked* intrinsics on x86/x64 are full barriers, so the // memory_order argument is accepted but ignored. -// Note: atomic_load_explicit is a plain volatile read. On x86/x64 this -// provides acquire semantics due to TSO. Correctness of the parking -// protocol relies on the mutex-protected re-check, not on seq_cst ordering. +// Note: atomic_load_explicit uses a plain volatile read on x64 (TSO +// provides acquire semantics). On x86 (32-bit), 64-bit volatile reads +// are not atomic, but the legacy atomic_int_least64_t polyfill here is +// only used for waiter counts and state fields where torn reads are +// benign (protected by mutex re-checks). The typed boc_atomic_*_u64 +// API above uses _InterlockedCompareExchange64 for true atomicity. +#if defined(_M_IX86) +#define atomic_load_explicit(ptr, order) \ + ((int_least64_t)InterlockedCompareExchange64((ptr), 0, 0)) +#define atomic_fetch_add_explicit(ptr, val, order) \ + atomic_fetch_add((ptr), (val)) +#define atomic_fetch_sub_explicit(ptr, val, order) \ + atomic_fetch_sub((ptr), (val)) +#else #define atomic_load_explicit(ptr, order) (*(ptr)) #define atomic_fetch_add_explicit(ptr, val, order) \ InterlockedExchangeAdd64((ptr), (val)) #define atomic_fetch_sub_explicit(ptr, val, order) \ InterlockedExchangeAdd64((ptr), -(val)) +#endif #define memory_order_seq_cst 0 // --------------------------------------------------------------------------- @@ -271,6 +283,13 @@ static inline uint64_t boc_atomic_load_u64_explicit(boc_atomic_u64_t *p, default: return BOC_IL_LOAD64_ACQ(p); } +#elif defined(_M_IX86) + // On x86, a 64-bit volatile read is not atomic (two 32-bit loads). + // Use _InterlockedCompareExchange64(p, 0, 0) which atomically reads + // the value without modifying it (CAS that "replaces" 0 with 0 if + // matched; either way returns the current value). + (void)order; + return (uint64_t)_InterlockedCompareExchange64((volatile __int64 *)p, 0, 0); #else (void)order; return *p; @@ -292,6 +311,18 @@ static inline void boc_atomic_store_u64_explicit(boc_atomic_u64_t *p, (void)_InterlockedExchange64((volatile __int64 *)p, (__int64)v); return; } +#elif defined(_M_IX86) + // On x86, a 64-bit volatile write is not atomic. Use an exchange + // via CAS loop to atomically store the value. + (void)order; + __int64 old = *((volatile __int64 *)p); + for (;;) { + __int64 prev = + _InterlockedCompareExchange64((volatile __int64 *)p, (__int64)v, old); + if (prev == old) + return; + old = prev; + } #else (void)order; *p = v; @@ -315,6 +346,18 @@ boc_atomic_exchange_u64_explicit(boc_atomic_u64_t *p, uint64_t v, default: return (uint64_t)_InterlockedExchange64((volatile __int64 *)p, (__int64)v); } +#elif defined(_M_IX86) + // x86 lacks _InterlockedExchange64; emulate with a CAS loop using + // _InterlockedCompareExchange64 (which is available on x86). + (void)order; + __int64 old = *((volatile __int64 *)p); + for (;;) { + __int64 prev = + _InterlockedCompareExchange64((volatile __int64 *)p, (__int64)v, old); + if (prev == old) + return (uint64_t)old; + old = prev; + } #else (void)order; return (uint64_t)_InterlockedExchange64((volatile __int64 *)p, (__int64)v); @@ -375,6 +418,18 @@ boc_atomic_fetch_add_u64_explicit(boc_atomic_u64_t *p, uint64_t v, return (uint64_t)_InterlockedExchangeAdd64((volatile __int64 *)p, (__int64)v); } +#elif defined(_M_IX86) + // x86 lacks _InterlockedExchangeAdd64; emulate with a CAS loop. + (void)order; + __int64 old = *((volatile __int64 *)p); + for (;;) { + __int64 desired = old + (__int64)v; + __int64 prev = + _InterlockedCompareExchange64((volatile __int64 *)p, desired, old); + if (prev == old) + return (uint64_t)old; + old = prev; + } #else (void)order; return (uint64_t)_InterlockedExchangeAdd64((volatile __int64 *)p, (__int64)v); @@ -856,6 +911,33 @@ static inline void boc_atomic_thread_fence_explicit(boc_memory_order_t o) { /// @return the current time double boc_now_s(void); +/// @brief Best-effort count of physical CPU cores available to this process. +/// @details Unlike @c sysconf(_SC_NPROCESSORS_ONLN) / +/// @c os.cpu_count(), this excludes hyperthread / SMT siblings so it +/// matches the count of independent execution units. Used to size the +/// default worker pool: oversubscribing CPU-bound Python workloads on +/// HT siblings causes the two siblings on a physical core to fight for +/// the same execution resources, often halving throughput vs. one +/// worker per physical core. +/// +/// **Per-platform behaviour.** +/// - **Linux**: walks @c +/// /sys/devices/system/cpu/cpu*/topology/thread_siblings_list, +/// counts distinct sibling sets, and intersects with +/// @c sched_getaffinity(0) so cgroup / container CPU restrictions +/// are honoured. +/// - **macOS**: @c sysctlbyname("hw.physicalcpu_max", ...) (falling +/// back to @c "hw.physicalcpu"). +/// - **Windows**: @c GetLogicalProcessorInformationEx with +/// @c RelationProcessorCore. +/// +/// On any platform where detection fails (sysfs unreadable, sysctl / +/// API failure), returns 0; callers should fall back to the logical +/// CPU count in that case. +/// @return Number of physical cores available to the process, or 0 +/// on failure. +int boc_physical_cpu_count(void); + /// @brief Returns a monotonic timestamp in nanoseconds. /// @details Uses @c CLOCK_MONOTONIC on POSIX and /// @c QueryPerformanceCounter on Windows. Unlike @ref boc_now_s the diff --git a/src/bocpy/sched.c b/src/bocpy/sched.c index c555a2e..941f061 100644 --- a/src/bocpy/sched.c +++ b/src/bocpy/sched.c @@ -30,7 +30,7 @@ void boc_bq_init(boc_bq_t *q) { // Use relaxed stores during init: callers must publish the queue // through their own release edge before any thread observes it. boc_atomic_store_ptr_explicit(&q->front, NULL, BOC_MO_RELAXED); - boc_atomic_store_ptr_explicit(&q->back, &q->front, BOC_MO_RELAXED); + boc_atomic_store_ptr_explicit(&q->back, (void *)&q->front, BOC_MO_RELAXED); } void boc_bq_destroy_assert_empty(boc_bq_t *q) { @@ -69,7 +69,7 @@ void boc_bq_enqueue_segment(boc_bq_t *q, boc_bq_segment_t s) { BOC_SCHED_YIELD(); boc_atomic_ptr_t *b = (boc_atomic_ptr_t *)boc_atomic_exchange_ptr_explicit( - &q->back, s.end, BOC_MO_ACQ_REL); + &q->back, (void *)s.end, BOC_MO_ACQ_REL); BOC_SCHED_YIELD(); @@ -129,9 +129,10 @@ boc_bq_node_t *boc_bq_dequeue(boc_bq_t *q) { // (mpmcq.h:165-176). The expected `back` value is the address of the // singleton node's `next_in_queue` slot; the desired value is the // address of `q->front`, restoring the empty representation. - void *expected = &old_front->next_in_queue; + void *expected = (void *)&old_front->next_in_queue; if (boc_atomic_compare_exchange_strong_ptr_explicit( - &q->back, &expected, &q->front, BOC_MO_ACQ_REL, BOC_MO_RELAXED)) { + &q->back, &expected, (void *)&q->front, BOC_MO_ACQ_REL, + BOC_MO_RELAXED)) { return old_front; } @@ -156,8 +157,8 @@ boc_bq_segment_t boc_bq_dequeue_all(boc_bq_t *q) { BOC_SCHED_YIELD(); boc_atomic_ptr_t *old_back = - (boc_atomic_ptr_t *)boc_atomic_exchange_ptr_explicit(&q->back, &q->front, - BOC_MO_ACQ_REL); + (boc_atomic_ptr_t *)boc_atomic_exchange_ptr_explicit( + &q->back, (void *)&q->front, BOC_MO_ACQ_REL); BOC_SCHED_YIELD(); diff --git a/test/test_scheduler_steal.py b/test/test_scheduler_steal.py index 4afa234..6153fe4 100644 --- a/test/test_scheduler_steal.py +++ b/test/test_scheduler_steal.py @@ -16,10 +16,6 @@ - **Empty-queue race** — starting the runtime with W workers and no work must converge to every worker parked and the process CPU/wall ratio must stay well below 1 (no busy-spinning thieves). -- **Spurious-failure stress** — placeholder; activated when bocpy is - built with ``-DBOC_SCHED_SYSTEMATIC`` (Verona-style fault-injection - in the queue links). The flag is off in default builds, so the - test is skipped here. Tests that asserted timing-dependent outcomes (``popped_via_steal > 0`` after a pinned fan-out, ``fairness_arm_fires >= N`` on a busy @@ -225,28 +221,3 @@ def test_empty_queue_does_not_spin(self): f"worker {s['worker_index']} never reached cnd_wait " f"in an idle runtime: {s}" ) - - -# --------------------------------------------------------------------------- -# Spurious-failure stress (gated on the systematic-test build flag) -# --------------------------------------------------------------------------- - - -class TestStealSpuriousFailureStress: - """Reserved for ``-DBOC_SCHED_SYSTEMATIC`` builds. - - Verona's stealing path has three documented spurious-failure - modes (fully empty victim, single-element victim, first link not - yet visible). Verifying convergence under fault-injection - requires building bocpy with the ``BOC_SCHED_SYSTEMATIC`` macro, - which is off in the default editable install. When that build - flavour exists the body of this test should run 100 fan-out - iterations and assert each completes within ``RECEIVE_TIMEOUT``. - """ - - @pytest.mark.skip( - reason="needs -DBOC_SCHED_SYSTEMATIC build flag", - ) - def test_spurious_failure_stress(self): # pragma: no cover - """Placeholder; see class docstring.""" - pass