Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 172 additions & 53 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ type Downloader struct {
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks

// for stateFetcher
// State sync
pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
pivotLock sync.RWMutex // Lock protecting pivot header reads from updates

stateSyncStart chan *stateSync
trackStateReq chan *stateReq
stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
Expand Down Expand Up @@ -451,10 +454,17 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
}(time.Now())

// Look up the sync boundaries: the common ancestor and the target block
latest, err := d.fetchHeight(p)
latest, pivot, err := d.fetchHead(p)
if err != nil {
return err
}
if mode == FastSync && pivot == nil {
// If no pivot block was returned, the head is below the min full block
// threshold (i.e. new chian). In that case we won't really fast sync
// anyway, but still need a valid pivot block to avoid some code hitting
// nil panics on an access.
pivot = d.blockchain.CurrentBlock().Header()
}
height := latest.Number.Uint64()

origin, err := d.findAncestor(p, latest)
Expand All @@ -469,22 +479,21 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d.syncStatsLock.Unlock()

// Ensure our origin point is below any fast sync pivot point
pivot := uint64(0)
if mode == FastSync {
if height <= uint64(fsMinFullBlocks) {
origin = 0
} else {
pivot = height - uint64(fsMinFullBlocks)
if pivot <= origin {
origin = pivot - 1
pivotNumber := pivot.Number.Uint64()
if pivotNumber <= origin {
origin = pivotNumber - 1
}
// Write out the pivot into the database so a rollback beyond it will
// reenable fast sync
rawdb.WriteLastPivotNumber(d.stateDB, pivot)
rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)
}
}
d.committed = 1
if mode == FastSync && pivot != 0 {
if mode == FastSync && pivot.Number.Uint64() != 0 {
d.committed = 0
}
if mode == FastSync {
Expand Down Expand Up @@ -530,13 +539,17 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d.syncInitHook(origin, height)
}
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin+1, pivot, td) },
func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin+1, td) },
}
if mode == FastSync {
fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
d.pivotLock.Lock()
d.pivotHeader = pivot
d.pivotLock.Unlock()

fetchers = append(fetchers, func() error { return d.processFastSyncContent() })
} else if mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
}
Expand Down Expand Up @@ -617,46 +630,63 @@ func (d *Downloader) Terminate() {
d.Cancel()
}

// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
p.log.Debug("Retrieving remote chain height")
// fetchHead retrieves the head header and prior pivot block (if available) from
// a remote peer.
func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *types.Header, err error) {
p.log.Debug("Retrieving remote chain head")
mode := d.getMode()

// Request the advertised remote head block and wait for the response
head, _ := p.peer.Head()
go p.peer.RequestHeadersByHash(head, 1, 0, false)
latest, _ := p.peer.Head()
fetch := 1
if mode == FastSync {
fetch = 2 // head + pivot headers
}
go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true)

ttl := d.requestTTL()
timeout := time.After(ttl)
mode := d.getMode()
for {
select {
case <-d.cancelCh:
return nil, errCanceled
return nil, nil, errCanceled

case packet := <-d.headerCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
// Make sure the peer gave us at least one and at most the requested headers
headers := packet.(*headerPack).headers
if len(headers) != 1 {
p.log.Warn("Multiple headers for single request", "headers", len(headers))
return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
if len(headers) == 0 || len(headers) > fetch {
return nil, nil, fmt.Errorf("%w: returned headers %d != requested %d", errBadPeer, len(headers), fetch)
}
// The first header needs to be the head, validate against the checkpoint
// and request. If only 1 header was returned, make sure there's no pivot
// or there was not one requested.
head := headers[0]
if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
p.log.Warn("Remote head below checkpoint", "number", head.Number, "hash", head.Hash())
return nil, errUnsyncedPeer
return nil, nil, fmt.Errorf("%w: remote head %d below checkpoint %d", errUnsyncedPeer, head.Number, d.checkpoint)
}
if len(headers) == 1 {
if mode == FastSync && head.Number.Uint64() > uint64(fsMinFullBlocks) {
return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer)
}
p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", head.Hash())
return head, nil, nil
}
// At this point we have 2 headers in total and the first is the
// validated head of the chian. Check the pivot number and return,
pivot := headers[1]
if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) {
return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks))
}
p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
return head, nil
return head, pivot, nil

case <-timeout:
p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
return nil, errTimeout
return nil, nil, errTimeout

case <-d.bodyCh:
case <-d.receiptCh:
Expand Down Expand Up @@ -871,14 +901,14 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
case <-d.cancelCh:
return 0, errCanceled

case packer := <-d.headerCh:
case packet := <-d.headerCh:
// Discard anything not from the origin peer
if packer.PeerId() != p.id {
log.Debug("Received headers from incorrect peer", "peer", packer.PeerId())
if packet.PeerId() != p.id {
log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
headers := packer.(*headerPack).headers
headers := packet.(*headerPack).headers
if len(headers) != 1 {
p.log.Warn("Multiple headers for single request", "headers", len(headers))
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
Expand Down Expand Up @@ -937,12 +967,13 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
// other peers are only accepted if they map cleanly to the skeleton. If no one
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error {
func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
p.log.Debug("Directing header downloads", "origin", from)
defer p.log.Debug("Header download terminated")

// Create a timeout timer, and the associated header fetcher
skeleton := true // Skeleton assembly phase or finishing up
pivoting := false // Whether the next request is pivot verification
request := time.Now() // time of the last skeleton fetch request
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
<-timeout.C // timeout channel should be initially empty
Expand All @@ -963,6 +994,20 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
}
}
getNextPivot := func() {
pivoting = true
request = time.Now()

ttl = d.requestTTL()
timeout.Reset(ttl)

d.pivotLock.RLock()
pivot := d.pivotHeader.Number.Uint64()
d.pivotLock.RUnlock()

p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks))
go p.peer.RequestHeadersByNumber(pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep
}
// Start pulling the header chain skeleton until all is done
ancestor := from
getHeaders(from)
Expand All @@ -982,8 +1027,46 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
headerReqTimer.UpdateSince(request)
timeout.Stop()

// If the pivot is being checked, move if it became stale and run the real retrieval
var pivot uint64

d.pivotLock.RLock()
if d.pivotHeader != nil {
pivot = d.pivotHeader.Number.Uint64()
}
d.pivotLock.RUnlock()

if pivoting {
if packet.Items() == 2 {
// Retrieve the headers and do some sanity checks, just in case
headers := packet.(*headerPack).headers

if have, want := headers[0].Number.Uint64(), pivot+uint64(fsMinFullBlocks); have != want {
log.Warn("Peer sent invalid next pivot", "have", have, "want", want)
return fmt.Errorf("%w: next pivot number %d != requested %d", errInvalidChain, have, want)
}
if have, want := headers[1].Number.Uint64(), pivot+2*uint64(fsMinFullBlocks)-8; have != want {
log.Warn("Peer sent invalid pivot confirmer", "have", have, "want", want)
return fmt.Errorf("%w: next pivot confirmer number %d != requested %d", errInvalidChain, have, want)
}
log.Warn("Pivot seemingly stale, moving", "old", pivot, "new", headers[0].Number)
pivot = headers[0].Number.Uint64()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, any two headers received will cause us to do WriteLastPivotNumber to db. I'm thinking this might lead to some internal assumptions / corruption failing if, for example, a peer moves the pivot back to genesis, or block 5...?
If we become moved back to genesis, then we're done ..?

So shouldn't we also check that the returned headers match the numbers we requested?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah. Problem is that checking the number is not particularly useful. I could just give you the correct number and junk root hash. We could add at least PoW verification (beside number), but that will not work on Clique networks (acceptable?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked the header numbers in this PR for now. PoW checks would need to push the consensus engine into the downloader too.


d.pivotLock.Lock()
d.pivotHeader = headers[0]
d.pivotLock.Unlock()

// Write out the pivot into the database so a rollback beyond
// it will reenable fast sync and update the state root that
// the state syncer will be downloading.
rawdb.WriteLastPivotNumber(d.stateDB, pivot)
}
pivoting = false
getHeaders(from)
continue
}
// If the skeleton's finished, pull any remaining head headers directly from the origin
if packet.Items() == 0 && skeleton {
if skeleton && packet.Items() == 0 {
skeleton = false
getHeaders(from)
continue
Expand Down Expand Up @@ -1061,7 +1144,14 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
return errCanceled
}
from += uint64(len(headers))
getHeaders(from)

// If we're still skeleton filling fast sync, check pivot staleness
// before continuing to the next skeleton filling
if skeleton && pivot > 0 {
getNextPivot()
} else {
getHeaders(from)
}
} else {
// No headers delivered, or all of them being delayed, sleep a bit and retry
p.log.Trace("All headers delayed, waiting")
Expand Down Expand Up @@ -1390,7 +1480,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back
var (
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
Expand Down Expand Up @@ -1493,6 +1583,14 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// In case of header only syncing, validate the chunk immediately
if mode == FastSync || mode == LightSync {
// If we're importing pure headers, verify based on their recentness
var pivot uint64

d.pivotLock.RLock()
if d.pivotHeader != nil {
pivot = d.pivotHeader.Number.Uint64()
}
d.pivotLock.RUnlock()

frequency := fsHeaderCheckFrequency
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1
Expand Down Expand Up @@ -1609,10 +1707,13 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {

// processFastSyncContent takes fetch results from the queue and writes them to the
// database. It also controls the synchronisation of state nodes of the pivot block.
func (d *Downloader) processFastSyncContent(latest *types.Header) error {
func (d *Downloader) processFastSyncContent() error {
// Start syncing state of the reported head block. This should get us most of
// the state of the pivot block.
sync := d.syncState(latest.Root)
d.pivotLock.RLock()
sync := d.syncState(d.pivotHeader.Root)
d.pivotLock.RUnlock()

defer func() {
// The `sync` object is replaced every time the pivot moves. We need to
// defer close the very last active one, hence the lazy evaluation vs.
Expand All @@ -1627,12 +1728,6 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
}
go closeOnErr(sync)

// Figure out the ideal pivot block. Note, that this goalpost may move if the
// sync takes long enough for the chain head to move significantly.
pivot := uint64(0)
if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {
pivot = height - uint64(fsMinFullBlocks)
}
// To cater for moving pivot points, track the pivot block and subsequently
// accumulated download results separately.
var (
Expand All @@ -1659,22 +1754,46 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
if oldPivot != nil {
// If we haven't downloaded the pivot block yet, check pivot staleness
// notifications from the header downloader
d.pivotLock.RLock()
pivot := d.pivotHeader
d.pivotLock.RUnlock()

if oldPivot == nil {
if pivot.Root != sync.root {
sync.Cancel()
sync = d.syncState(pivot.Root)

go closeOnErr(sync)
}
} else {
results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
}
// Split around the pivot block and process the two sides via fast/full sync
if atomic.LoadInt32(&d.committed) == 0 {
latest = results[len(results)-1].Header
if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) {
log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks))
pivot = height - uint64(fsMinFullBlocks)
latest := results[len(results)-1].Header
// If the height is above the pivot block by 2 sets, it means the pivot
// become stale in the network and it was garbage collected, move to a
// new pivot.
//
// Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those
// need to be taken into account, otherwise we're detecting the pivot move
// late and will drop peers due to unavailable state!!!
if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) {
log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-uint64(fsMinFullBlocks)+uint64(reorgProtHeaderDelay))
pivot = results[len(results)-1-fsMinFullBlocks+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted

d.pivotLock.Lock()
d.pivotHeader = pivot
d.pivotLock.Unlock()

// Write out the pivot into the database so a rollback beyond it will
// reenable fast sync
rawdb.WriteLastPivotNumber(d.stateDB, pivot)
rawdb.WriteLastPivotNumber(d.stateDB, pivot.Number.Uint64())
}
}
P, beforeP, afterP := splitAroundPivot(pivot, results)
P, beforeP, afterP := splitAroundPivot(pivot.Number.Uint64(), results)
if err := d.commitFastSyncData(beforeP, sync); err != nil {
return err
}
Expand Down
Loading