Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions les/vflux/server/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,15 @@ func (n *nodeBalance) estimatePriority(capacity uint64, addBalance int64, future
if bias > 0 {
b = n.reducedBalance(b, now+mclock.AbsTime(future), bias, capacity, 0)
}
// Note: we subtract one from the estimated priority in order to ensure that biased
// estimates are always lower than actual priorities, even if the bias is very small.
pri := n.balanceToPriority(now, b, capacity)
// Ensure that biased estimates are always lower than actual priorities, even if
// the bias is very small.
// This ensures that two nodes will not ping-pong update signals forever if both of
// them have zero estimated priority drop in the projected future.
pri := n.balanceToPriority(now, b, capacity) - 1
current := n.balanceToPriority(now, n.balance, capacity)
if pri >= current {
pri = current - 1
}
if update {
n.addCallback(balanceCallbackUpdate, pri, n.signalPriorityUpdate)
}
Expand Down
6 changes: 3 additions & 3 deletions les/vflux/server/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestEstimatedPriority(t *testing.T) {
{time.Second, 3 * time.Second, 1000000000, 48},

// All positive balance is used up
{time.Second * 55, 0, 0, 0},
{time.Second * 55, 0, 0, -1},

// 1 minute estimated time cost, 4/58 * 10^9 estimated request cost per sec.
{0, time.Minute, 0, -int64(time.Minute) - int64(time.Second)*120/29},
Expand All @@ -292,8 +292,8 @@ func TestEstimatedPriority(t *testing.T) {
b.clock.Run(i.runTime)
node.RequestServed(i.reqCost)
priority := node.estimatePriority(1000000000, 0, i.futureTime, 0, false)
if priority != i.priority-1 {
t.Fatalf("Estimated priority mismatch, want %v, got %v", i.priority-1, priority)
if priority != i.priority {
t.Fatalf("Estimated priority mismatch, want %v, got %v", i.priority, priority)
}
}
}
Expand Down
9 changes: 1 addition & 8 deletions les/vflux/server/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,7 @@ func NewClientPool(balanceDb ethdb.KeyValueStore, minCap uint64, connectedBias t
if c, ok := ns.GetField(node, setup.clientField).(clientPeer); ok {
timeout = c.InactiveAllowance()
}
if timeout > 0 {
ns.AddTimeout(node, setup.inactiveFlag, timeout)
} else {
// Note: if capacity is immediately available then priorityPool will set the active
// flag simultaneously with removing the inactive flag and therefore this will not
// initiate disconnection
ns.SetStateSub(node, nodestate.Flags{}, setup.inactiveFlag, 0)
}
ns.AddTimeout(node, setup.inactiveFlag, timeout)
}
if oldState.Equals(setup.inactiveFlag) && newState.Equals(setup.inactiveFlag.Or(setup.priorityFlag)) {
ns.SetStateSub(node, setup.inactiveFlag, nodestate.Flags{}, 0) // priority gained; remove timeout
Expand Down
11 changes: 8 additions & 3 deletions les/vflux/server/clientpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,13 @@ func TestPaidClientKickedOut(t *testing.T) {
if cap := connect(pool, newPoolTestPeer(11, kickedCh)); cap == 0 {
t.Fatalf("Free client should be accepted")
}
clock.Run(0)
select {
case id := <-kickedCh:
if id != 0 {
t.Fatalf("Kicked client mismatch, want %v, got %v", 0, id)
}
case <-time.NewTimer(time.Second).C:
default:
t.Fatalf("timeout")
}
}
Expand Down Expand Up @@ -399,23 +400,27 @@ func TestFreeClientKickedOut(t *testing.T) {
if cap := connect(pool, newPoolTestPeer(10, kicked)); cap != 0 {
t.Fatalf("New free client should be rejected")
}
clock.Run(0)
select {
case <-kicked:
case <-time.NewTimer(time.Second).C:
default:
t.Fatalf("timeout")
}
disconnect(pool, newPoolTestPeer(10, kicked))
clock.Run(5 * time.Minute)
for i := 0; i < 10; i++ {
connect(pool, newPoolTestPeer(i+10, kicked))

}
clock.Run(0)

for i := 0; i < 10; i++ {
select {
case id := <-kicked:
if id >= 10 {
t.Fatalf("Old client should be kicked, now got: %d", id)
}
case <-time.NewTimer(time.Second).C:
default:
t.Fatalf("timeout")
}
}
Expand Down