Skip to content

Commit c0d12d7

Browse files
committed
Improves panic-handling
- StagePanicHandler siganture is updated to take in `any` to avoid discarding information. The return - Panic is wrapped after calling StagePanicHandler and error is returned for the stage so that we can verify the pipeline's error contains the stage panic. - Skip recovery if panic-handler is not set to ensure previous behavior remains unchanged. - Cleanup resources on any type of exit from stage.Start to ensure. - Move recovery logic to a separate method.
1 parent 9c58e5a commit c0d12d7

3 files changed

Lines changed: 25 additions & 44 deletions

File tree

pipe/function.go

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -52,34 +52,21 @@ func (s *goStage) Start(ctx context.Context, env Env, stdin io.ReadCloser) (io.R
5252

5353
go func() {
5454
defer func() {
55-
if p := recover(); p != nil {
56-
if s.panicHandler == nil {
57-
// Nothing to do, just panic
58-
panic(p)
59-
}
60-
61-
_ = w.Close()
62-
if stdin != nil {
63-
_ = stdin.Close()
55+
// Cleanup resources on exit
56+
if err := w.Close(); err != nil && s.err == nil {
57+
s.err = fmt.Errorf("error closing output pipe for stage %q: %w", s.Name(), err)
58+
}
59+
if stdin != nil {
60+
if err := stdin.Close(); err != nil && s.err == nil {
61+
s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err)
6462
}
65-
close(s.done)
66-
67-
err := FromPanicValue(p)
68-
s.err = err
69-
s.panicHandler(err)
7063
}
64+
close(s.done)
7165
}()
7266

67+
defer s.recoverPanic()
68+
7369
s.err = s.f(ctx, env, stdin, w)
74-
if err := w.Close(); err != nil && s.err == nil {
75-
s.err = fmt.Errorf("error closing output pipe for stage %q: %w", s.Name(), err)
76-
}
77-
if stdin != nil {
78-
if err := stdin.Close(); err != nil && s.err == nil {
79-
s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err)
80-
}
81-
}
82-
close(s.done)
8370
}()
8471

8572
return r, nil
@@ -89,3 +76,13 @@ func (s *goStage) Wait() error {
8976
<-s.done
9077
return s.err
9178
}
79+
80+
func (s *goStage) recoverPanic() {
81+
if s.panicHandler == nil {
82+
return
83+
}
84+
85+
if p := recover(); p != nil {
86+
s.err = s.panicHandler(p)
87+
}
88+
}

pipe/panic.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package pipe
22

3-
import "fmt"
4-
53
// StagePanicHandlerAware is an interface that Stages can implement to receive
64
// a panic handler from the pipeline. This is particularly useful for stages
75
// that execute work in a separate goroutine and need to manage panics occurring
@@ -11,17 +9,4 @@ type StagePanicHandlerAware interface {
119
}
1210

1311
// StagePanicHandler is a function that handles panics in a pipeline's stages.
14-
type StagePanicHandler func(err error)
15-
16-
// FromPanicValue converts a panic value to an error. If the panic value is
17-
// already an error, it returns it directly. Otherwise, it wraps the value in
18-
// a generic error.
19-
func FromPanicValue(p any) error {
20-
var err error
21-
if e, ok := p.(error); ok {
22-
err = e
23-
} else {
24-
err = fmt.Errorf("%v", p)
25-
}
26-
return err
27-
}
12+
type StagePanicHandler func(p any) error

pipe/pipeline_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -462,11 +462,11 @@ func TestFunction(t *testing.T) {
462462
})
463463

464464
t.Run("panic with handler", func(t *testing.T) {
465-
panickedMessage := make(chan error, 1)
466465
p := pipe.New(
467466
pipe.WithDir(dir),
468-
pipe.WithStagePanicHandler(func(err error) {
469-
panickedMessage <- err
467+
pipe.WithStagePanicHandler(func(p any) error {
468+
err := fmt.Errorf("panic handled: %v", p)
469+
return err
470470
}),
471471
)
472472
p.Add(
@@ -480,8 +480,7 @@ func TestFunction(t *testing.T) {
480480
)
481481

482482
out, err := p.Output(ctx)
483-
assert.Error(t, <-panickedMessage)
484-
assert.Error(t, err)
483+
assert.ErrorContains(t, err, "panic handled")
485484
assert.Empty(t, out)
486485
})
487486
}

0 commit comments

Comments
 (0)