diff --git a/builder/beacon_client.go b/builder/beacon_client.go index 769a5674c4..bb97ebfa6c 100644 --- a/builder/beacon_client.go +++ b/builder/beacon_client.go @@ -8,6 +8,7 @@ import ( "net/http" "strconv" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -19,82 +20,138 @@ type testBeaconClient struct { slot uint64 } +func (b *testBeaconClient) Stop() { + return +} + func (b *testBeaconClient) isValidator(pubkey PubkeyHex) bool { return true } func (b *testBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) { return PubkeyHex(hexutil.Encode(b.validator.Pk)), nil } -func (b *testBeaconClient) onForkchoiceUpdate() (uint64, error) { - return b.slot, nil +func (b *testBeaconClient) Start() error { + return nil } type BeaconClient struct { - endpoint string + endpoint string + slotsInEpoch uint64 + secondsInSlot uint64 + + mu sync.Mutex + slotProposerMap map[uint64]PubkeyHex - mu sync.Mutex - currentEpoch uint64 - currentSlot uint64 - nextSlotProposer PubkeyHex - slotProposerMap map[uint64]PubkeyHex + closeCh chan struct{} } -func NewBeaconClient(endpoint string) *BeaconClient { +func NewBeaconClient(endpoint string, slotsInEpoch uint64, secondsInSlot uint64) *BeaconClient { return &BeaconClient{ endpoint: endpoint, + slotsInEpoch: slotsInEpoch, + secondsInSlot: secondsInSlot, slotProposerMap: make(map[uint64]PubkeyHex), + closeCh: make(chan struct{}), } } +func (b *BeaconClient) Stop() { + close(b.closeCh) +} + func (b *BeaconClient) isValidator(pubkey PubkeyHex) bool { return true } func (b *BeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) { - /* Only returns proposer if requestedSlot is currentSlot + 1, would be a race otherwise */ b.mu.Lock() defer b.mu.Unlock() - if b.currentSlot+1 != requestedSlot { - return PubkeyHex(""), errors.New("slot out of sync") + nextSlotProposer, found := b.slotProposerMap[requestedSlot] + if !found { + log.Error("inconsistent proposer mapping", "requestSlot", requestedSlot, "slotProposerMap", b.slotProposerMap) + return PubkeyHex(""), errors.New("inconsistent proposer mapping") } - return b.nextSlotProposer, nil + return nextSlotProposer, nil } -/* Returns next slot's proposer pubkey */ -// TODO: what happens if no block for previous slot - should still get next slot -func (b *BeaconClient) onForkchoiceUpdate() (uint64, error) { - b.mu.Lock() - defer b.mu.Unlock() +func (b *BeaconClient) Start() error { + go b.UpdateValidatorMapForever() + return nil +} +func (b *BeaconClient) UpdateValidatorMapForever() { + durationPerSlot := time.Duration(b.secondsInSlot) * time.Second + + prevFetchSlot := uint64(0) + + // fetch current epoch if beacon is online currentSlot, err := fetchCurrentSlot(b.endpoint) if err != nil { - return 0, err + log.Error("could not get current slot", "err", err) + } else { + currentEpoch := currentSlot / b.slotsInEpoch + slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch) + if err != nil { + log.Error("could not fetch validators map", "epoch", currentEpoch, "err", err) + } else { + b.mu.Lock() + b.slotProposerMap = slotProposerMap + b.mu.Unlock() + } } - nextSlot := currentSlot + 1 + retryDelay := time.Second - b.currentSlot = currentSlot - nextSlotEpoch := nextSlot / 32 + // Every half epoch request validators map, polling for the slot + // more frequently to avoid missing updates on errors + timer := time.NewTimer(retryDelay) + defer timer.Stop() + for true { + select { + case <-b.closeCh: + return + case <-timer.C: + } - if nextSlotEpoch != b.currentEpoch { - // TODO: this should be prepared in advance, possibly just fetch for next epoch in advance - slotProposerMap, err := fetchEpochProposersMap(b.endpoint, nextSlotEpoch) + currentSlot, err := fetchCurrentSlot(b.endpoint) if err != nil { - return 0, err + log.Error("could not get current slot", "err", err) + timer.Reset(retryDelay) + continue } - b.currentEpoch = nextSlotEpoch - b.slotProposerMap = slotProposerMap - } + // TODO: should poll after consistent slot within the epoch (slot % slotsInEpoch/2 == 0) + nextFetchSlot := prevFetchSlot + b.slotsInEpoch/2 + if currentSlot < nextFetchSlot { + timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot) + continue + } - nextSlotProposer, found := b.slotProposerMap[nextSlot] - if !found { - log.Error("inconsistent proposer mapping", "currentSlot", currentSlot, "slotProposerMap", b.slotProposerMap) - return 0, errors.New("inconsistent proposer mapping") + currentEpoch := currentSlot / b.slotsInEpoch + slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch+1) + if err != nil { + log.Error("could not fetch validators map", "epoch", currentEpoch+1, "err", err) + timer.Reset(retryDelay) + continue + } + + prevFetchSlot = currentSlot + b.mu.Lock() + // remove previous epoch slots + for k := range b.slotProposerMap { + if k < currentEpoch*b.slotsInEpoch { + delete(b.slotProposerMap, k) + } + } + // update the slot proposer map for next epoch + for k, v := range slotProposerMap { + b.slotProposerMap[k] = v + } + b.mu.Unlock() + + timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot) } - b.nextSlotProposer = nextSlotProposer - return nextSlot, nil } func fetchCurrentSlot(endpoint string) (uint64, error) { diff --git a/builder/beacon_client_test.go b/builder/beacon_client_test.go index 564275e5ad..6073488c30 100644 --- a/builder/beacon_client_test.go +++ b/builder/beacon_client_test.go @@ -174,95 +174,3 @@ func TestFetchEpochProposersMap(t *testing.T) { require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74a"), proposersMap[1]) require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b"), proposersMap[2]) } - -func TestOnForkchoiceUpdate(t *testing.T) { - mbn := newMockBeaconNode() - defer mbn.srv.Close() - - mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "31", "proposer_index": "1" } } } ] }`) - - mbn.proposerDuties[1] = []byte(`{ - "dependent_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", - "execution_optimistic": false, - "data": [ - { - "pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74a", - "validator_index": "1", - "slot": "31" - }, - { - "pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b", - "validator_index": "2", - "slot": "32" - }, - { - "pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74c", - "validator_index": "3", - "slot": "33" - } - ] -}`) - - bc := NewBeaconClient(mbn.srv.URL) - slot, err := bc.onForkchoiceUpdate() - require.NoError(t, err) - require.Equal(t, slot, uint64(32)) - - pubkeyHex, err := bc.getProposerForNextSlot(32) - require.NoError(t, err) - require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b"), pubkeyHex) - - _, err = bc.getProposerForNextSlot(31) - require.EqualError(t, err, "slot out of sync") - - _, err = bc.getProposerForNextSlot(33) - require.EqualError(t, err, "slot out of sync") - - mbn.headersCode = 404 - mbn.headersResp = []byte(`{ "code": 404, "message": "State not found" }`) - - slot, err = NewBeaconClient(mbn.srv.URL).onForkchoiceUpdate() - require.EqualError(t, err, "State not found") - require.Equal(t, slot, uint64(0)) - - // Check that client does not fetch new proposers if epoch did not change - mbn.headersCode = 200 - mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "31", "proposer_index": "1" } } } ] }`) - mbn.proposerDuties[1] = []byte(`{ - "data": [ - { - "pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d", - "validator_index": "4", - "slot": "32" - } - ] -}`) - - slot, err = bc.onForkchoiceUpdate() - require.NoError(t, err, "") - require.Equal(t, slot, uint64(32)) - - mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "63", "proposer_index": "1" } } } ] }`) - mbn.proposerDuties[2] = []byte(`{ - "data": [ - { - "pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d", - "validator_index": "4", - "slot": "64" - } - ] -}`) - - slot, err = bc.onForkchoiceUpdate() - require.NoError(t, err, "") - require.Equal(t, slot, uint64(64)) - - pubkeyHex, err = bc.getProposerForNextSlot(64) - require.NoError(t, err) - require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d"), pubkeyHex) - - // Check proposers map error is routed out - mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "65", "proposer_index": "1" } } } ] }`) - _, err = bc.onForkchoiceUpdate() - require.EqualError(t, err, "inconsistent proposer mapping") -} diff --git a/builder/builder.go b/builder/builder.go index 7a6b1e5aa6..c732bf0553 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -32,12 +32,15 @@ type ValidatorData struct { type IBeaconClient interface { isValidator(pubkey PubkeyHex) bool getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) - onForkchoiceUpdate() (uint64, error) + Start() error + Stop() } type IRelay interface { SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, vd ValidatorData) error GetValidatorForSlot(nextSlot uint64) (ValidatorData, error) + Start() error + Stop() } type IBuilder interface { @@ -89,7 +92,7 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe } func (b *Builder) Start() error { - return nil + return b.relay.Start() } func (b *Builder) Stop() error { diff --git a/builder/config.go b/builder/config.go index e22d0c3817..cc445d5efc 100644 --- a/builder/config.go +++ b/builder/config.go @@ -4,6 +4,8 @@ type Config struct { Enabled bool `toml:",omitempty"` EnableValidatorChecks bool `toml:",omitempty"` EnableLocalRelay bool `toml:",omitempty"` + SlotsInEpoch uint64 `toml:",omitempty"` + SecondsInSlot uint64 `toml:",omitempty"` DisableBundleFetcher bool `toml:",omitempty"` DryRun bool `toml:",omitempty"` BuilderSecretKey string `toml:",omitempty"` @@ -23,6 +25,8 @@ var DefaultConfig = Config{ Enabled: false, EnableValidatorChecks: false, EnableLocalRelay: false, + SlotsInEpoch: 32, + SecondsInSlot: 12, DisableBundleFetcher: false, DryRun: false, BuilderSecretKey: "0x2fc12ae741f29701f8e30f5de6350766c020cb80768a0ff01e6838ffd2431e11", diff --git a/builder/local_relay.go b/builder/local_relay.go index 513e341559..d07d463231 100644 --- a/builder/local_relay.go +++ b/builder/local_relay.go @@ -84,6 +84,15 @@ func NewLocalRelay(sk *bls.SecretKey, beaconClient IBeaconClient, builderSigning } } +func (r *LocalRelay) Start() error { + r.beaconClient.Start() + return nil +} + +func (r *LocalRelay) Stop() { + r.beaconClient.Stop() +} + func (r *LocalRelay) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, _ ValidatorData) error { log.Info("submitting block to local relay", "block", msg.ExecutionPayload.BlockHash.String()) return r.submitBlock(msg) diff --git a/builder/relay.go b/builder/relay.go index 5b11c8d9f0..3c7e7d6138 100644 --- a/builder/relay.go +++ b/builder/relay.go @@ -125,6 +125,12 @@ func (r *RemoteRelay) GetValidatorForSlot(nextSlot uint64) (ValidatorData, error return ValidatorData{}, ErrValidatorNotFound } +func (r *RemoteRelay) Start() error { + return nil +} + +func (r *RemoteRelay) Stop() {} + func (r *RemoteRelay) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, _ ValidatorData) error { log.Info("submitting block to remote relay", "endpoint", r.endpoint) code, err := server.SendHTTPRequest(context.TODO(), *http.DefaultClient, http.MethodPost, r.endpoint+"/relay/v1/builder/blocks", msg, nil) diff --git a/builder/relay_aggregator.go b/builder/relay_aggregator.go index d5969982f8..bb141822ed 100644 --- a/builder/relay_aggregator.go +++ b/builder/relay_aggregator.go @@ -24,6 +24,22 @@ func NewRemoteRelayAggregator(primary IRelay, secondary []IRelay) *RemoteRelayAg } } +func (r *RemoteRelayAggregator) Start() error { + for _, relay := range r.relays { + err := relay.Start() + if err != nil { + return err + } + } + return nil +} + +func (r *RemoteRelayAggregator) Stop() { + for _, relay := range r.relays { + relay.Stop() + } +} + func (r *RemoteRelayAggregator) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, registration ValidatorData) error { r.registrationsCacheLock.RLock() defer r.registrationsCacheLock.RUnlock() diff --git a/builder/relay_aggregator_test.go b/builder/relay_aggregator_test.go index 84daed0b8e..d42aff65f0 100644 --- a/builder/relay_aggregator_test.go +++ b/builder/relay_aggregator_test.go @@ -58,6 +58,12 @@ func (r *testRelay) GetValidatorForSlot(nextSlot uint64) (ValidatorData, error) return r.gvsVd, r.gvsErr } +func (r *testRelay) Start() error { + return nil +} + +func (r *testRelay) Stop() {} + func TestRemoteRelayAggregator(t *testing.T) { t.Run("should return error if no relays return validator data", func(t *testing.T) { backend := newTestRelayAggBackend(3) diff --git a/builder/service.go b/builder/service.go index aea8349c13..e343bcfae1 100644 --- a/builder/service.go +++ b/builder/service.go @@ -144,7 +144,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error { copy(bellatrixForkVersion[:], bellatrixForkVersionBytes[:4]) proposerSigningDomain := boostTypes.ComputeDomain(boostTypes.DomainTypeBeaconProposer, bellatrixForkVersion, genesisValidatorsRoot) - beaconClient := NewBeaconClient(cfg.BeaconEndpoint) + beaconClient := NewBeaconClient(cfg.BeaconEndpoint, cfg.SlotsInEpoch, cfg.SecondsInSlot) var localRelay *LocalRelay if cfg.EnableLocalRelay { diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 320340897d..1a8e1dcf55 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -166,6 +166,8 @@ var ( utils.BuilderEnableValidatorChecks, utils.BuilderBlockValidationBlacklistSourceFilePath, utils.BuilderEnableLocalRelay, + utils.BuilderSecondsInSlot, + utils.BuilderSlotsInEpoch, utils.BuilderDisableBundleFetcher, utils.BuilderDryRun, utils.BuilderSecretKey, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index d44e1e3491..7b55b48745 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -708,6 +708,18 @@ var ( Usage: "Enable the local relay", Category: flags.BuilderCategory, } + BuilderSlotsInEpoch = &cli.Uint64Flag{ + Name: "builder.slots_in_epoch", + Usage: "Set the number of slots in an epoch in the local relay", + Value: 32, + Category: flags.BuilderCategory, + } + BuilderSecondsInSlot = &cli.Uint64Flag{ + Name: "builder.seconds_in_slot", + Usage: "Set the number of seconds in a slot in the local relay", + Value: 12, + Category: flags.BuilderCategory, + } BuilderDisableBundleFetcher = &cli.BoolFlag{ Name: "builder.no_bundle_fetcher", Usage: "Disable the bundle fetcher", @@ -1577,6 +1589,8 @@ func SetBuilderConfig(ctx *cli.Context, cfg *builder.Config) { cfg.Enabled = ctx.IsSet(BuilderEnabled.Name) cfg.EnableValidatorChecks = ctx.IsSet(BuilderEnableValidatorChecks.Name) cfg.EnableLocalRelay = ctx.IsSet(BuilderEnableLocalRelay.Name) + cfg.SlotsInEpoch = ctx.Uint64(BuilderSlotsInEpoch.Name) + cfg.SecondsInSlot = ctx.Uint64(BuilderSecondsInSlot.Name) cfg.DisableBundleFetcher = ctx.IsSet(BuilderDisableBundleFetcher.Name) cfg.DryRun = ctx.IsSet(BuilderDryRun.Name) cfg.BuilderSecretKey = ctx.String(BuilderSecretKey.Name) diff --git a/miner/worker.go b/miner/worker.go index b0b032a225..aebbb66f6f 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1396,8 +1396,8 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { } defer work.discard() - finalizeFn := func(env *environment, orderCloseTime time.Time, blockBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle) (*types.Block, error) { - block, err := w.finalizeBlock(env, validatorCoinbase) + finalizeFn := func(env *environment, orderCloseTime time.Time, blockBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, noTxs bool) (*types.Block, error) { + block, err := w.finalizeBlock(env, validatorCoinbase, noTxs) if err != nil { log.Error("could not finalize block", "err", err) return nil, err @@ -1412,7 +1412,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { } if params.noTxs { - return finalizeFn(work, time.Now(), nil, nil) + return finalizeFn(work, time.Now(), nil, nil, true) } paymentTxReserve, err := w.proposerTxPrepare(work, &validatorCoinbase) @@ -1436,15 +1436,20 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { return nil, err } + // no bundles or tx from mempool + if len(work.txs) == 0 { + return finalizeFn(work, orderCloseTime, blockBundles, allBundles, true) + } + err = w.proposerTxCommit(work, &validatorCoinbase, paymentTxReserve) if err != nil { return nil, err } - return finalizeFn(work, orderCloseTime, blockBundles, allBundles) + return finalizeFn(work, orderCloseTime, blockBundles, allBundles, false) } -func (w *worker) finalizeBlock(work *environment, validatorCoinbase common.Address) (*types.Block, error) { +func (w *worker) finalizeBlock(work *environment, validatorCoinbase common.Address, noTxs bool) (*types.Block, error) { block, err := w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) if err != nil { return nil, err @@ -1456,6 +1461,10 @@ func (w *worker) finalizeBlock(work *environment, validatorCoinbase common.Addre return block, nil } + if noTxs { + return block, nil + } + blockProfit, err := w.checkProposerPayment(work, validatorCoinbase) if err != nil { return nil, err