|  | 
| 1 | 1 | package builder | 
| 2 | 2 | 
 | 
| 3 | 3 | import ( | 
|  | 4 | +	"context" | 
| 4 | 5 | 	"errors" | 
|  | 6 | +	"golang.org/x/time/rate" | 
| 5 | 7 | 	"math/big" | 
| 6 | 8 | 	_ "os" | 
| 7 | 9 | 	"sync" | 
| @@ -44,66 +46,53 @@ type IBuilder interface { | 
| 44 | 46 | } | 
| 45 | 47 | 
 | 
| 46 | 48 | type Builder struct { | 
| 47 |  | -	ds                         flashbotsextra.IDatabaseService | 
| 48 |  | -	beaconClient               IBeaconClient | 
| 49 |  | -	relay                      IRelay | 
| 50 |  | -	eth                        IEthereumService | 
| 51 |  | -	resubmitter                Resubmitter | 
| 52 |  | -	blockSubmissionRateLimiter *BlockSubmissionRateLimiter | 
| 53 |  | -	builderSecretKey           *bls.SecretKey | 
| 54 |  | -	builderPublicKey           boostTypes.PublicKey | 
| 55 |  | -	builderSigningDomain       boostTypes.Domain | 
| 56 |  | - | 
| 57 |  | -	bestMu          sync.Mutex | 
| 58 |  | -	bestAttrs       BuilderPayloadAttributes | 
| 59 |  | -	bestBlockProfit *big.Int | 
|  | 49 | +	ds                   flashbotsextra.IDatabaseService | 
|  | 50 | +	relay                IRelay | 
|  | 51 | +	eth                  IEthereumService | 
|  | 52 | +	builderSecretKey     *bls.SecretKey | 
|  | 53 | +	builderPublicKey     boostTypes.PublicKey | 
|  | 54 | +	builderSigningDomain boostTypes.Domain | 
|  | 55 | + | 
|  | 56 | +	limiter *rate.Limiter | 
|  | 57 | + | 
|  | 58 | +	slotMu        sync.Mutex | 
|  | 59 | +	slot          uint64 | 
|  | 60 | +	slotAttrs     []BuilderPayloadAttributes | 
|  | 61 | +	slotCtx       context.Context | 
|  | 62 | +	slotCtxCancel context.CancelFunc | 
| 60 | 63 | } | 
| 61 | 64 | 
 | 
| 62 |  | -func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, bc IBeaconClient, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder { | 
|  | 65 | +func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder { | 
| 63 | 66 | 	pkBytes := bls.PublicKeyFromSecretKey(sk).Compress() | 
| 64 | 67 | 	pk := boostTypes.PublicKey{} | 
| 65 | 68 | 	pk.FromSlice(pkBytes) | 
| 66 | 69 | 
 | 
|  | 70 | +	slotCtx, slotCtxCancel := context.WithCancel(context.Background()) | 
| 67 | 71 | 	return &Builder{ | 
| 68 |  | -		ds:                         ds, | 
| 69 |  | -		beaconClient:               bc, | 
| 70 |  | -		relay:                      relay, | 
| 71 |  | -		eth:                        eth, | 
| 72 |  | -		resubmitter:                Resubmitter{}, | 
| 73 |  | -		blockSubmissionRateLimiter: NewBlockSubmissionRateLimiter(), | 
| 74 |  | -		builderSecretKey:           sk, | 
| 75 |  | -		builderPublicKey:           pk, | 
| 76 |  | - | 
|  | 72 | +		ds:                   ds, | 
|  | 73 | +		relay:                relay, | 
|  | 74 | +		eth:                  eth, | 
|  | 75 | +		builderSecretKey:     sk, | 
|  | 76 | +		builderPublicKey:     pk, | 
| 77 | 77 | 		builderSigningDomain: builderSigningDomain, | 
| 78 |  | -		bestBlockProfit:      big.NewInt(0), | 
|  | 78 | + | 
|  | 79 | +		limiter:       rate.NewLimiter(rate.Every(time.Second), 1), | 
|  | 80 | +		slot:          0, | 
|  | 81 | +		slotCtx:       slotCtx, | 
|  | 82 | +		slotCtxCancel: slotCtxCancel, | 
| 79 | 83 | 	} | 
| 80 | 84 | } | 
| 81 | 85 | 
 | 
| 82 | 86 | func (b *Builder) Start() error { | 
| 83 |  | -	b.blockSubmissionRateLimiter.Start() | 
| 84 | 87 | 	return nil | 
| 85 | 88 | } | 
| 86 | 89 | 
 | 
| 87 | 90 | func (b *Builder) Stop() error { | 
| 88 |  | -	b.blockSubmissionRateLimiter.Stop() | 
| 89 | 91 | 	return nil | 
| 90 | 92 | } | 
| 91 | 93 | 
 | 
| 92 | 94 | func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBundle, proposerPubkey boostTypes.PublicKey, proposerFeeRecipient boostTypes.Address, attrs *BuilderPayloadAttributes) error { | 
| 93 |  | -	b.bestMu.Lock() | 
| 94 |  | -	defer b.bestMu.Unlock() | 
| 95 |  | - | 
| 96 |  | -	// Do not submit blocks that don't improve the profit | 
| 97 |  | -	if b.bestAttrs != *attrs { | 
| 98 |  | -		b.bestAttrs = *attrs | 
| 99 |  | -		b.bestBlockProfit.SetInt64(0) | 
| 100 |  | -	} else { | 
| 101 |  | -		if block.Profit.Cmp(b.bestBlockProfit) <= 0 { | 
| 102 |  | -			log.Info("Ignoring block that is not improving the profit") | 
| 103 |  | -			return nil | 
| 104 |  | -		} | 
| 105 |  | -	} | 
| 106 |  | - | 
|  | 95 | +	start := time.Now() | 
| 107 | 96 | 	executableData := beacon.BlockToExecutableData(block) | 
| 108 | 97 | 	payload, err := executableDataToExecutionPayload(executableData) | 
| 109 | 98 | 	if err != nil { | 
| @@ -152,9 +141,8 @@ func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBun | 
| 152 | 141 | 		log.Info("could submit block", "bundles", len(bundles)) | 
| 153 | 142 | 	} | 
| 154 | 143 | 
 | 
| 155 |  | -	log.Info("submitted block", "header", block.Header(), "bid", blockBidMsg) | 
|  | 144 | +	log.Info("submitted block", "header", block.Header(), "bid", blockBidMsg, "time", time.Since(start)) | 
| 156 | 145 | 
 | 
| 157 |  | -	b.bestBlockProfit.Set(block.Profit) | 
| 158 | 146 | 	return nil | 
| 159 | 147 | } | 
| 160 | 148 | 
 | 
| @@ -188,29 +176,100 @@ func (b *Builder) OnPayloadAttribute(attrs *BuilderPayloadAttributes) error { | 
| 188 | 176 | 		return errors.New("parent block not found in blocktree") | 
| 189 | 177 | 	} | 
| 190 | 178 | 
 | 
| 191 |  | -	blockHook := func(block *types.Block, bundles []types.SimulatedBundle) { | 
| 192 |  | -		select { | 
| 193 |  | -		case shouldSubmit := <-b.blockSubmissionRateLimiter.Limit(block): | 
| 194 |  | -			if !shouldSubmit { | 
| 195 |  | -				log.Info("Block rate limited", "blochHash", block.Hash()) | 
| 196 |  | -				return | 
|  | 179 | +	b.slotMu.Lock() | 
|  | 180 | +	defer b.slotMu.Unlock() | 
|  | 181 | + | 
|  | 182 | +	if b.slot != attrs.Slot { | 
|  | 183 | +		if b.slotCtxCancel != nil { | 
|  | 184 | +			b.slotCtxCancel() | 
|  | 185 | +		} | 
|  | 186 | + | 
|  | 187 | +		slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second) | 
|  | 188 | +		b.slot = attrs.Slot | 
|  | 189 | +		b.slotAttrs = nil | 
|  | 190 | +		b.slotCtx = slotCtx | 
|  | 191 | +		b.slotCtxCancel = slotCtxCancel | 
|  | 192 | +	} | 
|  | 193 | + | 
|  | 194 | +	for _, currentAttrs := range b.slotAttrs { | 
|  | 195 | +		if *attrs == currentAttrs { | 
|  | 196 | +			log.Debug("ignoring known payload attribute", "slot", attrs.Slot, "hash", attrs.HeadHash) | 
|  | 197 | +			return nil | 
|  | 198 | +		} | 
|  | 199 | +	} | 
|  | 200 | +	b.slotAttrs = append(b.slotAttrs, *attrs) | 
|  | 201 | + | 
|  | 202 | +	go b.runBuildingJob(b.slotCtx, proposerPubkey, vd.FeeRecipient, attrs) | 
|  | 203 | +	return nil | 
|  | 204 | +} | 
|  | 205 | + | 
|  | 206 | +func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTypes.PublicKey, feeRecipient boostTypes.Address, attrs *BuilderPayloadAttributes) { | 
|  | 207 | +	ctx, cancel := context.WithTimeout(slotCtx, 12*time.Second) | 
|  | 208 | +	defer cancel() | 
|  | 209 | + | 
|  | 210 | +	// Submission queue for the given payload attributes | 
|  | 211 | +	// multiple jobs can run for different attributes fot the given slot | 
|  | 212 | +	// 1. When new block is ready we check if its profit is higher than profit of last best block | 
|  | 213 | +	//    if it is we set queueBest* to values of the new block and notify queueSignal channel. | 
|  | 214 | +	// 2. Submission goroutine waits for queueSignal and submits queueBest* if its more valuable than | 
|  | 215 | +	//    queueLastSubmittedProfit keeping queueLastSubmittedProfit to be the profit of the last submission. | 
|  | 216 | +	//    Submission goroutine is globally rate limited to have fixed rate of submissions for all jobs. | 
|  | 217 | +	var ( | 
|  | 218 | +		queueSignal = make(chan struct{}, 1) | 
|  | 219 | + | 
|  | 220 | +		queueMu                  sync.Mutex | 
|  | 221 | +		queueLastSubmittedProfit = new(big.Int) | 
|  | 222 | +		queueBestProfit          = new(big.Int) | 
|  | 223 | +		queueBestBlock           *types.Block | 
|  | 224 | +		queueBestBundles         []types.SimulatedBundle | 
|  | 225 | +	) | 
|  | 226 | + | 
|  | 227 | +	log.Debug("runBuildingJob", "slot", attrs.Slot, "parent", attrs.HeadHash) | 
|  | 228 | + | 
|  | 229 | +	submitBestBlock := func() { | 
|  | 230 | +		queueMu.Lock() | 
|  | 231 | +		if queueLastSubmittedProfit.Cmp(queueBestProfit) < 0 { | 
|  | 232 | +			err := b.onSealedBlock(queueBestBlock, queueBestBundles, proposerPubkey, feeRecipient, attrs) | 
|  | 233 | +			if err != nil { | 
|  | 234 | +				log.Error("could not run sealed block hook", "err", err) | 
|  | 235 | +			} else { | 
|  | 236 | +				queueLastSubmittedProfit.Set(queueBestProfit) | 
| 197 | 237 | 			} | 
| 198 |  | -		case <-time.After(200 * time.Millisecond): | 
| 199 |  | -			log.Info("Block rate limit timeout, submitting the block anyway") | 
| 200 | 238 | 		} | 
|  | 239 | +		queueMu.Unlock() | 
|  | 240 | +	} | 
| 201 | 241 | 
 | 
| 202 |  | -		err := b.onSealedBlock(block, bundles, proposerPubkey, vd.FeeRecipient, attrs) | 
| 203 |  | -		if err != nil { | 
| 204 |  | -			log.Error("could not run sealed block hook", "err", err) | 
|  | 242 | +	// Empties queue, submits the best block for current job with rate limit (global for all jobs) | 
|  | 243 | +	go runResubmitLoop(ctx, b.limiter, queueSignal, submitBestBlock) | 
|  | 244 | + | 
|  | 245 | +	// Populates queue with submissions that increase block profit | 
|  | 246 | +	blockHook := func(block *types.Block, bundles []types.SimulatedBundle) { | 
|  | 247 | +		if ctx.Err() != nil { | 
|  | 248 | +			return | 
|  | 249 | +		} | 
|  | 250 | + | 
|  | 251 | +		queueMu.Lock() | 
|  | 252 | +		defer queueMu.Unlock() | 
|  | 253 | +		if block.Profit.Cmp(queueBestProfit) > 0 { | 
|  | 254 | +			queueBestBlock = block | 
|  | 255 | +			queueBestBundles = bundles | 
|  | 256 | +			queueBestProfit.Set(block.Profit) | 
|  | 257 | + | 
|  | 258 | +			select { | 
|  | 259 | +			case queueSignal <- struct{}{}: | 
|  | 260 | +			default: | 
|  | 261 | +			} | 
| 205 | 262 | 		} | 
| 206 | 263 | 	} | 
| 207 | 264 | 
 | 
| 208 |  | -	firstBlockResult := b.resubmitter.newTask(12*time.Second, time.Second, func() error { | 
| 209 |  | -		log.Info("Resubmitting build job") | 
| 210 |  | -		return b.eth.BuildBlock(attrs, blockHook) | 
|  | 265 | +	// resubmits block builder requests every second | 
|  | 266 | +	runRetryLoop(ctx, time.Second, func() { | 
|  | 267 | +		log.Debug("retrying BuildBlock", "slot", attrs.Slot, "parent", attrs.HeadHash) | 
|  | 268 | +		err := b.eth.BuildBlock(attrs, blockHook) | 
|  | 269 | +		if err != nil { | 
|  | 270 | +			log.Warn("Failed to build block", "err", err) | 
|  | 271 | +		} | 
| 211 | 272 | 	}) | 
| 212 |  | - | 
| 213 |  | -	return firstBlockResult | 
| 214 | 273 | } | 
| 215 | 274 | 
 | 
| 216 | 275 | func executableDataToExecutionPayload(data *beacon.ExecutableDataV1) (*boostTypes.ExecutionPayload, error) { | 
|  | 
0 commit comments