-
Notifications
You must be signed in to change notification settings - Fork 21.5k
eth/downloader: dynamically move pivot even during chain sync #21529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -138,7 +138,10 @@ type Downloader struct { | |
| receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks | ||
| headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks | ||
|
|
||
| // for stateFetcher | ||
| // State sync | ||
| pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root | ||
| pivotLock sync.RWMutex // Lock protecting pivot header reads from updates | ||
|
|
||
| stateSyncStart chan *stateSync | ||
| trackStateReq chan *stateReq | ||
| stateCh chan dataPack // [eth/63] Channel receiving inbound node state data | ||
|
|
@@ -451,10 +454,17 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I | |
| }(time.Now()) | ||
|
|
||
| // Look up the sync boundaries: the common ancestor and the target block | ||
| latest, err := d.fetchHeight(p) | ||
| latest, pivot, err := d.fetchHead(p) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if mode == FastSync && pivot == nil { | ||
| // If no pivot block was returned, the head is below the min full block | ||
| // threshold (i.e. new chian). In that case we won't really fast sync | ||
| // anyway, but still need a valid pivot block to avoid some code hitting | ||
| // nil panics on an access. | ||
| pivot = d.blockchain.CurrentBlock().Header() | ||
| } | ||
| height := latest.Number.Uint64() | ||
|
|
||
| origin, err := d.findAncestor(p, latest) | ||
|
|
@@ -469,22 +479,21 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I | |
| d.syncStatsLock.Unlock() | ||
|
|
||
| // Ensure our origin point is below any fast sync pivot point | ||
| pivot := uint64(0) | ||
| if mode == FastSync { | ||
| if height <= uint64(fsMinFullBlocks) { | ||
| origin = 0 | ||
| } else { | ||
| pivot = height - uint64(fsMinFullBlocks) | ||
| if pivot <= origin { | ||
| origin = pivot - 1 | ||
| pivotNumber := pivot.Number.Uint64() | ||
| if pivotNumber <= origin { | ||
| origin = pivotNumber - 1 | ||
| } | ||
| // Write out the pivot into the database so a rollback beyond it will | ||
| // reenable fast sync | ||
| rawdb.WriteLastPivotNumber(d.stateDB, pivot) | ||
| rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber) | ||
| } | ||
| } | ||
| d.committed = 1 | ||
| if mode == FastSync && pivot != 0 { | ||
| if mode == FastSync && pivot.Number.Uint64() != 0 { | ||
| d.committed = 0 | ||
| } | ||
| if mode == FastSync { | ||
|
|
@@ -530,13 +539,17 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I | |
| d.syncInitHook(origin, height) | ||
| } | ||
| fetchers := []func() error{ | ||
| func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved | ||
| func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync | ||
| func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync | ||
| func() error { return d.processHeaders(origin+1, pivot, td) }, | ||
| func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved | ||
| func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync | ||
| func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync | ||
| func() error { return d.processHeaders(origin+1, td) }, | ||
| } | ||
| if mode == FastSync { | ||
| fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) | ||
| d.pivotLock.Lock() | ||
| d.pivotHeader = pivot | ||
| d.pivotLock.Unlock() | ||
|
|
||
| fetchers = append(fetchers, func() error { return d.processFastSyncContent() }) | ||
| } else if mode == FullSync { | ||
| fetchers = append(fetchers, d.processFullSyncContent) | ||
| } | ||
|
|
@@ -617,46 +630,63 @@ func (d *Downloader) Terminate() { | |
| d.Cancel() | ||
| } | ||
|
|
||
| // fetchHeight retrieves the head header of the remote peer to aid in estimating | ||
| // the total time a pending synchronisation would take. | ||
| func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { | ||
| p.log.Debug("Retrieving remote chain height") | ||
| // fetchHead retrieves the head header and prior pivot block (if available) from | ||
| // a remote peer. | ||
| func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *types.Header, err error) { | ||
| p.log.Debug("Retrieving remote chain head") | ||
| mode := d.getMode() | ||
|
|
||
| // Request the advertised remote head block and wait for the response | ||
| head, _ := p.peer.Head() | ||
| go p.peer.RequestHeadersByHash(head, 1, 0, false) | ||
| latest, _ := p.peer.Head() | ||
| fetch := 1 | ||
| if mode == FastSync { | ||
| fetch = 2 // head + pivot headers | ||
| } | ||
| go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true) | ||
|
|
||
| ttl := d.requestTTL() | ||
| timeout := time.After(ttl) | ||
| mode := d.getMode() | ||
| for { | ||
| select { | ||
| case <-d.cancelCh: | ||
| return nil, errCanceled | ||
| return nil, nil, errCanceled | ||
|
|
||
| case packet := <-d.headerCh: | ||
| // Discard anything not from the origin peer | ||
| if packet.PeerId() != p.id { | ||
| log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) | ||
| break | ||
| } | ||
| // Make sure the peer actually gave something valid | ||
| // Make sure the peer gave us at least one and at most the requested headers | ||
| headers := packet.(*headerPack).headers | ||
| if len(headers) != 1 { | ||
| p.log.Warn("Multiple headers for single request", "headers", len(headers)) | ||
| return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers)) | ||
| if len(headers) == 0 || len(headers) > fetch { | ||
| return nil, nil, fmt.Errorf("%w: returned headers %d != requested %d", errBadPeer, len(headers), fetch) | ||
| } | ||
| // The first header needs to be the head, validate against the checkpoint | ||
| // and request. If only 1 header was returned, make sure there's no pivot | ||
| // or there was not one requested. | ||
| head := headers[0] | ||
| if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint { | ||
| p.log.Warn("Remote head below checkpoint", "number", head.Number, "hash", head.Hash()) | ||
| return nil, errUnsyncedPeer | ||
| return nil, nil, fmt.Errorf("%w: remote head %d below checkpoint %d", errUnsyncedPeer, head.Number, d.checkpoint) | ||
| } | ||
| if len(headers) == 1 { | ||
| if mode == FastSync && head.Number.Uint64() > uint64(fsMinFullBlocks) { | ||
| return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer) | ||
| } | ||
| p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", head.Hash()) | ||
| return head, nil, nil | ||
| } | ||
| // At this point we have 2 headers in total and the first is the | ||
| // validated head of the chian. Check the pivot number and return, | ||
| pivot := headers[1] | ||
| if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) { | ||
| return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks)) | ||
| } | ||
| p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash()) | ||
| return head, nil | ||
| return head, pivot, nil | ||
|
|
||
| case <-timeout: | ||
| p.log.Debug("Waiting for head header timed out", "elapsed", ttl) | ||
| return nil, errTimeout | ||
| return nil, nil, errTimeout | ||
|
|
||
| case <-d.bodyCh: | ||
| case <-d.receiptCh: | ||
|
|
@@ -871,14 +901,14 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) | |
| case <-d.cancelCh: | ||
| return 0, errCanceled | ||
|
|
||
| case packer := <-d.headerCh: | ||
| case packet := <-d.headerCh: | ||
| // Discard anything not from the origin peer | ||
| if packer.PeerId() != p.id { | ||
| log.Debug("Received headers from incorrect peer", "peer", packer.PeerId()) | ||
| if packet.PeerId() != p.id { | ||
| log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) | ||
| break | ||
| } | ||
| // Make sure the peer actually gave something valid | ||
| headers := packer.(*headerPack).headers | ||
| headers := packet.(*headerPack).headers | ||
| if len(headers) != 1 { | ||
| p.log.Warn("Multiple headers for single request", "headers", len(headers)) | ||
| return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers)) | ||
|
|
@@ -937,12 +967,13 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) | |
| // other peers are only accepted if they map cleanly to the skeleton. If no one | ||
| // can fill in the skeleton - not even the origin peer - it's assumed invalid and | ||
| // the origin is dropped. | ||
| func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error { | ||
| func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error { | ||
| p.log.Debug("Directing header downloads", "origin", from) | ||
| defer p.log.Debug("Header download terminated") | ||
|
|
||
| // Create a timeout timer, and the associated header fetcher | ||
| skeleton := true // Skeleton assembly phase or finishing up | ||
| pivoting := false // Whether the next request is pivot verification | ||
| request := time.Now() // time of the last skeleton fetch request | ||
| timeout := time.NewTimer(0) // timer to dump a non-responsive active peer | ||
| <-timeout.C // timeout channel should be initially empty | ||
|
|
@@ -963,6 +994,20 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) | |
| go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false) | ||
| } | ||
| } | ||
| getNextPivot := func() { | ||
| pivoting = true | ||
| request = time.Now() | ||
|
|
||
| ttl = d.requestTTL() | ||
| timeout.Reset(ttl) | ||
|
|
||
| d.pivotLock.RLock() | ||
| pivot := d.pivotHeader.Number.Uint64() | ||
| d.pivotLock.RUnlock() | ||
|
|
||
| p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks)) | ||
| go p.peer.RequestHeadersByNumber(pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep | ||
| } | ||
| // Start pulling the header chain skeleton until all is done | ||
| ancestor := from | ||
| getHeaders(from) | ||
|
|
@@ -982,8 +1027,46 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) | |
| headerReqTimer.UpdateSince(request) | ||
| timeout.Stop() | ||
|
|
||
| // If the pivot is being checked, move if it became stale and run the real retrieval | ||
| var pivot uint64 | ||
|
|
||
| d.pivotLock.RLock() | ||
| if d.pivotHeader != nil { | ||
| pivot = d.pivotHeader.Number.Uint64() | ||
| } | ||
| d.pivotLock.RUnlock() | ||
|
|
||
| if pivoting { | ||
| if packet.Items() == 2 { | ||
| // Retrieve the headers and do some sanity checks, just in case | ||
| headers := packet.(*headerPack).headers | ||
|
|
||
| if have, want := headers[0].Number.Uint64(), pivot+uint64(fsMinFullBlocks); have != want { | ||
| log.Warn("Peer sent invalid next pivot", "have", have, "want", want) | ||
| return fmt.Errorf("%w: next pivot number %d != requested %d", errInvalidChain, have, want) | ||
| } | ||
| if have, want := headers[1].Number.Uint64(), pivot+2*uint64(fsMinFullBlocks)-8; have != want { | ||
| log.Warn("Peer sent invalid pivot confirmer", "have", have, "want", want) | ||
| return fmt.Errorf("%w: next pivot confirmer number %d != requested %d", errInvalidChain, have, want) | ||
| } | ||
| log.Warn("Pivot seemingly stale, moving", "old", pivot, "new", headers[0].Number) | ||
| pivot = headers[0].Number.Uint64() | ||
|
||
|
|
||
| d.pivotLock.Lock() | ||
| d.pivotHeader = headers[0] | ||
| d.pivotLock.Unlock() | ||
|
|
||
| // Write out the pivot into the database so a rollback beyond | ||
| // it will reenable fast sync and update the state root that | ||
| // the state syncer will be downloading. | ||
| rawdb.WriteLastPivotNumber(d.stateDB, pivot) | ||
| } | ||
| pivoting = false | ||
| getHeaders(from) | ||
| continue | ||
| } | ||
| // If the skeleton's finished, pull any remaining head headers directly from the origin | ||
| if packet.Items() == 0 && skeleton { | ||
| if skeleton && packet.Items() == 0 { | ||
| skeleton = false | ||
| getHeaders(from) | ||
| continue | ||
|
|
@@ -1061,7 +1144,14 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) | |
| return errCanceled | ||
| } | ||
| from += uint64(len(headers)) | ||
| getHeaders(from) | ||
|
|
||
| // If we're still skeleton filling fast sync, check pivot staleness | ||
| // before continuing to the next skeleton filling | ||
| if skeleton && pivot > 0 { | ||
| getNextPivot() | ||
| } else { | ||
| getHeaders(from) | ||
| } | ||
| } else { | ||
| // No headers delivered, or all of them being delayed, sleep a bit and retry | ||
| p.log.Trace("All headers delayed, waiting") | ||
|
|
@@ -1390,7 +1480,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) | |
| // processHeaders takes batches of retrieved headers from an input channel and | ||
| // keeps processing and scheduling them into the header chain and downloader's | ||
| // queue until the stream ends or a failure occurs. | ||
| func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { | ||
| func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { | ||
| // Keep a count of uncertain headers to roll back | ||
| var ( | ||
| rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis) | ||
|
|
@@ -1493,6 +1583,14 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er | |
| // In case of header only syncing, validate the chunk immediately | ||
| if mode == FastSync || mode == LightSync { | ||
| // If we're importing pure headers, verify based on their recentness | ||
| var pivot uint64 | ||
|
|
||
| d.pivotLock.RLock() | ||
| if d.pivotHeader != nil { | ||
| pivot = d.pivotHeader.Number.Uint64() | ||
| } | ||
| d.pivotLock.RUnlock() | ||
|
|
||
| frequency := fsHeaderCheckFrequency | ||
| if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { | ||
| frequency = 1 | ||
|
|
@@ -1609,10 +1707,13 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { | |
|
|
||
| // processFastSyncContent takes fetch results from the queue and writes them to the | ||
| // database. It also controls the synchronisation of state nodes of the pivot block. | ||
| func (d *Downloader) processFastSyncContent(latest *types.Header) error { | ||
| func (d *Downloader) processFastSyncContent() error { | ||
| // Start syncing state of the reported head block. This should get us most of | ||
| // the state of the pivot block. | ||
| sync := d.syncState(latest.Root) | ||
| d.pivotLock.RLock() | ||
| sync := d.syncState(d.pivotHeader.Root) | ||
| d.pivotLock.RUnlock() | ||
|
|
||
| defer func() { | ||
| // The `sync` object is replaced every time the pivot moves. We need to | ||
| // defer close the very last active one, hence the lazy evaluation vs. | ||
|
|
@@ -1627,12 +1728,6 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { | |
| } | ||
| go closeOnErr(sync) | ||
|
|
||
| // Figure out the ideal pivot block. Note, that this goalpost may move if the | ||
| // sync takes long enough for the chain head to move significantly. | ||
| pivot := uint64(0) | ||
| if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) { | ||
| pivot = height - uint64(fsMinFullBlocks) | ||
| } | ||
| // To cater for moving pivot points, track the pivot block and subsequently | ||
| // accumulated download results separately. | ||
| var ( | ||
|
|
@@ -1659,22 +1754,46 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { | |
| if d.chainInsertHook != nil { | ||
| d.chainInsertHook(results) | ||
| } | ||
| if oldPivot != nil { | ||
| // If we haven't downloaded the pivot block yet, check pivot staleness | ||
| // notifications from the header downloader | ||
| d.pivotLock.RLock() | ||
| pivot := d.pivotHeader | ||
| d.pivotLock.RUnlock() | ||
|
|
||
| if oldPivot == nil { | ||
| if pivot.Root != sync.root { | ||
| sync.Cancel() | ||
| sync = d.syncState(pivot.Root) | ||
|
|
||
| go closeOnErr(sync) | ||
| } | ||
| } else { | ||
| results = append(append([]*fetchResult{oldPivot}, oldTail...), results...) | ||
| } | ||
| // Split around the pivot block and process the two sides via fast/full sync | ||
| if atomic.LoadInt32(&d.committed) == 0 { | ||
| latest = results[len(results)-1].Header | ||
| if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) { | ||
| log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks)) | ||
| pivot = height - uint64(fsMinFullBlocks) | ||
| latest := results[len(results)-1].Header | ||
| // If the height is above the pivot block by 2 sets, it means the pivot | ||
| // become stale in the network and it was garbage collected, move to a | ||
| // new pivot. | ||
| // | ||
| // Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those | ||
| // need to be taken into account, otherwise we're detecting the pivot move | ||
| // late and will drop peers due to unavailable state!!! | ||
| if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) { | ||
| log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-uint64(fsMinFullBlocks)+uint64(reorgProtHeaderDelay)) | ||
| pivot = results[len(results)-1-fsMinFullBlocks+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted | ||
|
|
||
| d.pivotLock.Lock() | ||
| d.pivotHeader = pivot | ||
| d.pivotLock.Unlock() | ||
|
|
||
| // Write out the pivot into the database so a rollback beyond it will | ||
| // reenable fast sync | ||
| rawdb.WriteLastPivotNumber(d.stateDB, pivot) | ||
| rawdb.WriteLastPivotNumber(d.stateDB, pivot.Number.Uint64()) | ||
| } | ||
| } | ||
| P, beforeP, afterP := splitAroundPivot(pivot, results) | ||
| P, beforeP, afterP := splitAroundPivot(pivot.Number.Uint64(), results) | ||
| if err := d.commitFastSyncData(beforeP, sync); err != nil { | ||
| return err | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.