Skip to content

Commit 172e961

Browse files
karalabeshekhirin
authored andcommitted
eth: use the last announced finalized block as the sync ancient limit (ethereum#26685)
1 parent 4ef10e7 commit 172e961

File tree

8 files changed

+104
-55
lines changed

8 files changed

+104
-55
lines changed

eth/catalyst/api.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,15 +237,27 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
237237
log.Warn("Forkchoice requested unknown head", "hash", update.HeadBlockHash)
238238
return engine.STATUS_SYNCING, nil
239239
}
240+
// If the finalized hash is known, we can direct the downloader to move
241+
// potentially more data to the freezer from the get go.
242+
finalized := api.remoteBlocks.get(update.FinalizedBlockHash)
243+
240244
// Header advertised via a past newPayload request. Start syncing to it.
241245
// Before we do however, make sure any legacy sync in switched off so we
242246
// don't accidentally have 2 cycles running.
243247
if merger := api.eth.Merger(); !merger.TDDReached() {
244248
merger.ReachTTD()
245249
api.eth.Downloader().Cancel()
246250
}
247-
log.Info("Forkchoice requested sync to new head", "number", header.Number, "hash", header.Hash())
248-
if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header); err != nil {
251+
context := []interface{}{"number", header.Number, "hash", header.Hash()}
252+
if update.FinalizedBlockHash != (common.Hash{}) {
253+
if finalized == nil {
254+
context = append(context, []interface{}{"finalized", "unknown"}...)
255+
} else {
256+
context = append(context, []interface{}{"finalized", finalized.Number}...)
257+
}
258+
}
259+
log.Info("Forkchoice requested sync to new head", context...)
260+
if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header, finalized); err != nil {
249261
return engine.STATUS_SYNCING, err
250262
}
251263
return engine.STATUS_SYNCING, nil

eth/catalyst/queue.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@ import (
3131
const maxTrackedPayloads = 10
3232

3333
// maxTrackedHeaders is the maximum number of executed payloads the execution
34-
// engine tracks before evicting old ones. Ideally we should only ever track the
35-
// latest one; but have a slight wiggle room for non-ideal conditions.
36-
const maxTrackedHeaders = 10
34+
// engine tracks before evicting old ones. These are tracked outside the chain
35+
// during initial sync to allow ForkchoiceUpdate to reference past blocks via
36+
// hashes only. For the sync target it would be enough to track only the latest
37+
// header, but snap sync also needs the latest finalized height for the ancient
38+
// limit.
39+
const maxTrackedHeaders = 96
3740

3841
// payloadQueueItem represents an id->payload tuple to store until it's retrieved
3942
// or evicted.

eth/catalyst/tester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (tester *FullSyncTester) Start() error {
7575
}
7676
// Trigger beacon sync with the provided block header as
7777
// trusted chain head.
78-
err := tester.api.eth.Downloader().BeaconSync(downloader.FullSync, tester.block.Header())
78+
err := tester.api.eth.Downloader().BeaconSync(downloader.FullSync, tester.block.Header(), nil)
7979
if err != nil {
8080
log.Info("Failed to beacon sync", "err", err)
8181
}

eth/downloader/beaconsync.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,8 @@ func (d *Downloader) SetBadBlockCallback(onBadBlock badBlockFn) {
151151
//
152152
// Internally backfilling and state sync is done the same way, but the header
153153
// retrieval and scheduling is replaced.
154-
func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error {
155-
return d.beaconSync(mode, head, true)
154+
func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header, final *types.Header) error {
155+
return d.beaconSync(mode, head, final, true)
156156
}
157157

158158
// BeaconExtend is an optimistic version of BeaconSync, where an attempt is made
@@ -162,7 +162,7 @@ func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error {
162162
// This is useful if a beacon client is feeding us large chunks of payloads to run,
163163
// but is not setting the head after each.
164164
func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error {
165-
return d.beaconSync(mode, head, false)
165+
return d.beaconSync(mode, head, nil, false)
166166
}
167167

168168
// beaconSync is the post-merge version of the chain synchronization, where the
@@ -171,7 +171,7 @@ func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error {
171171
//
172172
// Internally backfilling and state sync is done the same way, but the header
173173
// retrieval and scheduling is replaced.
174-
func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) error {
174+
func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, final *types.Header, force bool) error {
175175
// When the downloader starts a sync cycle, it needs to be aware of the sync
176176
// mode to use (full, snap). To keep the skeleton chain oblivious, inject the
177177
// mode into the backfiller directly.
@@ -181,7 +181,7 @@ func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) e
181181
d.skeleton.filler.(*beaconBackfiller).setMode(mode)
182182

183183
// Signal the skeleton sync to switch to a new head, however it wants
184-
if err := d.skeleton.Sync(head, force); err != nil {
184+
if err := d.skeleton.Sync(head, final, force); err != nil {
185185
return err
186186
}
187187
return nil
@@ -207,7 +207,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
207207
number := chainHead.Number.Uint64()
208208

209209
// Retrieve the skeleton bounds and ensure they are linked to the local chain
210-
beaconHead, beaconTail, err := d.skeleton.Bounds()
210+
beaconHead, beaconTail, _, err := d.skeleton.Bounds()
211211
if err != nil {
212212
// This is a programming error. The chain backfiller was called with an
213213
// invalid beacon sync state. Ideally we would panic here, but erroring
@@ -272,7 +272,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
272272
// until sync errors or is finished.
273273
func (d *Downloader) fetchBeaconHeaders(from uint64) error {
274274
var head *types.Header
275-
_, tail, err := d.skeleton.Bounds()
275+
_, tail, _, err := d.skeleton.Bounds()
276276
if err != nil {
277277
return err
278278
}
@@ -292,7 +292,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
292292
for {
293293
// Some beacon headers might have appeared since the last cycle, make
294294
// sure we're always syncing to all available ones
295-
head, _, err = d.skeleton.Bounds()
295+
head, _, _, err = d.skeleton.Bounds()
296296
if err != nil {
297297
return err
298298
}

eth/downloader/downloader.go

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
480480
}(time.Now())
481481

482482
// Look up the sync boundaries: the common ancestor and the target block
483-
var latest, pivot *types.Header
483+
var latest, pivot, final *types.Header
484484
if !beaconMode {
485485
// In legacy mode, use the master peer to retrieve the headers from
486486
latest, pivot, err = d.fetchHead(p)
@@ -489,7 +489,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
489489
}
490490
} else {
491491
// In beacon mode, use the skeleton chain to retrieve the headers from
492-
latest, _, err = d.skeleton.Bounds()
492+
latest, _, final, err = d.skeleton.Bounds()
493493
if err != nil {
494494
return err
495495
}
@@ -499,7 +499,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
499499
// Retrieve the pivot header from the skeleton chain segment but
500500
// fallback to local chain if it's not found in skeleton space.
501501
if pivot = d.skeleton.Header(number); pivot == nil {
502-
_, oldest, _ := d.skeleton.Bounds() // error is already checked
502+
_, oldest, _, _ := d.skeleton.Bounds() // error is already checked
503503
if number < oldest.Number.Uint64() {
504504
count := int(oldest.Number.Uint64() - number) // it's capped by fsMinFullBlocks
505505
headers := d.readHeaderRange(oldest, count)
@@ -567,26 +567,41 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
567567
d.committed = 0
568568
}
569569
if mode == SnapSync {
570-
// Set the ancient data limitation.
571-
// If we are running snap sync, all block data older than ancientLimit will be
572-
// written to the ancient store. More recent data will be written to the active
573-
// database and will wait for the freezer to migrate.
570+
// Set the ancient data limitation. If we are running snap sync, all block
571+
// data older than ancientLimit will be written to the ancient store. More
572+
// recent data will be written to the active database and will wait for the
573+
// freezer to migrate.
574574
//
575-
// If there is a checkpoint available, then calculate the ancientLimit through
576-
// that. Otherwise calculate the ancient limit through the advertised height
577-
// of the remote peer.
575+
// If the network is post-merge, use either the last announced finalized
576+
// block as the ancient limit, or if we haven't yet received one, the head-
577+
// a max fork ancestry limit. One quirky case if we've already passed the
578+
// finalized block, in which case the skeleton.Bounds will return nil and
579+
// we'll revert to head - 90K. That's fine, we're finishing sync anyway.
578580
//
579-
// The reason for picking checkpoint first is that a malicious peer can give us
580-
// a fake (very high) height, forcing the ancient limit to also be very high.
581-
// The peer would start to feed us valid blocks until head, resulting in all of
582-
// the blocks might be written into the ancient store. A following mini-reorg
583-
// could cause issues.
584-
if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
585-
d.ancientLimit = d.checkpoint
586-
} else if height > fullMaxForkAncestry+1 {
587-
d.ancientLimit = height - fullMaxForkAncestry - 1
581+
// For non-merged networks, if there is a checkpoint available, then calculate
582+
// the ancientLimit through that. Otherwise calculate the ancient limit through
583+
// the advertised height of the remote peer. This most is mostly a fallback for
584+
// legacy networks, but should eventually be droppped. TODO(karalabe).
585+
if beaconMode {
586+
// Beacon sync, use the latest finalized block as the ancient limit
587+
// or a reasonable height if no finalized block is yet announced.
588+
if final != nil {
589+
d.ancientLimit = final.Number.Uint64()
590+
} else if height > fullMaxForkAncestry+1 {
591+
d.ancientLimit = height - fullMaxForkAncestry - 1
592+
} else {
593+
d.ancientLimit = 0
594+
}
588595
} else {
589-
d.ancientLimit = 0
596+
// Legacy sync, use any hardcoded checkpoints or the best announcement
597+
// we have from the remote peer. TODO(karalabe): Drop this pathway.
598+
if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
599+
d.ancientLimit = d.checkpoint
600+
} else if height > fullMaxForkAncestry+1 {
601+
d.ancientLimit = height - fullMaxForkAncestry - 1
602+
} else {
603+
d.ancientLimit = 0
604+
}
590605
}
591606
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
592607

@@ -1566,7 +1581,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
15661581

15671582
// In post-merge, notify the engine API of encountered bad chains
15681583
if d.badBlock != nil {
1569-
head, _, err := d.skeleton.Bounds()
1584+
head, _, _, err := d.skeleton.Bounds()
15701585
if err != nil {
15711586
log.Error("Failed to retrieve beacon bounds for bad block reporting", "err", err)
15721587
} else {
@@ -1860,7 +1875,7 @@ func (d *Downloader) reportSnapSyncProgress(force bool) {
18601875
return
18611876
}
18621877
// Retrieve the current chain head and calculate the ETA
1863-
latest, _, err := d.skeleton.Bounds()
1878+
latest, _, _, err := d.skeleton.Bounds()
18641879
if err != nil {
18651880
// We're going to cheat for non-merged networks, but that's fine
18661881
latest = d.pivotHeader

eth/downloader/downloader_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1478,7 +1478,7 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) {
14781478
if c.local > 0 {
14791479
tester.chain.InsertChain(chain.blocks[1 : c.local+1])
14801480
}
1481-
if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header()); err != nil {
1481+
if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil {
14821482
t.Fatalf("Failed to beacon sync chain %v %v", c.name, err)
14831483
}
14841484
select {

eth/downloader/skeleton.go

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,15 @@ type subchain struct {
102102
// suspended skeleton sync without prior knowledge of all prior suspension points.
103103
type skeletonProgress struct {
104104
Subchains []*subchain // Disjoint subchains downloaded until now
105+
Finalized *uint64 // Last known finalized block number
105106
}
106107

107108
// headUpdate is a notification that the beacon sync should switch to a new target.
108109
// The update might request whether to forcefully change the target, or only try to
109110
// extend it and fail if it's not possible.
110111
type headUpdate struct {
111112
header *types.Header // Header to update the sync target to
113+
final *types.Header // Finalized header to use as thresholds
112114
force bool // Whether to force the update or only extend if possible
113115
errc chan error // Channel to signal acceptance of the new head
114116
}
@@ -321,12 +323,12 @@ func (s *skeleton) Terminate() error {
321323
//
322324
// This method does not block, rather it just waits until the syncer receives the
323325
// fed header. What the syncer does with it is the syncer's problem.
324-
func (s *skeleton) Sync(head *types.Header, force bool) error {
326+
func (s *skeleton) Sync(head *types.Header, final *types.Header, force bool) error {
325327
log.Trace("New skeleton head announced", "number", head.Number, "hash", head.Hash(), "force", force)
326328
errc := make(chan error)
327329

328330
select {
329-
case s.headEvents <- &headUpdate{header: head, force: force, errc: errc}:
331+
case s.headEvents <- &headUpdate{header: head, final: final, force: force, errc: errc}:
330332
return <-errc
331333
case <-s.terminated:
332334
return errTerminated
@@ -437,7 +439,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
437439
// we don't seamlessly integrate reorgs to keep things simple. If the
438440
// network starts doing many mini reorgs, it might be worthwhile handling
439441
// a limited depth without an error.
440-
if reorged := s.processNewHead(event.header, event.force); reorged {
442+
if reorged := s.processNewHead(event.header, event.final, event.force); reorged {
441443
// If a reorg is needed, and we're forcing the new head, signal
442444
// the syncer to tear down and start over. Otherwise, drop the
443445
// non-force reorg.
@@ -590,7 +592,17 @@ func (s *skeleton) saveSyncStatus(db ethdb.KeyValueWriter) {
590592
// accepts and integrates it into the skeleton or requests a reorg. Upon reorg,
591593
// the syncer will tear itself down and restart with a fresh head. It is simpler
592594
// to reconstruct the sync state than to mutate it and hope for the best.
593-
func (s *skeleton) processNewHead(head *types.Header, force bool) bool {
595+
func (s *skeleton) processNewHead(head *types.Header, final *types.Header, force bool) bool {
596+
// If a new finalized block was announced, update the sync process independent
597+
// of what happens with the sync head below
598+
if final != nil {
599+
if number := final.Number.Uint64(); s.progress.Finalized == nil || *s.progress.Finalized != number {
600+
s.progress.Finalized = new(uint64)
601+
*s.progress.Finalized = final.Number.Uint64()
602+
603+
s.saveSyncStatus(s.db)
604+
}
605+
}
594606
// If the header cannot be inserted without interruption, return an error for
595607
// the outer loop to tear down the skeleton sync and restart it
596608
number := head.Number.Uint64()
@@ -1150,38 +1162,45 @@ func (s *skeleton) cleanStales(filled *types.Header) error {
11501162
return nil
11511163
}
11521164

1153-
// Bounds retrieves the current head and tail tracked by the skeleton syncer.
1154-
// This method is used by the backfiller, whose life cycle is controlled by the
1155-
// skeleton syncer.
1165+
// Bounds retrieves the current head and tail tracked by the skeleton syncer
1166+
// and optionally the last known finalized header if any was announced and if
1167+
// it is still in the sync range. This method is used by the backfiller, whose
1168+
// life cycle is controlled by the skeleton syncer.
11561169
//
11571170
// Note, the method will not use the internal state of the skeleton, but will
11581171
// rather blindly pull stuff from the database. This is fine, because the back-
11591172
// filler will only run when the skeleton chain is fully downloaded and stable.
11601173
// There might be new heads appended, but those are atomic from the perspective
11611174
// of this method. Any head reorg will first tear down the backfiller and only
11621175
// then make the modification.
1163-
func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, err error) {
1176+
func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, final *types.Header, err error) {
11641177
// Read the current sync progress from disk and figure out the current head.
11651178
// Although there's a lot of error handling here, these are mostly as sanity
11661179
// checks to avoid crashing if a programming error happens. These should not
11671180
// happen in live code.
11681181
status := rawdb.ReadSkeletonSyncStatus(s.db)
11691182
if len(status) == 0 {
1170-
return nil, nil, errors.New("beacon sync not yet started")
1183+
return nil, nil, nil, errors.New("beacon sync not yet started")
11711184
}
11721185
progress := new(skeletonProgress)
11731186
if err := json.Unmarshal(status, progress); err != nil {
1174-
return nil, nil, err
1187+
return nil, nil, nil, err
11751188
}
11761189
head = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Head)
11771190
if head == nil {
1178-
return nil, nil, fmt.Errorf("head skeleton header %d is missing", progress.Subchains[0].Head)
1191+
return nil, nil, nil, fmt.Errorf("head skeleton header %d is missing", progress.Subchains[0].Head)
11791192
}
11801193
tail = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Tail)
11811194
if tail == nil {
1182-
return nil, nil, fmt.Errorf("tail skeleton header %d is missing", progress.Subchains[0].Tail)
1195+
return nil, nil, nil, fmt.Errorf("tail skeleton header %d is missing", progress.Subchains[0].Tail)
1196+
}
1197+
if progress.Finalized != nil && tail.Number.Uint64() <= *progress.Finalized && *progress.Finalized <= head.Number.Uint64() {
1198+
final = rawdb.ReadSkeletonHeader(s.db, *progress.Finalized)
1199+
if final == nil {
1200+
return nil, nil, nil, fmt.Errorf("finalized skeleton header %d is missing", *progress.Finalized)
1201+
}
11831202
}
1184-
return head, tail, nil
1203+
return head, tail, final, nil
11851204
}
11861205

11871206
// Header retrieves a specific header tracked by the skeleton syncer. This method

eth/downloader/skeleton_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ func TestSkeletonSyncInit(t *testing.T) {
370370

371371
skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller())
372372
skeleton.syncStarting = func() { close(wait) }
373-
skeleton.Sync(tt.head, true)
373+
skeleton.Sync(tt.head, nil, true)
374374

375375
<-wait
376376
skeleton.Terminate()
@@ -484,10 +484,10 @@ func TestSkeletonSyncExtend(t *testing.T) {
484484

485485
skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller())
486486
skeleton.syncStarting = func() { close(wait) }
487-
skeleton.Sync(tt.head, true)
487+
skeleton.Sync(tt.head, nil, true)
488488

489489
<-wait
490-
if err := skeleton.Sync(tt.extend, false); err != tt.err {
490+
if err := skeleton.Sync(tt.extend, nil, false); err != tt.err {
491491
t.Errorf("test %d: extension failure mismatch: have %v, want %v", i, err, tt.err)
492492
}
493493
skeleton.Terminate()
@@ -859,7 +859,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
859859
}
860860
// Create a skeleton sync and run a cycle
861861
skeleton := newSkeleton(db, peerset, drop, filler)
862-
skeleton.Sync(tt.head, true)
862+
skeleton.Sync(tt.head, nil, true)
863863

864864
var progress skeletonProgress
865865
// Wait a bit (bleah) for the initial sync loop to go to idle. This might
@@ -910,7 +910,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
910910
}
911911
// Apply the post-init events if there's any
912912
if tt.newHead != nil {
913-
skeleton.Sync(tt.newHead, true)
913+
skeleton.Sync(tt.newHead, nil, true)
914914
}
915915
if tt.newPeer != nil {
916916
if err := peerset.Register(newPeerConnection(tt.newPeer.id, eth.ETH66, tt.newPeer, log.New("id", tt.newPeer.id))); err != nil {

0 commit comments

Comments
 (0)