Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions pipe/iocopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ var copyBufPool = sync.Pool{
},
}

// readerOnly wraps an io.Reader, hiding any other interfaces (such as
// WriterTo) so that io.CopyBuffer is forced to use the provided buffer.
type readerOnly struct{ io.Reader }

func newIOCopier(w io.WriteCloser) *ioCopier {
return &ioCopier{
w: w,
Expand All @@ -40,9 +44,31 @@ func (s *ioCopier) Name() string {
// This method always returns `nil, nil`.
func (s *ioCopier) Start(_ context.Context, _ Env, r io.ReadCloser) (io.ReadCloser, error) {
go func() {
bp := copyBufPool.Get().(*[]byte)
_, err := io.CopyBuffer(s.w, r, *bp)
copyBufPool.Put(bp)
var err error

// Unwrap nopWriteCloser to see if the underlying writer
// supports ReaderFrom (e.g., for zero-copy network I/O).
var dst io.Writer = s.w
if nwc, ok := s.w.(nopWriteCloser); ok {
dst = nwc.Writer
}

if rf, ok := dst.(io.ReaderFrom); ok {
// Call ReadFrom directly, bypassing io.Copy's
// WriterTo check so that ReadFrom sees the
// original reader type (needed for zero-copy).
_, err = rf.ReadFrom(r)
} else {
bp := copyBufPool.Get().(*[]byte)
// Strip all interfaces except Read from r so that
// io.CopyBuffer always uses the provided pool buffer.
// Without this, *os.File's WriterTo (added in Go 1.26)
// causes CopyBuffer to call File.WriteTo, which falls
// back to io.Copy with a fresh allocation, bypassing
// the pool entirely.
_, err = io.CopyBuffer(dst, readerOnly{r}, *bp)
copyBufPool.Put(bp)
}
// We don't consider `ErrClosed` an error (FIXME: is this
// correct?):
if err != nil && !errors.Is(err, os.ErrClosed) {
Expand Down
121 changes: 121 additions & 0 deletions pipe/iocopier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package pipe

import (
"bytes"
"io"
"os"
"runtime"
"sync/atomic"
"testing"
)

// TestIOCopierPoolBufferUsed verifies that ioCopier uses the sync.Pool
// buffer rather than allocating a fresh one. On Go 1.26+, *os.File
// implements WriterTo, which causes io.CopyBuffer to bypass the
// provided pool buffer entirely. Instead, File.WriteTo →
// genericWriteTo → io.Copy allocates a fresh 32KB buffer on every call.
func TestIOCopierPoolBufferUsed(t *testing.T) {
Comment on lines +13 to +18
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is meant to guard against the Go 1.26+ *os.File WriterTo path, but the module targets Go 1.24 (go.mod) where *os.File likely doesn't implement io.WriterTo, so the failure mode may not be exercised in CI. Consider making the test version-independent (e.g., use a custom reader that implements io.WriterTo and would allocate/counter-increment if WriteTo is invoked, or explicitly skip unless the pipe reader implements io.WriterTo).

Copilot uses AI. Check for mistakes.
const payload = "hello from pipe\n"

// Pre-warm the pool so Get doesn't allocate.
copyBufPool.Put(copyBufPool.New())

// Warm up: run once to stabilize lazy init.
pr, pw, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
go func() {
pw.Write([]byte(payload))

Check failure on line 29 in pipe/iocopier_test.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value of `pw.Write` is not checked (errcheck)
pw.Close()
}()
var warmBuf bytes.Buffer
c := newIOCopier(nopWriteCloser{&warmBuf})
c.Start(nil, Env{}, pr)

Check failure on line 34 in pipe/iocopier_test.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value of `c.Start` is not checked (errcheck)
c.Wait()

Check failure on line 35 in pipe/iocopier_test.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value of `c.Wait` is not checked (errcheck)
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test calls pass a nil context and ignore Start/Wait return values. Using context.Background() (or a cancellable ctx) and asserting Start/Wait errors avoids panics if ioCopier later starts using ctx and prevents silently ignoring failures.

This issue also appears in the following locations of the same file:

  • line 58
  • line 108

Copilot uses AI. Check for mistakes.

// Now measure: run the copy and check how many bytes were allocated.
// If the pool buffer is bypassed, a fresh 32KB buffer is allocated.
pr, pw, err = os.Pipe()
if err != nil {
t.Fatal(err)
}
go func() {
pw.Write([]byte(payload))

Check failure on line 44 in pipe/iocopier_test.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value of `pw.Write` is not checked (errcheck)
pw.Close()
}()
var buf bytes.Buffer
c = newIOCopier(nopWriteCloser{&buf})

// GC clears sync.Pool, so re-warm it afterward to isolate the
// measurement from pool repopulation overhead.
runtime.GC()
copyBufPool.Put(copyBufPool.New())

var m1, m2 runtime.MemStats
runtime.ReadMemStats(&m1)

c.Start(nil, Env{}, pr)

Check failure on line 58 in pipe/iocopier_test.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value of `c.Start` is not checked (errcheck)
c.Wait()

Check failure on line 59 in pipe/iocopier_test.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value of `c.Wait` is not checked (errcheck)

runtime.GC()
runtime.ReadMemStats(&m2)

if buf.String() != payload {
t.Fatalf("unexpected output: %q", buf.String())
}

allocBytes := m2.TotalAlloc - m1.TotalAlloc
// A bypassed pool buffer causes ~32KB of allocation.
// With the pool buffer working, we expect well under 32KB.
const maxBytes = 16 * 1024
if allocBytes > maxBytes {
t.Errorf("ioCopier allocated %d bytes during copy (max %d); "+
"pool buffer may be bypassed by *os.File WriterTo",
allocBytes, maxBytes)
}
}

// readFromWriter is a test writer that implements io.ReaderFrom and
// records whether ReadFrom was called.
type readFromWriter struct {
bytes.Buffer
readFromCalled atomic.Bool
}

func (w *readFromWriter) ReadFrom(r io.Reader) (int64, error) {
w.readFromCalled.Store(true)
return w.Buffer.ReadFrom(r)
}

func (w *readFromWriter) Close() error { return nil }

// TestIOCopierUsesReadFrom verifies that ioCopier dispatches to
// ReaderFrom when the destination writer supports it, even when
// wrapped in nopWriteCloser (as happens with WithStdout).
func TestIOCopierUsesReadFrom(t *testing.T) {
const payload = "hello readfrom\n"

pr, pw, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
go func() {
pw.Write([]byte(payload))

Check failure on line 104 in pipe/iocopier_test.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value of `pw.Write` is not checked (errcheck)
pw.Close()
}()

w := &readFromWriter{}
c := newIOCopier(nopWriteCloser{w})
c.Start(nil, Env{}, pr)

Check failure on line 110 in pipe/iocopier_test.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value of `c.Start` is not checked (errcheck)
c.Wait()

Check failure on line 111 in pipe/iocopier_test.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value of `c.Wait` is not checked (errcheck)

if w.Buffer.String() != payload {

Check failure on line 113 in pipe/iocopier_test.go

View workflow job for this annotation

GitHub Actions / golangci

QF1008: could remove embedded field "Buffer" from selector (staticcheck)
t.Fatalf("unexpected output: %q", w.Buffer.String())
}

if !w.readFromCalled.Load() {
t.Error("ioCopier did not call ReadFrom on destination; " +
"nopWriteCloser may be hiding the ReaderFrom interface")
}
}
Loading