Skip to content
This repository was archived by the owner on Apr 4, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package mempool
import (
"container/heap"
"fmt"
"sync"

"github.com/ava-labs/avalanchego/ids"

Expand Down Expand Up @@ -85,6 +86,7 @@ func (th *internalTxHeap) Has(id ids.ID) bool {
}

type Mempool struct {
mu sync.RWMutex
maxSize int
maxHeap *internalTxHeap
minHeap *internalTxHeap
Expand All @@ -109,6 +111,8 @@ func (th *Mempool) Add(tx *chain.Transaction) bool {
// Optimistically add tx to mempool
difficulty := tx.Difficulty()
oldLen := th.Len()

th.mu.Lock()
heap.Push(th.maxHeap, &txEntry{
id: txID,
difficulty: difficulty,
Expand All @@ -121,6 +125,8 @@ func (th *Mempool) Add(tx *chain.Transaction) bool {
tx: tx,
index: oldLen,
})
th.mu.Unlock()

// Remove the lowest paying tx
//
// Note: we do this after adding the new transaction in case it is the new
Expand All @@ -136,19 +142,25 @@ func (th *Mempool) Add(tx *chain.Transaction) bool {

// Assumes there is non-zero items in [Mempool]
func (th *Mempool) PeekMax() (*chain.Transaction, uint64) {
th.mu.RLock()
txEntry := th.maxHeap.items[0]
th.mu.RUnlock()
return txEntry.tx, txEntry.difficulty
}

// Assumes there is non-zero items in [Mempool]
func (th *Mempool) PeekMin() (*chain.Transaction, uint64) {
th.mu.RLock()
txEntry := th.minHeap.items[0]
th.mu.RUnlock()
return txEntry.tx, txEntry.difficulty
}

// Assumes there is non-zero items in [Mempool]
func (th *Mempool) PopMax() (*chain.Transaction, uint64) {
th.mu.RLock()
item := th.maxHeap.items[0]
th.mu.RUnlock()
return th.Remove(item.id), item.difficulty
}

Expand All @@ -158,6 +170,9 @@ func (th *Mempool) PopMin() *chain.Transaction {
}

func (th *Mempool) Remove(id ids.ID) *chain.Transaction {
th.mu.Lock()
defer th.mu.Unlock()

maxEntry, ok := th.maxHeap.Get(id)
if !ok {
return nil
Expand All @@ -175,18 +190,22 @@ func (th *Mempool) Remove(id ids.ID) *chain.Transaction {

// TODO: remember to prune
func (th *Mempool) Prune(validHashes ids.Set) {
th.mu.RLock()
toRemove := []ids.ID{}
for _, txE := range th.maxHeap.items {
if !validHashes.Contains(txE.tx.GetBlockID()) {
toRemove = append(toRemove, txE.id)
}
}
th.mu.RUnlock()
for _, txID := range toRemove {
th.Remove(txID)
}
}

func (th *Mempool) Len() int {
th.mu.RLock()
defer th.mu.RUnlock()
return th.maxHeap.Len()
}

Expand All @@ -199,5 +218,7 @@ func (th *Mempool) Get(id ids.ID) (*chain.Transaction, bool) {
}

func (th *Mempool) Has(id ids.ID) bool {
th.mu.RLock()
defer th.mu.RUnlock()
return th.maxHeap.Has(id)
}
166 changes: 137 additions & 29 deletions tests/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func init() {
flag.IntVar(
&vms,
"vms",
1,
3,
"number of VMs to create",
)
flag.Uint64Var(
Expand All @@ -70,30 +70,28 @@ func init() {
var (
priv *crypto.PrivateKey

// clients for embedded VMs
// TODO: test against external endpoints
clients []client.Client

// when used with embedded VMs
genesisBytes []byte
instances []instance
toEngines []chan common.Message
)

type instance struct {
nodeID ids.ShortID
vm *vm.VM
toEngine chan common.Message
httpServer *httptest.Server
cli client.Client // clients for embedded VMs
}

var _ = ginkgo.BeforeSuite(func() {
gomega.Ω(vms).Should(gomega.BeNumerically(">", 1))

var err error
priv, err = crypto.NewPrivateKey()
gomega.Ω(err).Should(gomega.BeNil())

// create embedded VMs
clients = make([]client.Client, vms)
instances = make([]instance, vms)
toEngines = make([]chan common.Message, vms)

blk := &chain.StatefulBlock{
Tmstmp: time.Now().Unix(),
Expand All @@ -103,17 +101,23 @@ var _ = ginkgo.BeforeSuite(func() {
genesisBytes, err = chain.Marshal(blk)
gomega.Ω(err).Should(gomega.BeNil())

ctx := &snow.Context{
NetworkID: 1,
SubnetID: ids.GenerateTestID(),
ChainID: ids.GenerateTestID(),
NodeID: ids.ShortID{1, 2, 3},
}
networkID := uint32(1)
subnetID := ids.GenerateTestID()
chainID := ids.GenerateTestID()

app := &appSender{}
for i := range instances {
db := manager.NewMemDB(avago_version.CurrentDatabase)
ctx := &snow.Context{
NetworkID: networkID,
SubnetID: subnetID,
ChainID: chainID,
NodeID: ids.GenerateTestShortID(),
}

toEngine := make(chan common.Message, 1)
db := manager.NewMemDB(avago_version.CurrentDatabase)

// TODO: test appsender
v := &vm.VM{}
err := v.Initialize(
ctx,
Expand All @@ -123,20 +127,29 @@ var _ = ginkgo.BeforeSuite(func() {
nil,
toEngine,
nil,
nil,
app,
)
gomega.Ω(err).Should(gomega.BeNil())

// never trigger periodic batch gossip/block builds
// to make testing more deterministic
v.SetWorkInterval(24 * time.Hour)

var hd map[string]*common.HTTPHandler
hd, err = v.CreateHandlers()
gomega.Ω(err).Should(gomega.BeNil())

httpServer := httptest.NewServer(hd[""].Handler)
instances[i] = instance{vm: v, httpServer: httpServer}
clients[i] = client.New(httpServer.URL, "", requestTimeout)
toEngines[i] = toEngine
instances[i] = instance{
nodeID: ctx.NodeID,
vm: v,
toEngine: toEngine,
httpServer: httpServer,
cli: client.New(httpServer.URL, "", requestTimeout),
}
}

app.instances = instances
color.Blue("created %d VMs", vms)
})

Expand All @@ -150,7 +163,8 @@ var _ = ginkgo.AfterSuite(func() {

var _ = ginkgo.Describe("[Ping]", func() {
ginkgo.It("can ping", func() {
for _, cli := range clients {
for _, inst := range instances {
cli := inst.cli
ok, err := cli.Ping()
gomega.Ω(ok).Should(gomega.BeTrue())
gomega.Ω(err).Should(gomega.BeNil())
Expand All @@ -160,12 +174,58 @@ var _ = ginkgo.Describe("[Ping]", func() {

var _ = ginkgo.Describe("[ClaimTx]", func() {
ginkgo.It("get currently preferred block ID", func() {
for _, cli := range clients {
for _, inst := range instances {
cli := inst.cli
_, err := cli.Preferred()
gomega.Ω(err).Should(gomega.BeNil())
}
})

ginkgo.It("Gossip ClaimTx to a different node", func() {
pfx := []byte(fmt.Sprintf("%10d", time.Now().UnixNano()))
claimTx := &chain.ClaimTx{
BaseTx: &chain.BaseTx{
Sender: priv.PublicKey().Bytes(),
Prefix: pfx,
},
}

ginkgo.By("mine and issue ClaimTx", func() {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err := client.MineSignIssueTx(ctx, instances[0].cli, claimTx, priv)
cancel()
gomega.Ω(err).Should(gomega.BeNil())
})

ginkgo.By("send gossip from node 0 to 1", func() {
err := instances[0].vm.GossipTxs(false)
gomega.Ω(err).Should(gomega.BeNil())
})

ginkgo.By("receive gossip in the node 1, and signal block build", func() {
instances[1].vm.NotifyBlockReady()
<-instances[1].toEngine
})

ginkgo.By("build block in the node 1", func() {
blk, err := instances[1].vm.BuildBlock()
gomega.Ω(err).To(gomega.BeNil())

gomega.Ω(blk.Verify()).To(gomega.BeNil())
gomega.Ω(blk.Status()).To(gomega.Equal(choices.Processing))

err = instances[1].vm.SetPreference(blk.ID())
gomega.Ω(err).To(gomega.BeNil())

gomega.Ω(blk.Accept()).To(gomega.BeNil())
gomega.Ω(blk.Status()).To(gomega.Equal(choices.Accepted))

lastAccepted, err := instances[1].vm.LastAccepted()
gomega.Ω(err).To(gomega.BeNil())
gomega.Ω(lastAccepted).To(gomega.Equal(blk.ID()))
})
})

ginkgo.It("fail ClaimTx with no block ID", func() {
utx := &chain.ClaimTx{
BaseTx: &chain.BaseTx{
Expand All @@ -184,12 +244,12 @@ var _ = ginkgo.Describe("[ClaimTx]", func() {
err = tx.Init()
gomega.Ω(err).Should(gomega.BeNil())

_, err = clients[0].IssueTx(tx.Bytes())
_, err = instances[0].cli.IssueTx(tx.Bytes())
gomega.Ω(err.Error()).Should(gomega.Equal(chain.ErrInvalidBlockID.Error()))
})

ginkgo.It("ClaimTx with valid PoW", func() {
pfx := []byte(fmt.Sprintf("%10d", ginkgo.GinkgoRandomSeed()))
ginkgo.It("Claim/SetTx with valid PoW in a single node", func() {
pfx := []byte(fmt.Sprintf("%10d", time.Now().UnixNano()))
claimTx := &chain.ClaimTx{
BaseTx: &chain.BaseTx{
Sender: priv.PublicKey().Bytes(),
Expand All @@ -198,11 +258,11 @@ var _ = ginkgo.Describe("[ClaimTx]", func() {
}

ginkgo.By("mine and accept block with the first ClaimTx", func() {
mineAndExpectBlkAccept(clients[0], instances[0].vm, claimTx, toEngines[0])
mineAndExpectBlkAccept(instances[0].cli, instances[0].vm, claimTx, instances[0].toEngine)
})

ginkgo.By("check prefix after ClaimTx has been accepted", func() {
pf, err := clients[0].PrefixInfo(pfx)
pf, err := instances[0].cli.PrefixInfo(pfx)
gomega.Ω(err).To(gomega.BeNil())
gomega.Ω(pf.Keys).To(gomega.Equal(int64(1)))
gomega.Ω(pf.Owner).To(gomega.Equal(priv.PublicKey().Bytes()))
Expand All @@ -222,16 +282,44 @@ var _ = ginkgo.Describe("[ClaimTx]", func() {
time.Sleep(5 * time.Second)

ginkgo.By("mine and accept block with a new SetTx", func() {
mineAndExpectBlkAccept(clients[0], instances[0].vm, setTx, toEngines[0])
mineAndExpectBlkAccept(instances[0].cli, instances[0].vm, setTx, instances[0].toEngine)
})

ginkgo.By("read back from VM with range query", func() {
kvs, err := clients[0].Range(pfx, k)
kvs, err := instances[0].cli.Range(pfx, k)
gomega.Ω(err).To(gomega.BeNil())
gomega.Ω(kvs[0].Key).To(gomega.Equal(append(append(pfx, parser.Delimiter), k...)))
gomega.Ω(kvs[0].Value).To(gomega.Equal(v))
})
})

ginkgo.It("fail Gossip ClaimTx to a stale node when missing previous blocks", func() {
pfx := []byte(fmt.Sprintf("%10d", time.Now().UnixNano()))
claimTx := &chain.ClaimTx{
BaseTx: &chain.BaseTx{
Sender: priv.PublicKey().Bytes(),
Prefix: pfx,
},
}

ginkgo.By("mine and issue ClaimTx", func() {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err := client.MineSignIssueTx(ctx, instances[0].cli, claimTx, priv)
cancel()
gomega.Ω(err).Should(gomega.BeNil())
})

// since the block from previous test spec has not been replicated yet
ginkgo.By("send gossip from node 0 to 1 should fail on server-side since 1 doesn't have the block yet", func() {
err := instances[0].vm.GossipTxs(false)
gomega.Ω(err).Should(gomega.BeNil())

// mempool in 1 should be empty, since gossip/submit failed
gomega.Ω(instances[1].vm.Mempool().Len()).Should(gomega.Equal(0))
})
})

// TODO: full replicate blocks between nodes
})

func mineAndExpectBlkAccept(
Expand Down Expand Up @@ -260,6 +348,9 @@ func mineAndExpectBlkAccept(
_, err = cli.IssueTx(tx.Bytes())
gomega.Ω(err).To(gomega.BeNil())

// manually signal ready
vm.NotifyBlockReady()
// manually ack ready sig as in engine
<-toEngine

blk, err := vm.BuildBlock()
Expand All @@ -279,4 +370,21 @@ func mineAndExpectBlkAccept(
gomega.Ω(lastAccepted).To(gomega.Equal(blk.ID()))
}

// TODO: test with multiple VMs
var _ common.AppSender = &appSender{}

type appSender struct {
next int
instances []instance
}

func (app *appSender) SendAppGossip(appGossipBytes []byte) error {
n := len(app.instances)
sender := app.instances[app.next].nodeID
app.next++
app.next %= n
return app.instances[app.next].vm.AppGossip(sender, appGossipBytes)
}

func (app *appSender) SendAppRequest(_ ids.ShortSet, _ uint32, _ []byte) error { return nil }
func (app *appSender) SendAppResponse(_ ids.ShortID, _ uint32, _ []byte) error { return nil }
func (app *appSender) SendAppGossipSpecific(_ ids.ShortSet, _ []byte) error { return nil }
Loading