Skip to content

Commit fe65fea

Browse files
authored
Merge pull request #126 from linuxboot/refactor/remove_step_stopping_loop
TestRunner: Remove step stopping loop in RunMonitor
2 parents a3f975d + e9a9bd1 commit fe65fea

2 files changed

Lines changed: 42 additions & 63 deletions

File tree

pkg/runner/step_state.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ type stepState struct {
2121
stepIndex int // Index of this step in the pipeline.
2222
sb test.TestStepBundle // The test bundle.
2323

24-
ev testevent.Emitter
25-
tsv *testStepsVariables
26-
stepRunner *StepRunner
27-
addTarget AddTargetToStep
28-
stopped chan struct{}
24+
ev testevent.Emitter
25+
tsv *testStepsVariables
26+
stepRunner *StepRunner
27+
addTarget AddTargetToStep
28+
leftTargetsCounter int // the number of targets that will be assigned to the step, when reaches 0 the stepRunner should be stopped
29+
stopped chan struct{}
2930

3031
resumeState json.RawMessage // Resume state passed to and returned by the Run method.
3132
resumeStateTargets []target.Target // Targets that were being processed during pause.
@@ -36,6 +37,7 @@ type stepState struct {
3637

3738
func newStepState(
3839
stepIndex int,
40+
totalTargets int,
3941
sb test.TestStepBundle,
4042
emitterFactory TestStepEventsEmitterFactory,
4143
tsv *testStepsVariables,
@@ -45,6 +47,7 @@ func newStepState(
4547
) *stepState {
4648
return &stepState{
4749
stepIndex: stepIndex,
50+
leftTargetsCounter: totalTargets,
4851
sb: sb,
4952
ev: emitterFactory.New(sb.TestStepLabel),
5053
tsv: tsv,
@@ -75,8 +78,18 @@ func (ss *stepState) WaitResults(ctx context.Context) (stepResult StepResult, er
7578
return ss.stepRunner.WaitResults(ctx)
7679
}
7780

78-
func (ss *stepState) Stop() {
79-
ss.stepRunner.Stop()
81+
// DecreaseLeftTargets stops step runner when there are no more targets left for it in the scope of a Test
82+
func (ss *stepState) DecreaseLeftTargets() {
83+
ss.mu.Lock()
84+
defer ss.mu.Unlock()
85+
86+
ss.leftTargetsCounter--
87+
if ss.leftTargetsCounter < 0 {
88+
panic(fmt.Sprintf("leftTargetsCounter should be >= 0 but is '%d'", ss.leftTargetsCounter))
89+
}
90+
if ss.leftTargetsCounter == 0 {
91+
ss.stepRunner.Stop()
92+
}
8093
}
8194

8295
func (ss *stepState) ForceStop() {

pkg/runner/test_runner.go

Lines changed: 22 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ func (tr *TestRunner) Run(
126126

127127
// Initialize remaining fields of the target structures,
128128
// build the map and kick off target processing.
129-
minStep := len(tr.steps)
130129
for _, tgt := range targets {
131130
tgs := tr.targets[tgt.ID]
132131
if tgs == nil {
@@ -136,10 +135,6 @@ func (tr *TestRunner) Run(
136135
}
137136
tgs.tgt = tgt
138137
tr.targets[tgt.ID] = tgs
139-
140-
if tgs.CurStep < minStep {
141-
minStep = tgs.CurStep
142-
}
143138
}
144139

145140
stepOutputs, err := newTestStepsVariables(t.TestStepsBundles)
@@ -164,14 +159,18 @@ func (tr *TestRunner) Run(
164159

165160
// Collect "processed" targets in resume state for a StepRunner
166161
var resumeStateTargets []target.Target
162+
var stepTargetsCount int
167163
for _, tgt := range tr.targets {
164+
if tgt.CurStep <= i {
165+
stepTargetsCount++
166+
}
168167
if tgt.CurStep == i && tgt.CurPhase == targetStepPhaseRun {
169168
resumeStateTargets = append(resumeStateTargets, *tgt.tgt)
170169
}
171170
}
172171

173172
// Step handlers will be started from target handlers as targets reach them.
174-
tr.steps = append(tr.steps, newStepState(i, sb, emitterFactory, stepOutputs, srs, resumeStateTargets, func(err error) {
173+
tr.steps = append(tr.steps, newStepState(i, stepTargetsCount, sb, emitterFactory, stepOutputs, srs, resumeStateTargets, func(err error) {
175174
tr.monitorCond.Signal()
176175
}))
177176
}
@@ -185,7 +184,7 @@ func (tr *TestRunner) Run(
185184
}
186185

187186
// Run until no more progress can be made.
188-
runErr := tr.runMonitor(ctx, minStep)
187+
runErr := tr.runMonitor(ctx)
189188
if runErr != nil {
190189
ctx.Errorf("monitor returned error: %q, canceling", runErr)
191190
for _, ss := range tr.steps {
@@ -314,12 +313,17 @@ func (tr *TestRunner) waitSteps(ctx xcontext.Context) ([]json.RawMessage, error)
314313
// targetHandler takes a single target through each step of the pipeline in sequence.
315314
// It injects the target, waits for the result, then moves on to the next step.
316315
func (tr *TestRunner) targetHandler(ctx xcontext.Context, tgs *targetState) {
316+
lastDecremented := tgs.CurStep - 1
317317
defer func() {
318318
tr.mu.Lock()
319319
tgs.handlerRunning = false
320320
tr.mu.Unlock()
321321
tr.monitorCond.Signal()
322322
ctx.Debugf("%s: target handler finished", tgs)
323+
324+
for i := lastDecremented + 1; i < len(tr.steps); i++ {
325+
tr.steps[i].DecreaseLeftTargets()
326+
}
323327
}()
324328

325329
ctx = ctx.WithField("target", tgs.tgt.ID)
@@ -365,6 +369,7 @@ loop:
365369
ctx.Debugf("%s: injecting into %s", tgs, ss)
366370
targetNotifier, err = ss.InjectTarget(ctx, tgs.tgt)
367371
}
372+
368373
if err == nil {
369374
tr.mu.Lock()
370375
// By the time we get here the target could have been processed and result posted already, hence the check.
@@ -374,6 +379,10 @@ loop:
374379
tr.mu.Unlock()
375380
tr.monitorCond.Signal()
376381
}
382+
383+
tr.steps[i].DecreaseLeftTargets()
384+
lastDecremented = i
385+
377386
// Await result. It will be communicated to us by the step runner
378387
// and returned in tgs.res.
379388
if err == nil {
@@ -443,62 +452,19 @@ func (tr *TestRunner) checkStepRunnersFailed() error {
443452
// It also monitors steps for critical errors and cancels the whole run.
444453
// Note: input channels remain open when cancellation is requested,
445454
// plugins are expected to handle it explicitly.
446-
func (tr *TestRunner) runMonitor(ctx xcontext.Context, minStep int) error {
455+
func (tr *TestRunner) runMonitor(ctx xcontext.Context) error {
447456
ctx.Debugf("monitor: active")
448-
if minStep < len(tr.steps) {
449-
ctx.Debugf("monitor: starting at step %s", tr.steps[minStep])
450-
}
451-
452-
// Run the main loop.
453-
runErr := func() error {
454-
tr.mu.Lock()
455-
defer tr.mu.Unlock()
456-
457-
pass := 1
458-
var runErr error
459-
stepLoop:
460-
for step := minStep; step < len(tr.steps); pass++ {
461-
ss := tr.steps[step]
462-
ctx.Debugf("monitor pass %d: current step %s", pass, ss)
463-
// Check if all the targets have either made it past the injection phase or terminated.
464-
ok := true
465-
for _, tgs := range tr.targets {
466-
ctx.Debugf("monitor pass %d: %s: %s", pass, ss, tgs)
467-
if !tgs.handlerRunning { // Not running anymore
468-
continue
469-
}
470-
if tgs.CurStep < step || tgs.CurPhase < targetStepPhaseRun {
471-
ctx.Debugf("monitor pass %d: %s: not all targets injected yet (%s)", pass, ss, tgs)
472-
ok = false
473-
break
474-
}
475-
}
476-
if runErr = tr.checkStepRunnersFailed(); runErr != nil {
477-
break stepLoop
478-
}
479-
if !ok {
480-
// Wait for notification: as progress is being made, we get notified.
481-
tr.monitorCond.Wait()
482-
continue
483-
}
484-
// All targets ok, close the step's input channel.
485-
ctx.Debugf("monitor pass %d: %s: no more targets, closing input channel", pass, ss)
486-
ss.Stop()
487-
step++
488-
}
489-
return runErr
490-
}()
491-
if runErr != nil {
492-
return runErr
493-
}
494457

495458
//
496-
// After all targets were sent to the steps we should monitor steps for incoming errors
459+
// Monitor steps for incoming errors
497460
//
498461
tr.mu.Lock()
499462
defer tr.mu.Unlock()
500463

501-
var pass int
464+
var (
465+
pass int
466+
runErr error
467+
)
502468
tgtLoop:
503469
for ; runErr == nil; pass++ {
504470
if runErr = tr.checkStepRunnersFailed(); runErr != nil {

0 commit comments

Comments
 (0)