Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/natspec/natspec_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (self *testFrontend) applyTxs() {
block := self.ethereum.ChainManager().NewBlock(cb)
coinbase := self.stateDb.GetStateObject(cb)
coinbase.SetGasPool(big.NewInt(10000000))
txs := self.ethereum.TxPool().GetTransactions()
txs := self.ethereum.TxPool().GetQueuedTransactions()

for i := 0; i < len(txs); i++ {
for _, tx := range txs {
Expand Down
2 changes: 1 addition & 1 deletion core/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st
state.Sync()

// Remove transactions from the pool
sm.txpool.RemoveSet(block.Transactions())
sm.txpool.RemoveTransactions(block.Transactions())

// This puts transactions in a extra db for rpc
for i, tx := range block.Transactions() {
Expand Down
4 changes: 3 additions & 1 deletion core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func makeChain(bman *BlockProcessor, parent *types.Block, max int, db common.Dat
// Create a new chain manager starting from given block
// Effectively a fork factory
func newChainManager(block *types.Block, eventMux *event.TypeMux, db common.Database) *ChainManager {
bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: GenesisBlock(db), eventMux: eventMux}
genesis := GenesisBlock(db)
bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: eventMux}
bc.txState = state.ManageState(state.New(genesis.Root(), db))
bc.futureBlocks = NewBlockCache(1000)
if block == nil {
bc.Reset()
Expand Down
2 changes: 1 addition & 1 deletion core/chain_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
})

self.setTransState(state.New(block.Root(), self.stateDb))
self.setTxState(state.New(block.Root(), self.stateDb))
self.txState.SetState(state.New(block.Root(), self.stateDb))

queue[i] = ChainEvent{block, logs}
queueEvent.canonicalCount++
Expand Down
2 changes: 1 addition & 1 deletion core/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (err *NonceErr) Error() string {
}

func NonceError(is, exp uint64) *NonceErr {
return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce (%d / %d)", is, exp), Is: is, Exp: exp}
return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce. tx=%d state=%d)", is, exp), Is: is, Exp: exp}
}

func IsNonceErr(err error) bool {
Expand Down
142 changes: 117 additions & 25 deletions core/transaction_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"errors"
"fmt"
"math/big"
"sort"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
Expand All @@ -17,7 +19,7 @@ import (

var (
ErrInvalidSender = errors.New("Invalid sender")
ErrImpossibleNonce = errors.New("Impossible nonce")
ErrNonce = errors.New("Nonce too low")
ErrNonExistentAccount = errors.New("Account does not exist")
ErrInsufficientFunds = errors.New("Insufficient funds")
ErrIntrinsicGas = errors.New("Intrinsic gas too low")
Expand Down Expand Up @@ -54,20 +56,43 @@ type TxPool struct {
txs map[common.Hash]*types.Transaction
invalidHashes *set.Set

queue map[common.Address]types.Transactions

subscribers []chan TxMsg

eventMux *event.TypeMux
}

func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool {
return &TxPool{
txPool := &TxPool{
txs: make(map[common.Hash]*types.Transaction),
queue: make(map[common.Address]types.Transactions),
queueChan: make(chan *types.Transaction, txPoolQueueSize),
quit: make(chan bool),
eventMux: eventMux,
invalidHashes: set.New(),
currentState: currentStateFn,
}
return txPool
}

func (pool *TxPool) Start() {
// Queue timer will tick so we can attempt to move items from the queue to the
// main transaction pool.
queueTimer := time.NewTicker(300 * time.Millisecond)
// Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce)
removalTimer := time.NewTicker(1 * time.Second)
done:
for {
select {
case <-queueTimer.C:
pool.checkQueue()
case <-removalTimer.C:
pool.validatePool()
case <-pool.quit:
break done
}
}
}

func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
Expand Down Expand Up @@ -100,16 +125,12 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
}

if pool.currentState().GetNonce(from) > tx.Nonce() {
return ErrImpossibleNonce
return ErrNonce
}

return nil
}

func (self *TxPool) addTx(tx *types.Transaction) {
self.txs[tx.Hash()] = tx
}

func (self *TxPool) add(tx *types.Transaction) error {
hash := tx.Hash()

Expand All @@ -127,7 +148,7 @@ func (self *TxPool) add(tx *types.Transaction) error {
return err
}

self.addTx(tx)
self.queueTx(tx)

var toname string
if to := tx.To(); to != nil {
Expand All @@ -144,9 +165,6 @@ func (self *TxPool) add(tx *types.Transaction) error {
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
}

// Notify the subscribers
go self.eventMux.Post(TxPreEvent{tx})

return nil
}

Expand Down Expand Up @@ -189,34 +207,108 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) {
return
}

func (self *TxPool) RemoveSet(txs types.Transactions) {
self.mu.Lock()
defer self.mu.Unlock()
for _, tx := range txs {
delete(self.txs, tx.Hash())
func (self *TxPool) GetQueuedTransactions() types.Transactions {
self.mu.RLock()
defer self.mu.RUnlock()

var txs types.Transactions
for _, ts := range self.queue {
txs = append(txs, ts...)
}

return txs
}

func (self *TxPool) InvalidateSet(hashes *set.Set) {
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
self.mu.Lock()
defer self.mu.Unlock()

hashes.Each(func(v interface{}) bool {
delete(self.txs, v.(common.Hash))
return true
})
self.invalidHashes.Merge(hashes)
for _, tx := range txs {
delete(self.txs, tx.Hash())
}
}

func (pool *TxPool) Flush() {
pool.txs = make(map[common.Hash]*types.Transaction)
}

func (pool *TxPool) Start() {
}

func (pool *TxPool) Stop() {
pool.Flush()
close(pool.quit)

glog.V(logger.Info).Infoln("TX Pool stopped")
}

func (self *TxPool) queueTx(tx *types.Transaction) {
from, _ := tx.From()
self.queue[from] = append(self.queue[from], tx)
}

func (pool *TxPool) addTx(tx *types.Transaction) {
if _, ok := pool.txs[tx.Hash()]; !ok {
pool.txs[tx.Hash()] = tx
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
go pool.eventMux.Post(TxPreEvent{tx})
}
}

// check queue will attempt to insert
func (pool *TxPool) checkQueue() {
pool.mu.Lock()
defer pool.mu.Unlock()

statedb := pool.currentState()
for address, txs := range pool.queue {
sort.Sort(types.TxByNonce{txs})

var (
nonce = statedb.GetNonce(address)
start int
)
// Clean up the transactions first and determine the start of the nonces
for _, tx := range txs {
if tx.Nonce() >= nonce {
break
}
start++
}
pool.queue[address] = txs[start:]

// expected nonce
enonce := nonce
for _, tx := range pool.queue[address] {
// If the expected nonce does not match up with the next one
// (i.e. a nonce gap), we stop the loop
if enonce != tx.Nonce() {
break
}
enonce++

pool.addTx(tx)
}
//pool.queue[address] = txs[i:]
// delete the entire queue entry if it's empty. There's no need to keep it
if len(pool.queue[address]) == 0 {
delete(pool.queue, address)
}
}
}

func (pool *TxPool) validatePool() {
pool.mu.Lock()
defer pool.mu.Unlock()

statedb := pool.currentState()
for hash, tx := range pool.txs {
from, _ := tx.From()
if nonce := statedb.GetNonce(from); nonce > tx.Nonce() {
if glog.V(logger.Debug) {
glog.Infof("removed tx (%x) from pool due to nonce error. state=%d tx=%d\n", hash[:4], nonce, tx.Nonce())
}

delete(pool.txs, hash)
}
}
}
54 changes: 52 additions & 2 deletions core/transaction_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,57 @@ func TestInvalidTransactions(t *testing.T) {
tx.SignECDSA(key)

err = pool.Add(tx)
if err != ErrImpossibleNonce {
t.Error("expected", ErrImpossibleNonce)
if err != ErrNonce {
t.Error("expected", ErrNonce)
}
}

func TestTransactionQueue(t *testing.T) {
pool, key := setupTxPool()
tx := transaction()
tx.SignECDSA(key)
from, _ := tx.From()
pool.currentState().AddBalance(from, big.NewInt(1))
pool.queueTx(tx)

pool.checkQueue()
if len(pool.txs) != 1 {
t.Error("expected valid txs to be 1 is", len(pool.txs))
}

tx = transaction()
tx.SignECDSA(key)
from, _ = tx.From()
pool.currentState().SetNonce(from, 10)
tx.SetNonce(1)
pool.queueTx(tx)
pool.checkQueue()
if _, ok := pool.txs[tx.Hash()]; ok {
t.Error("expected transaction to be in tx pool")
}

if len(pool.queue[from]) != 0 {
t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
}

pool, key = setupTxPool()
tx1, tx2, tx3 := transaction(), transaction(), transaction()
tx2.SetNonce(10)
tx3.SetNonce(11)
tx1.SignECDSA(key)
tx2.SignECDSA(key)
tx3.SignECDSA(key)
pool.queueTx(tx1)
pool.queueTx(tx2)
pool.queueTx(tx3)
from, _ = tx1.From()
pool.checkQueue()

if len(pool.txs) != 1 {
t.Error("expected tx pool to be 1 =")
}

if len(pool.queue[from]) != 3 {
t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
}
}
Loading