Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ type Pipeliner interface {
// If a certain Redis command is not yet supported, you can use Do to execute it.
Do(ctx context.Context, args ...interface{}) *Cmd

// Process puts the commands to be executed into the pipeline buffer.
// Process queues the cmd for later execution.
Process(ctx context.Context, cmd Cmder) error

// BatchProcess adds multiple commands to be executed into the pipeline buffer.
BatchProcess(ctx context.Context, cmd ...Cmder) error

// Discard discards all commands in the pipeline buffer that have not yet been executed.
Discard()

Expand Down Expand Up @@ -79,7 +82,12 @@ func (c *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd {

// Process queues the cmd for later execution.
func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error {
c.cmds = append(c.cmds, cmd)
return c.BatchProcess(ctx, cmd)
}

// BatchProcess queues multiple cmds for later execution.
func (c *Pipeline) BatchProcess(ctx context.Context, cmd ...Cmder) error {
c.cmds = append(c.cmds, cmd...)
return nil
}

Expand Down
19 changes: 19 additions & 0 deletions pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,25 @@ var _ = Describe("pipelining", func() {
err := pipe.Do(ctx).Err()
Expect(err).To(Equal(errors.New("redis: please enter the command to be executed")))
})

It("should process", func() {
err := pipe.Process(ctx, redis.NewCmd(ctx, "asking"))
Expect(err).To(BeNil())
Expect(pipe.Cmds()).To(HaveLen(1))
})

It("should batchProcess", func() {
err := pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking"))
Expect(err).To(BeNil())
Expect(pipe.Cmds()).To(HaveLen(1))

pipe.Discard()
Expect(pipe.Cmds()).To(HaveLen(0))

err = pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking"), redis.NewCmd(ctx, "set", "key", "value"))
Expect(err).To(BeNil())
Expect(pipe.Cmds()).To(HaveLen(2))
})
}

Describe("Pipeline", func() {
Expand Down
Loading