Skip to content

Commit f8e09cd

Browse files
committed
Add verbosity to diagnose mis-signaling
Signed-off-by: Dmitrii Okunev <[email protected]>
1 parent 8378704 commit f8e09cd

File tree

5 files changed

+43
-13
lines changed

5 files changed

+43
-13
lines changed

pkg/jobmanager/jobmanager.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,10 @@ func (jm *JobManager) Run(ctx context.Context, resumeJobs bool) error {
161161
}
162162

163163
apiCtx, apiCancel := context.WithCancel(ctx)
164-
jm.apiCancel = apiCancel
164+
jm.apiCancel = func() {
165+
logger.FromCtx(ctx).Debugf("cancelling API context")
166+
apiCancel()
167+
}
165168

166169
errCh := make(chan error, 1)
167170
go func() {

pkg/jobmanager/start.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"github.com/facebookincubator/go-belt/beltctx"
15+
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
1516
"github.com/facebookincubator/go-belt/tool/experimental/metrics"
1617
"github.com/facebookincubator/go-belt/tool/logger"
1718
"github.com/linuxboot/contest/pkg/api"
@@ -84,9 +85,19 @@ func (jm *JobManager) start(ev *api.Event) *api.EventResponse {
8485
func (jm *JobManager) startJob(ctx context.Context, j *job.Job, resumeState *job.PauseEventPayload) {
8586
jm.jobsMu.Lock()
8687
defer jm.jobsMu.Unlock()
87-
jobCtx, jobCancel := context.WithCancel(ctx)
88+
jobCtx := ctx
89+
jobCtx, jobCancel := context.WithCancel(jobCtx)
8890
jobCtx, jobPause := signaling.WithSignal(jobCtx, signals.Paused)
89-
jm.jobs[j.ID] = &jobInfo{job: j, pause: jobPause, cancel: jobCancel}
91+
jm.jobs[j.ID] = &jobInfo{
92+
job: j,
93+
pause: func() {
94+
logger.FromCtx(ctx).Debugf("pausing job")
95+
jobPause()
96+
},
97+
cancel: func() {
98+
logger.FromCtx(ctx).Debugf("cancelling job context")
99+
jobCancel()
100+
}}
90101
go jm.runJob(jobCtx, j, resumeState)
91102
}
92103

@@ -117,11 +128,13 @@ func (jm *JobManager) runJob(ctx context.Context, j *job.Job, resumeState *job.P
117128
logger.FromCtx(ctx).Debugf("Job %d: runner finished, err %v", j.ID, err)
118129
switch err {
119130
case context.Canceled:
120-
_ = jm.emitEvent(ctx, j.ID, job.EventJobCancelled)
131+
_err := jm.emitEvent(ctx, j.ID, job.EventJobCancelled)
132+
errmon.ObserveErrorCtx(ctx, _err)
121133
return
122134
case signals.Paused:
123135
if err := jm.emitEventPayload(ctx, j.ID, job.EventJobPaused, resumeState); err != nil {
124-
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, fmt.Errorf("Job %+v failed pausing: %v", j, err))
136+
_err := jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, fmt.Errorf("job %+v failed pausing: %w", j, err))
137+
errmon.ObserveErrorCtx(ctx, _err)
125138
} else {
126139
logger.FromCtx(ctx).Infof("Successfully paused job %d (run %d, %d targets)", j.ID, resumeState.RunID, len(resumeState.Targets))
127140
logger.FromCtx(ctx).Debugf("Job %d pause state: %+v", j.ID, resumeState)
@@ -131,17 +144,19 @@ func (jm *JobManager) runJob(ctx context.Context, j *job.Job, resumeState *job.P
131144
select {
132145
case <-signaling.Until(ctx, signals.Paused):
133146
// We were asked to pause but failed to do so.
134-
pauseErr := fmt.Errorf("Job %+v failed pausing: %v", j, err)
147+
pauseErr := fmt.Errorf("job %+v failed pausing: %w", j, err)
135148
logger.FromCtx(ctx).Errorf("%v", pauseErr)
136-
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, pauseErr)
149+
_err := jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, pauseErr)
150+
errmon.ObserveErrorCtx(ctx, _err)
137151
return
138152
default:
139153
}
140154
logger.FromCtx(ctx).Infof("Job %d finished", j.ID)
141155
// at this point it is safe to emit the job status event. Note: this is
142156
// checking `err` from the `jm.jobRunner.Run()` call above.
143157
if err != nil {
144-
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobFailed, fmt.Errorf("Job %d failed after %s: %w", j.ID, duration, err))
158+
_err := jm.emitErrEvent(ctx, j.ID, job.EventJobFailed, fmt.Errorf("job %d failed after %s: %w", j.ID, duration, err))
159+
errmon.ObserveErrorCtx(ctx, _err)
145160
} else {
146161
logger.FromCtx(ctx).Infof("Job %+v completed after %s", j, duration)
147162
err = jm.emitEvent(ctx, j.ID, job.EventJobCompleted)

pkg/runner/step_state.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,11 @@ func (ss *stepState) Run(ctx context.Context) error {
127127
return nil
128128
}
129129

130-
stepCtx, cancel := context.WithCancel(ctx)
130+
stepCtx, stepCtxCancel := context.WithCancel(ctx)
131+
cancel := func() {
132+
logger.FromCtx(stepCtx).Debugf("cancelling step context")
133+
stepCtxCancel()
134+
}
131135
stepCtx = beltctx.WithField(stepCtx, "step_index", strconv.Itoa(ss.stepIndex))
132136
stepCtx = beltctx.WithField(stepCtx, "step_label", ss.sb.TestStepLabel)
133137

pkg/runner/test_runner.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,10 @@ func (tr *TestRunner) Run(
103103

104104
// Peel off contexts used for steps and target handlers.
105105
runCtx, runCancel := context.WithCancel(ctx)
106-
defer runCancel()
106+
defer func() {
107+
logger.FromCtx(ctx).Debugf("run finished, cancelling the context")
108+
runCancel()
109+
}()
107110

108111
var targetStates map[string]*targetState
109112

@@ -231,7 +234,9 @@ func (tr *TestRunner) Run(
231234
if !ok {
232235
return resultErr
233236
}
237+
logger.FromCtx(ctx).Tracef("got event from targetErrors: %v", runErr)
234238
case runErr = <-stepsErrorsCh:
239+
logger.FromCtx(ctx).Tracef("got event from stepsErrorsCh: %v", runErr)
235240
}
236241
if runErr != nil && runErr != signals.Paused && resultErr == nil {
237242
resultErr = runErr
@@ -348,7 +353,7 @@ func (tr *TestRunner) waitSteps(ctx context.Context) ([]json.RawMessage, error)
348353
if err != nil {
349354
stepsNeverReturned = append(stepsNeverReturned, ss.GetTestStepLabel())
350355
ss.SetError(ctx, &cerrors.ErrTestStepsNeverReturned{StepNames: []string{ss.GetTestStepLabel()}})
351-
// Stop step context, this will help release the reader.
356+
logger.FromCtx(ctx).Debugf("stopping step context, this will help release the reader")
352357
ss.ForceStop()
353358
} else if resultErr == nil && result.Err != nil && result.Err != signals.Paused {
354359
resultErr = result.Err
@@ -481,7 +486,7 @@ loop:
481486

482487
case <-ctx.Done():
483488
err = ctx.Err()
484-
logger.FromCtx(ctx).Debugf("Canceled target context during waiting for target result")
489+
logger.FromCtx(ctx).Debugf("Canceled target context during waiting for target result: %v", err)
485490
}
486491

487492
case signals.Paused:

tests/integ/jobmanager/common.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/linuxboot/contest/pkg/target"
3636
"github.com/linuxboot/contest/pkg/types"
3737

38+
"github.com/facebookincubator/go-belt/beltctx"
3839
"github.com/facebookincubator/go-belt/tool/logger"
3940
"github.com/linuxboot/contest/plugins/reporters/targetsuccess"
4041
"github.com/linuxboot/contest/plugins/targetlocker/inmemory"
@@ -355,7 +356,9 @@ func (suite *TestJobManagerSuite) initJobManager(instanceTag string) {
355356
require.NoError(suite.T(), err)
356357

357358
suite.jm = jm
358-
suite.jmCtx, suite.jmCancel = context.WithCancel(logging.WithBelt(context.Background(), logger.LevelDebug))
359+
ctx := logging.WithBelt(context.Background(), logger.LevelTrace)
360+
ctx = beltctx.WithField(ctx, "integ-test", suite.T().Name())
361+
suite.jmCtx, suite.jmCancel = context.WithCancel(ctx)
359362
suite.jmCtx, suite.jmPause = signaling.WithSignal(suite.jmCtx, signals.Paused)
360363
}
361364

0 commit comments

Comments
 (0)