diff --git a/pipe/iocopier.go b/pipe/iocopier.go index 78a9143..ae9769d 100644 --- a/pipe/iocopier.go +++ b/pipe/iocopier.go @@ -5,6 +5,7 @@ import ( "errors" "io" "os" + "sync" ) // ioCopier is a stage that copies its stdin to a specified @@ -15,6 +16,20 @@ type ioCopier struct { err error } +// copyBufPool reuses 32KB buffers across io.CopyBuffer calls, avoiding +// a fresh heap allocation per copy. This matters in high-throughput +// pipelines where many ioCopier stages run concurrently. +var copyBufPool = sync.Pool{ + New: func() any { + b := make([]byte, 32*1024) + return &b + }, +} + +// readerOnly wraps an io.Reader, hiding any other interfaces (such as +// WriterTo) so that io.CopyBuffer is forced to use the provided buffer. +type readerOnly struct{ io.Reader } + func newIOCopier(w io.WriteCloser) *ioCopier { return &ioCopier{ w: w, @@ -29,7 +44,31 @@ func (s *ioCopier) Name() string { // This method always returns `nil, nil`. func (s *ioCopier) Start(_ context.Context, _ Env, r io.ReadCloser) (io.ReadCloser, error) { go func() { - _, err := io.Copy(s.w, r) + var err error + + // Unwrap nopWriteCloser to see if the underlying writer + // supports ReaderFrom (e.g., for zero-copy network I/O). + var dst io.Writer = s.w + if nwc, ok := s.w.(nopWriteCloser); ok { + dst = nwc.Writer + } + + if rf, ok := dst.(io.ReaderFrom); ok { + // Call ReadFrom directly, bypassing io.Copy's + // WriterTo check so that ReadFrom sees the + // original reader type (needed for zero-copy). + _, err = rf.ReadFrom(r) + } else { + bp := copyBufPool.Get().(*[]byte) + // Strip all interfaces except Read from r so that + // io.CopyBuffer always uses the provided pool buffer. + // Without this, *os.File's WriterTo (added in Go 1.26) + // causes CopyBuffer to call File.WriteTo, which falls + // back to io.Copy with a fresh allocation, bypassing + // the pool entirely. + _, err = io.CopyBuffer(dst, readerOnly{r}, *bp) + copyBufPool.Put(bp) + } // We don't consider `ErrClosed` an error (FIXME: is this // correct?): if err != nil && !errors.Is(err, os.ErrClosed) { diff --git a/pipe/iocopier_test.go b/pipe/iocopier_test.go new file mode 100644 index 0000000..64dceb4 --- /dev/null +++ b/pipe/iocopier_test.go @@ -0,0 +1,122 @@ +package pipe + +import ( + "bytes" + "context" + "io" + "os" + "runtime" + "sync/atomic" + "testing" +) + +// TestIOCopierPoolBufferUsed verifies that ioCopier uses the sync.Pool +// buffer rather than allocating a fresh one. On Go 1.26+, *os.File +// implements WriterTo, which causes io.CopyBuffer to bypass the +// provided pool buffer entirely. Instead, File.WriteTo → +// genericWriteTo → io.Copy allocates a fresh 32KB buffer on every call. +func TestIOCopierPoolBufferUsed(t *testing.T) { + const payload = "hello from pipe\n" + + // Pre-warm the pool so Get doesn't allocate. + copyBufPool.Put(copyBufPool.New()) + + // Warm up: run once to stabilize lazy init. + pr, pw, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + go func() { + _, _ = pw.Write([]byte(payload)) + pw.Close() + }() + var warmBuf bytes.Buffer + c := newIOCopier(nopWriteCloser{&warmBuf}) + _, _ = c.Start(context.TODO(), Env{}, pr) + _ = c.Wait() + + // Now measure: run the copy and check how many bytes were allocated. + // If the pool buffer is bypassed, a fresh 32KB buffer is allocated. + pr, pw, err = os.Pipe() + if err != nil { + t.Fatal(err) + } + go func() { + _, _ = pw.Write([]byte(payload)) + pw.Close() + }() + var buf bytes.Buffer + c = newIOCopier(nopWriteCloser{&buf}) + + // GC clears sync.Pool, so re-warm it afterward to isolate the + // measurement from pool repopulation overhead. + runtime.GC() + copyBufPool.Put(copyBufPool.New()) + + var m1, m2 runtime.MemStats + runtime.ReadMemStats(&m1) + + _, _ = c.Start(context.TODO(), Env{}, pr) + _ = c.Wait() + + runtime.GC() + runtime.ReadMemStats(&m2) + + if buf.String() != payload { + t.Fatalf("unexpected output: %q", buf.String()) + } + + allocBytes := m2.TotalAlloc - m1.TotalAlloc + // A bypassed pool buffer causes ~32KB of allocation. + // With the pool buffer working, we expect well under 32KB. + const maxBytes = 16 * 1024 + if allocBytes > maxBytes { + t.Errorf("ioCopier allocated %d bytes during copy (max %d); "+ + "pool buffer may be bypassed by *os.File WriterTo", + allocBytes, maxBytes) + } +} + +// readFromWriter is a test writer that implements io.ReaderFrom and +// records whether ReadFrom was called. +type readFromWriter struct { + bytes.Buffer + readFromCalled atomic.Bool +} + +func (w *readFromWriter) ReadFrom(r io.Reader) (int64, error) { + w.readFromCalled.Store(true) + return w.Buffer.ReadFrom(r) +} + +func (w *readFromWriter) Close() error { return nil } + +// TestIOCopierUsesReadFrom verifies that ioCopier dispatches to +// ReaderFrom when the destination writer supports it, even when +// wrapped in nopWriteCloser (as happens with WithStdout). +func TestIOCopierUsesReadFrom(t *testing.T) { + const payload = "hello readfrom\n" + + pr, pw, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + go func() { + _, _ = pw.Write([]byte(payload)) + pw.Close() + }() + + w := &readFromWriter{} + c := newIOCopier(nopWriteCloser{w}) + _, _ = c.Start(context.TODO(), Env{}, pr) + _ = c.Wait() + + if w.String() != payload { + t.Fatalf("unexpected output: %q", w.String()) + } + + if !w.readFromCalled.Load() { + t.Error("ioCopier did not call ReadFrom on destination; " + + "nopWriteCloser may be hiding the ReaderFrom interface") + } +}