diff --git a/mempool/mempool.go b/mempool/mempool.go index 8a5f660a..3c28cbb0 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -6,6 +6,7 @@ package mempool import ( "container/heap" "fmt" + "sync" "github.com/ava-labs/avalanchego/ids" @@ -85,6 +86,7 @@ func (th *internalTxHeap) Has(id ids.ID) bool { } type Mempool struct { + mu sync.RWMutex maxSize int maxHeap *internalTxHeap minHeap *internalTxHeap @@ -109,6 +111,8 @@ func (th *Mempool) Add(tx *chain.Transaction) bool { // Optimistically add tx to mempool difficulty := tx.Difficulty() oldLen := th.Len() + + th.mu.Lock() heap.Push(th.maxHeap, &txEntry{ id: txID, difficulty: difficulty, @@ -121,6 +125,8 @@ func (th *Mempool) Add(tx *chain.Transaction) bool { tx: tx, index: oldLen, }) + th.mu.Unlock() + // Remove the lowest paying tx // // Note: we do this after adding the new transaction in case it is the new @@ -136,19 +142,25 @@ func (th *Mempool) Add(tx *chain.Transaction) bool { // Assumes there is non-zero items in [Mempool] func (th *Mempool) PeekMax() (*chain.Transaction, uint64) { + th.mu.RLock() txEntry := th.maxHeap.items[0] + th.mu.RUnlock() return txEntry.tx, txEntry.difficulty } // Assumes there is non-zero items in [Mempool] func (th *Mempool) PeekMin() (*chain.Transaction, uint64) { + th.mu.RLock() txEntry := th.minHeap.items[0] + th.mu.RUnlock() return txEntry.tx, txEntry.difficulty } // Assumes there is non-zero items in [Mempool] func (th *Mempool) PopMax() (*chain.Transaction, uint64) { + th.mu.RLock() item := th.maxHeap.items[0] + th.mu.RUnlock() return th.Remove(item.id), item.difficulty } @@ -158,6 +170,9 @@ func (th *Mempool) PopMin() *chain.Transaction { } func (th *Mempool) Remove(id ids.ID) *chain.Transaction { + th.mu.Lock() + defer th.mu.Unlock() + maxEntry, ok := th.maxHeap.Get(id) if !ok { return nil @@ -175,18 +190,22 @@ func (th *Mempool) Remove(id ids.ID) *chain.Transaction { // TODO: remember to prune func (th *Mempool) Prune(validHashes ids.Set) { + th.mu.RLock() toRemove := []ids.ID{} for _, txE := range th.maxHeap.items { if !validHashes.Contains(txE.tx.GetBlockID()) { toRemove = append(toRemove, txE.id) } } + th.mu.RUnlock() for _, txID := range toRemove { th.Remove(txID) } } func (th *Mempool) Len() int { + th.mu.RLock() + defer th.mu.RUnlock() return th.maxHeap.Len() } @@ -199,5 +218,7 @@ func (th *Mempool) Get(id ids.ID) (*chain.Transaction, bool) { } func (th *Mempool) Has(id ids.ID) bool { + th.mu.RLock() + defer th.mu.RUnlock() return th.maxHeap.Has(id) } diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index c1dffa25..2ee098f0 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -50,7 +50,7 @@ func init() { flag.IntVar( &vms, "vms", - 1, + 3, "number of VMs to create", ) flag.Uint64Var( @@ -70,30 +70,28 @@ func init() { var ( priv *crypto.PrivateKey - // clients for embedded VMs - // TODO: test against external endpoints - clients []client.Client - // when used with embedded VMs genesisBytes []byte instances []instance - toEngines []chan common.Message ) type instance struct { + nodeID ids.ShortID vm *vm.VM + toEngine chan common.Message httpServer *httptest.Server + cli client.Client // clients for embedded VMs } var _ = ginkgo.BeforeSuite(func() { + gomega.Ω(vms).Should(gomega.BeNumerically(">", 1)) + var err error priv, err = crypto.NewPrivateKey() gomega.Ω(err).Should(gomega.BeNil()) // create embedded VMs - clients = make([]client.Client, vms) instances = make([]instance, vms) - toEngines = make([]chan common.Message, vms) blk := &chain.StatefulBlock{ Tmstmp: time.Now().Unix(), @@ -103,17 +101,23 @@ var _ = ginkgo.BeforeSuite(func() { genesisBytes, err = chain.Marshal(blk) gomega.Ω(err).Should(gomega.BeNil()) - ctx := &snow.Context{ - NetworkID: 1, - SubnetID: ids.GenerateTestID(), - ChainID: ids.GenerateTestID(), - NodeID: ids.ShortID{1, 2, 3}, - } + networkID := uint32(1) + subnetID := ids.GenerateTestID() + chainID := ids.GenerateTestID() + app := &appSender{} for i := range instances { - db := manager.NewMemDB(avago_version.CurrentDatabase) + ctx := &snow.Context{ + NetworkID: networkID, + SubnetID: subnetID, + ChainID: chainID, + NodeID: ids.GenerateTestShortID(), + } + toEngine := make(chan common.Message, 1) + db := manager.NewMemDB(avago_version.CurrentDatabase) + // TODO: test appsender v := &vm.VM{} err := v.Initialize( ctx, @@ -123,20 +127,29 @@ var _ = ginkgo.BeforeSuite(func() { nil, toEngine, nil, - nil, + app, ) gomega.Ω(err).Should(gomega.BeNil()) + // never trigger periodic batch gossip/block builds + // to make testing more deterministic + v.SetWorkInterval(24 * time.Hour) + var hd map[string]*common.HTTPHandler hd, err = v.CreateHandlers() gomega.Ω(err).Should(gomega.BeNil()) httpServer := httptest.NewServer(hd[""].Handler) - instances[i] = instance{vm: v, httpServer: httpServer} - clients[i] = client.New(httpServer.URL, "", requestTimeout) - toEngines[i] = toEngine + instances[i] = instance{ + nodeID: ctx.NodeID, + vm: v, + toEngine: toEngine, + httpServer: httpServer, + cli: client.New(httpServer.URL, "", requestTimeout), + } } + app.instances = instances color.Blue("created %d VMs", vms) }) @@ -150,7 +163,8 @@ var _ = ginkgo.AfterSuite(func() { var _ = ginkgo.Describe("[Ping]", func() { ginkgo.It("can ping", func() { - for _, cli := range clients { + for _, inst := range instances { + cli := inst.cli ok, err := cli.Ping() gomega.Ω(ok).Should(gomega.BeTrue()) gomega.Ω(err).Should(gomega.BeNil()) @@ -160,12 +174,58 @@ var _ = ginkgo.Describe("[Ping]", func() { var _ = ginkgo.Describe("[ClaimTx]", func() { ginkgo.It("get currently preferred block ID", func() { - for _, cli := range clients { + for _, inst := range instances { + cli := inst.cli _, err := cli.Preferred() gomega.Ω(err).Should(gomega.BeNil()) } }) + ginkgo.It("Gossip ClaimTx to a different node", func() { + pfx := []byte(fmt.Sprintf("%10d", time.Now().UnixNano())) + claimTx := &chain.ClaimTx{ + BaseTx: &chain.BaseTx{ + Sender: priv.PublicKey().Bytes(), + Prefix: pfx, + }, + } + + ginkgo.By("mine and issue ClaimTx", func() { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + _, err := client.MineSignIssueTx(ctx, instances[0].cli, claimTx, priv) + cancel() + gomega.Ω(err).Should(gomega.BeNil()) + }) + + ginkgo.By("send gossip from node 0 to 1", func() { + err := instances[0].vm.GossipTxs(false) + gomega.Ω(err).Should(gomega.BeNil()) + }) + + ginkgo.By("receive gossip in the node 1, and signal block build", func() { + instances[1].vm.NotifyBlockReady() + <-instances[1].toEngine + }) + + ginkgo.By("build block in the node 1", func() { + blk, err := instances[1].vm.BuildBlock() + gomega.Ω(err).To(gomega.BeNil()) + + gomega.Ω(blk.Verify()).To(gomega.BeNil()) + gomega.Ω(blk.Status()).To(gomega.Equal(choices.Processing)) + + err = instances[1].vm.SetPreference(blk.ID()) + gomega.Ω(err).To(gomega.BeNil()) + + gomega.Ω(blk.Accept()).To(gomega.BeNil()) + gomega.Ω(blk.Status()).To(gomega.Equal(choices.Accepted)) + + lastAccepted, err := instances[1].vm.LastAccepted() + gomega.Ω(err).To(gomega.BeNil()) + gomega.Ω(lastAccepted).To(gomega.Equal(blk.ID())) + }) + }) + ginkgo.It("fail ClaimTx with no block ID", func() { utx := &chain.ClaimTx{ BaseTx: &chain.BaseTx{ @@ -184,12 +244,12 @@ var _ = ginkgo.Describe("[ClaimTx]", func() { err = tx.Init() gomega.Ω(err).Should(gomega.BeNil()) - _, err = clients[0].IssueTx(tx.Bytes()) + _, err = instances[0].cli.IssueTx(tx.Bytes()) gomega.Ω(err.Error()).Should(gomega.Equal(chain.ErrInvalidBlockID.Error())) }) - ginkgo.It("ClaimTx with valid PoW", func() { - pfx := []byte(fmt.Sprintf("%10d", ginkgo.GinkgoRandomSeed())) + ginkgo.It("Claim/SetTx with valid PoW in a single node", func() { + pfx := []byte(fmt.Sprintf("%10d", time.Now().UnixNano())) claimTx := &chain.ClaimTx{ BaseTx: &chain.BaseTx{ Sender: priv.PublicKey().Bytes(), @@ -198,11 +258,11 @@ var _ = ginkgo.Describe("[ClaimTx]", func() { } ginkgo.By("mine and accept block with the first ClaimTx", func() { - mineAndExpectBlkAccept(clients[0], instances[0].vm, claimTx, toEngines[0]) + mineAndExpectBlkAccept(instances[0].cli, instances[0].vm, claimTx, instances[0].toEngine) }) ginkgo.By("check prefix after ClaimTx has been accepted", func() { - pf, err := clients[0].PrefixInfo(pfx) + pf, err := instances[0].cli.PrefixInfo(pfx) gomega.Ω(err).To(gomega.BeNil()) gomega.Ω(pf.Keys).To(gomega.Equal(int64(1))) gomega.Ω(pf.Owner).To(gomega.Equal(priv.PublicKey().Bytes())) @@ -222,16 +282,44 @@ var _ = ginkgo.Describe("[ClaimTx]", func() { time.Sleep(5 * time.Second) ginkgo.By("mine and accept block with a new SetTx", func() { - mineAndExpectBlkAccept(clients[0], instances[0].vm, setTx, toEngines[0]) + mineAndExpectBlkAccept(instances[0].cli, instances[0].vm, setTx, instances[0].toEngine) }) ginkgo.By("read back from VM with range query", func() { - kvs, err := clients[0].Range(pfx, k) + kvs, err := instances[0].cli.Range(pfx, k) gomega.Ω(err).To(gomega.BeNil()) gomega.Ω(kvs[0].Key).To(gomega.Equal(append(append(pfx, parser.Delimiter), k...))) gomega.Ω(kvs[0].Value).To(gomega.Equal(v)) }) }) + + ginkgo.It("fail Gossip ClaimTx to a stale node when missing previous blocks", func() { + pfx := []byte(fmt.Sprintf("%10d", time.Now().UnixNano())) + claimTx := &chain.ClaimTx{ + BaseTx: &chain.BaseTx{ + Sender: priv.PublicKey().Bytes(), + Prefix: pfx, + }, + } + + ginkgo.By("mine and issue ClaimTx", func() { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + _, err := client.MineSignIssueTx(ctx, instances[0].cli, claimTx, priv) + cancel() + gomega.Ω(err).Should(gomega.BeNil()) + }) + + // since the block from previous test spec has not been replicated yet + ginkgo.By("send gossip from node 0 to 1 should fail on server-side since 1 doesn't have the block yet", func() { + err := instances[0].vm.GossipTxs(false) + gomega.Ω(err).Should(gomega.BeNil()) + + // mempool in 1 should be empty, since gossip/submit failed + gomega.Ω(instances[1].vm.Mempool().Len()).Should(gomega.Equal(0)) + }) + }) + + // TODO: full replicate blocks between nodes }) func mineAndExpectBlkAccept( @@ -260,6 +348,9 @@ func mineAndExpectBlkAccept( _, err = cli.IssueTx(tx.Bytes()) gomega.Ω(err).To(gomega.BeNil()) + // manually signal ready + vm.NotifyBlockReady() + // manually ack ready sig as in engine <-toEngine blk, err := vm.BuildBlock() @@ -279,4 +370,21 @@ func mineAndExpectBlkAccept( gomega.Ω(lastAccepted).To(gomega.Equal(blk.ID())) } -// TODO: test with multiple VMs +var _ common.AppSender = &appSender{} + +type appSender struct { + next int + instances []instance +} + +func (app *appSender) SendAppGossip(appGossipBytes []byte) error { + n := len(app.instances) + sender := app.instances[app.next].nodeID + app.next++ + app.next %= n + return app.instances[app.next].vm.AppGossip(sender, appGossipBytes) +} + +func (app *appSender) SendAppRequest(_ ids.ShortSet, _ uint32, _ []byte) error { return nil } +func (app *appSender) SendAppResponse(_ ids.ShortID, _ uint32, _ []byte) error { return nil } +func (app *appSender) SendAppGossipSpecific(_ ids.ShortSet, _ []byte) error { return nil } diff --git a/vm/block_builder.go b/vm/block_builder.go new file mode 100644 index 00000000..9c632ffd --- /dev/null +++ b/vm/block_builder.go @@ -0,0 +1,117 @@ +// Copyright (C) 2019-2021, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vm + +import ( + "time" + + "github.com/ava-labs/avalanchego/snow/engine/common" + log "github.com/inconshreveable/log15" +) + +// Updates the build block/gossip interval. +func (vm *VM) SetWorkInterval(d time.Duration) { + vm.workInterval = d +} + +// signal the avalanchego engine +// to build a block from pending transactions +func (vm *VM) NotifyBlockReady() { + select { + case vm.toEngine <- common.PendingTxs: + default: + log.Debug("dropping message to consensus engine") + } +} + +const blockBuildTimeout = time.Second + +// "batchInterval" waits to gossip more txs until some build block +// timeout in order to avoid unnecessary/redundant gossip +// basically, we shouldn't gossip anything included in the block +// to make this more deterministic, we signal block ready and +// wait until "BuildBlock is triggered" from avalanchego +// mempool is shared between "chain.BuildBlock" and "GossipTxs" +// so once tx is included in the block, it won't be included +// in the following "GossipTxs" +// however, we still need to cache recently gossiped txs +// in "GossipTxs" to further protect the node from being +// DDOSed via repeated gossip failures +func (vm *VM) run() { + log.Debug("starting run loops") + defer close(vm.donecRun) + + t := time.NewTimer(vm.workInterval) + defer t.Stop() + + buildBlk := true + for { + select { + case <-t.C: + case <-vm.stopc: + return + } + t.Reset(vm.workInterval) + if vm.mempool.Len() == 0 { + continue + } + + // TODO: this is async, verify we aren't currently + // building a block + if buildBlk { + // as soon as we receive at least one transaction + // triggers "BuildBlock" from avalanchego on the local node + // ref. "plugin/evm.blockBuilder.markBuilding" + vm.NotifyBlockReady() + + // wait for this node to build a block + // rather than trigger gossip immediately + // TODO: "blockBuilder" read may be stale + // due to lack of request ID for each "common.PendingTxs" + // just wait some time for best efforts + select { + case <-vm.blockBuilder: + log.Debug("engine just called BuildBlock") + case <-time.After(blockBuildTimeout): + // did not build a block, but still gossip + log.Debug("timed out waiting for BuildBlock from engine") + case <-vm.stopc: + return + } + + // next iteration should be gossip + buildBlk = false + continue + } + + // we shouldn't gossip anything included in the block + // and it's handled via mempool + block build wait above + _ = vm.GossipTxs(false) + buildBlk = true + } +} + +// periodically but less aggressively force-regossip the pending +func (vm *VM) regossip() { + log.Debug("starting regossip loops") + defer close(vm.donecRegossip) + + // should retry less aggressively + t := time.NewTimer(vm.regossipInterval) + defer t.Stop() + + for { + select { + case <-t.C: + case <-vm.stopc: + return + } + t.Reset(vm.regossipInterval) + if vm.mempool.Len() == 0 { + continue + } + + _ = vm.GossipTxs(true) + } +} diff --git a/vm/network.go b/vm/network.go new file mode 100644 index 00000000..b3359337 --- /dev/null +++ b/vm/network.go @@ -0,0 +1,102 @@ +// Copyright (C) 2019-2021, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vm + +import ( + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/quarkvm/chain" + log "github.com/inconshreveable/log15" +) + +// Triggers "AppGossip" on the pending transactions in the mempool. +// "force" is true to re-gossip whether recently gossiped or not +func (vm *VM) GossipTxs(force bool) error { + if vm.appSender == nil { + return nil + } + txs := []*chain.Transaction{} + for vm.mempool.Len() > 0 && len(txs) < chain.TargetTransactions { + tx, _ := vm.mempool.PopMax() + if !force { + // skip if recently gossiped + // to further protect the node from being + // DDOSed via repeated gossip failures + if _, exists := vm.gossipedTxs.Get(tx.ID()); exists { + log.Debug("already gossiped, skipping", "txId", tx.ID()) + vm.mempool.Add(tx) + continue + } + } + // force regossip (but less aggressively with greater interval) + vm.gossipedTxs.Put(tx.ID(), nil) + txs = append(txs, tx) + } + + b, err := chain.Marshal(txs) + if err != nil { + log.Warn("failed to marshal txs", "error", err) + } else { + log.Debug("sending AppGossip", + "txs", len(txs), + "size", len(b), + ) + err = vm.appSender.SendAppGossip(b) + } + if err == nil { + return nil + } + + log.Warn( + "GossipTxs failed; txs back to mempool", + "error", err, + ) + for _, tx := range txs { + vm.mempool.Add(tx) + } + return err +} + +// Handles incoming "AppGossip" messages, parses them to transactions, +// and submits them to the mempool. The "AppGossip" message is sent by +// the other VM (quarkvm) via "common.AppSender" to receive txs and +// forward them to the other node (validator). +// +// implements "snowmanblock.ChainVM.commom.VM.AppHandler" +// assume gossip via proposervm has been activated +// ref. "avalanchego/vms/platformvm/network.AppGossip" +// ref. "coreeth/plugin/evm.GossipHandler.HandleEthTxs" +func (vm *VM) AppGossip(nodeID ids.ShortID, msg []byte) error { + log.Debug("AppGossip message handler", + "sender", nodeID, + "receiver", vm.ctx.NodeID, + "bytes", len(msg), + ) + + txs := make([]*chain.Transaction, 0) + if _, err := chain.Unmarshal(msg, &txs); err != nil { + log.Debug( + "AppGossip provided invalid txs", + "peerID", nodeID, + "err", err, + ) + return nil + } + + // submit incoming gossip + log.Debug("AppGossip transactions are being submitted", "txs", len(txs)) + if errs := vm.Submit(txs...); len(errs) > 0 { + for _, err := range errs { + log.Debug( + "AppGossip failed to submit txs", + "peerID", nodeID, + "err", err, + ) + } + } + + // only trace error to prevent VM's being shutdown + // from "AppGossip" returning an error + // TODO: gracefully handle "AppGossip" failures? + return nil +} diff --git a/vm/service.go b/vm/service.go index 5c5a5704..eef87096 100644 --- a/vm/service.go +++ b/vm/service.go @@ -5,6 +5,7 @@ package vm import ( "errors" + "fmt" "net/http" "github.com/ava-labs/avalanchego/ids" @@ -56,9 +57,15 @@ func (svc *Service) IssueTx(_ *http.Request, args *IssueTxArgs, reply *IssueTxRe } reply.TxID = tx.ID() - err := svc.vm.Submit(tx) - reply.Success = err == nil - return err + errs := svc.vm.Submit(tx) + reply.Success = len(errs) == 0 + if reply.Success { + return nil + } + if len(errs) == 1 { + return errs[0] + } + return fmt.Errorf("%v", errs) } type CheckTxArgs struct { @@ -168,7 +175,7 @@ type RangeReply struct { } func (svc *Service) Range(_ *http.Request, args *RangeArgs, reply *RangeReply) (err error) { - log.Info("range query for key %q and range end %q", args.Key, args.RangeEnd) + log.Debug("range query", "key", string(args.Key), "rangeEnd", string(args.RangeEnd)) opts := make([]chain.OpOption, 0) if len(args.RangeEnd) > 0 { opts = append(opts, chain.WithRangeEnd(args.RangeEnd)) diff --git a/vm/vm.go b/vm/vm.go index 991f2cf8..2dc65a20 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -8,6 +8,7 @@ import ( "net/http" "time" + "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/manager" "github.com/ava-labs/avalanchego/database/versiondb" @@ -29,6 +30,9 @@ import ( const ( Name = "quarkvm" + defaultWorkInterval = 100 * time.Millisecond + defaultRegossipInterval = time.Second + mempoolSize = 1024 ) @@ -38,9 +42,14 @@ var ( ) type VM struct { - ctx *snow.Context - db database.Database - mempool *mempool.Mempool + ctx *snow.Context + db database.Database + + workInterval time.Duration + regossipInterval time.Duration + mempool *mempool.Mempool + appSender common.AppSender + gossipedTxs *cache.LRU // Block ID --> Block // Each element is a block that passed verification but @@ -48,14 +57,22 @@ type VM struct { verifiedBlocks map[ids.ID]*chain.StatelessBlock toEngine chan<- common.Message + // signaled when "BuildBlock" is triggered by the engine + blockBuilder chan struct{} preferred ids.ID lastAccepted ids.ID minDifficulty uint64 minBlockCost uint64 + + stopc chan struct{} + donecRun chan struct{} + donecRegossip chan struct{} } +const gossipedTxsLRUSize = 512 + // implements "snowmanblock.ChainVM.common.VM" func (vm *VM) Initialize( ctx *snow.Context, @@ -65,15 +82,25 @@ func (vm *VM) Initialize( configBytes []byte, toEngine chan<- common.Message, _ []*common.Fx, - _ common.AppSender, + appSender common.AppSender, ) error { log.Info("initializing quarkvm", "version", version.Version) vm.ctx = ctx vm.db = dbManager.Current().Database + + // TODO: make this configurable via genesis + vm.workInterval = defaultWorkInterval + vm.regossipInterval = defaultRegossipInterval + vm.mempool = mempool.New(mempoolSize) + vm.appSender = appSender + vm.gossipedTxs = &cache.LRU{Size: gossipedTxsLRUSize} + vm.verifiedBlocks = make(map[ids.ID]*chain.StatelessBlock) + vm.toEngine = toEngine + vm.blockBuilder = make(chan struct{}, 1) // Try to load last accepted has, err := chain.HasLastAccepted(vm.db) @@ -112,6 +139,13 @@ func (vm *VM) Initialize( vm.preferred, vm.lastAccepted = gBlkID, gBlkID vm.minDifficulty, vm.minBlockCost = genesisBlk.Difficulty, genesisBlk.Cost log.Info("initialized quarkvm from genesis", "block", gBlkID) + + vm.stopc = make(chan struct{}) + vm.donecRun = make(chan struct{}) + vm.donecRegossip = make(chan struct{}) + + go vm.run() + go vm.regossip() return nil } @@ -127,6 +161,9 @@ func (vm *VM) Bootstrapped() error { // implements "snowmanblock.ChainVM.common.VM" func (vm *VM) Shutdown() error { + close(vm.stopc) + <-vm.donecRun + <-vm.donecRegossip if vm.ctx == nil { return nil } @@ -176,12 +213,6 @@ func (vm *VM) AppResponse(nodeID ids.ShortID, requestID uint32, response []byte) return nil } -// implements "snowmanblock.ChainVM.commom.VM.AppHandler" -func (vm *VM) AppGossip(nodeID ids.ShortID, msg []byte) error { - // TODO: gossip txs - return nil -} - // implements "snowmanblock.ChainVM.commom.VM.health.Checkable" func (vm *VM) HealthCheck() (interface{}, error) { return http.StatusOK, nil @@ -238,39 +269,60 @@ func (vm *VM) ParseBlock(source []byte) (snowman.Block, error) { } // implements "snowmanblock.ChainVM" +// called via "avalanchego" node over RPC func (vm *VM) BuildBlock() (snowman.Block, error) { - return chain.BuildBlock(vm, vm.preferred) -} - -func (vm *VM) Submit(tx *chain.Transaction) error { - if err := tx.Init(); err != nil { - return err + log.Debug("BuildBlock triggered") + blk, err := chain.BuildBlock(vm, vm.preferred) + if err != nil { + log.Warn("BuildBlock failed", "error", err) + } else { + log.Debug("BuildBlock success", "blockId", blk.ID()) } - if err := tx.ExecuteBase(); err != nil { - return err + select { + case vm.blockBuilder <- struct{}{}: + default: } + return blk, err +} + +func (vm *VM) Submit(txs ...*chain.Transaction) (errs []error) { blk, err := vm.GetBlock(vm.preferred) if err != nil { - return err + return []error{err} } now := time.Now().Unix() - context, err := vm.ExecutionContext(now, blk.(*chain.StatelessBlock)) + ctx, err := vm.ExecutionContext(now, blk.(*chain.StatelessBlock)) if err != nil { - return err + return []error{err} } vdb := versiondb.New(vm.db) defer vdb.Close() // TODO: need to do everywhere? - if err := tx.Execute(vdb, now, context); err != nil { + + for _, tx := range txs { + if serr := vm.submit(tx, vdb, now, ctx); serr != nil { + log.Debug("failed to submit transaction", + "tx", tx.ID(), + "error", serr, + ) + errs = append(errs, serr) + continue + } + vdb.Abort() + } + return errs +} + +func (vm *VM) submit(tx *chain.Transaction, db database.Database, blkTime int64, ctx *chain.Context) error { + if err := tx.Init(); err != nil { return err } - if added := vm.mempool.Add(tx); !added { - // Don't gossip if not added - return nil + if err := tx.ExecuteBase(); err != nil { + return err } - - // TODO: do on a block timer - // TODO: wait to gossip if can create a block - vm.notifyBlockReady() + if err := tx.Execute(db, blkTime, ctx); err != nil { + return err + } + vm.mempool.Add(tx) return nil } @@ -287,11 +339,3 @@ func (vm *VM) SetPreference(id ids.ID) error { func (vm *VM) LastAccepted() (ids.ID, error) { return vm.lastAccepted, nil } - -func (vm *VM) notifyBlockReady() { - select { - case vm.toEngine <- common.PendingTxs: - default: - log.Debug("dropping message to consensus engine") - } -}