@@ -12,6 +12,7 @@ import (
1212 "time"
1313
1414 "github.com/facebookincubator/go-belt/beltctx"
15+ "github.com/facebookincubator/go-belt/tool/experimental/errmon"
1516 "github.com/facebookincubator/go-belt/tool/experimental/metrics"
1617 "github.com/facebookincubator/go-belt/tool/logger"
1718 "github.com/linuxboot/contest/pkg/api"
@@ -84,9 +85,19 @@ func (jm *JobManager) start(ev *api.Event) *api.EventResponse {
8485func (jm * JobManager ) startJob (ctx context.Context , j * job.Job , resumeState * job.PauseEventPayload ) {
8586 jm .jobsMu .Lock ()
8687 defer jm .jobsMu .Unlock ()
87- jobCtx , jobCancel := context .WithCancel (ctx )
88+ jobCtx := ctx
89+ jobCtx , jobCancel := context .WithCancel (jobCtx )
8890 jobCtx , jobPause := signaller .WithSignal (jobCtx , signals .Paused )
89- jm .jobs [j .ID ] = & jobInfo {job : j , pause : jobPause , cancel : jobCancel }
91+ jm .jobs [j .ID ] = & jobInfo {
92+ job : j ,
93+ pause : func () {
94+ logger .FromCtx (ctx ).Debugf ("pausing job" )
95+ jobPause ()
96+ },
97+ cancel : func () {
98+ logger .FromCtx (ctx ).Debugf ("cancelling job context" )
99+ jobCancel ()
100+ }}
90101 go jm .runJob (jobCtx , j , resumeState )
91102}
92103
@@ -117,11 +128,13 @@ func (jm *JobManager) runJob(ctx context.Context, j *job.Job, resumeState *job.P
117128 logger .FromCtx (ctx ).Debugf ("Job %d: runner finished, err %v" , j .ID , err )
118129 switch err {
119130 case context .Canceled :
120- _ = jm .emitEvent (ctx , j .ID , job .EventJobCancelled )
131+ _err := jm .emitEvent (ctx , j .ID , job .EventJobCancelled )
132+ errmon .ObserveErrorCtx (ctx , _err )
121133 return
122134 case signals .Paused :
123135 if err := jm .emitEventPayload (ctx , j .ID , job .EventJobPaused , resumeState ); err != nil {
124- _ = jm .emitErrEvent (ctx , j .ID , job .EventJobPauseFailed , fmt .Errorf ("Job %+v failed pausing: %v" , j , err ))
136+ _err := jm .emitErrEvent (ctx , j .ID , job .EventJobPauseFailed , fmt .Errorf ("job %+v failed pausing: %w" , j , err ))
137+ errmon .ObserveErrorCtx (ctx , _err )
125138 } else {
126139 logger .FromCtx (ctx ).Infof ("Successfully paused job %d (run %d, %d targets)" , j .ID , resumeState .RunID , len (resumeState .Targets ))
127140 logger .FromCtx (ctx ).Debugf ("Job %d pause state: %+v" , j .ID , resumeState )
@@ -131,17 +144,19 @@ func (jm *JobManager) runJob(ctx context.Context, j *job.Job, resumeState *job.P
131144 select {
132145 case <- signaller .Until (ctx , signals .Paused ):
133146 // We were asked to pause but failed to do so.
134- pauseErr := fmt .Errorf ("Job %+v failed pausing: %v " , j , err )
147+ pauseErr := fmt .Errorf ("job %+v failed pausing: %w " , j , err )
135148 logger .FromCtx (ctx ).Errorf ("%v" , pauseErr )
136- _ = jm .emitErrEvent (ctx , j .ID , job .EventJobPauseFailed , pauseErr )
149+ _err := jm .emitErrEvent (ctx , j .ID , job .EventJobPauseFailed , pauseErr )
150+ errmon .ObserveErrorCtx (ctx , _err )
137151 return
138152 default :
139153 }
140154 logger .FromCtx (ctx ).Infof ("Job %d finished" , j .ID )
141155 // at this point it is safe to emit the job status event. Note: this is
142156 // checking `err` from the `jm.jobRunner.Run()` call above.
143157 if err != nil {
144- _ = jm .emitErrEvent (ctx , j .ID , job .EventJobFailed , fmt .Errorf ("Job %d failed after %s: %w" , j .ID , duration , err ))
158+ _err := jm .emitErrEvent (ctx , j .ID , job .EventJobFailed , fmt .Errorf ("job %d failed after %s: %w" , j .ID , duration , err ))
159+ errmon .ObserveErrorCtx (ctx , _err )
145160 } else {
146161 logger .FromCtx (ctx ).Infof ("Job %+v completed after %s" , j , duration )
147162 err = jm .emitEvent (ctx , j .ID , job .EventJobCompleted )
0 commit comments