Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion pipe/iocopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io"
"os"
"sync"
)

// ioCopier is a stage that copies its stdin to a specified
Expand All @@ -15,6 +16,20 @@ type ioCopier struct {
err error
}

// copyBufPool reuses 32KB buffers across io.CopyBuffer calls, avoiding
// a fresh heap allocation per copy. This matters in high-throughput
// pipelines where many ioCopier stages run concurrently.
var copyBufPool = sync.Pool{
New: func() any {
b := make([]byte, 32*1024)
return &b
},
}

// readerOnly wraps an io.Reader, hiding any other interfaces (such as
// WriterTo) so that io.CopyBuffer is forced to use the provided buffer.
type readerOnly struct{ io.Reader }

func newIOCopier(w io.WriteCloser) *ioCopier {
return &ioCopier{
w: w,
Expand All @@ -29,7 +44,31 @@ func (s *ioCopier) Name() string {
// This method always returns `nil, nil`.
func (s *ioCopier) Start(_ context.Context, _ Env, r io.ReadCloser) (io.ReadCloser, error) {
go func() {
_, err := io.Copy(s.w, r)
var err error

// Unwrap nopWriteCloser to see if the underlying writer
// supports ReaderFrom (e.g., for zero-copy network I/O).
var dst io.Writer = s.w
if nwc, ok := s.w.(nopWriteCloser); ok {
dst = nwc.Writer
}

if rf, ok := dst.(io.ReaderFrom); ok {
// Call ReadFrom directly, bypassing io.Copy's
// WriterTo check so that ReadFrom sees the
// original reader type (needed for zero-copy).
_, err = rf.ReadFrom(r)
} else {
bp := copyBufPool.Get().(*[]byte)
// Strip all interfaces except Read from r so that
// io.CopyBuffer always uses the provided pool buffer.
// Without this, *os.File's WriterTo (added in Go 1.26)
// causes CopyBuffer to call File.WriteTo, which falls
// back to io.Copy with a fresh allocation, bypassing
// the pool entirely.
_, err = io.CopyBuffer(dst, readerOnly{r}, *bp)
copyBufPool.Put(bp)
}
// We don't consider `ErrClosed` an error (FIXME: is this
// correct?):
if err != nil && !errors.Is(err, os.ErrClosed) {
Expand Down
122 changes: 122 additions & 0 deletions pipe/iocopier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package pipe

import (
"bytes"
"context"
"io"
"os"
"runtime"
"sync/atomic"
"testing"
)

// TestIOCopierPoolBufferUsed verifies that ioCopier uses the sync.Pool
// buffer rather than allocating a fresh one. On Go 1.26+, *os.File
// implements WriterTo, which causes io.CopyBuffer to bypass the
// provided pool buffer entirely. Instead, File.WriteTo →
// genericWriteTo → io.Copy allocates a fresh 32KB buffer on every call.
func TestIOCopierPoolBufferUsed(t *testing.T) {
const payload = "hello from pipe\n"

// Pre-warm the pool so Get doesn't allocate.
copyBufPool.Put(copyBufPool.New())

// Warm up: run once to stabilize lazy init.
pr, pw, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
go func() {
_, _ = pw.Write([]byte(payload))
pw.Close()
}()
var warmBuf bytes.Buffer
c := newIOCopier(nopWriteCloser{&warmBuf})
_, _ = c.Start(context.TODO(), Env{}, pr)
_ = c.Wait()

// Now measure: run the copy and check how many bytes were allocated.
// If the pool buffer is bypassed, a fresh 32KB buffer is allocated.
pr, pw, err = os.Pipe()
if err != nil {
t.Fatal(err)
}
go func() {
_, _ = pw.Write([]byte(payload))
pw.Close()
}()
var buf bytes.Buffer
c = newIOCopier(nopWriteCloser{&buf})

// GC clears sync.Pool, so re-warm it afterward to isolate the
// measurement from pool repopulation overhead.
runtime.GC()
copyBufPool.Put(copyBufPool.New())

var m1, m2 runtime.MemStats
runtime.ReadMemStats(&m1)

_, _ = c.Start(context.TODO(), Env{}, pr)
_ = c.Wait()

runtime.GC()
runtime.ReadMemStats(&m2)

if buf.String() != payload {
t.Fatalf("unexpected output: %q", buf.String())
}

allocBytes := m2.TotalAlloc - m1.TotalAlloc
// A bypassed pool buffer causes ~32KB of allocation.
// With the pool buffer working, we expect well under 32KB.
const maxBytes = 16 * 1024
if allocBytes > maxBytes {
t.Errorf("ioCopier allocated %d bytes during copy (max %d); "+
"pool buffer may be bypassed by *os.File WriterTo",
allocBytes, maxBytes)
}
}

// readFromWriter is a test writer that implements io.ReaderFrom and
// records whether ReadFrom was called.
type readFromWriter struct {
bytes.Buffer
readFromCalled atomic.Bool
}

func (w *readFromWriter) ReadFrom(r io.Reader) (int64, error) {
w.readFromCalled.Store(true)
return w.Buffer.ReadFrom(r)
}

func (w *readFromWriter) Close() error { return nil }

// TestIOCopierUsesReadFrom verifies that ioCopier dispatches to
// ReaderFrom when the destination writer supports it, even when
// wrapped in nopWriteCloser (as happens with WithStdout).
func TestIOCopierUsesReadFrom(t *testing.T) {
const payload = "hello readfrom\n"

pr, pw, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
go func() {
_, _ = pw.Write([]byte(payload))
pw.Close()
}()

w := &readFromWriter{}
c := newIOCopier(nopWriteCloser{w})
_, _ = c.Start(context.TODO(), Env{}, pr)
_ = c.Wait()

if w.String() != payload {
t.Fatalf("unexpected output: %q", w.String())
}

if !w.readFromCalled.Load() {
t.Error("ioCopier did not call ReadFrom on destination; " +
"nopWriteCloser may be hiding the ReaderFrom interface")
}
}
Loading