perf(weave): defer output _save_nested_objects in client#6740
Draft
gtarpenning wants to merge 4 commits intomasterfrom
Draft
perf(weave): defer output _save_nested_objects in client#6740gtarpenning wants to merge 4 commits intomasterfrom
gtarpenning wants to merge 4 commits intomasterfrom
Conversation
…t gating Two-part change. Together they cut per-op overhead on the calling thread (asyncio loop in async workloads) without introducing the latent thread pool starvation that an earlier draft (#6738 history) tripped on. ## Part 1 — defer the output tree walks into the worker `_save_nested_objects(output)` and `map_to_refs(output)` were running on the calling thread before `finish_call` returned. For ops with nested weave Objects/Tables in the output, this dominates per-op CPU on the asyncio loop. They are now deferred into the existing `send_end_call` future. `call.output` is still set sync so user code reading it immediately after `finish_call` returns still observes the postprocessed value (refs may attach a moment later). ## Part 2 — structural digest gating in _save_object_basic / _save_table Without this, deferring the walks reproduces a real deadlock: `_save_object_basic` does `defer(send_obj_create)`, where `send_obj_create` calls `to_json(parent_with_refs)`. `to_json` synchronously waits on each nested ref's `digest_future.result()`. Under high concurrency every main-pool worker can land in `to_json` at once, each blocked on a child whose `obj_create` is queued *behind* it in the same executor — classic thread pool starvation. Master happens to escape by ~1 worker of timing headroom; the deferral consumes it. `test_evaluation_performance::paused_client` makes this deterministic. Fix: add `_collect_pending_digests(val)` which walks `val` and returns any unresolved digest/row-digest futures held by nested Refs. Then in the save sites: - `_save_object_basic`: after `map_to_refs`, collect pending digests; if any, schedule `send_obj_create` via `future_executor.then(pending, ...)` instead of `defer(...)`. The worker registers a callback and returns to the pool; the continuation fires once digests resolve, by which point `to_json` is non-blocking. Same idea applied to `_save_table` (simple path), `_create_table_with_parallel_chunks` (per-chunk), and the first chunk of `_create_table_with_incremental_updates`. - `finish_call`: split `send_end_call` into Phase 1 (`schedule_send`: walks output, builds `output_as_refs`, collects pending digests) and Phase 2 (`finalize_send`: redact + `to_json` + `server.call_end`). If there are pending digests, Phase 2 is chained via `then(pending, finalize_send)`; the Phase 1 worker returns immediately. A barrier `end_complete_future` is tracked in the executor's active set (`_track_future`) so `_flush()` waits for the full pipeline, and `on_end_complete` fires exactly once on actual call_end completion. ## Tests `tests/trace/test_save_nested_defer.py`: - 12 unit tests for `_collect_pending_digests` covering primitives, containers, ObjectRef/OpRef/TableRef (with and without resolved digests and row_digests), nested ObjectRecord, cycles, dict/list/tuple walks. - 4 integration tests against a paused-server fixture: single op finish doesn't deadlock; 20 concurrent async finishes drain via the then-chain; call_end errors still log (regression guard for `log_exception=True`); `finish_call` returns before the deferred work runs. Existing suites pass with no modification: - Local: `trace` shard (839 passed, 0 failed). - Local: `trace_calls_complete_only` shard (839 passed, 0 failed). - Local: `test_evaluation_performance::test_evaluation_performance` 5x in a row at default parallelism (the deadlock fixture; would hang without the structural gate). - Local: `test_evaluation_performance::test_evaluation_resilience` — exception logging path still produces 4 unique error log keys (file_create, obj_create, table_create, call_end), not 3. - Local: `test_tracing_resilience.py` (29/29 passed). ## Bench (heavy GIL, 1460 ops/step, 30-step median per-step) | Branch | Median step | p50 stall | Notes | |---------------------------------|------------:|----------:|-------| | Master | ~1.0s | 176ms | baseline | | Internal-only (no defer, #5/6/7)| ~1.0s | 170ms | #6738 | | **This PR (defer + gating)** | ~0.4s | 38ms | | The headline win is `p50 stall = 38ms` — the calling thread spends ~5x less time in tracing per op. Step-time average is dominated by occasional GC / mempool stalls in the 30-step long run; median is the stable comparison.
Drop unused `noqa: E731` (replaced lambda with named def) and apply ruff format to the new test file.
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
mypy 1.19.1 can't infer the lambda's signature in `_create_table_with_parallel_chunks` because it's passed positionally to `then(...)` whose `g` is a generic `Callable[[list[T]], U]`. Named def with explicit annotations fixes the "Cannot infer type of lambda" diagnostic.
- Extract _defer_after_pending_digests helper to remove the four-place duplication of the pending-digest dispatch in _save_object_basic, _save_table, and the chunked-table paths. - Narrow `except BaseException` to `except Exception` in finalize_send and schedule_send so KeyboardInterrupt/SystemExit propagate normally. - Hoist `import asyncio`/`import logging` to the top of the test file. - Replace the smoke-test-only test_call_end_error_is_logged_once with a real regression guard: inject a call_end failure and assert the "Task failed" log lands; mark with disable_logging_error_check. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
TL;DR
Cuts per-op calling-thread CPU ~5× in async tracing (p50 stall 176ms → 38ms in heavy-GIL bench) by deferring output-side
_save_nested_objects+map_to_refs+to_jsonfromfinish_callinto the worker pool. A structural fix to_save_object_basic/_save_tablekeeps a naive defer from deadlocking on thread-pool starvation.Builds on #6738 (the conservative subset). Linked: WB-33844.
How the work moves off the calling thread
What changes
finish_call._save_nested_objects,map_to_refs, redact, andto_jsonmove into the worker pool.call.outputis still set sync, so user code reading it afterfinish_callreturns is unchanged._save_object_basic,_save_table, and the chunked-table paths now go through one_defer_after_pending_digestshelper that pre-walks the value and usesthen(pending, send)instead ofdefer(send). The worker registering the callback returns to the pool instead of blocking insideto_json -> digest.result().send_end_callsplit into Phase 1 (schedule_send— walks + collect_pending) and Phase 2 (finalize_send— redact +to_json+call_end). A barrierend_complete_futureis tracked in the executor so_flush()waits for the whole pipeline;log_exception=Truekeeps the same "Task failed" log line existing resilience tests assert on.Calling-thread contract (unchanged)
postprocess_outputand_on_finish_handlerstill raise on the calling thread.call.outputis set to the postprocessed value beforefinish_callreturns.call.summaryis computed and set beforefinish_callreturns.Risks
finish_callreturns. Today (master) refs attach beforefinish_callreturns. User code that synchronously inspectsoutput.refimmediately afterfinish_callmay briefly see no ref attached. Behavior change; not expected to affect any current pattern but called out._save_object_basic,_save_table, both chunked-table paths, andfinish_call._track_futureis private. Used deliberately to register the barrier future; will need an update if the futures module evolves.Bench
Heavy GIL pressure (1460 ops/step, 30 steps). Median per-step is the stable comparison; avg is dominated by occasional GC stalls under heavy GIL.
* Max under heavy GIL is dominated by occasional consecutive bad steps (a 9.54s outlier appears once in the 30-step run, then the system recovers). Median/p50 are the meaningful numbers — calling-thread time per op is cut roughly 5×.
Tests
tests/trace/test_save_nested_defer.py(16 tests):_collect_pending_digestscovering primitives, nested containers, ObjectRef/OpRef/TableRef (resolved + pending), TableRef with both digest and row_digests pending, ObjectRecord, cycles, and resolved-future short-circuit.call_endfailure surfaces as"Task failed"via_track_future(regression guard forlog_exception=True);finish_callreturns before the deferred work runs.Existing suites green:
traceshard (839 passed),trace_calls_complete_onlyshard (839 passed).test_evaluation_performance5× consecutive at default parallelism (the deadlock fixture; would hang without the structural gate).test_tracing_resilience(29/29).Test plan
traceandtrace_calls_complete_onlyshards greentest_evaluation_performance) green at default parallelism, 5× consecutiveRelated
~/Desktop/local_testing/weavetest/qa-scripts/bench_async_overhead.py+bench_history.jsonl.Deep dive: why naive defer deadlocks (and how the gate fixes it)
Part 1 alone — just deferring the output walks, no gate — is a one-line change but reproduces a real deadlock against
test_evaluation_performance::paused_client:_save_object_basic'ssend_obj_createcallsto_json(parent_with_refs), which synchronously waits on each nested ref'sdigest_future.result(). Under high concurrency every main-pool worker can land into_jsonat once, each blocked on a child whoseobj_createis queued behind it in the same executor — classic thread-pool starvation. Master happens to escape by ~1 worker of timing headroom; the deferral consumes it.Part 2 — this PR — restructures the dispatch so workers don't synchronously block on digest futures. At every save site that calls
to_json(value_with_refs), we pre-walkvaluefor unresolved digest futures via_collect_pending_digests. If any are pending, we register the save viafuture_executor.then(pending, send)instead ofdefer(send). The worker that wanted to schedule the save attaches a callback and returns to the pool; the continuation runs once digests resolve, by which pointto_jsonis non-blocking. No worker holds a slot waiting for another worker._collect_pending_digestswalks any value (primitives, dicts, lists, tuples,ObjectRecord, Refs) and collects unresolved digest / row-digest futures. Cycle-safe viaseenset, skips done futures, treats Refs as leaves (does not recurse into their fields).