Skip to content

Commit b56a180

Browse files
committed
implementation as packages and register pattern
Signed-off-by: Máximo Cuadros <[email protected]>
1 parent dbcfd67 commit b56a180

File tree

9 files changed

+269
-216
lines changed

9 files changed

+269
-216
lines changed

amqp.go renamed to amqp/amqp.go

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,18 @@ import (
88
"sync/atomic"
99
"time"
1010

11-
"gopkg.in/src-d/go-errors.v1"
11+
"gopkg.in/src-d/go-queue.v0"
1212

1313
"github.com/jpillora/backoff"
1414
"github.com/streadway/amqp"
1515
log15 "gopkg.in/inconshreveable/log15.v2"
16+
"gopkg.in/src-d/go-errors.v1"
1617
)
1718

19+
func init() {
20+
queue.Register("amqp", NewAMQPBroker)
21+
}
22+
1823
var consumerSeq uint64
1924

2025
var (
@@ -52,7 +57,7 @@ type connection interface {
5257
}
5358

5459
// NewAMQPBroker creates a new AMQPBroker.
55-
func NewAMQPBroker(url string) (Broker, error) {
60+
func NewAMQPBroker(url string) (queue.Broker, error) {
5661
conn, err := amqp.Dial(url)
5762
if err != nil {
5863
return nil, ErrConnectionFailed.New(err)
@@ -75,7 +80,6 @@ func NewAMQPBroker(url string) (Broker, error) {
7580
}
7681

7782
func connect(url string) (*amqp.Connection, *amqp.Channel) {
78-
7983
var (
8084
conn *amqp.Connection
8185
ch *amqp.Channel
@@ -184,7 +188,7 @@ func (b *AMQPBroker) newBuriedQueue(mainQueueName string) (q amqp.Queue, rex str
184188
}
185189

186190
// Queue returns the queue with the given name.
187-
func (b *AMQPBroker) Queue(name string) (Queue, error) {
191+
func (b *AMQPBroker) Queue(name string) (queue.Queue, error) {
188192
buriedQueue, rex, err := b.newBuriedQueue(name)
189193
if err != nil {
190194
return nil, err
@@ -199,7 +203,7 @@ func (b *AMQPBroker) Queue(name string) (Queue, error) {
199203
amqp.Table{
200204
"x-dead-letter-exchange": rex,
201205
"x-dead-letter-routing-key": name,
202-
"x-max-priority": uint8(PriorityUrgent),
206+
"x-max-priority": uint8(queue.PriorityUrgent),
203207
},
204208
)
205209

@@ -233,9 +237,9 @@ type AMQPQueue struct {
233237
}
234238

235239
// Publish publishes the given Job to the Queue.
236-
func (q *AMQPQueue) Publish(j *Job) error {
237-
if j == nil || len(j.raw) == 0 {
238-
return ErrEmptyJob.New()
240+
func (q *AMQPQueue) Publish(j *queue.Job) error {
241+
if j == nil || j.Size() == 0 {
242+
return queue.ErrEmptyJob.New()
239243
}
240244

241245
headers := amqp.Table{}
@@ -257,18 +261,18 @@ func (q *AMQPQueue) Publish(j *Job) error {
257261
MessageId: j.ID,
258262
Priority: uint8(j.Priority),
259263
Timestamp: j.Timestamp,
260-
ContentType: string(j.contentType),
261-
Body: j.raw,
264+
ContentType: j.ContentType,
265+
Body: j.Raw,
262266
Headers: headers,
263267
},
264268
)
265269
}
266270

267271
// PublishDelayed publishes the given Job with a given delay. Delayed messages
268272
// wont go into the buried queue if they fail.
269-
func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error {
270-
if j == nil || len(j.raw) == 0 {
271-
return ErrEmptyJob.New()
273+
func (q *AMQPQueue) PublishDelayed(j *queue.Job, delay time.Duration) error {
274+
if j == nil || j.Size() == 0 {
275+
return queue.ErrEmptyJob.New()
272276
}
273277

274278
ttl := delay / time.Millisecond
@@ -283,7 +287,7 @@ func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error {
283287
"x-dead-letter-routing-key": q.queue.Name,
284288
"x-message-ttl": int64(ttl),
285289
"x-expires": int64(ttl) * 2,
286-
"x-max-priority": uint8(PriorityUrgent),
290+
"x-max-priority": uint8(queue.PriorityUrgent),
287291
},
288292
)
289293
if err != nil {
@@ -300,20 +304,20 @@ func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error {
300304
MessageId: j.ID,
301305
Priority: uint8(j.Priority),
302306
Timestamp: j.Timestamp,
303-
ContentType: string(j.contentType),
304-
Body: j.raw,
307+
ContentType: j.ContentType,
308+
Body: j.Raw,
305309
},
306310
)
307311
}
308312

309313
type jobErr struct {
310-
job *Job
314+
job *queue.Job
311315
err error
312316
}
313317

314318
// RepublishBuried will republish in the main queue those jobs that timed out without Ack
315319
// or were Rejected with requeue = False and makes comply return true.
316-
func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error {
320+
func (q *AMQPQueue) RepublishBuried(conditions ...queue.RepublishConditionFunc) error {
317321
if q.buriedQueue == nil {
318322
return fmt.Errorf("buriedQueue is nil, called RepublishBuried on the internal buried queue?")
319323
}
@@ -327,7 +331,7 @@ func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error
327331
defer iter.Close()
328332

329333
retries := 0
330-
var notComplying []*Job
334+
var notComplying []*queue.Job
331335
var errorsPublishing []*jobErr
332336
for {
333337
j, err := iter.(*AMQPJobIter).nextNonBlocking()
@@ -355,7 +359,7 @@ func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error
355359
return err
356360
}
357361

358-
if republishConditions(conditions).comply(j) {
362+
if queue.RepublishConditions(conditions).Comply(j) {
359363
if err = q.Publish(j); err != nil {
360364
errorsPublishing = append(errorsPublishing, &jobErr{j, err})
361365
}
@@ -391,7 +395,7 @@ func (q *AMQPQueue) handleRepublishErrors(list []*jobErr) error {
391395
}
392396

393397
// Transaction executes the given callback inside a transaction.
394-
func (q *AMQPQueue) Transaction(txcb TxCallback) error {
398+
func (q *AMQPQueue) Transaction(txcb queue.TxCallback) error {
395399
ch, err := q.conn.connection().Channel()
396400
if err != nil {
397401
return ErrOpenChannel.New(err)
@@ -425,7 +429,7 @@ func (q *AMQPQueue) Transaction(txcb TxCallback) error {
425429

426430
// Implements Queue. The advertisedWindow value will be the exact
427431
// number of undelivered jobs in transit, not just the minium.
428-
func (q *AMQPQueue) Consume(advertisedWindow int) (JobIter, error) {
432+
func (q *AMQPQueue) Consume(advertisedWindow int) (queue.JobIter, error) {
429433
ch, err := q.conn.connection().Channel()
430434
if err != nil {
431435
return nil, ErrOpenChannel.New(err)
@@ -470,20 +474,20 @@ type AMQPJobIter struct {
470474
}
471475

472476
// Next returns the next job in the iter.
473-
func (i *AMQPJobIter) Next() (*Job, error) {
477+
func (i *AMQPJobIter) Next() (*queue.Job, error) {
474478
d, ok := <-i.c
475479
if !ok {
476-
return nil, ErrAlreadyClosed.New()
480+
return nil, queue.ErrAlreadyClosed.New()
477481
}
478482

479483
return fromDelivery(&d)
480484
}
481485

482-
func (i *AMQPJobIter) nextNonBlocking() (*Job, error) {
486+
func (i *AMQPJobIter) nextNonBlocking() (*queue.Job, error) {
483487
select {
484488
case d, ok := <-i.c:
485489
if !ok {
486-
return nil, ErrAlreadyClosed.New()
490+
return nil, queue.ErrAlreadyClosed.New()
487491
}
488492

489493
return fromDelivery(&d)
@@ -518,19 +522,18 @@ func (a *AMQPAcknowledger) Reject(requeue bool) error {
518522
return a.ack.Reject(a.id, requeue)
519523
}
520524

521-
func fromDelivery(d *amqp.Delivery) (*Job, error) {
522-
j, err := NewJob()
525+
func fromDelivery(d *amqp.Delivery) (*queue.Job, error) {
526+
j, err := queue.NewJob()
523527
if err != nil {
524528
return nil, err
525529
}
526530

527531
j.ID = d.MessageId
528-
j.Priority = Priority(d.Priority)
532+
j.Priority = queue.Priority(d.Priority)
529533
j.Timestamp = d.Timestamp
530-
j.contentType = contentType(d.ContentType)
531-
j.acknowledger = &AMQPAcknowledger{d.Acknowledger, d.DeliveryTag}
532-
j.tag = d.DeliveryTag
533-
j.raw = d.Body
534+
j.ContentType = d.ContentType
535+
j.Acknowledger = &AMQPAcknowledger{d.Acknowledger, d.DeliveryTag}
536+
j.Raw = d.Body
534537

535538
if retries, ok := d.Headers[retriesHeader]; ok {
536539
retries, ok := retries.(int32)

amqp_test.go renamed to amqp/amqp_test.go

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,20 @@ import (
88
"github.com/stretchr/testify/assert"
99
"github.com/stretchr/testify/require"
1010
"github.com/stretchr/testify/suite"
11+
queue "gopkg.in/src-d/go-queue.v0"
12+
"gopkg.in/src-d/go-queue.v0/test"
1113
)
1214

1315
func TestAMQPSuite(t *testing.T) {
1416
suite.Run(t, new(AMQPSuite))
1517
}
1618

1719
type AMQPSuite struct {
18-
QueueSuite
20+
test.QueueSuite
1921
}
2022

23+
const testAMQPURI = "amqp://localhost:5672"
24+
2125
func (s *AMQPSuite) SetupSuite() {
2226
s.BrokerURI = testAMQPURI
2327
}
@@ -30,9 +34,9 @@ func TestNewAMQPBroker_bad_url(t *testing.T) {
3034
assert.Nil(b)
3135
}
3236

33-
func sendJobs(assert *assert.Assertions, n int, p Priority, q Queue) {
37+
func sendJobs(assert *assert.Assertions, n int, p queue.Priority, q queue.Queue) {
3438
for i := 0; i < n; i++ {
35-
j, err := NewJob()
39+
j, err := queue.NewJob()
3640
assert.NoError(err)
3741
j.SetPriority(p)
3842
err = j.Encode(i)
@@ -47,18 +51,20 @@ func TestAMQPPriorities(t *testing.T) {
4751

4852
broker, err := NewAMQPBroker(testAMQPURI)
4953
assert.NoError(err)
50-
assert.NotNil(broker)
54+
if !assert.NotNil(broker) {
55+
return
56+
}
5157

52-
name := newName()
58+
name := test.NewName()
5359
q, err := broker.Queue(name)
5460
assert.NoError(err)
5561
assert.NotNil(q)
5662

5763
// Send 50 low priority jobs
58-
sendJobs(assert, 50, PriorityLow, q)
64+
sendJobs(assert, 50, queue.PriorityLow, q)
5965

6066
// Send 50 high priority jobs
61-
sendJobs(assert, 50, PriorityUrgent, q)
67+
sendJobs(assert, 50, queue.PriorityUrgent, q)
6268

6369
// Receive and collect priorities
6470
iter, err := q.Consume(1)
@@ -81,16 +87,16 @@ func TestAMQPPriorities(t *testing.T) {
8187
}
8288

8389
assert.True(sumFirst > sumLast)
84-
assert.Equal(uint(PriorityUrgent)*50, sumFirst)
85-
assert.Equal(uint(PriorityLow)*50, sumLast)
90+
assert.Equal(uint(queue.PriorityUrgent)*50, sumFirst)
91+
assert.Equal(uint(queue.PriorityLow)*50, sumLast)
8692
}
8793

8894
func TestAMQPHeaders(t *testing.T) {
89-
broker, err := NewBroker(testAMQPURI)
95+
broker, err := queue.NewBroker(testAMQPURI)
9096
require.NoError(t, err)
9197
defer func() { require.NoError(t, broker.Close()) }()
9298

93-
queue, err := broker.Queue(newName())
99+
q, err := broker.Queue(test.NewName())
94100
require.NoError(t, err)
95101

96102
tests := []struct {
@@ -121,17 +127,17 @@ func TestAMQPHeaders(t *testing.T) {
121127
}
122128

123129
for i, test := range tests {
124-
job, err := NewJob()
130+
job, err := queue.NewJob()
125131
require.NoError(t, err)
126132

127133
job.Retries = test.retries
128134
job.ErrorType = test.errorType
129135

130136
require.NoError(t, job.Encode(i))
131-
require.NoError(t, queue.Publish(job))
137+
require.NoError(t, q.Publish(job))
132138
}
133139

134-
jobIter, err := queue.Consume(len(tests))
140+
jobIter, err := q.Consume(len(tests))
135141
require.NoError(t, err)
136142

137143
for _, test := range tests {
@@ -147,15 +153,15 @@ func TestAMQPHeaders(t *testing.T) {
147153
}
148154

149155
func TestAMQPRepublishBuried(t *testing.T) {
150-
broker, err := NewBroker(testAMQPURI)
156+
broker, err := queue.NewBroker(testAMQPURI)
151157
require.NoError(t, err)
152158
defer func() { require.NoError(t, broker.Close()) }()
153159

154-
queueName := newName()
155-
queue, err := broker.Queue(queueName)
160+
queueName := test.NewName()
161+
q, err := broker.Queue(queueName)
156162
require.NoError(t, err)
157163

158-
amqpQueue, ok := queue.(*AMQPQueue)
164+
amqpQueue, ok := q.(*AMQPQueue)
159165
require.True(t, ok)
160166

161167
buried := amqpQueue.buriedQueue
@@ -170,29 +176,29 @@ func TestAMQPRepublishBuried(t *testing.T) {
170176
{name: "message 3", payload: "payload 4"},
171177
}
172178

173-
for _, test := range tests {
174-
job, err := NewJob()
179+
for _, utest := range tests {
180+
job, err := queue.NewJob()
175181
require.NoError(t, err)
176182

177-
job.raw = []byte(test.payload)
183+
job.Raw = []byte(utest.payload)
178184

179185
err = buried.Publish(job)
180186
require.NoError(t, err)
181187
time.Sleep(1 * time.Second)
182188
}
183189

184-
var condition RepublishConditionFunc = func(j *Job) bool {
185-
return string(j.raw) == "republish"
190+
var condition queue.RepublishConditionFunc = func(j *queue.Job) bool {
191+
return string(j.Raw) == "republish"
186192
}
187193

188-
err = queue.RepublishBuried(condition)
194+
err = q.RepublishBuried(condition)
189195
require.NoError(t, err)
190196

191-
jobIter, err := queue.Consume(1)
197+
jobIter, err := q.Consume(1)
192198
require.NoError(t, err)
193199
defer func() { require.NoError(t, jobIter.Close()) }()
194200

195201
job, err := jobIter.Next()
196202
require.NoError(t, err)
197-
require.Equal(t, string(job.raw), "republish")
203+
require.Equal(t, string(job.Raw), "republish")
198204
}

0 commit comments

Comments
 (0)