Skip to content

Commit 304ab28

Browse files
authored
Merge pull request #13 from carlosms/bug-memory-next
Fix memory queue not blocking on Next
2 parents 32a0a06 + c297a37 commit 304ab28

File tree

3 files changed

+58
-28
lines changed

3 files changed

+58
-28
lines changed

memory/memory.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package memory
22

33
import (
4+
"io"
45
"sync"
56
"time"
67

@@ -161,7 +162,7 @@ func (i *JobIter) next() (*queue.Job, error) {
161162
i.Lock()
162163
defer i.Unlock()
163164
if len(i.q.jobs) <= i.q.idx {
164-
return nil, nil
165+
return nil, io.EOF
165166
}
166167

167168
j := i.q.jobs[i.q.idx]

memory/memory_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ type MemorySuite struct {
2020

2121
func (s *MemorySuite) SetupSuite() {
2222
s.BrokerURI = "memory://"
23-
s.AdvWindowNotSupported = true
2423
}
2524

2625
func (s *MemorySuite) TestIntegration() {

test/suite.go

Lines changed: 56 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ type QueueSuite struct {
2727
suite.Suite
2828
r rand.Rand
2929

30-
TxNotSupported bool
31-
AdvWindowNotSupported bool
32-
BrokerURI string
30+
TxNotSupported bool
31+
BrokerURI string
3332

3433
Broker queue.Broker
3534
}
@@ -63,7 +62,7 @@ func (s *QueueSuite) TestConsume_empty() {
6362
assert.NoError(iter.Close())
6463
}
6564

66-
func (s *QueueSuite) TestJobIter_Next_empty() {
65+
func (s *QueueSuite) TestJobIter_Next_closed() {
6766
assert := assert.New(s.T())
6867

6968
qName := NewName()
@@ -81,6 +80,50 @@ func (s *QueueSuite) TestJobIter_Next_empty() {
8180
<-done
8281
}
8382

83+
func (s *QueueSuite) TestJobIter_Next_empty() {
84+
assert := assert.New(s.T())
85+
86+
qName := NewName()
87+
q, err := s.Broker.Queue(qName)
88+
assert.NoError(err)
89+
assert.NotNil(q)
90+
91+
advertisedWindow := 1
92+
iter, err := q.Consume(advertisedWindow)
93+
assert.NoError(err)
94+
assert.NotNil(iter)
95+
96+
nJobs := 0
97+
98+
done := make(chan struct{})
99+
go func() {
100+
j, err := iter.Next()
101+
assert.NoError(err)
102+
assert.NotNil(j)
103+
104+
nJobs += 1
105+
done <- struct{}{}
106+
}()
107+
108+
time.Sleep(50 * time.Millisecond)
109+
110+
assert.Equal(0, nJobs)
111+
112+
j, err := queue.NewJob()
113+
assert.NoError(err)
114+
115+
err = j.Encode(1)
116+
assert.NoError(err)
117+
118+
err = q.Publish(j)
119+
assert.NoError(err)
120+
121+
<-done
122+
123+
assert.Equal(1, nJobs)
124+
assert.NoError(iter.Close())
125+
}
126+
84127
func (s *QueueSuite) TestJob_Reject_no_requeue() {
85128
assert := assert.New(s.T())
86129

@@ -110,17 +153,10 @@ func (s *QueueSuite) TestJob_Reject_no_requeue() {
110153
err = j.Reject(false)
111154
assert.NoError(err)
112155

113-
if s.AdvWindowNotSupported {
114-
j, err := iter.Next()
115-
assert.Nil(j)
116-
assert.NoError(err)
117-
assert.NoError(iter.Close())
118-
} else {
119-
done := s.checkNextClosed(iter)
120-
time.Sleep(50 * time.Millisecond)
121-
assert.NoError(iter.Close())
122-
<-done
123-
}
156+
done := s.checkNextClosed(iter)
157+
time.Sleep(50 * time.Millisecond)
158+
assert.NoError(iter.Close())
159+
<-done
124160
}
125161

126162
func (s *QueueSuite) TestJob_Reject_requeue() {
@@ -385,17 +421,11 @@ func (s *QueueSuite) TestTransaction_Error() {
385421
advertisedWindow := 1
386422
i, err := q.Consume(advertisedWindow)
387423
assert.NoError(err)
388-
if s.AdvWindowNotSupported {
389-
j, err := i.Next()
390-
assert.Nil(j)
391-
assert.NoError(err)
392-
assert.NoError(i.Close())
393-
} else {
394-
done := s.checkNextClosed(i)
395-
time.Sleep(50 * time.Millisecond)
396-
assert.NoError(i.Close())
397-
<-done
398-
}
424+
425+
done := s.checkNextClosed(i)
426+
time.Sleep(50 * time.Millisecond)
427+
assert.NoError(i.Close())
428+
<-done
399429
}
400430

401431
func (s *QueueSuite) TestTransaction() {

0 commit comments

Comments
 (0)