Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 92 additions & 35 deletions builder/beacon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"strconv"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand All @@ -19,82 +20,138 @@ type testBeaconClient struct {
slot uint64
}

func (b *testBeaconClient) Stop() {
return
}

func (b *testBeaconClient) isValidator(pubkey PubkeyHex) bool {
return true
}
func (b *testBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
return PubkeyHex(hexutil.Encode(b.validator.Pk)), nil
}
func (b *testBeaconClient) onForkchoiceUpdate() (uint64, error) {
return b.slot, nil
func (b *testBeaconClient) Start() error {
return nil
}

type BeaconClient struct {
endpoint string
endpoint string
slotsInEpoch uint64
secondsInSlot uint64

mu sync.Mutex
slotProposerMap map[uint64]PubkeyHex

mu sync.Mutex
currentEpoch uint64
currentSlot uint64
nextSlotProposer PubkeyHex
slotProposerMap map[uint64]PubkeyHex
closeCh chan struct{}
}

func NewBeaconClient(endpoint string) *BeaconClient {
func NewBeaconClient(endpoint string, slotsInEpoch uint64, secondsInSlot uint64) *BeaconClient {
return &BeaconClient{
endpoint: endpoint,
slotsInEpoch: slotsInEpoch,
secondsInSlot: secondsInSlot,
slotProposerMap: make(map[uint64]PubkeyHex),
closeCh: make(chan struct{}),
}
}

func (b *BeaconClient) Stop() {
close(b.closeCh)
}

func (b *BeaconClient) isValidator(pubkey PubkeyHex) bool {
return true
}

func (b *BeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
/* Only returns proposer if requestedSlot is currentSlot + 1, would be a race otherwise */
b.mu.Lock()
defer b.mu.Unlock()

if b.currentSlot+1 != requestedSlot {
return PubkeyHex(""), errors.New("slot out of sync")
nextSlotProposer, found := b.slotProposerMap[requestedSlot]
if !found {
log.Error("inconsistent proposer mapping", "requestSlot", requestedSlot, "slotProposerMap", b.slotProposerMap)
return PubkeyHex(""), errors.New("inconsistent proposer mapping")
}
return b.nextSlotProposer, nil
return nextSlotProposer, nil
}

/* Returns next slot's proposer pubkey */
// TODO: what happens if no block for previous slot - should still get next slot
func (b *BeaconClient) onForkchoiceUpdate() (uint64, error) {
b.mu.Lock()
defer b.mu.Unlock()
func (b *BeaconClient) Start() error {
go b.UpdateValidatorMapForever()
return nil
}

func (b *BeaconClient) UpdateValidatorMapForever() {
durationPerSlot := time.Duration(b.secondsInSlot) * time.Second

prevFetchSlot := uint64(0)

// fetch current epoch if beacon is online
currentSlot, err := fetchCurrentSlot(b.endpoint)
if err != nil {
return 0, err
log.Error("could not get current slot", "err", err)
} else {
currentEpoch := currentSlot / b.slotsInEpoch
slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch)
if err != nil {
log.Error("could not fetch validators map", "epoch", currentEpoch, "err", err)
} else {
b.mu.Lock()
b.slotProposerMap = slotProposerMap
b.mu.Unlock()
}
}

nextSlot := currentSlot + 1
retryDelay := time.Second

b.currentSlot = currentSlot
nextSlotEpoch := nextSlot / 32
// Every half epoch request validators map, polling for the slot
// more frequently to avoid missing updates on errors
timer := time.NewTimer(retryDelay)
defer timer.Stop()
for true {
select {
case <-b.closeCh:
return
case <-timer.C:
}

if nextSlotEpoch != b.currentEpoch {
// TODO: this should be prepared in advance, possibly just fetch for next epoch in advance
slotProposerMap, err := fetchEpochProposersMap(b.endpoint, nextSlotEpoch)
currentSlot, err := fetchCurrentSlot(b.endpoint)
if err != nil {
return 0, err
log.Error("could not get current slot", "err", err)
timer.Reset(retryDelay)
continue
}

b.currentEpoch = nextSlotEpoch
b.slotProposerMap = slotProposerMap
}
// TODO: should poll after consistent slot within the epoch (slot % slotsInEpoch/2 == 0)
nextFetchSlot := prevFetchSlot + b.slotsInEpoch/2
if currentSlot < nextFetchSlot {
timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot)
continue
}

nextSlotProposer, found := b.slotProposerMap[nextSlot]
if !found {
log.Error("inconsistent proposer mapping", "currentSlot", currentSlot, "slotProposerMap", b.slotProposerMap)
return 0, errors.New("inconsistent proposer mapping")
currentEpoch := currentSlot / b.slotsInEpoch
slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch+1)
if err != nil {
log.Error("could not fetch validators map", "epoch", currentEpoch+1, "err", err)
timer.Reset(retryDelay)
continue
}

prevFetchSlot = currentSlot
b.mu.Lock()
// remove previous epoch slots
for k := range b.slotProposerMap {
if k < currentEpoch*b.slotsInEpoch {
delete(b.slotProposerMap, k)
}
}
// update the slot proposer map for next epoch
for k, v := range slotProposerMap {
b.slotProposerMap[k] = v
}
b.mu.Unlock()

timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot)
}
b.nextSlotProposer = nextSlotProposer
return nextSlot, nil
}

func fetchCurrentSlot(endpoint string) (uint64, error) {
Expand Down
92 changes: 0 additions & 92 deletions builder/beacon_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,95 +174,3 @@ func TestFetchEpochProposersMap(t *testing.T) {
require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74a"), proposersMap[1])
require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b"), proposersMap[2])
}

func TestOnForkchoiceUpdate(t *testing.T) {
mbn := newMockBeaconNode()
defer mbn.srv.Close()

mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "31", "proposer_index": "1" } } } ] }`)

mbn.proposerDuties[1] = []byte(`{
"dependent_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
"execution_optimistic": false,
"data": [
{
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74a",
"validator_index": "1",
"slot": "31"
},
{
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b",
"validator_index": "2",
"slot": "32"
},
{
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74c",
"validator_index": "3",
"slot": "33"
}
]
}`)

bc := NewBeaconClient(mbn.srv.URL)
slot, err := bc.onForkchoiceUpdate()
require.NoError(t, err)
require.Equal(t, slot, uint64(32))

pubkeyHex, err := bc.getProposerForNextSlot(32)
require.NoError(t, err)
require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b"), pubkeyHex)

_, err = bc.getProposerForNextSlot(31)
require.EqualError(t, err, "slot out of sync")

_, err = bc.getProposerForNextSlot(33)
require.EqualError(t, err, "slot out of sync")

mbn.headersCode = 404
mbn.headersResp = []byte(`{ "code": 404, "message": "State not found" }`)

slot, err = NewBeaconClient(mbn.srv.URL).onForkchoiceUpdate()
require.EqualError(t, err, "State not found")
require.Equal(t, slot, uint64(0))

// Check that client does not fetch new proposers if epoch did not change
mbn.headersCode = 200
mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "31", "proposer_index": "1" } } } ] }`)
mbn.proposerDuties[1] = []byte(`{
"data": [
{
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d",
"validator_index": "4",
"slot": "32"
}
]
}`)

slot, err = bc.onForkchoiceUpdate()
require.NoError(t, err, "")
require.Equal(t, slot, uint64(32))

mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "63", "proposer_index": "1" } } } ] }`)
mbn.proposerDuties[2] = []byte(`{
"data": [
{
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d",
"validator_index": "4",
"slot": "64"
}
]
}`)

slot, err = bc.onForkchoiceUpdate()
require.NoError(t, err, "")
require.Equal(t, slot, uint64(64))

pubkeyHex, err = bc.getProposerForNextSlot(64)
require.NoError(t, err)
require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d"), pubkeyHex)

// Check proposers map error is routed out
mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "65", "proposer_index": "1" } } } ] }`)
_, err = bc.onForkchoiceUpdate()
require.EqualError(t, err, "inconsistent proposer mapping")
}
7 changes: 5 additions & 2 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ type ValidatorData struct {
type IBeaconClient interface {
isValidator(pubkey PubkeyHex) bool
getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error)
onForkchoiceUpdate() (uint64, error)
Start() error
Stop()
}

type IRelay interface {
SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, vd ValidatorData) error
GetValidatorForSlot(nextSlot uint64) (ValidatorData, error)
Start() error
Stop()
}

type IBuilder interface {
Expand Down Expand Up @@ -89,7 +92,7 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe
}

func (b *Builder) Start() error {
return nil
return b.relay.Start()
}

func (b *Builder) Stop() error {
Expand Down
4 changes: 4 additions & 0 deletions builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ type Config struct {
Enabled bool `toml:",omitempty"`
EnableValidatorChecks bool `toml:",omitempty"`
EnableLocalRelay bool `toml:",omitempty"`
SlotsInEpoch uint64 `toml:",omitempty"`
SecondsInSlot uint64 `toml:",omitempty"`
DisableBundleFetcher bool `toml:",omitempty"`
DryRun bool `toml:",omitempty"`
BuilderSecretKey string `toml:",omitempty"`
Expand All @@ -23,6 +25,8 @@ var DefaultConfig = Config{
Enabled: false,
EnableValidatorChecks: false,
EnableLocalRelay: false,
SlotsInEpoch: 32,
SecondsInSlot: 12,
DisableBundleFetcher: false,
DryRun: false,
BuilderSecretKey: "0x2fc12ae741f29701f8e30f5de6350766c020cb80768a0ff01e6838ffd2431e11",
Expand Down
9 changes: 9 additions & 0 deletions builder/local_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ func NewLocalRelay(sk *bls.SecretKey, beaconClient IBeaconClient, builderSigning
}
}

func (r *LocalRelay) Start() error {
r.beaconClient.Start()
return nil
}

func (r *LocalRelay) Stop() {
r.beaconClient.Stop()
}

func (r *LocalRelay) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, _ ValidatorData) error {
log.Info("submitting block to local relay", "block", msg.ExecutionPayload.BlockHash.String())
return r.submitBlock(msg)
Expand Down
6 changes: 6 additions & 0 deletions builder/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ func (r *RemoteRelay) GetValidatorForSlot(nextSlot uint64) (ValidatorData, error
return ValidatorData{}, ErrValidatorNotFound
}

func (r *RemoteRelay) Start() error {
return nil
}

func (r *RemoteRelay) Stop() {}

func (r *RemoteRelay) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, _ ValidatorData) error {
log.Info("submitting block to remote relay", "endpoint", r.endpoint)
code, err := server.SendHTTPRequest(context.TODO(), *http.DefaultClient, http.MethodPost, r.endpoint+"/relay/v1/builder/blocks", msg, nil)
Expand Down
16 changes: 16 additions & 0 deletions builder/relay_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ func NewRemoteRelayAggregator(primary IRelay, secondary []IRelay) *RemoteRelayAg
}
}

func (r *RemoteRelayAggregator) Start() error {
for _, relay := range r.relays {
err := relay.Start()
if err != nil {
return err
}
}
return nil
}

func (r *RemoteRelayAggregator) Stop() {
for _, relay := range r.relays {
relay.Stop()
}
}

func (r *RemoteRelayAggregator) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, registration ValidatorData) error {
r.registrationsCacheLock.RLock()
defer r.registrationsCacheLock.RUnlock()
Expand Down
6 changes: 6 additions & 0 deletions builder/relay_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ func (r *testRelay) GetValidatorForSlot(nextSlot uint64) (ValidatorData, error)
return r.gvsVd, r.gvsErr
}

func (r *testRelay) Start() error {
return nil
}

func (r *testRelay) Stop() {}

func TestRemoteRelayAggregator(t *testing.T) {
t.Run("should return error if no relays return validator data", func(t *testing.T) {
backend := newTestRelayAggBackend(3)
Expand Down
2 changes: 1 addition & 1 deletion builder/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error {
copy(bellatrixForkVersion[:], bellatrixForkVersionBytes[:4])
proposerSigningDomain := boostTypes.ComputeDomain(boostTypes.DomainTypeBeaconProposer, bellatrixForkVersion, genesisValidatorsRoot)

beaconClient := NewBeaconClient(cfg.BeaconEndpoint)
beaconClient := NewBeaconClient(cfg.BeaconEndpoint, cfg.SlotsInEpoch, cfg.SecondsInSlot)

var localRelay *LocalRelay
if cfg.EnableLocalRelay {
Expand Down
Loading