From 010dca575aab8cc2b0e9ffc5052ce649cb083bc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81ngel=20Pastor=20Olivar?= Date: Tue, 7 Apr 2026 15:23:47 +0200 Subject: [PATCH 1/6] Added a sync.Pool for 32KB copy buffers Replace the cureent io.Copy with the io.CopyBuffer with pooled buffer --- pipe/iocopier.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pipe/iocopier.go b/pipe/iocopier.go index 78a9143..bdadf33 100644 --- a/pipe/iocopier.go +++ b/pipe/iocopier.go @@ -5,6 +5,7 @@ import ( "errors" "io" "os" + "sync" ) // ioCopier is a stage that copies its stdin to a specified @@ -15,6 +16,16 @@ type ioCopier struct { err error } +// copyBufPool reuses 32KB buffers across io.CopyBuffer calls, avoiding +// a fresh heap allocation per copy. This matters in high-throughput +// pipelines where many ioCopier stages run concurrently. +var copyBufPool = sync.Pool{ + New: func() any { + b := make([]byte, 32*1024) + return &b + }, +} + func newIOCopier(w io.WriteCloser) *ioCopier { return &ioCopier{ w: w, @@ -29,7 +40,9 @@ func (s *ioCopier) Name() string { // This method always returns `nil, nil`. func (s *ioCopier) Start(_ context.Context, _ Env, r io.ReadCloser) (io.ReadCloser, error) { go func() { - _, err := io.Copy(s.w, r) + bp := copyBufPool.Get().(*[]byte) + _, err := io.CopyBuffer(s.w, r, *bp) + copyBufPool.Put(bp) // We don't consider `ErrClosed` an error (FIXME: is this // correct?): if err != nil && !errors.Is(err, os.ErrClosed) { From 22c71eee2692878856d657e47332b57b413ad9a3 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Wed, 8 Apr 2026 13:08:26 +0200 Subject: [PATCH 2/6] Add test for pool buffer bypass via *os.File WriterTo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On Go 1.26+, *os.File implements WriterTo, which causes io.CopyBuffer to bypass the provided pool buffer entirely. Instead, File.WriteTo → genericWriteTo → io.Copy allocates a fresh 32KB buffer on every call. This test detects the bypass by counting allocations during ioCopier copy operations with an *os.File source (which is the common case when the last pipeline stage is a commandStage). The test currently FAILS, demonstrating the problem. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/iocopier_test.go | 75 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 pipe/iocopier_test.go diff --git a/pipe/iocopier_test.go b/pipe/iocopier_test.go new file mode 100644 index 0000000..f164633 --- /dev/null +++ b/pipe/iocopier_test.go @@ -0,0 +1,75 @@ +package pipe + +import ( + "bytes" + "os" + "runtime" + "testing" +) + +// TestIOCopierPoolBufferUsed verifies that ioCopier uses the sync.Pool +// buffer rather than allocating a fresh one. On Go 1.26+, *os.File +// implements WriterTo, which causes io.CopyBuffer to bypass the +// provided pool buffer entirely. Instead, File.WriteTo → +// genericWriteTo → io.Copy allocates a fresh 32KB buffer on every call. +func TestIOCopierPoolBufferUsed(t *testing.T) { + const payload = "hello from pipe\n" + + // Pre-warm the pool so Get doesn't allocate. + copyBufPool.Put(copyBufPool.New()) + + // Warm up: run once to stabilize lazy init. + pr, pw, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + go func() { + pw.Write([]byte(payload)) + pw.Close() + }() + var warmBuf bytes.Buffer + c := newIOCopier(nopWriteCloser{&warmBuf}) + c.Start(nil, Env{}, pr) + c.Wait() + + // Now measure: run the copy and check how many bytes were allocated. + // If the pool buffer is bypassed, a fresh 32KB buffer is allocated. + pr, pw, err = os.Pipe() + if err != nil { + t.Fatal(err) + } + go func() { + pw.Write([]byte(payload)) + pw.Close() + }() + var buf bytes.Buffer + c = newIOCopier(nopWriteCloser{&buf}) + + // GC clears sync.Pool, so re-warm it afterward to isolate the + // measurement from pool repopulation overhead. + runtime.GC() + copyBufPool.Put(copyBufPool.New()) + + var m1, m2 runtime.MemStats + runtime.ReadMemStats(&m1) + + c.Start(nil, Env{}, pr) + c.Wait() + + runtime.GC() + runtime.ReadMemStats(&m2) + + if buf.String() != payload { + t.Fatalf("unexpected output: %q", buf.String()) + } + + allocBytes := m2.TotalAlloc - m1.TotalAlloc + // A bypassed pool buffer causes ~32KB of allocation. + // With the pool buffer working, we expect well under 32KB. + const maxBytes = 16 * 1024 + if allocBytes > maxBytes { + t.Errorf("ioCopier allocated %d bytes during copy (max %d); "+ + "pool buffer may be bypassed by *os.File WriterTo", + allocBytes, maxBytes) + } +} From c5839cb2a4eeaeeefd6f1f3964463339e4333966 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Wed, 8 Apr 2026 13:43:02 +0200 Subject: [PATCH 3/6] Fix pool buffer bypass caused by *os.File WriterTo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On Go 1.26+, *os.File implements WriterTo. When ioCopier's reader is an *os.File (the common case for commandStage), io.CopyBuffer detects WriterTo and calls File.WriteTo instead of using the provided pool buffer. File.WriteTo's sendfile path fails (the dest is not a network connection), so it falls back to genericWriteTo → io.Copy, which allocates a fresh 32KB buffer on every call — defeating the sync.Pool entirely. Fix: wrap the reader in readerOnly{} to strip all interfaces except Read, forcing io.CopyBuffer to use the pool buffer. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/iocopier.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pipe/iocopier.go b/pipe/iocopier.go index bdadf33..a29878c 100644 --- a/pipe/iocopier.go +++ b/pipe/iocopier.go @@ -26,6 +26,10 @@ var copyBufPool = sync.Pool{ }, } +// readerOnly wraps an io.Reader, hiding any other interfaces (such as +// WriterTo) so that io.CopyBuffer is forced to use the provided buffer. +type readerOnly struct{ io.Reader } + func newIOCopier(w io.WriteCloser) *ioCopier { return &ioCopier{ w: w, @@ -41,7 +45,13 @@ func (s *ioCopier) Name() string { func (s *ioCopier) Start(_ context.Context, _ Env, r io.ReadCloser) (io.ReadCloser, error) { go func() { bp := copyBufPool.Get().(*[]byte) - _, err := io.CopyBuffer(s.w, r, *bp) + // Strip all interfaces except Read from r so that + // io.CopyBuffer always uses the provided pool buffer. + // Without this, *os.File's WriterTo (added in Go 1.26) + // causes CopyBuffer to call File.WriteTo, which falls + // back to io.Copy with a fresh allocation, bypassing + // the pool entirely. + _, err := io.CopyBuffer(s.w, readerOnly{r}, *bp) copyBufPool.Put(bp) // We don't consider `ErrClosed` an error (FIXME: is this // correct?): From afe1947d3c8379bb9c77377f27a81b63778ad760 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Wed, 8 Apr 2026 13:15:54 +0200 Subject: [PATCH 4/6] Add test for ReaderFrom hidden by nopWriteCloser When using WithStdout, the writer is wrapped in nopWriteCloser, which hides the ReaderFrom interface of the underlying writer. This prevents io.CopyBuffer from dispatching to ReadFrom for potential zero-copy paths (e.g., when the destination is a network connection or has a custom ReadFrom implementation). This test currently FAILS, demonstrating the problem. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/iocopier_test.go | 46 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/pipe/iocopier_test.go b/pipe/iocopier_test.go index f164633..9f23d86 100644 --- a/pipe/iocopier_test.go +++ b/pipe/iocopier_test.go @@ -2,8 +2,10 @@ package pipe import ( "bytes" + "io" "os" "runtime" + "sync/atomic" "testing" ) @@ -73,3 +75,47 @@ func TestIOCopierPoolBufferUsed(t *testing.T) { allocBytes, maxBytes) } } + +// readFromWriter is a test writer that implements io.ReaderFrom and +// records whether ReadFrom was called. +type readFromWriter struct { + bytes.Buffer + readFromCalled atomic.Bool +} + +func (w *readFromWriter) ReadFrom(r io.Reader) (int64, error) { + w.readFromCalled.Store(true) + return w.Buffer.ReadFrom(r) +} + +func (w *readFromWriter) Close() error { return nil } + +// TestIOCopierUsesReadFrom verifies that ioCopier dispatches to +// ReaderFrom when the destination writer supports it, even when +// wrapped in nopWriteCloser (as happens with WithStdout). +func TestIOCopierUsesReadFrom(t *testing.T) { + const payload = "hello readfrom\n" + + pr, pw, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + go func() { + pw.Write([]byte(payload)) + pw.Close() + }() + + w := &readFromWriter{} + c := newIOCopier(nopWriteCloser{w}) + c.Start(nil, Env{}, pr) + c.Wait() + + if w.Buffer.String() != payload { + t.Fatalf("unexpected output: %q", w.Buffer.String()) + } + + if !w.readFromCalled.Load() { + t.Error("ioCopier did not call ReadFrom on destination; " + + "nopWriteCloser may be hiding the ReaderFrom interface") + } +} From 6e7976a09e911f74b3ef8438a5fd75576f9475ec Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Wed, 8 Apr 2026 13:16:48 +0200 Subject: [PATCH 5/6] Unwrap nopWriteCloser to expose ReaderFrom When the pipeline's stdout is set via WithStdout, the writer is wrapped in nopWriteCloser to add a no-op Close method. This wrapper hides the ReaderFrom interface of the underlying writer, preventing io.CopyBuffer from dispatching to it. Fix: unwrap nopWriteCloser in ioCopier and call ReadFrom directly when available. This enables zero-copy when the destination has a meaningful ReadFrom (e.g., network connections, custom writers). For the pipe-to-pipe *os.File case, File.ReadFrom's zero-copy paths don't yet support pipe sources, so a follow-up commit adds direct splice(2) for that case. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/iocopier.go | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/pipe/iocopier.go b/pipe/iocopier.go index a29878c..ae9769d 100644 --- a/pipe/iocopier.go +++ b/pipe/iocopier.go @@ -44,15 +44,31 @@ func (s *ioCopier) Name() string { // This method always returns `nil, nil`. func (s *ioCopier) Start(_ context.Context, _ Env, r io.ReadCloser) (io.ReadCloser, error) { go func() { - bp := copyBufPool.Get().(*[]byte) - // Strip all interfaces except Read from r so that - // io.CopyBuffer always uses the provided pool buffer. - // Without this, *os.File's WriterTo (added in Go 1.26) - // causes CopyBuffer to call File.WriteTo, which falls - // back to io.Copy with a fresh allocation, bypassing - // the pool entirely. - _, err := io.CopyBuffer(s.w, readerOnly{r}, *bp) - copyBufPool.Put(bp) + var err error + + // Unwrap nopWriteCloser to see if the underlying writer + // supports ReaderFrom (e.g., for zero-copy network I/O). + var dst io.Writer = s.w + if nwc, ok := s.w.(nopWriteCloser); ok { + dst = nwc.Writer + } + + if rf, ok := dst.(io.ReaderFrom); ok { + // Call ReadFrom directly, bypassing io.Copy's + // WriterTo check so that ReadFrom sees the + // original reader type (needed for zero-copy). + _, err = rf.ReadFrom(r) + } else { + bp := copyBufPool.Get().(*[]byte) + // Strip all interfaces except Read from r so that + // io.CopyBuffer always uses the provided pool buffer. + // Without this, *os.File's WriterTo (added in Go 1.26) + // causes CopyBuffer to call File.WriteTo, which falls + // back to io.Copy with a fresh allocation, bypassing + // the pool entirely. + _, err = io.CopyBuffer(dst, readerOnly{r}, *bp) + copyBufPool.Put(bp) + } // We don't consider `ErrClosed` an error (FIXME: is this // correct?): if err != nil && !errors.Is(err, os.ErrClosed) { From 60bd6c3f56e25495c63a0d40c689b94c0ad60946 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Wed, 8 Apr 2026 15:26:11 +0200 Subject: [PATCH 6/6] Fix lint errors in ioCopier tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Check error returns from pw.Write, c.Start, c.Wait. Remove redundant embedded field selectors (w.Buffer.String → w.String). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/iocopier_test.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/pipe/iocopier_test.go b/pipe/iocopier_test.go index 9f23d86..64dceb4 100644 --- a/pipe/iocopier_test.go +++ b/pipe/iocopier_test.go @@ -2,6 +2,7 @@ package pipe import ( "bytes" + "context" "io" "os" "runtime" @@ -26,13 +27,13 @@ func TestIOCopierPoolBufferUsed(t *testing.T) { t.Fatal(err) } go func() { - pw.Write([]byte(payload)) + _, _ = pw.Write([]byte(payload)) pw.Close() }() var warmBuf bytes.Buffer c := newIOCopier(nopWriteCloser{&warmBuf}) - c.Start(nil, Env{}, pr) - c.Wait() + _, _ = c.Start(context.TODO(), Env{}, pr) + _ = c.Wait() // Now measure: run the copy and check how many bytes were allocated. // If the pool buffer is bypassed, a fresh 32KB buffer is allocated. @@ -41,7 +42,7 @@ func TestIOCopierPoolBufferUsed(t *testing.T) { t.Fatal(err) } go func() { - pw.Write([]byte(payload)) + _, _ = pw.Write([]byte(payload)) pw.Close() }() var buf bytes.Buffer @@ -55,8 +56,8 @@ func TestIOCopierPoolBufferUsed(t *testing.T) { var m1, m2 runtime.MemStats runtime.ReadMemStats(&m1) - c.Start(nil, Env{}, pr) - c.Wait() + _, _ = c.Start(context.TODO(), Env{}, pr) + _ = c.Wait() runtime.GC() runtime.ReadMemStats(&m2) @@ -101,17 +102,17 @@ func TestIOCopierUsesReadFrom(t *testing.T) { t.Fatal(err) } go func() { - pw.Write([]byte(payload)) + _, _ = pw.Write([]byte(payload)) pw.Close() }() w := &readFromWriter{} c := newIOCopier(nopWriteCloser{w}) - c.Start(nil, Env{}, pr) - c.Wait() + _, _ = c.Start(context.TODO(), Env{}, pr) + _ = c.Wait() - if w.Buffer.String() != payload { - t.Fatalf("unexpected output: %q", w.Buffer.String()) + if w.String() != payload { + t.Fatalf("unexpected output: %q", w.String()) } if !w.readFromCalled.Load() {