Skip to content

Commit 2f8809d

Browse files
committed
Merge pull request #769 from obscuren/develop
core: transaction queue
2 parents 2fe54ab + 7f14fbd commit 2f8809d

File tree

12 files changed

+261
-66
lines changed

12 files changed

+261
-66
lines changed

common/natspec/natspec_e2e_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func (self *testFrontend) applyTxs() {
220220
block := self.ethereum.ChainManager().NewBlock(cb)
221221
coinbase := self.stateDb.GetStateObject(cb)
222222
coinbase.SetGasPool(big.NewInt(10000000))
223-
txs := self.ethereum.TxPool().GetTransactions()
223+
txs := self.ethereum.TxPool().GetQueuedTransactions()
224224

225225
for i := 0; i < len(txs); i++ {
226226
for _, tx := range txs {

core/block_processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st
258258
state.Sync()
259259

260260
// Remove transactions from the pool
261-
sm.txpool.RemoveSet(block.Transactions())
261+
sm.txpool.RemoveTransactions(block.Transactions())
262262

263263
// This puts transactions in a extra db for rpc
264264
for i, tx := range block.Transactions() {

core/chain_makers.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ func makeChain(bman *BlockProcessor, parent *types.Block, max int, db common.Dat
108108
// Create a new chain manager starting from given block
109109
// Effectively a fork factory
110110
func newChainManager(block *types.Block, eventMux *event.TypeMux, db common.Database) *ChainManager {
111-
bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: GenesisBlock(db), eventMux: eventMux}
111+
genesis := GenesisBlock(db)
112+
bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: eventMux}
113+
bc.txState = state.ManageState(state.New(genesis.Root(), db))
112114
bc.futureBlocks = NewBlockCache(1000)
113115
if block == nil {
114116
bc.Reset()

core/chain_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
576576
})
577577

578578
self.setTransState(state.New(block.Root(), self.stateDb))
579-
self.setTxState(state.New(block.Root(), self.stateDb))
579+
self.txState.SetState(state.New(block.Root(), self.stateDb))
580580

581581
queue[i] = ChainEvent{block, logs}
582582
queueEvent.canonicalCount++

core/error.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (err *NonceErr) Error() string {
8181
}
8282

8383
func NonceError(is, exp uint64) *NonceErr {
84-
return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce (%d / %d)", is, exp), Is: is, Exp: exp}
84+
return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce. tx=%d state=%d)", is, exp), Is: is, Exp: exp}
8585
}
8686

8787
func IsNonceErr(err error) bool {

core/transaction_pool.go

Lines changed: 117 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"errors"
55
"fmt"
66
"math/big"
7+
"sort"
78
"sync"
9+
"time"
810

911
"github.com/ethereum/go-ethereum/common"
1012
"github.com/ethereum/go-ethereum/core/state"
@@ -17,7 +19,7 @@ import (
1719

1820
var (
1921
ErrInvalidSender = errors.New("Invalid sender")
20-
ErrImpossibleNonce = errors.New("Impossible nonce")
22+
ErrNonce = errors.New("Nonce too low")
2123
ErrNonExistentAccount = errors.New("Account does not exist")
2224
ErrInsufficientFunds = errors.New("Insufficient funds")
2325
ErrIntrinsicGas = errors.New("Intrinsic gas too low")
@@ -54,20 +56,43 @@ type TxPool struct {
5456
txs map[common.Hash]*types.Transaction
5557
invalidHashes *set.Set
5658

59+
queue map[common.Address]types.Transactions
60+
5761
subscribers []chan TxMsg
5862

5963
eventMux *event.TypeMux
6064
}
6165

6266
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool {
63-
return &TxPool{
67+
txPool := &TxPool{
6468
txs: make(map[common.Hash]*types.Transaction),
69+
queue: make(map[common.Address]types.Transactions),
6570
queueChan: make(chan *types.Transaction, txPoolQueueSize),
6671
quit: make(chan bool),
6772
eventMux: eventMux,
6873
invalidHashes: set.New(),
6974
currentState: currentStateFn,
7075
}
76+
return txPool
77+
}
78+
79+
func (pool *TxPool) Start() {
80+
// Queue timer will tick so we can attempt to move items from the queue to the
81+
// main transaction pool.
82+
queueTimer := time.NewTicker(300 * time.Millisecond)
83+
// Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce)
84+
removalTimer := time.NewTicker(1 * time.Second)
85+
done:
86+
for {
87+
select {
88+
case <-queueTimer.C:
89+
pool.checkQueue()
90+
case <-removalTimer.C:
91+
pool.validatePool()
92+
case <-pool.quit:
93+
break done
94+
}
95+
}
7196
}
7297

7398
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
@@ -100,16 +125,12 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
100125
}
101126

102127
if pool.currentState().GetNonce(from) > tx.Nonce() {
103-
return ErrImpossibleNonce
128+
return ErrNonce
104129
}
105130

106131
return nil
107132
}
108133

109-
func (self *TxPool) addTx(tx *types.Transaction) {
110-
self.txs[tx.Hash()] = tx
111-
}
112-
113134
func (self *TxPool) add(tx *types.Transaction) error {
114135
hash := tx.Hash()
115136

@@ -127,7 +148,7 @@ func (self *TxPool) add(tx *types.Transaction) error {
127148
return err
128149
}
129150

130-
self.addTx(tx)
151+
self.queueTx(tx)
131152

132153
var toname string
133154
if to := tx.To(); to != nil {
@@ -144,9 +165,6 @@ func (self *TxPool) add(tx *types.Transaction) error {
144165
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
145166
}
146167

147-
// Notify the subscribers
148-
go self.eventMux.Post(TxPreEvent{tx})
149-
150168
return nil
151169
}
152170

@@ -189,34 +207,108 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) {
189207
return
190208
}
191209

192-
func (self *TxPool) RemoveSet(txs types.Transactions) {
193-
self.mu.Lock()
194-
defer self.mu.Unlock()
195-
for _, tx := range txs {
196-
delete(self.txs, tx.Hash())
210+
func (self *TxPool) GetQueuedTransactions() types.Transactions {
211+
self.mu.RLock()
212+
defer self.mu.RUnlock()
213+
214+
var txs types.Transactions
215+
for _, ts := range self.queue {
216+
txs = append(txs, ts...)
197217
}
218+
219+
return txs
198220
}
199221

200-
func (self *TxPool) InvalidateSet(hashes *set.Set) {
222+
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
201223
self.mu.Lock()
202224
defer self.mu.Unlock()
203225

204-
hashes.Each(func(v interface{}) bool {
205-
delete(self.txs, v.(common.Hash))
206-
return true
207-
})
208-
self.invalidHashes.Merge(hashes)
226+
for _, tx := range txs {
227+
delete(self.txs, tx.Hash())
228+
}
209229
}
210230

211231
func (pool *TxPool) Flush() {
212232
pool.txs = make(map[common.Hash]*types.Transaction)
213233
}
214234

215-
func (pool *TxPool) Start() {
216-
}
217-
218235
func (pool *TxPool) Stop() {
219236
pool.Flush()
237+
close(pool.quit)
220238

221239
glog.V(logger.Info).Infoln("TX Pool stopped")
222240
}
241+
242+
func (self *TxPool) queueTx(tx *types.Transaction) {
243+
from, _ := tx.From()
244+
self.queue[from] = append(self.queue[from], tx)
245+
}
246+
247+
func (pool *TxPool) addTx(tx *types.Transaction) {
248+
if _, ok := pool.txs[tx.Hash()]; !ok {
249+
pool.txs[tx.Hash()] = tx
250+
// Notify the subscribers. This event is posted in a goroutine
251+
// because it's possible that somewhere during the post "Remove transaction"
252+
// gets called which will then wait for the global tx pool lock and deadlock.
253+
go pool.eventMux.Post(TxPreEvent{tx})
254+
}
255+
}
256+
257+
// check queue will attempt to insert
258+
func (pool *TxPool) checkQueue() {
259+
pool.mu.Lock()
260+
defer pool.mu.Unlock()
261+
262+
statedb := pool.currentState()
263+
for address, txs := range pool.queue {
264+
sort.Sort(types.TxByNonce{txs})
265+
266+
var (
267+
nonce = statedb.GetNonce(address)
268+
start int
269+
)
270+
// Clean up the transactions first and determine the start of the nonces
271+
for _, tx := range txs {
272+
if tx.Nonce() >= nonce {
273+
break
274+
}
275+
start++
276+
}
277+
pool.queue[address] = txs[start:]
278+
279+
// expected nonce
280+
enonce := nonce
281+
for _, tx := range pool.queue[address] {
282+
// If the expected nonce does not match up with the next one
283+
// (i.e. a nonce gap), we stop the loop
284+
if enonce != tx.Nonce() {
285+
break
286+
}
287+
enonce++
288+
289+
pool.addTx(tx)
290+
}
291+
//pool.queue[address] = txs[i:]
292+
// delete the entire queue entry if it's empty. There's no need to keep it
293+
if len(pool.queue[address]) == 0 {
294+
delete(pool.queue, address)
295+
}
296+
}
297+
}
298+
299+
func (pool *TxPool) validatePool() {
300+
pool.mu.Lock()
301+
defer pool.mu.Unlock()
302+
303+
statedb := pool.currentState()
304+
for hash, tx := range pool.txs {
305+
from, _ := tx.From()
306+
if nonce := statedb.GetNonce(from); nonce > tx.Nonce() {
307+
if glog.V(logger.Debug) {
308+
glog.Infof("removed tx (%x) from pool due to nonce error. state=%d tx=%d\n", hash[:4], nonce, tx.Nonce())
309+
}
310+
311+
delete(pool.txs, hash)
312+
}
313+
}
314+
}

core/transaction_pool_test.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,57 @@ func TestInvalidTransactions(t *testing.T) {
5656
tx.SignECDSA(key)
5757

5858
err = pool.Add(tx)
59-
if err != ErrImpossibleNonce {
60-
t.Error("expected", ErrImpossibleNonce)
59+
if err != ErrNonce {
60+
t.Error("expected", ErrNonce)
61+
}
62+
}
63+
64+
func TestTransactionQueue(t *testing.T) {
65+
pool, key := setupTxPool()
66+
tx := transaction()
67+
tx.SignECDSA(key)
68+
from, _ := tx.From()
69+
pool.currentState().AddBalance(from, big.NewInt(1))
70+
pool.queueTx(tx)
71+
72+
pool.checkQueue()
73+
if len(pool.txs) != 1 {
74+
t.Error("expected valid txs to be 1 is", len(pool.txs))
75+
}
76+
77+
tx = transaction()
78+
tx.SignECDSA(key)
79+
from, _ = tx.From()
80+
pool.currentState().SetNonce(from, 10)
81+
tx.SetNonce(1)
82+
pool.queueTx(tx)
83+
pool.checkQueue()
84+
if _, ok := pool.txs[tx.Hash()]; ok {
85+
t.Error("expected transaction to be in tx pool")
86+
}
87+
88+
if len(pool.queue[from]) != 0 {
89+
t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
90+
}
91+
92+
pool, key = setupTxPool()
93+
tx1, tx2, tx3 := transaction(), transaction(), transaction()
94+
tx2.SetNonce(10)
95+
tx3.SetNonce(11)
96+
tx1.SignECDSA(key)
97+
tx2.SignECDSA(key)
98+
tx3.SignECDSA(key)
99+
pool.queueTx(tx1)
100+
pool.queueTx(tx2)
101+
pool.queueTx(tx3)
102+
from, _ = tx1.From()
103+
pool.checkQueue()
104+
105+
if len(pool.txs) != 1 {
106+
t.Error("expected tx pool to be 1 =")
107+
}
108+
109+
if len(pool.queue[from]) != 3 {
110+
t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
61111
}
62112
}

0 commit comments

Comments
 (0)