Skip to content

Commit 8e5666e

Browse files
authored
Refactor/remove run monitor (#127)
TestRunner: Remove RunMonitor. Make all targetHandlers explicitly return errors. Make TestRunner targets a local variable
1 parent c0c55f0 commit 8e5666e

1 file changed

Lines changed: 66 additions & 108 deletions

File tree

pkg/runner/test_runner.go

Lines changed: 66 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"encoding/json"
1111
"fmt"
1212
"sync"
13+
"sync/atomic"
1314
"time"
1415

1516
"github.com/insomniacslk/xjson"
@@ -40,13 +41,11 @@ import (
4041
type TestRunner struct {
4142
shutdownTimeout time.Duration // Time to wait for steps runners to finish a the end of the run
4243

43-
steps []*stepState // The pipeline, in order of execution
44-
targets map[string]*targetState // Target state lookup map
44+
steps []*stepState // The pipeline, in order of execution
4545

4646
// One mutex to rule them all, used to serialize access to all the state above.
4747
// Could probably be split into several if necessary.
48-
mu sync.Mutex
49-
monitorCond *sync.Cond // Used to notify the monitor about changes
48+
mu sync.Mutex
5049
}
5150

5251
// targetStepPhase denotes progression of a target through a step
@@ -73,8 +72,6 @@ type targetState struct {
7372
CurPhase targetStepPhase `json:"P,omitempty"` // Current phase of step execution.
7473
Res *xjson.Error `json:"R,omitempty"` // Final result, if reached the end state.
7574
StepsVariables map[string]stepVariables `json:"V,omitempty"` // maps steps onto emitted variables of each
76-
77-
handlerRunning bool
7875
}
7976

8077
// resumeStateStruct is used to serialize runner state to be resumed in the future.
@@ -85,15 +82,15 @@ type resumeStateStruct struct {
8582
}
8683

8784
// Resume state version we are compatible with.
88-
// When imcompatible changes are made to the state format, bump this.
85+
// When incompatible changes are made to the state format, bump this.
8986
// Restoring incompatible state will abort the job.
9087
const resumeStateStructVersion = 2
9188

9289
type TestStepEventsEmitterFactory interface {
9390
New(testStepLabel string) testevent.Emitter
9491
}
9592

96-
// Run is the main enty point of the runner.
93+
// Run is the main entry point of the runner.
9794
func (tr *TestRunner) Run(
9895
ctx xcontext.Context,
9996
t *test.Test, targets []*target.Target,
@@ -105,6 +102,8 @@ func (tr *TestRunner) Run(
105102
runCtx, runCancel := xcontext.WithCancel(ctx)
106103
defer runCancel()
107104

105+
var targetStates map[string]*targetState
106+
108107
// If we have state to resume, parse it.
109108
var rs resumeStateStruct
110109
if len(resumeState) > 0 {
@@ -116,25 +115,25 @@ func (tr *TestRunner) Run(
116115
return nil, nil, fmt.Errorf("incompatible resume state version %d (want %d)",
117116
rs.Version, resumeStateStructVersion)
118117
}
119-
tr.targets = rs.Targets
118+
targetStates = rs.Targets
120119
}
121120

122121
// Set up the targets
123-
if tr.targets == nil {
124-
tr.targets = make(map[string]*targetState)
122+
if targetStates == nil {
123+
targetStates = make(map[string]*targetState)
125124
}
126125

127126
// Initialize remaining fields of the target structures,
128127
// build the map and kick off target processing.
129128
for _, tgt := range targets {
130-
tgs := tr.targets[tgt.ID]
129+
tgs := targetStates[tgt.ID]
131130
if tgs == nil {
132131
tgs = &targetState{
133132
CurPhase: targetStepPhaseInit,
134133
}
135134
}
136135
tgs.tgt = tgt
137-
tr.targets[tgt.ID] = tgs
136+
targetStates[tgt.ID] = tgs
138137
}
139138

140139
stepOutputs, err := newTestStepsVariables(t.TestStepsBundles)
@@ -143,14 +142,15 @@ func (tr *TestRunner) Run(
143142
return nil, nil, err
144143
}
145144

146-
for targetID, targetState := range tr.targets {
145+
for targetID, targetState := range targetStates {
147146
if err := stepOutputs.initTargetStepsVariables(targetID, targetState.StepsVariables); err != nil {
148147
ctx.Errorf("Failed to initialise test steps variables for target: %s: %v", targetID, err)
149148
return nil, nil, err
150149
}
151150
}
152151

153152
// Set up the pipeline
153+
stepsErrorsCh := make(chan error, len(t.TestStepsBundles))
154154
for i, sb := range t.TestStepsBundles {
155155
var srs json.RawMessage
156156
if i < len(rs.StepResumeState) && string(rs.StepResumeState[i]) != "null" {
@@ -159,38 +159,63 @@ func (tr *TestRunner) Run(
159159

160160
// Collect "processed" targets in resume state for a StepRunner
161161
var resumeStateTargets []target.Target
162+
162163
var stepTargetsCount int
163-
for _, tgt := range tr.targets {
164+
for _, tgt := range targetStates {
164165
if tgt.CurStep <= i {
165166
stepTargetsCount++
166167
}
168+
167169
if tgt.CurStep == i && tgt.CurPhase == targetStepPhaseRun {
168170
resumeStateTargets = append(resumeStateTargets, *tgt.tgt)
169171
}
170172
}
171173

172174
// Step handlers will be started from target handlers as targets reach them.
173175
tr.steps = append(tr.steps, newStepState(i, stepTargetsCount, sb, emitterFactory, stepOutputs, srs, resumeStateTargets, func(err error) {
174-
tr.monitorCond.Signal()
176+
stepsErrorsCh <- err
175177
}))
176178
}
177179

178-
for _, tgs := range tr.targets {
179-
tgs.handlerRunning = true
180-
181-
go func(state *targetState) {
182-
tr.targetHandler(runCtx, state)
183-
}(tgs)
180+
targetErrors := make(chan error, len(targetStates))
181+
targetErrorsCount := int32(len(targetStates))
182+
for _, tgs := range targetStates {
183+
go func(ctx xcontext.Context, state *targetState, targetErrors chan<- error) {
184+
targetErr := tr.handleTarget(ctx, state)
185+
if targetErr != nil {
186+
runCtx.Errorf("Target %s reported an error: %v", state.tgt.ID, targetErr)
187+
}
188+
targetErrors <- targetErr
189+
if atomic.AddInt32(&targetErrorsCount, -1) == 0 {
190+
close(targetErrors)
191+
}
192+
}(runCtx, tgs, targetErrors)
184193
}
185194

186-
// Run until no more progress can be made.
187-
runErr := tr.runMonitor(ctx)
188-
if runErr != nil {
189-
ctx.Errorf("monitor returned error: %q, canceling", runErr)
190-
for _, ss := range tr.steps {
191-
ss.ForceStop()
195+
runErr := func() error {
196+
var resultErr error
197+
for {
198+
var (
199+
runErr error
200+
ok bool
201+
)
202+
select {
203+
case runErr, ok = <-targetErrors:
204+
if !ok {
205+
return resultErr
206+
}
207+
case runErr = <-stepsErrorsCh:
208+
}
209+
if runErr != nil && runErr != xcontext.ErrPaused && resultErr == nil {
210+
resultErr = runErr
211+
212+
ctx.Errorf("Got error: %v, canceling", runErr)
213+
for _, ss := range tr.steps {
214+
ss.ForceStop()
215+
}
216+
}
192217
}
193-
}
218+
}()
194219

195220
// Wait for step runners and readers to exit.
196221
stepResumeStates, err := tr.waitSteps(ctx)
@@ -216,7 +241,7 @@ func (tr *TestRunner) Run(
216241
resumeOk := runErr == nil
217242
numInFlightTargets := 0
218243
for i, tgt := range targets {
219-
tgs := tr.targets[tgt.ID]
244+
tgs := targetStates[tgt.ID]
220245
tgs.StepsVariables, err = stepOutputs.getTargetStepsVariables(tgt.ID)
221246
if err != nil {
222247
ctx.Errorf("Failed to get steps variables: %v", err)
@@ -254,7 +279,7 @@ func (tr *TestRunner) Run(
254279
}
255280
rs := &resumeStateStruct{
256281
Version: resumeStateStructVersion,
257-
Targets: tr.targets,
282+
Targets: targetStates,
258283
StepResumeState: stepResumeStates,
259284
}
260285
resumeState, runErr = json.Marshal(rs)
@@ -268,7 +293,7 @@ func (tr *TestRunner) Run(
268293
}
269294

270295
targetsResults := make(map[string]error)
271-
for id, state := range tr.targets {
296+
for id, state := range targetStates {
272297
if state.Res != nil {
273298
targetsResults[id] = state.Res.Unwrap()
274299
} else if state.CurStep == len(tr.steps)-1 && state.CurPhase == targetStepPhaseEnd {
@@ -310,15 +335,11 @@ func (tr *TestRunner) waitSteps(ctx xcontext.Context) ([]json.RawMessage, error)
310335
return resumeStates, resultErr
311336
}
312337

313-
// targetHandler takes a single target through each step of the pipeline in sequence.
338+
// handleTarget takes a single target through each step of the pipeline in sequence.
314339
// It injects the target, waits for the result, then moves on to the next step.
315-
func (tr *TestRunner) targetHandler(ctx xcontext.Context, tgs *targetState) {
340+
func (tr *TestRunner) handleTarget(ctx xcontext.Context, tgs *targetState) error {
316341
lastDecremented := tgs.CurStep - 1
317342
defer func() {
318-
tr.mu.Lock()
319-
tgs.handlerRunning = false
320-
tr.mu.Unlock()
321-
tr.monitorCond.Signal()
322343
ctx.Debugf("%s: target handler finished", tgs)
323344

324345
for i := lastDecremented + 1; i < len(tr.steps); i++ {
@@ -356,8 +377,10 @@ loop:
356377
tr.mu.Unlock()
357378
break loop
358379
default:
359-
ctx.Errorf("%s: invalid phase %s", tgs, tgs.CurPhase)
360-
break loop
380+
tr.mu.Unlock()
381+
err := fmt.Errorf("%s: invalid phase %s", tgs, tgs.CurPhase)
382+
ctx.Errorf("%v", err)
383+
return err
361384
}
362385
tr.mu.Unlock()
363386
// Make sure we have a step runner active. If not, start one.
@@ -377,7 +400,6 @@ loop:
377400
tgs.CurPhase = targetStepPhaseRun
378401
}
379402
tr.mu.Unlock()
380-
tr.monitorCond.Signal()
381403
}
382404

383405
tr.steps[i].DecreaseLeftTargets()
@@ -395,7 +417,6 @@ loop:
395417
}
396418
tgs.CurPhase = targetStepPhaseEnd
397419
tr.mu.Unlock()
398-
tr.monitorCond.Signal()
399420
err = nil
400421
case <-ss.NotifyStopped():
401422
err = ss.GetError()
@@ -412,12 +433,8 @@ loop:
412433
ctx.Debugf("%s: paused 1", tgs)
413434
case xcontext.ErrCanceled:
414435
ctx.Debugf("%s: canceled 1", tgs)
415-
default:
416-
// TODO: this is a logical error. The step might not have failed.
417-
// targetHandler should return an error instead that should be tracked in runMonitor
418-
ss.SetError(ctx, err)
419436
}
420-
break
437+
return err
421438
}
422439

423440
tr.mu.Lock()
@@ -432,71 +449,13 @@ loop:
432449
}
433450
tr.mu.Unlock()
434451
}
435-
}
436-
437-
// checkStepRunnersFailed checks if any step runner has encountered an error.
438-
func (tr *TestRunner) checkStepRunnersFailed() error {
439-
for _, ss := range tr.steps {
440-
runErr := ss.GetError()
441-
switch runErr {
442-
case nil, xcontext.ErrPaused:
443-
default:
444-
return runErr
445-
}
446-
}
447452
return nil
448453
}
449454

450-
// runMonitor monitors progress of targets through the pipeline
451-
// and closes input channels of the steps to indicate that no more are expected.
452-
// It also monitors steps for critical errors and cancels the whole run.
453-
// Note: input channels remain open when cancellation is requested,
454-
// plugins are expected to handle it explicitly.
455-
func (tr *TestRunner) runMonitor(ctx xcontext.Context) error {
456-
ctx.Debugf("monitor: active")
457-
458-
//
459-
// Monitor steps for incoming errors
460-
//
461-
tr.mu.Lock()
462-
defer tr.mu.Unlock()
463-
464-
var (
465-
pass int
466-
runErr error
467-
)
468-
tgtLoop:
469-
for ; runErr == nil; pass++ {
470-
if runErr = tr.checkStepRunnersFailed(); runErr != nil {
471-
break tgtLoop
472-
}
473-
done := true
474-
for _, tgs := range tr.targets {
475-
ctx.Debugf("monitor pass %d: %s", pass, tgs)
476-
if tgs.handlerRunning && (tgs.CurStep < len(tr.steps) || tgs.CurPhase != targetStepPhaseEnd) {
477-
if tgs.CurPhase == targetStepPhaseRun && tr.steps[tgs.CurStep].GetError() == xcontext.ErrPaused {
478-
// It's been paused, this is fine.
479-
continue
480-
}
481-
done = false
482-
break
483-
}
484-
}
485-
if done {
486-
break
487-
}
488-
// Wait for notification: as progress is being made, we get notified.
489-
tr.monitorCond.Wait()
490-
}
491-
ctx.Debugf("monitor: finished, %v", runErr)
492-
return runErr
493-
}
494-
495455
func NewTestRunnerWithTimeouts(shutdownTimeout time.Duration) *TestRunner {
496456
tr := &TestRunner{
497457
shutdownTimeout: shutdownTimeout,
498458
}
499-
tr.monitorCond = sync.NewCond(&tr.mu)
500459
return tr
501460
}
502461

@@ -533,7 +492,6 @@ func (tgs *targetState) String() string {
533492
} else {
534493
resText = "<nil>"
535494
}
536-
finished := !tgs.handlerRunning
537-
return fmt.Sprintf("[%s %d %s %t %s]",
538-
tgs.tgt, tgs.CurStep, tgs.CurPhase, finished, resText)
495+
return fmt.Sprintf("[%s %d %s %s]",
496+
tgs.tgt, tgs.CurStep, tgs.CurPhase, resText)
539497
}

0 commit comments

Comments
 (0)