From 5e35a610c378736e5a541470768e76d7c6f56c71 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Fri, 25 Apr 2025 10:54:12 +0800 Subject: [PATCH] core/filtermaps: fix deadlock in filtermap callback --- core/filtermaps/filtermaps.go | 50 ++++++++++++++++++++---------- core/filtermaps/indexer.go | 6 +++- core/filtermaps/matcher_backend.go | 18 +++++++---- 3 files changed, 50 insertions(+), 24 deletions(-) diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index 920167ca8d59..3da7f4b721b6 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -85,11 +85,17 @@ type FilterMaps struct { // fields written by the indexer and read by matcher backend. Indexer can // read them without a lock and write them under indexLock write lock. // Matcher backend can read them under indexLock read lock. - indexLock sync.RWMutex - indexedRange filterMapsRange - cleanedEpochsBefore uint32 // all unindexed data cleaned before this point - indexedView *ChainView // always consistent with the log index - hasTempRange bool + indexLock sync.RWMutex + indexedRange filterMapsRange + indexedView *ChainView // always consistent with the log index + hasTempRange bool + + // cleanedEpochsBefore indicates that all unindexed data before this point + // has been cleaned. + // + // This field is only accessed and modified within tryUnindexTail, so no + // explicit locking is required. + cleanedEpochsBefore uint32 // also accessed by indexer and matcher backend but no locking needed. filterMapCache *lru.Cache[uint32, filterMap] @@ -248,15 +254,16 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f }, // deleting last unindexed epoch might have been interrupted by shutdown cleanedEpochsBefore: max(rs.MapsFirst>>params.logMapsPerEpoch, 1) - 1, - historyCutoff: historyCutoff, - finalBlock: finalBlock, - matcherSyncCh: make(chan *FilterMapsMatcherBackend), - matchers: make(map[*FilterMapsMatcherBackend]struct{}), - filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps), - lastBlockCache: lru.NewCache[uint32, lastBlockOfMap](cachedLastBlocks), - lvPointerCache: lru.NewCache[uint64, uint64](cachedLvPointers), - baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows), - renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots), + + historyCutoff: historyCutoff, + finalBlock: finalBlock, + matcherSyncCh: make(chan *FilterMapsMatcherBackend), + matchers: make(map[*FilterMapsMatcherBackend]struct{}), + filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps), + lastBlockCache: lru.NewCache[uint32, lastBlockOfMap](cachedLastBlocks), + lvPointerCache: lru.NewCache[uint64, uint64](cachedLvPointers), + baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows), + renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots), } // Set initial indexer target. @@ -444,6 +451,7 @@ func (f *FilterMaps) safeDeleteWithLogs(deleteFn func(db ethdb.KeyValueStore, ha // setRange updates the indexed chain view and covered range and also adds the // changes to the given batch. +// // Note that this function assumes that the index write lock is being held. func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newView *ChainView, newRange filterMapsRange, isTempRange bool) { f.indexedView = newView @@ -477,6 +485,7 @@ func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newView *ChainView, ne // Note that this function assumes that the log index structure is consistent // with the canonical chain at the point where the given log value index points. // If this is not the case then an invalid result or an error may be returned. +// // Note that this function assumes that the indexer read lock is being held when // called from outside the indexerLoop goroutine. func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { @@ -655,6 +664,7 @@ func (f *FilterMaps) mapRowIndex(mapIndex, rowIndex uint32) uint64 { // getBlockLvPointer returns the starting log value index where the log values // generated by the given block are located. If blockNumber is beyond the current // head then the first unoccupied log value index is returned. +// // Note that this function assumes that the indexer read lock is being held when // called from outside the indexerLoop goroutine. func (f *FilterMaps) getBlockLvPointer(blockNumber uint64) (uint64, error) { @@ -762,7 +772,7 @@ func (f *FilterMaps) deleteTailEpoch(epoch uint32) (bool, error) { return false, errors.New("invalid tail epoch number") } // remove index data - if err := f.safeDeleteWithLogs(func(db ethdb.KeyValueStore, hashScheme bool, stopCb func(bool) bool) error { + deleteFn := func(db ethdb.KeyValueStore, hashScheme bool, stopCb func(bool) bool) error { first := f.mapRowIndex(firstMap, 0) count := f.mapRowIndex(firstMap+f.mapsPerEpoch, 0) - first if err := rawdb.DeleteFilterMapRows(f.db, common.NewRange(first, count), hashScheme, stopCb); err != nil { @@ -786,10 +796,13 @@ func (f *FilterMaps) deleteTailEpoch(epoch uint32) (bool, error) { f.lvPointerCache.Remove(blockNumber) } return nil - }, fmt.Sprintf("Deleting tail epoch #%d", epoch), func() bool { + } + action := fmt.Sprintf("Deleting tail epoch #%d", epoch) + stopFn := func() bool { f.processEvents() return f.stop || !f.targetHeadIndexed() - }); err == nil { + } + if err := f.safeDeleteWithLogs(deleteFn, action, stopFn); err == nil { // everything removed; mark as cleaned and report success if f.cleanedEpochsBefore == epoch { f.cleanedEpochsBefore = epoch + 1 @@ -808,6 +821,9 @@ func (f *FilterMaps) deleteTailEpoch(epoch uint32) (bool, error) { } // exportCheckpoints exports epoch checkpoints in the format used by checkpoints.go. +// +// Note: acquiring the indexLock read lock is unnecessary here, as this function +// is always called within the indexLoop. func (f *FilterMaps) exportCheckpoints() { finalLvPtr, err := f.getBlockLvPointer(f.finalBlock + 1) if err != nil { diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 787197345a09..3ec49ca11665 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -43,6 +43,8 @@ func (f *FilterMaps) indexerLoop() { log.Info("Started log indexer") for !f.stop { + // Note: acquiring the indexLock read lock is unnecessary here, + // as the `indexedRange` is accessed within the indexerLoop. if !f.indexedRange.initialized { if f.targetView.HeadNumber() == 0 { // initialize when chain head is available @@ -105,7 +107,7 @@ type targetUpdate struct { historyCutoff, finalBlock uint64 } -// SetTargetView sets a new target chain view for the indexer to render. +// SetTarget sets a new target chain view for the indexer to render. // Note that SetTargetView never blocks. func (f *FilterMaps) SetTarget(targetView *ChainView, historyCutoff, finalBlock uint64) { if targetView == nil { @@ -178,6 +180,8 @@ func (f *FilterMaps) processSingleEvent(blocking bool) bool { if f.stop { return false } + // Note: acquiring the indexLock read lock is unnecessary here, + // as this function is always called within the indexLoop. if !f.hasTempRange { for _, mb := range f.matcherSyncRequests { mb.synced() diff --git a/core/filtermaps/matcher_backend.go b/core/filtermaps/matcher_backend.go index ee18a0694c43..9751783754f7 100644 --- a/core/filtermaps/matcher_backend.go +++ b/core/filtermaps/matcher_backend.go @@ -111,17 +111,17 @@ func (fm *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex // synced signals to the matcher that has triggered a synchronisation that it // has been finished and the log index is consistent with the chain head passed // as a parameter. +// // Note that if the log index head was far behind the chain head then it might not // be synced up to the given head in a single step. Still, the latest chain head // should be passed as a parameter and the existing log index should be consistent // with that chain. +// +// Note: acquiring the indexLock read lock is unnecessary here, as this function +// is always called within the indexLoop. func (fm *FilterMapsMatcherBackend) synced() { - fm.f.indexLock.RLock() fm.f.matchersLock.Lock() - defer func() { - fm.f.matchersLock.Unlock() - fm.f.indexLock.RUnlock() - }() + defer fm.f.matchersLock.Unlock() indexedBlocks := fm.f.indexedRange.blocks if !fm.f.indexedRange.headIndexed && !indexedBlocks.IsEmpty() { @@ -154,6 +154,8 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange case <-ctx.Done(): return SyncRange{}, ctx.Err() case <-fm.f.disabledCh: + // Note: acquiring the indexLock read lock is unnecessary here, + // as the indexer has already been terminated. return SyncRange{IndexedView: fm.f.indexedView}, nil } select { @@ -162,6 +164,8 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange case <-ctx.Done(): return SyncRange{}, ctx.Err() case <-fm.f.disabledCh: + // Note: acquiring the indexLock read lock is unnecessary here, + // as the indexer has already been terminated. return SyncRange{IndexedView: fm.f.indexedView}, nil } } @@ -170,7 +174,9 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange // valid range with the current indexed range. This function should be called // whenever a part of the log index has been removed, before adding new blocks // to it. -// Note that this function assumes that the index read lock is being held. +// +// Note: acquiring the indexLock read lock is unnecessary here, as this function +// is always called within the indexLoop. func (f *FilterMaps) updateMatchersValidRange() { f.matchersLock.Lock() defer f.matchersLock.Unlock()