Skip to content

Commit b1bca08

Browse files
Add MemoryLimitWithObserver to combine limit and observer in one goroutine
MemoryLimitWithObserver provides the same functionality as composing MemoryLimit(MemoryObserver(stage, h), limit, h) but uses a single goroutine instead of two. The combined watcher polls GetRSSAnon once per second, enforces the byte limit, tracks peak RSS, and logs peak memory usage on exit. In gitrpcd, each upload-pack request wraps its git subprocess with MemoryLimit(MemoryObserver(...)), spawning two goroutines that both poll the same PID's RSS every second. During a goroutine spike on github-dfs-dfee0d0 (2026-03-20 ~08:04 UTC), this contributed ~415 goroutines out of 4,430 total. Using the combined API eliminates ~200 of those goroutines. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 903aad4 commit b1bca08

2 files changed

Lines changed: 209 additions & 0 deletions

File tree

pipe/memorylimit.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,104 @@ func killAtLimit(byteLimit uint64, eventHandler func(e *Event)) memoryWatchFunc
8989
}
9090
}
9191

92+
// MemoryLimitWithObserver combines MemoryLimit and MemoryObserver into a single
93+
// stage that uses one goroutine instead of two. It watches the memory usage of
94+
// the stage, kills the process if it exceeds byteLimit, and logs peak memory
95+
// usage when the stage exits.
96+
//
97+
// Use this instead of MemoryLimit(MemoryObserver(stage, h), limit, h) to save
98+
// one goroutine per pipeline stage.
99+
func MemoryLimitWithObserver(stage Stage, byteLimit uint64, eventHandler func(e *Event)) Stage {
100+
limitableStage, ok := stage.(LimitableStage)
101+
if !ok {
102+
eventHandler(&Event{
103+
Command: stage.Name(),
104+
Msg: "invalid pipe.MemoryLimitWithObserver usage",
105+
Err: fmt.Errorf("invalid pipe.MemoryLimitWithObserver usage"),
106+
})
107+
return stage
108+
}
109+
110+
return &memoryWatchStage{
111+
nameSuffix: " with memory limit",
112+
stage: limitableStage,
113+
watch: killAtLimitAndObserve(byteLimit, eventHandler),
114+
}
115+
}
116+
117+
func killAtLimitAndObserve(byteLimit uint64, eventHandler func(e *Event)) memoryWatchFunc {
118+
return func(ctx context.Context, stage LimitableStage) {
119+
var (
120+
maxRSS uint64
121+
samples, errCount, consecutiveErrors int
122+
)
123+
124+
t := time.NewTicker(memoryPollInterval)
125+
defer t.Stop()
126+
127+
for {
128+
select {
129+
case <-ctx.Done():
130+
eventHandler(&Event{
131+
Command: stage.Name(),
132+
Msg: "peak memory usage",
133+
Context: map[string]interface{}{
134+
"max_rss_bytes": maxRSS,
135+
"samples": samples,
136+
"errors": errCount,
137+
},
138+
})
139+
return
140+
case <-t.C:
141+
rss, err := stage.GetRSSAnon(ctx)
142+
if err != nil {
143+
if !errors.Is(err, errProcessInfoMissing) {
144+
errCount++
145+
consecutiveErrors++
146+
if consecutiveErrors == 2 {
147+
eventHandler(&Event{
148+
Command: stage.Name(),
149+
Msg: "error getting RSS",
150+
Err: err,
151+
})
152+
}
153+
}
154+
continue
155+
}
156+
157+
consecutiveErrors = 0
158+
samples++
159+
if rss > maxRSS {
160+
maxRSS = rss
161+
}
162+
163+
if rss >= byteLimit {
164+
eventHandler(&Event{
165+
Command: stage.Name(),
166+
Msg: "stage exceeded allowed memory use",
167+
Err: fmt.Errorf("stage exceeded allowed memory use"),
168+
Context: map[string]interface{}{
169+
"limit": byteLimit,
170+
"used": rss,
171+
},
172+
})
173+
eventHandler(&Event{
174+
Command: stage.Name(),
175+
Msg: "peak memory usage",
176+
Context: map[string]interface{}{
177+
"max_rss_bytes": maxRSS,
178+
"samples": samples,
179+
"errors": errCount,
180+
},
181+
})
182+
stage.Kill(ErrMemoryLimitExceeded)
183+
return
184+
}
185+
}
186+
}
187+
}
188+
}
189+
92190
// MemoryObserver watches memory use of the stage and logs the maximum
93191
// value when the stage exits.
94192
func MemoryObserver(stage Stage, eventHandler func(e *Event)) Stage {

pipe/memorylimit_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,117 @@ func (w closeWrapper) Close() error {
121121
return w.close()
122122
}
123123

124+
func TestMemoryLimitWithObserverSimple(t *testing.T) {
125+
t.Parallel()
126+
msg, err := testMemoryLimitWithObserver(t, 400, 10_000_000, pipe.Command("less"))
127+
assert.Contains(t, msg, "exceeded allowed memory")
128+
assert.Contains(t, msg, "limit=10000000")
129+
require.ErrorContains(t, err, "memory limit exceeded")
130+
}
131+
132+
func TestMemoryLimitWithObserverTreeMem(t *testing.T) {
133+
t.Parallel()
134+
msg, err := testMemoryLimitWithObserver(t, 400, 10_000_000, pipe.Command("sh", "-c", "less; :"))
135+
assert.Contains(t, msg, "exceeded allowed memory")
136+
assert.Contains(t, msg, "limit=10000000")
137+
require.ErrorContains(t, err, "memory limit exceeded")
138+
}
139+
140+
func TestMemoryLimitWithObserverBelowLimit(t *testing.T) {
141+
t.Parallel()
142+
rss := testMemoryLimitWithObserverBelowLimit(t, 400, pipe.Command("less"))
143+
require.Greater(t, rss, 400_000_000)
144+
}
145+
146+
func TestMemoryLimitWithObserverBelowLimitTreeMem(t *testing.T) {
147+
t.Parallel()
148+
rss := testMemoryLimitWithObserverBelowLimit(t, 400, pipe.Command("sh", "-c", "less; :"))
149+
require.Greater(t, rss, 400_000_000)
150+
}
151+
152+
func TestMemoryLimitWithObserverLogsPeakOnKill(t *testing.T) {
153+
t.Parallel()
154+
msg, err := testMemoryLimitWithObserver(t, 400, 10_000_000, pipe.Command("less"))
155+
// Verify both limit-exceeded AND peak memory are logged (matching
156+
// the behavior of MemoryLimit(MemoryObserver(...)))
157+
assert.Contains(t, msg, "exceeded allowed memory")
158+
assert.Contains(t, msg, "peak memory usage")
159+
require.ErrorContains(t, err, "memory limit exceeded")
160+
}
161+
162+
func testMemoryLimitWithObserverBelowLimit(t *testing.T, mbs int, stage pipe.Stage) int {
163+
ctx := context.Background()
164+
165+
stdinReader, stdinWriter := io.Pipe()
166+
167+
devNull, err := os.OpenFile("/dev/null", os.O_WRONLY, 0)
168+
require.NoError(t, err)
169+
170+
buf := &bytes.Buffer{}
171+
logger := log.New(buf, "testMemoryObserver", log.Ldate|log.Ltime)
172+
173+
// Use a high limit so it won't be hit — we want to verify the observer part
174+
p := pipe.New(pipe.WithDir("/"), pipe.WithStdin(stdinReader), pipe.WithStdout(devNull))
175+
p.Add(pipe.MemoryLimitWithObserver(stage, 100*1024*1024*1024, LogEventHandler(logger)))
176+
require.NoError(t, p.Start(ctx))
177+
178+
var bytes [1_000_000]byte
179+
for i := 0; i < mbs; i++ {
180+
n, err := stdinWriter.Write(bytes[:])
181+
require.NoError(t, err)
182+
require.Equal(t, len(bytes), n)
183+
}
184+
185+
time.Sleep(2 * time.Second)
186+
187+
require.NoError(t, stdinWriter.Close())
188+
require.NoError(t, p.Wait())
189+
190+
// Verify that peak memory usage was logged (the observer part)
191+
output := buf.String()
192+
assert.Contains(t, output, "peak memory usage")
193+
194+
return maxBytes(output)
195+
}
196+
197+
func testMemoryLimitWithObserver(t *testing.T, mbs int, limit uint64, stage pipe.Stage) (string, error) {
198+
ctx := context.Background()
199+
200+
stdinReader, stdinWriter := io.Pipe()
201+
202+
devNull, err := os.OpenFile("/dev/null", os.O_WRONLY, 0)
203+
require.NoError(t, err)
204+
205+
closedErr := fmt.Errorf("stdout was closed")
206+
stdout := closeWrapper{
207+
Writer: devNull,
208+
close: func() error {
209+
require.NoError(t, stdinReader.CloseWithError(closedErr))
210+
return nil
211+
},
212+
}
213+
214+
buf := &bytes.Buffer{}
215+
logger := log.New(buf, "testMemoryObserver", log.Ldate|log.Ltime)
216+
217+
p := pipe.New(pipe.WithDir("/"), pipe.WithStdin(stdinReader), pipe.WithStdoutCloser(stdout))
218+
p.Add(pipe.MemoryLimitWithObserver(stage, limit, LogEventHandler(logger)))
219+
require.NoError(t, p.Start(ctx))
220+
221+
var bytes [1_000_000]byte
222+
for i := 0; i < mbs; i++ {
223+
_, err := stdinWriter.Write(bytes[:])
224+
if err != nil {
225+
require.ErrorIs(t, err, closedErr)
226+
}
227+
}
228+
229+
require.NoError(t, stdinWriter.Close())
230+
err = p.Wait()
231+
232+
return buf.String(), err
233+
}
234+
124235
func testMemoryLimit(t *testing.T, mbs int, limit uint64, stage pipe.Stage) (string, error) {
125236
ctx := context.Background()
126237

0 commit comments

Comments
 (0)