Skip to content

Commit c13b022

Browse files
committed
Add verbosity to diagnose mis-signaling
Signed-off-by: Dmitrii Okunev <[email protected]>
1 parent 40ff8c6 commit c13b022

File tree

5 files changed

+43
-13
lines changed

5 files changed

+43
-13
lines changed

pkg/jobmanager/jobmanager.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,10 @@ func (jm *JobManager) Run(ctx context.Context, resumeJobs bool) error {
161161
}
162162

163163
apiCtx, apiCancel := context.WithCancel(ctx)
164-
jm.apiCancel = apiCancel
164+
jm.apiCancel = func() {
165+
logging.Debugf(ctx, "cancelling API context")
166+
apiCancel()
167+
}
165168

166169
errCh := make(chan error, 1)
167170
go func() {

pkg/jobmanager/start.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"github.com/facebookincubator/go-belt/beltctx"
15+
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
1516
"github.com/facebookincubator/go-belt/tool/experimental/metrics"
1617
"github.com/linuxboot/contest/pkg/api"
1718
"github.com/linuxboot/contest/pkg/job"
@@ -84,9 +85,19 @@ func (jm *JobManager) start(ev *api.Event) *api.EventResponse {
8485
func (jm *JobManager) startJob(ctx context.Context, j *job.Job, resumeState *job.PauseEventPayload) {
8586
jm.jobsMu.Lock()
8687
defer jm.jobsMu.Unlock()
87-
jobCtx, jobCancel := context.WithCancel(ctx)
88+
jobCtx := ctx
89+
jobCtx, jobCancel := context.WithCancel(jobCtx)
8890
jobCtx, jobPause := signaling.WithSignal(jobCtx, signals.Paused)
89-
jm.jobs[j.ID] = &jobInfo{job: j, pause: jobPause, cancel: jobCancel}
91+
jm.jobs[j.ID] = &jobInfo{
92+
job: j,
93+
pause: func() {
94+
logging.Debugf(ctx, "pausing job")
95+
jobPause()
96+
},
97+
cancel: func() {
98+
logging.Debugf(ctx, "cancelling job context")
99+
jobCancel()
100+
}}
90101
go jm.runJob(jobCtx, j, resumeState)
91102
}
92103

@@ -117,11 +128,13 @@ func (jm *JobManager) runJob(ctx context.Context, j *job.Job, resumeState *job.P
117128
logging.Debugf(ctx, "Job %d: runner finished, err %v", j.ID, err)
118129
switch err {
119130
case context.Canceled:
120-
_ = jm.emitEvent(ctx, j.ID, job.EventJobCancelled)
131+
_err := jm.emitEvent(ctx, j.ID, job.EventJobCancelled)
132+
errmon.ObserveErrorCtx(ctx, _err)
121133
return
122134
case signals.Paused:
123135
if err := jm.emitEventPayload(ctx, j.ID, job.EventJobPaused, resumeState); err != nil {
124-
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, fmt.Errorf("Job %+v failed pausing: %v", j, err))
136+
_err := jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, fmt.Errorf("job %+v failed pausing: %w", j, err))
137+
errmon.ObserveErrorCtx(ctx, _err)
125138
} else {
126139
logging.Infof(ctx, "Successfully paused job %d (run %d, %d targets)", j.ID, resumeState.RunID, len(resumeState.Targets))
127140
logging.Debugf(ctx, "Job %d pause state: %+v", j.ID, resumeState)
@@ -131,17 +144,19 @@ func (jm *JobManager) runJob(ctx context.Context, j *job.Job, resumeState *job.P
131144
select {
132145
case <-signaling.Until(ctx, signals.Paused):
133146
// We were asked to pause but failed to do so.
134-
pauseErr := fmt.Errorf("Job %+v failed pausing: %v", j, err)
147+
pauseErr := fmt.Errorf("job %+v failed pausing: %w", j, err)
135148
logging.Errorf(ctx, "%v", pauseErr)
136-
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, pauseErr)
149+
_err := jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, pauseErr)
150+
errmon.ObserveErrorCtx(ctx, _err)
137151
return
138152
default:
139153
}
140154
logging.Infof(ctx, "Job %d finished", j.ID)
141155
// at this point it is safe to emit the job status event. Note: this is
142156
// checking `err` from the `jm.jobRunner.Run()` call above.
143157
if err != nil {
144-
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobFailed, fmt.Errorf("Job %d failed after %s: %w", j.ID, duration, err))
158+
_err := jm.emitErrEvent(ctx, j.ID, job.EventJobFailed, fmt.Errorf("job %d failed after %s: %w", j.ID, duration, err))
159+
errmon.ObserveErrorCtx(ctx, _err)
145160
} else {
146161
logging.Infof(ctx, "Job %+v completed after %s", j, duration)
147162
err = jm.emitEvent(ctx, j.ID, job.EventJobCompleted)

pkg/runner/step_state.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,11 @@ func (ss *stepState) Run(ctx context.Context) error {
127127
return nil
128128
}
129129

130-
stepCtx, cancel := context.WithCancel(ctx)
130+
stepCtx, stepCtxCancel := context.WithCancel(ctx)
131+
cancel := func() {
132+
logging.Debugf(stepCtx, "cancelling step context")
133+
stepCtxCancel()
134+
}
131135
stepCtx = beltctx.WithField(stepCtx, "step_index", strconv.Itoa(ss.stepIndex))
132136
stepCtx = beltctx.WithField(stepCtx, "step_label", ss.sb.TestStepLabel)
133137

pkg/runner/test_runner.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ func (tr *TestRunner) Run(
105105

106106
// Peel off contexts used for steps and target handlers.
107107
runCtx, runCancel := context.WithCancel(ctx)
108-
defer runCancel()
108+
defer func() {
109+
logging.Debugf(ctx, "run finished, cancelling the context")
110+
runCancel()
111+
}()
109112

110113
var targetStates map[string]*targetState
111114

@@ -238,7 +241,9 @@ func (tr *TestRunner) Run(
238241
if !ok {
239242
return resultErr
240243
}
244+
logging.Tracef(ctx, "got event from targetErrors: %v", runErr)
241245
case runErr = <-stepsErrorsCh:
246+
logging.Tracef(ctx, "got event from stepsErrorsCh: %v", runErr)
242247
}
243248
if runErr != nil && runErr != signals.Paused && resultErr == nil {
244249
resultErr = runErr
@@ -355,7 +360,7 @@ func (tr *TestRunner) waitSteps(ctx context.Context) ([]json.RawMessage, error)
355360
if err != nil {
356361
stepsNeverReturned = append(stepsNeverReturned, ss.GetTestStepLabel())
357362
ss.SetError(ctx, &cerrors.ErrTestStepsNeverReturned{StepNames: []string{ss.GetTestStepLabel()}})
358-
// Stop step context, this will help release the reader.
363+
logging.Debugf(ctx, "stopping step context, this will help release the reader")
359364
ss.ForceStop()
360365
} else if resultErr == nil && result.Err != nil && result.Err != signals.Paused {
361366
resultErr = result.Err
@@ -488,7 +493,7 @@ loop:
488493

489494
case <-ctx.Done():
490495
err = ctx.Err()
491-
logging.Debugf(ctx, "Canceled target context during waiting for target result")
496+
logging.Debugf(ctx, "Canceled target context during waiting for target result: %v", err)
492497
}
493498

494499
case signals.Paused:

tests/integ/jobmanager/common.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/linuxboot/contest/pkg/target"
3636
"github.com/linuxboot/contest/pkg/types"
3737

38+
"github.com/facebookincubator/go-belt/beltctx"
3839
"github.com/facebookincubator/go-belt/tool/logger"
3940
"github.com/linuxboot/contest/plugins/reporters/targetsuccess"
4041
"github.com/linuxboot/contest/plugins/targetlocker/inmemory"
@@ -354,7 +355,9 @@ func (suite *TestJobManagerSuite) initJobManager(instanceTag string) {
354355
require.NoError(suite.T(), err)
355356

356357
suite.jm = jm
357-
suite.jmCtx, suite.jmCancel = context.WithCancel(logging.WithBelt(context.Background(), logger.LevelDebug))
358+
ctx := logging.WithBelt(context.Background(), logger.LevelTrace)
359+
ctx = beltctx.WithField(ctx, "integ-test", suite.T().Name())
360+
suite.jmCtx, suite.jmCancel = context.WithCancel(ctx)
358361
suite.jmCtx, suite.jmPause = signaling.WithSignal(suite.jmCtx, signals.Paused)
359362
}
360363

0 commit comments

Comments
 (0)