Skip to content

Commit 0e29f32

Browse files
branchseerclaude
andauthored
refactor(execute): decompose spawn into single-concern modules (#343)
## Summary - Split the ~200-line `spawn()` in `session::execute::spawn` (which mixed fspy/tokio command construction, Windows Job Object, FD_CLOEXEC workaround, stdio piping, and cancellation) into four focused files under `session::execute`: - `spawn.rs` — now just abstracts fspy vs tokio, returning `ChildHandle { stdout, stderr, wait }` with a cancellation-aware `wait` future. - `pipe.rs` — `pipe_stdio()` drains stdout/stderr, with optional capture for cache replay. Houses `StdOutput` / `OutputKind`. - `tracked_accesses.rs` — `TrackedPathAccesses::from_raw()` normalizes raw fspy accesses. Houses `PathRead`. - `win_job.rs` — Windows Job Object RAII, moved out of `mod.rs`. - `execute_spawn` composes the pieces and measures duration end-to-end (spawn no longer tracks time). - Group pipe-mode state into a local `Pipe` enum (`Piped { stdio_config, capture }` / `Inherited`) so related state lives in one variant. The `Inherited` arm drops `stdio_config` eagerly, matching existing behavior. - Compute `SpawnStdio` directly from the cache/suggestion condition (no intermediate `use_piped: bool`). `SpawnStdio` now derives `Copy/Clone/PartialEq/Eq/Debug`. ## Test plan - [x] `just check` - [x] `just lint` - [x] `cargo test` (all library/unit tests, including `malformed_windows_drive_path_after_workspace_strip_is_ignored` which moved with `normalize_tracked_workspace_path`) - [x] `cargo test -p vite_task_bin --test e2e_snapshots` — full e2e coverage (148 tests) exercising piped + fspy + cache miss/hit, inherited stdio, cancellation/process-tree kill, and read/write-overlap not-cached paths - [ ] Cross-target lint: `just lint-linux` / `just lint-windows` (CI will cover) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d19333d commit 0e29f32

12 files changed

Lines changed: 697 additions & 597 deletions

File tree

crates/vite_task/src/session/cache/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use wincode::{
2323
io::{Reader, Writer},
2424
};
2525

26-
use super::execute::{fingerprint::PostRunFingerprint, spawn::StdOutput};
26+
use super::execute::{fingerprint::PostRunFingerprint, pipe::StdOutput};
2727

2828
/// Cache lookup key identifying a task's execution configuration.
2929
///

crates/vite_task/src/session/execute/fingerprint.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use vite_path::{AbsolutePath, RelativePathBuf};
1616
use vite_str::Str;
1717
use wincode::{SchemaRead, SchemaWrite};
1818

19-
use super::spawn::PathRead;
19+
use super::tracked_accesses::PathRead;
2020
use crate::{collections::HashMap, session::cache::InputChangeKind};
2121

2222
/// Post-run fingerprint capturing file state after execution.
@@ -53,7 +53,8 @@ pub enum DirEntryKind {
5353
impl PostRunFingerprint {
5454
/// Creates a new fingerprint from path accesses after task execution.
5555
///
56-
/// Negative glob filtering is done upstream in `spawn_with_tracking`.
56+
/// Negative glob filtering is done upstream (see
57+
/// [`super::tracked_accesses::TrackedPathAccesses::from_raw`]).
5758
/// Paths already present in `globbed_inputs` are skipped — they are
5859
/// already tracked by the prerun glob fingerprint, and the read-write
5960
/// overlap check in `execute_spawn` guarantees the task did not modify

crates/vite_task/src/session/execute/mod.rs

Lines changed: 197 additions & 277 deletions
Large diffs are not rendered by default.
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
//! Drain child stdout/stderr concurrently to writers, with optional capture.
2+
3+
use std::io::Write;
4+
5+
use serde::Serialize;
6+
use tokio::{
7+
io::AsyncReadExt as _,
8+
process::{ChildStderr, ChildStdout},
9+
};
10+
use tokio_util::sync::CancellationToken;
11+
use wincode::{SchemaRead, SchemaWrite};
12+
13+
/// Output kind for stdout/stderr
14+
#[derive(Debug, PartialEq, Eq, Clone, Copy, SchemaWrite, SchemaRead, Serialize)]
15+
pub enum OutputKind {
16+
StdOut,
17+
StdErr,
18+
}
19+
20+
/// Output chunk with stream kind
21+
#[derive(Debug, SchemaWrite, SchemaRead, Serialize, Clone)]
22+
pub struct StdOutput {
23+
pub kind: OutputKind,
24+
pub content: Vec<u8>,
25+
}
26+
27+
/// Downstream destinations for bytes read from the child's stdout/stderr:
28+
/// two pass-through writers plus an optional capture buffer (populated in
29+
/// place during drain for cache replay).
30+
pub struct PipeSinks<'a> {
31+
pub stdout_writer: &'a mut dyn Write,
32+
pub stderr_writer: &'a mut dyn Write,
33+
pub capture: Option<&'a mut Vec<StdOutput>>,
34+
}
35+
36+
/// Drain the child's stdout/stderr concurrently into `sinks`.
37+
///
38+
/// Bytes are written through `sinks.stdout_writer` / `sinks.stderr_writer` in
39+
/// real time and, when `sinks.capture` is `Some`, also appended (with adjacent
40+
/// same-kind chunks coalesced) for cache replay.
41+
///
42+
/// On cancellation: returns `Ok(())` without killing the child — the caller
43+
/// drives the child's cancellation-aware `wait` future next, which observes the
44+
/// same already-fired token and performs the kill. Dropping `stdout`/`stderr`
45+
/// closes the pipe read ends (EPIPE on Unix, `ERROR_BROKEN_PIPE` on Windows).
46+
#[tracing::instrument(level = "debug", skip_all)]
47+
pub async fn pipe_stdio(
48+
mut stdout: ChildStdout,
49+
mut stderr: ChildStderr,
50+
mut sinks: PipeSinks<'_>,
51+
cancellation_token: CancellationToken,
52+
) -> std::io::Result<()> {
53+
let mut stdout_buf = [0u8; 8192];
54+
let mut stderr_buf = [0u8; 8192];
55+
let mut stdout_done = false;
56+
let mut stderr_done = false;
57+
58+
loop {
59+
if stdout_done && stderr_done {
60+
return Ok(());
61+
}
62+
tokio::select! {
63+
result = stdout.read(&mut stdout_buf), if !stdout_done => {
64+
match result? {
65+
0 => stdout_done = true,
66+
n => {
67+
let bytes = &stdout_buf[..n];
68+
sinks.stdout_writer.write_all(bytes)?;
69+
sinks.stdout_writer.flush()?;
70+
if let Some(capture) = &mut sinks.capture {
71+
append_output_chunk(capture, OutputKind::StdOut, bytes);
72+
}
73+
}
74+
}
75+
}
76+
result = stderr.read(&mut stderr_buf), if !stderr_done => {
77+
match result? {
78+
0 => stderr_done = true,
79+
n => {
80+
let bytes = &stderr_buf[..n];
81+
sinks.stderr_writer.write_all(bytes)?;
82+
sinks.stderr_writer.flush()?;
83+
if let Some(capture) = &mut sinks.capture {
84+
append_output_chunk(capture, OutputKind::StdErr, bytes);
85+
}
86+
}
87+
}
88+
}
89+
() = cancellation_token.cancelled() => {
90+
return Ok(());
91+
}
92+
}
93+
}
94+
}
95+
96+
fn append_output_chunk(capture: &mut Vec<StdOutput>, kind: OutputKind, bytes: &[u8]) {
97+
if let Some(last) = capture.last_mut()
98+
&& last.kind == kind
99+
{
100+
last.content.extend_from_slice(bytes);
101+
} else {
102+
capture.push(StdOutput { kind, content: bytes.to_vec() });
103+
}
104+
}

0 commit comments

Comments
 (0)