Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions async_handoff_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
Dialer: func(ctx context.Context) (net.Conn, error) {
return &mockNetConn{addr: "original:6379"}, nil
},
PoolSize: int32(5),
PoolTimeout: time.Second,
PoolSize: int32(5),
MaxConcurrentDials: 5,
PoolTimeout: time.Second,
})

// Add the hook to the pool after creation
Expand Down Expand Up @@ -153,8 +154,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
return &mockNetConn{addr: "original:6379"}, nil
},

PoolSize: int32(10),
PoolTimeout: time.Second,
PoolSize: int32(10),
MaxConcurrentDials: 10,
PoolTimeout: time.Second,
})
defer testPool.Close()

Expand Down Expand Up @@ -225,8 +227,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
return &mockNetConn{addr: "original:6379"}, nil
},

PoolSize: int32(3),
PoolTimeout: time.Second,
PoolSize: int32(3),
MaxConcurrentDials: 3,
PoolTimeout: time.Second,
})
defer testPool.Close()

Expand Down Expand Up @@ -288,8 +291,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
return &mockNetConn{addr: "original:6379"}, nil
},

PoolSize: int32(2),
PoolTimeout: time.Second,
PoolSize: int32(2),
MaxConcurrentDials: 2,
PoolTimeout: time.Second,
})
defer testPool.Close()

Expand Down
22 changes: 12 additions & 10 deletions internal/pool/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ func BenchmarkPoolGetPut(b *testing.B) {
for _, bm := range benchmarks {
b.Run(bm.String(), func(b *testing.B) {
connPool := pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: int32(bm.poolSize),
PoolTimeout: time.Second,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Hour,
Dialer: dummyDialer,
PoolSize: int32(bm.poolSize),
MaxConcurrentDials: bm.poolSize,
PoolTimeout: time.Second,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Hour,
})

b.ResetTimer()
Expand Down Expand Up @@ -75,11 +76,12 @@ func BenchmarkPoolGetRemove(b *testing.B) {
for _, bm := range benchmarks {
b.Run(bm.String(), func(b *testing.B) {
connPool := pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: int32(bm.poolSize),
PoolTimeout: time.Second,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Hour,
Dialer: dummyDialer,
PoolSize: int32(bm.poolSize),
MaxConcurrentDials: bm.poolSize,
PoolTimeout: time.Second,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Hour,
})

b.ResetTimer()
Expand Down
36 changes: 20 additions & 16 deletions internal/pool/buffer_size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ var _ = Describe("Buffer Size Configuration", func() {

It("should use default buffer sizes when not specified", func() {
connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: int32(1),
PoolTimeout: 1000,
Dialer: dummyDialer,
PoolSize: int32(1),
MaxConcurrentDials: 1,
PoolTimeout: 1000,
})

cn, err := connPool.NewConn(ctx)
Expand All @@ -47,11 +48,12 @@ var _ = Describe("Buffer Size Configuration", func() {
customWriteSize := 64 * 1024 // 64KB

connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: int32(1),
PoolTimeout: 1000,
ReadBufferSize: customReadSize,
WriteBufferSize: customWriteSize,
Dialer: dummyDialer,
PoolSize: int32(1),
MaxConcurrentDials: 1,
PoolTimeout: 1000,
ReadBufferSize: customReadSize,
WriteBufferSize: customWriteSize,
})

cn, err := connPool.NewConn(ctx)
Expand All @@ -68,11 +70,12 @@ var _ = Describe("Buffer Size Configuration", func() {

It("should handle zero buffer sizes by using defaults", func() {
connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: int32(1),
PoolTimeout: 1000,
ReadBufferSize: 0, // Should use default
WriteBufferSize: 0, // Should use default
Dialer: dummyDialer,
PoolSize: int32(1),
MaxConcurrentDials: 1,
PoolTimeout: 1000,
ReadBufferSize: 0, // Should use default
WriteBufferSize: 0, // Should use default
})

cn, err := connPool.NewConn(ctx)
Expand Down Expand Up @@ -104,9 +107,10 @@ var _ = Describe("Buffer Size Configuration", func() {
// Test the scenario where someone creates a pool directly (like in tests)
// without setting ReadBufferSize and WriteBufferSize
connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: int32(1),
PoolTimeout: 1000,
Dialer: dummyDialer,
PoolSize: int32(1),
MaxConcurrentDials: 1,
PoolTimeout: 1000,
// ReadBufferSize and WriteBufferSize are not set (will be 0)
})

Expand Down
5 changes: 3 additions & 2 deletions internal/pool/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ func TestPoolWithHooks(t *testing.T) {
Dialer: func(ctx context.Context) (net.Conn, error) {
return &net.TCPConn{}, nil // Mock connection
},
PoolSize: 1,
DialTimeout: time.Second,
PoolSize: 1,
MaxConcurrentDials: 1,
DialTimeout: time.Second,
}

pool := NewConnPool(opt)
Expand Down
123 changes: 117 additions & 6 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type Options struct {

PoolFIFO bool
PoolSize int32
MaxConcurrentDials int
DialTimeout time.Duration
PoolTimeout time.Duration
MinIdleConns int32
Expand All @@ -113,13 +114,73 @@ type lastDialErrorWrap struct {
err error
}

type wantConn struct {
mu sync.Mutex // protects ctx, done and sending of the result
ctx context.Context // context for dial, cleared after delivered or canceled
cancelCtx context.CancelFunc
done bool // true after delivered or canceled
result chan wantConnResult // channel to deliver connection or error
}

// getCtxForDial returns context for dial or nil if connection was delivered or canceled.
func (w *wantConn) getCtxForDial() context.Context {
w.mu.Lock()
defer w.mu.Unlock()

return w.ctx
}

func (w *wantConn) tryDeliver(cn *Conn, err error) bool {
w.mu.Lock()
defer w.mu.Unlock()
if w.done {
return false
}

w.done = true
w.ctx = nil

w.result <- wantConnResult{cn: cn, err: err}
close(w.result)

return true
}

func (w *wantConn) cancel(ctx context.Context, p *ConnPool) {
w.mu.Lock()
var cn *Conn
if w.done {
select {
case result := <-w.result:
cn = result.cn
default:
}
} else {
close(w.result)
}

w.done = true
w.ctx = nil
w.mu.Unlock()

if cn != nil {
p.Put(ctx, cn)
}
}

type wantConnResult struct {
cn *Conn
err error
}

type ConnPool struct {
cfg *Options

dialErrorsNum uint32 // atomic
lastDialError atomic.Value

queue chan struct{}
queue chan struct{}
dialsInProgress chan struct{}

connsMu sync.Mutex
conns map[uint64]*Conn
Expand All @@ -145,9 +206,10 @@ func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{
cfg: opt,

queue: make(chan struct{}, opt.PoolSize),
conns: make(map[uint64]*Conn),
idleConns: make([]*Conn, 0, opt.PoolSize),
queue: make(chan struct{}, opt.PoolSize),
conns: make(map[uint64]*Conn),
dialsInProgress: make(chan struct{}, opt.MaxConcurrentDials),
idleConns: make([]*Conn, 0, opt.PoolSize),
}

// Only create MinIdleConns if explicitly requested (> 0)
Expand Down Expand Up @@ -473,9 +535,8 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {

atomic.AddUint32(&p.stats.Misses, 1)

newcn, err := p.newConn(ctx, true)
newcn, err := p.asyncNewConn(ctx)
if err != nil {
p.freeTurn()
return nil, err
}

Expand All @@ -495,6 +556,56 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
return newcn, nil
}

func (p *ConnPool) asyncNewConn(ctx context.Context) (*Conn, error) {
// First try to acquire permission to create a connection
select {
case p.dialsInProgress <- struct{}{}:
// Got permission, proceed to create connection
case <-ctx.Done():
p.freeTurn()
return nil, ctx.Err()
}

dialCtx, cancel := context.WithTimeout(context.Background(), p.cfg.DialTimeout)

w := &wantConn{
ctx: dialCtx,
cancelCtx: cancel,
result: make(chan wantConnResult, 1),
}
var err error
defer func() {
if err != nil {
w.cancel(ctx, p)
}
}()

go func(w *wantConn) {
defer w.cancelCtx()
defer func() { <-p.dialsInProgress }() // Release connection creation permission

dialCtx := w.getCtxForDial()
cn, cnErr := p.newConn(dialCtx, true)
delivered := w.tryDeliver(cn, cnErr)
if cnErr == nil && delivered {
return
} else if cnErr == nil && !delivered {
p.Put(dialCtx, cn)
} else { // freeTurn after error
p.freeTurn()
}
}(w)

select {
case <-ctx.Done():
err = ctx.Err()
return nil, err
Comment on lines +601 to +602
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it call p.freeTurn here as well? Just took a brief look over the PR, haven't done detailed review yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After starting the go-routine, freeTurn() is delegated to this new go-routine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, understood, but I am wondering how this will be different from the current approach, since the turn will be available only when the connection is either created or fails?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're correct that turn remains the same: Only becomes available after a connection creation attempt concludes (either successfully or with a definitive failure).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think now I understand the idea.
If for whatever reason we fail after the async dialer has started, we will error out to the command, but the potential opened tcp connection will try to be stored back in the pool (if there is place). Can't we optimize this a bit more. Two things we don't know but can help us here:

  • Are the other dialers executing at the moment? If so, why don't we provide the initialized connection to someone else that is currently waiting for its dialer to complete?
  • Is there place in the pool? This doesn't need to be exactly correct, but if there isn't place in the pool when initially starting the dialer, we can assume there won't be when we complete it, so we can fail early and don't wait for the tcp connection to be created.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Regarding your first point (connection sharing): I had similar thoughts about implementing a connection sharing or dial coalescing mechanism to prevent duplicate work. It's a fantastic idea. However, one concern that came to my mind by the time I started this PR is about fairness and potential starvation. If we put the successful connection into a channel for all waiting dialers to consume, the lack of priority in Go channels might mean that the earliest request isn't necessarily the first to get the connection. To address this fairness concern, I believe we could implement using a queue instead of a simple channel. This would ensure that waiters are served in a first-come-first-served manner, preventing any starvation issues. I will implement this approach in the next iteration of the PR.
  • Regarding your second point (early failure): In current pool design, the number of turns equals the PoolSize, and acquiring a turn is a prerequisite for starting a dialer. This means that by the time a dialer is started, we've already secured a place for the future connection in the pool. So, logically, a successfully created connection should always have a slot to be stored. I'm concerned I might be missing something here — could you please help me understand if my reasoning is correct or if there's a gap in my understanding of the current implementation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cyningsun to the second point:
In this PR and in the case that is implemented here you are correct, if we have a turn, there should be a slot in the pool to keep the connection. However, if the MaxIdleConns is set the connection won't be kept in the pool.

if p.cfg.MaxIdleConns == 0 || p.idleConnsLen.Load() < p.cfg.MaxIdleConns {
// unusable conns are expected to become usable at some point (background process is reconnecting them)
// put them at the opposite end of the queue
if !cn.IsUsable() {
if p.cfg.PoolFIFO {
p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
p.connsMu.Unlock()
} else {
p.connsMu.Lock()
p.idleConns = append([]*Conn{cn}, p.idleConns...)
p.connsMu.Unlock()
}
} else {
p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
p.connsMu.Unlock()
}
p.idleConnsLen.Add(1)
} else {
p.removeConnWithLock(cn)
shouldCloseConn = true
}

What we can do, to not waste this connection, is to keep it for some time without checking the MaxIdleConns on put, but rather cleanup the idles in checkMinIdle (which should be triggered on popIdle?

Overall, if we have reached creation of a new connection it should be the case that there are no usable idles at this time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ndyakov Ah, thank you for the clarification! I will work on integrating this strategy into the implementation.

case result := <-w.result:
err = result.err
return result.cn, err
}
}

func (p *ConnPool) waitTurn(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down
Loading
Loading