Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions common/prque/lazyqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type (
// NewLazyQueue creates a new lazy queue
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
q := &LazyQueue{
popQueue: newSstack(nil),
popQueue: newSstack(false, nil),
setIndex: setIndex,
priority: priority,
maxPriority: maxPriority,
Expand All @@ -71,8 +71,8 @@ func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPrior

// Reset clears the contents of the queue
func (q *LazyQueue) Reset() {
q.queue[0] = newSstack(q.setIndex0)
q.queue[1] = newSstack(q.setIndex1)
q.queue[0] = newSstack(q.popQueue.invert, q.setIndex0)
q.queue[1] = newSstack(q.popQueue.invert, q.setIndex1)
}

// Refresh performs queue re-evaluation if necessary
Expand Down Expand Up @@ -124,7 +124,7 @@ func (q *LazyQueue) Pop() (interface{}, int64) {
// highest estimated priority is or -1 if both are empty
func (q *LazyQueue) peekIndex() int {
if q.queue[0].Len() != 0 {
if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority {
if q.queue[1].Len() != 0 && less(q.queue[1].blocks[0][0].priority.(int64), q.queue[0].blocks[0][0].priority.(int64), q.popQueue.invert) {
return 1
}
return 0
Expand All @@ -145,9 +145,9 @@ func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) boo
data := heap.Pop(q.queue[nextIndex]).(*item).value
heap.Push(q.popQueue, &item{data, q.priority(data, now)})
nextIndex = q.peekIndex()
for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
for q.popQueue.Len() != 0 && (nextIndex == -1 || !less(q.queue[nextIndex].blocks[0][0].priority.(int64), q.popQueue.blocks[0][0].priority.(int64), q.popQueue.invert)) {
i := heap.Pop(q.popQueue).(*item)
if !callback(i.value, i.priority) {
if !callback(i.value, i.priority.(int64)) {
for q.popQueue.Len() != 0 {
q.Push(heap.Pop(q.popQueue).(*item).value)
}
Expand Down
18 changes: 13 additions & 5 deletions common/prque/prque.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,27 @@ type Prque struct {

// New creates a new priority queue.
func New(setIndex SetIndexCallback) *Prque {
return &Prque{newSstack(setIndex)}
return &Prque{newSstack(false, setIndex)}
}

func NewInverted(setIndex SetIndexCallback) *Prque {
return &Prque{newSstack(true, setIndex)}
}

// Pushes a value with a given priority into the queue, expanding if necessary.
func (p *Prque) Push(data interface{}, priority int64) {
func (p *Prque) Push(data interface{}, priority interface{}) {
heap.Push(p.cont, &item{data, priority})
}

// Peek returns the value with the greates priority but does not pop it off.
func (p *Prque) Peek() (interface{}, int64) {
func (p *Prque) Peek() (interface{}, interface{}) {
item := p.cont.blocks[0][0]
return item.value, item.priority
}

// Pops the value with the greates priority off the stack and returns it.
// Currently no shrinking is done.
func (p *Prque) Pop() (interface{}, int64) {
func (p *Prque) Pop() (interface{}, interface{}) {
item := heap.Pop(p.cont).(*item)
return item.value, item.priority
}
Expand Down Expand Up @@ -74,5 +78,9 @@ func (p *Prque) Size() int {

// Clears the contents of the priority queue.
func (p *Prque) Reset() {
*p = *New(p.cont.setIndex)
if p.cont.invert {
*p = *NewInverted(p.cont.setIndex)
} else {
*p = *New(p.cont.setIndex)
}
}
20 changes: 10 additions & 10 deletions common/prque/prque_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func TestPrque(t *testing.T) {
prevPrio := int64(size + 1)
for !queue.Empty() {
val, prio := queue.Pop()
if prio > prevPrio {
if prio.(int64) > prevPrio {
t.Errorf("invalid priority order: %v after %v.", prio, prevPrio)
}
prevPrio = prio
if val != dict[prio] {
t.Errorf("push/pop mismatch: have %v, want %v.", val, dict[prio])
prevPrio = prio.(int64)
if val != dict[prio.(int64)] {
t.Errorf("push/pop mismatch: have %v, want %v.", val, dict[prio.(int64)])
}
delete(dict, prio)
delete(dict, prio.(int64))
}
}
}
Expand Down Expand Up @@ -77,14 +77,14 @@ func TestReset(t *testing.T) {
prevPrio := int64(size + 1)
for i := 0; i < size/2; i++ {
val, prio := queue.Pop()
if prio > prevPrio {
if prio.(int64) > prevPrio {
t.Errorf("invalid priority order: %v after %v.", prio, prevPrio)
}
prevPrio = prio
if val != dict[prio] {
t.Errorf("push/pop mismatch: have %v, want %v.", val, dict[prio])
prevPrio = prio.(int64)
if val != dict[prio.(int64)] {
t.Errorf("push/pop mismatch: have %v, want %v.", val, dict[prio.(int64)])
}
delete(dict, prio)
delete(dict, prio.(int64))
}
// Reset and ensure it's empty
queue.Reset()
Expand Down
54 changes: 44 additions & 10 deletions common/prque/sstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

package prque

import (
"bytes"
)

// The size of a block of data
const blockSize = 4096

Expand All @@ -19,7 +23,7 @@ const blockSize = 4096
// The difference between the lowest and highest priorities in the queue at any point should be less than 2^63.
type item struct {
value interface{}
priority int64
priority interface{}
}

// SetIndexCallback is called when the element is moved to a new index.
Expand All @@ -35,19 +39,24 @@ type sstack struct {
size int
capacity int
offset int
invert bool // min-heap

blocks [][]*item
active []*item
}

// Creates a new, empty stack.
func newSstack(setIndex SetIndexCallback) *sstack {
result := new(sstack)
result.setIndex = setIndex
result.active = make([]*item, blockSize)
result.blocks = [][]*item{result.active}
result.capacity = blockSize
return result
func newSstack(invert bool, setIndex SetIndexCallback) *sstack {
active := make([]*item, blockSize)
return &sstack{
setIndex: setIndex,
size: 0,
capacity: blockSize,
offset: 0,
invert: invert,
blocks: [][]*item{active},
active: active,
}
}

// Pushes a value onto the stack, expanding it if necessary. Required by
Expand Down Expand Up @@ -94,7 +103,32 @@ func (s *sstack) Len() int {
// Compares the priority of two elements of the stack (higher is first).
// Required by sort.Interface.
func (s *sstack) Less(i, j int) bool {
return (s.blocks[i/blockSize][i%blockSize].priority - s.blocks[j/blockSize][j%blockSize].priority) > 0
iPriority := s.blocks[i/blockSize][i%blockSize].priority
jPriority := s.blocks[j/blockSize][j%blockSize].priority

return less(iPriority, jPriority, s.invert)
}

func less(i, j interface{}, invert bool) bool {
var result bool
switch iPriority := i.(type) {
case int:
// If an type assertion error occurred, check types in Push().
// Same type should be pushed.
jPriority := j.(int)
result = iPriority > jPriority
case int64:
jPriority := j.(int64)
result = iPriority > jPriority
case uint64:
jPriority := j.(uint64)
result = iPriority > jPriority
case []byte:
jPriority := j.([]byte)
result = bytes.Compare(iPriority, jPriority) > 0
}

return result != invert
}

// Swaps two elements in the stack. Required by sort.Interface.
Expand All @@ -110,5 +144,5 @@ func (s *sstack) Swap(i, j int) {

// Resets the stack, effectively clearing its contents.
func (s *sstack) Reset() {
*s = *newSstack(s.setIndex)
*s = *newSstack(s.invert, s.setIndex)
}
6 changes: 3 additions & 3 deletions common/prque/sstack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestSstack(t *testing.T) {
for i := 0; i < size; i++ {
data[i] = &item{rand.Int(), rand.Int63()}
}
stack := newSstack(nil)
stack := newSstack(false, nil)
for rep := 0; rep < 2; rep++ {
// Push all the data into the stack, pop out every second
secs := []*item{}
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestSstackSort(t *testing.T) {
data[i] = &item{rand.Int(), int64(i)}
}
// Push all the data into the stack
stack := newSstack(nil)
stack := newSstack(false, nil)
for _, val := range data {
stack.Push(val)
}
Expand All @@ -76,7 +76,7 @@ func TestSstackReset(t *testing.T) {
for i := 0; i < size; i++ {
data[i] = &item{rand.Int(), rand.Int63()}
}
stack := newSstack(nil)
stack := newSstack(false, nil)
for rep := 0; rep < 2; rep++ {
// Push all the data into the stack, pop out every second
secs := []*item{}
Expand Down
6 changes: 3 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
triegc: prque.NewInverted(nil),
stateCache: state.NewDatabaseWithConfig(db, &trie.Config{
Cache: cacheConfig.TrieCleanLimit,
Journal: cacheConfig.TrieCleanJournal,
Expand Down Expand Up @@ -1536,7 +1536,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
} else {
// Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64()))
bc.triegc.Push(root, block.NumberU64())

if current := block.NumberU64(); current > TriesInMemory {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
Expand Down Expand Up @@ -1572,7 +1572,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// Garbage collect anything below our required write retention
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
if number.(uint64) > chosen {
bc.triegc.Push(root, number)
break
}
Expand Down
10 changes: 5 additions & 5 deletions core/rawdb/chain_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan
// Push the delivery into the queue and process contiguous ranges.
// Since we iterate in reverse, so lower numbers have lower prio, and
// we can use the number directly as prio marker
queue.Push(chanDelivery, int64(chanDelivery.number))
queue.Push(chanDelivery, chanDelivery.number)
for !queue.Empty() {
// If the next available item is gapped, return
if _, priority := queue.Peek(); priority != int64(lastNum-1) {
if _, priority := queue.Peek(); priority != lastNum-1 {
break
}
// For testing
Expand Down Expand Up @@ -293,17 +293,17 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch
// we expect the first number to come in to be [from]. Therefore, setting
// nextNum to from means that the prqueue gap-evaluation will work correctly
nextNum = from
queue = prque.New(nil)
queue = prque.NewInverted(nil)
// for stats reporting
blocks, txs = 0, 0
)
// Otherwise spin up the concurrent iterator and unindexer
for delivery := range hashesCh {
// Push the delivery into the queue and process contiguous ranges.
queue.Push(delivery, -int64(delivery.number))
queue.Push(delivery, delivery.number)
for !queue.Empty() {
// If the next available item is gapped, return
if _, priority := queue.Peek(); -priority != int64(nextNum) {
if _, priority := queue.Peek(); priority.(uint64) != nextNum {
break
}
// For testing
Expand Down
2 changes: 1 addition & 1 deletion core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1284,7 +1284,7 @@ func (pool *TxPool) truncatePending() {
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, int64(list.Len()))
spammers.Push(addr, list.Len())
}
}
// Gradually drop transactions from offenders
Expand Down
Loading