Skip to content

Commit 231288d

Browse files
committed
Use blocking call with timeout instead of retries
After each retry it waited a fixed amount of time. This caused it to retry at least once per message, that is, a 50 ms extra sleep per job. Now it blocks for a configurable time if no messages are arriving. No retry is needed and jobs are processes as fast as they arrive. Signed-off-by: Javi Fontan <[email protected]>
1 parent 100479d commit 231288d

File tree

1 file changed

+10
-29
lines changed

1 file changed

+10
-29
lines changed

amqp/amqp.go

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ var DefaultConfiguration Configuration
3535
// Configuration AMQP configuration settings, this settings are set using the
3636
// envinroment varabiles.
3737
type Configuration struct {
38-
BuriedQueueSuffix string `envconfig:"BURIED_QUEUE_SUFFIX" default:".buriedQueue"`
39-
BuriedExchangeSuffix string `envconfig:"BURIED_EXCHANGE_SUFFIX" default:".buriedExchange"`
40-
BuriedNonBlockingRetries int `envconfig:"BURIED_BLOCKING_RETRIES" default:"3"`
38+
BuriedQueueSuffix string `envconfig:"BURIED_QUEUE_SUFFIX" default:".buriedQueue"`
39+
BuriedExchangeSuffix string `envconfig:"BURIED_EXCHANGE_SUFFIX" default:".buriedExchange"`
40+
BuriedTimeout int `envconfig:"BURIED_TIMEOUT" default:"500"`
4141

4242
RetriesHeader string `envconfig:"RETRIES_HEADER" default:"x-retries"`
4343
ErrorHeader string `envconfig:"ERROR_HEADER" default:"x-error-type"`
@@ -345,41 +345,22 @@ func (q *Queue) RepublishBuried(conditions ...queue.RepublishConditionFunc) erro
345345

346346
defer iter.Close()
347347

348-
retries := 0
348+
timeout := time.Duration(DefaultConfiguration.BuriedTimeout) * time.Millisecond
349+
349350
var notComplying []*queue.Job
350351
var errorsPublishing []*jobErr
351352
for {
352-
j, err := iter.(*JobIter).nextNonBlocking()
353+
j, err := iter.(*JobIter).nextWithTimeout(timeout)
353354
if err != nil {
354355
return err
355356
}
356357

357358
if j == nil {
358-
log.With(log.Fields{
359-
"retries": retries,
360-
}).Debugf("received empty job")
361-
362-
// check (in non blocking mode) up to DefaultConfiguration.BuriedNonBlockingRetries
363-
// with a small delay between them just in case some job is
364-
// arriving, return if there is nothing after all the retries
365-
// (meaning: BuriedQueue is surely empty or any arriving jobs will
366-
// have to wait to the next call).
367-
if retries > DefaultConfiguration.BuriedNonBlockingRetries {
368-
log.With(log.Fields{
369-
"retries": retries,
370-
"max-retries": DefaultConfiguration.BuriedNonBlockingRetries,
371-
}).Debugf("maximum number of retries reached")
359+
log.Debugf("no more jobs in the buried queue")
372360

373-
break
374-
}
375-
376-
time.Sleep(50 * time.Millisecond)
377-
retries++
378-
continue
361+
break
379362
}
380363

381-
retries = 0
382-
383364
if err = j.Ack(); err != nil {
384365
return err
385366
}
@@ -541,15 +522,15 @@ func (i *JobIter) Next() (*queue.Job, error) {
541522
return fromDelivery(&d)
542523
}
543524

544-
func (i *JobIter) nextNonBlocking() (*queue.Job, error) {
525+
func (i *JobIter) nextWithTimeout(timeout time.Duration) (*queue.Job, error) {
545526
select {
546527
case d, ok := <-i.c:
547528
if !ok {
548529
return nil, queue.ErrAlreadyClosed.New()
549530
}
550531

551532
return fromDelivery(&d)
552-
default:
533+
case <-time.After(timeout):
553534
return nil, nil
554535
}
555536
}

0 commit comments

Comments
 (0)