Skip to content

Commit a01cb47

Browse files
rjl493456442cp-wjhan
authored andcommitted
core/rawdb: untie freezer and ancient chain data (ethereum#24684)
Previously freezer has only been used for storing ancient chain data, while obviously it can be used more. This PR unties the chain data and freezer, keep the minimal freezer structure and move all other logic (like incrementally freezing block data) into a separate structure called ChainFreezer. This PR also extends the database interface by adding a new ancient store function AncientDatadir which can return the root directory of ancient store. The ancient root directory can be used when we want to open some other ancient-stores (e.g. reverse diff freezer).
1 parent 2eb47a4 commit a01cb47

File tree

9 files changed

+401
-310
lines changed

9 files changed

+401
-310
lines changed

cmd/geth/dbcmd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ func inspect(ctx *cli.Context) error {
289289
return rawdb.InspectDatabase(db, prefix, start)
290290
}
291291

292-
func showLeveldbStats(db ethdb.Stater) {
292+
func showLeveldbStats(db ethdb.KeyValueStater) {
293293
if stats, err := db.Stat("leveldb.stats"); err != nil {
294294
log.Warn("Failed to read database stats", "error", err)
295295
} else {

core/rawdb/accessors_chain.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import (
3636
// ReadCanonicalHash retrieves the hash assigned to a canonical block number.
3737
func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash {
3838
var data []byte
39-
db.ReadAncients(func(reader ethdb.AncientReader) error {
39+
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
4040
data, _ = reader.Ancient(freezerHashTable, number)
4141
if len(data) == 0 {
4242
// Get it by hash from leveldb
@@ -332,7 +332,7 @@ func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValu
332332
// ReadHeaderRLP retrieves a block header in its raw RLP database encoding.
333333
func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
334334
var data []byte
335-
db.ReadAncients(func(reader ethdb.AncientReader) error {
335+
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
336336
// First try to look up the data in ancient database. Extra hash
337337
// comparison is necessary since ancient database only maintains
338338
// the canonical data.
@@ -411,7 +411,7 @@ func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number
411411

412412
// isCanon is an internal utility method, to check whether the given number/hash
413413
// is part of the ancient (canon) set.
414-
func isCanon(reader ethdb.AncientReader, number uint64, hash common.Hash) bool {
414+
func isCanon(reader ethdb.AncientReaderOp, number uint64, hash common.Hash) bool {
415415
h, err := reader.Ancient(freezerHashTable, number)
416416
if err != nil {
417417
return false
@@ -425,7 +425,7 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue
425425
// comparison is necessary since ancient database only maintains
426426
// the canonical data.
427427
var data []byte
428-
db.ReadAncients(func(reader ethdb.AncientReader) error {
428+
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
429429
// Check if the data is in ancients
430430
if isCanon(reader, number, hash) {
431431
data, _ = reader.Ancient(freezerBodiesTable, number)
@@ -442,7 +442,7 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue
442442
// block at number, in RLP encoding.
443443
func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue {
444444
var data []byte
445-
db.ReadAncients(func(reader ethdb.AncientReader) error {
445+
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
446446
data, _ = reader.Ancient(freezerBodiesTable, number)
447447
if len(data) > 0 {
448448
return nil
@@ -508,7 +508,7 @@ func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
508508
// ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding.
509509
func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
510510
var data []byte
511-
db.ReadAncients(func(reader ethdb.AncientReader) error {
511+
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
512512
// Check if the data is in ancients
513513
if isCanon(reader, number, hash) {
514514
data, _ = reader.Ancient(freezerDifficultyTable, number)
@@ -568,7 +568,7 @@ func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool {
568568
// ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding.
569569
func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
570570
var data []byte
571-
db.ReadAncients(func(reader ethdb.AncientReader) error {
571+
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
572572
// Check if the data is in ancients
573573
if isCanon(reader, number, hash) {
574574
data, _ = reader.Ancient(freezerReceiptTable, number)

core/rawdb/chain_freezer.go

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
// Copyright 2022 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package rawdb
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
"sync/atomic"
23+
"time"
24+
25+
"github.com/ethereum/go-ethereum/common"
26+
"github.com/ethereum/go-ethereum/ethdb"
27+
"github.com/ethereum/go-ethereum/log"
28+
"github.com/ethereum/go-ethereum/params"
29+
)
30+
31+
const (
32+
// freezerRecheckInterval is the frequency to check the key-value database for
33+
// chain progression that might permit new blocks to be frozen into immutable
34+
// storage.
35+
freezerRecheckInterval = time.Minute
36+
37+
// freezerBatchLimit is the maximum number of blocks to freeze in one batch
38+
// before doing an fsync and deleting it from the key-value store.
39+
freezerBatchLimit = 30000
40+
)
41+
42+
// chainFreezer is a wrapper of freezer with additional chain freezing feature.
43+
// The background thread will keep moving ancient chain segments from key-value
44+
// database to flat files for saving space on live database.
45+
type chainFreezer struct {
46+
// WARNING: The `threshold` field is accessed atomically. On 32 bit platforms, only
47+
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
48+
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
49+
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
50+
51+
*Freezer
52+
quit chan struct{}
53+
wg sync.WaitGroup
54+
trigger chan chan struct{} // Manual blocking freeze trigger, test determinism
55+
}
56+
57+
// newChainFreezer initializes the freezer for ancient chain data.
58+
func newChainFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*chainFreezer, error) {
59+
freezer, err := NewFreezer(datadir, namespace, readonly, maxTableSize, tables)
60+
if err != nil {
61+
return nil, err
62+
}
63+
return &chainFreezer{
64+
Freezer: freezer,
65+
threshold: params.FullImmutabilityThreshold,
66+
quit: make(chan struct{}),
67+
trigger: make(chan chan struct{}),
68+
}, nil
69+
}
70+
71+
// Close closes the chain freezer instance and terminates the background thread.
72+
func (f *chainFreezer) Close() error {
73+
err := f.Freezer.Close()
74+
select {
75+
case <-f.quit:
76+
default:
77+
close(f.quit)
78+
}
79+
f.wg.Wait()
80+
return err
81+
}
82+
83+
// freeze is a background thread that periodically checks the blockchain for any
84+
// import progress and moves ancient data from the fast database into the freezer.
85+
//
86+
// This functionality is deliberately broken off from block importing to avoid
87+
// incurring additional data shuffling delays on block propagation.
88+
func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
89+
nfdb := &nofreezedb{KeyValueStore: db}
90+
91+
var (
92+
backoff bool
93+
triggered chan struct{} // Used in tests
94+
)
95+
for {
96+
select {
97+
case <-f.quit:
98+
log.Info("Freezer shutting down")
99+
return
100+
default:
101+
}
102+
if backoff {
103+
// If we were doing a manual trigger, notify it
104+
if triggered != nil {
105+
triggered <- struct{}{}
106+
triggered = nil
107+
}
108+
select {
109+
case <-time.NewTimer(freezerRecheckInterval).C:
110+
backoff = false
111+
case triggered = <-f.trigger:
112+
backoff = false
113+
case <-f.quit:
114+
return
115+
}
116+
}
117+
// Retrieve the freezing threshold.
118+
hash := ReadHeadBlockHash(nfdb)
119+
if hash == (common.Hash{}) {
120+
log.Debug("Current full block hash unavailable") // new chain, empty database
121+
backoff = true
122+
continue
123+
}
124+
number := ReadHeaderNumber(nfdb, hash)
125+
threshold := atomic.LoadUint64(&f.threshold)
126+
frozen := atomic.LoadUint64(&f.frozen)
127+
switch {
128+
case number == nil:
129+
log.Error("Current full block number unavailable", "hash", hash)
130+
backoff = true
131+
continue
132+
133+
case *number < threshold:
134+
log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", threshold)
135+
backoff = true
136+
continue
137+
138+
case *number-threshold <= frozen:
139+
log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", frozen)
140+
backoff = true
141+
continue
142+
}
143+
head := ReadHeader(nfdb, hash, *number)
144+
if head == nil {
145+
log.Error("Current full block unavailable", "number", *number, "hash", hash)
146+
backoff = true
147+
continue
148+
}
149+
150+
// Seems we have data ready to be frozen, process in usable batches
151+
var (
152+
start = time.Now()
153+
first, _ = f.Ancients()
154+
limit = *number - threshold
155+
)
156+
if limit-first > freezerBatchLimit {
157+
limit = first + freezerBatchLimit
158+
}
159+
ancients, err := f.freezeRange(nfdb, first, limit)
160+
if err != nil {
161+
log.Error("Error in block freeze operation", "err", err)
162+
backoff = true
163+
continue
164+
}
165+
166+
// Batch of blocks have been frozen, flush them before wiping from leveldb
167+
if err := f.Sync(); err != nil {
168+
log.Crit("Failed to flush frozen tables", "err", err)
169+
}
170+
171+
// Wipe out all data from the active database
172+
batch := db.NewBatch()
173+
for i := 0; i < len(ancients); i++ {
174+
// Always keep the genesis block in active database
175+
if first+uint64(i) != 0 {
176+
DeleteBlockWithoutNumber(batch, ancients[i], first+uint64(i))
177+
DeleteCanonicalHash(batch, first+uint64(i))
178+
}
179+
}
180+
if err := batch.Write(); err != nil {
181+
log.Crit("Failed to delete frozen canonical blocks", "err", err)
182+
}
183+
batch.Reset()
184+
185+
// Wipe out side chains also and track dangling side chains
186+
var dangling []common.Hash
187+
frozen = atomic.LoadUint64(&f.frozen) // Needs reload after during freezeRange
188+
for number := first; number < frozen; number++ {
189+
// Always keep the genesis block in active database
190+
if number != 0 {
191+
dangling = ReadAllHashes(db, number)
192+
for _, hash := range dangling {
193+
log.Trace("Deleting side chain", "number", number, "hash", hash)
194+
DeleteBlock(batch, hash, number)
195+
}
196+
}
197+
}
198+
if err := batch.Write(); err != nil {
199+
log.Crit("Failed to delete frozen side blocks", "err", err)
200+
}
201+
batch.Reset()
202+
203+
// Step into the future and delete and dangling side chains
204+
if frozen > 0 {
205+
tip := frozen
206+
for len(dangling) > 0 {
207+
drop := make(map[common.Hash]struct{})
208+
for _, hash := range dangling {
209+
log.Debug("Dangling parent from Freezer", "number", tip-1, "hash", hash)
210+
drop[hash] = struct{}{}
211+
}
212+
children := ReadAllHashes(db, tip)
213+
for i := 0; i < len(children); i++ {
214+
// Dig up the child and ensure it's dangling
215+
child := ReadHeader(nfdb, children[i], tip)
216+
if child == nil {
217+
log.Error("Missing dangling header", "number", tip, "hash", children[i])
218+
continue
219+
}
220+
if _, ok := drop[child.ParentHash]; !ok {
221+
children = append(children[:i], children[i+1:]...)
222+
i--
223+
continue
224+
}
225+
// Delete all block data associated with the child
226+
log.Debug("Deleting dangling block", "number", tip, "hash", children[i], "parent", child.ParentHash)
227+
DeleteBlock(batch, children[i], tip)
228+
}
229+
dangling = children
230+
tip++
231+
}
232+
if err := batch.Write(); err != nil {
233+
log.Crit("Failed to delete dangling side blocks", "err", err)
234+
}
235+
}
236+
237+
// Log something friendly for the user
238+
context := []interface{}{
239+
"blocks", frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", frozen - 1,
240+
}
241+
if n := len(ancients); n > 0 {
242+
context = append(context, []interface{}{"hash", ancients[n-1]}...)
243+
}
244+
log.Info("Deep froze chain segment", context...)
245+
246+
// Avoid database thrashing with tiny writes
247+
if frozen-first < freezerBatchLimit {
248+
backoff = true
249+
}
250+
}
251+
}
252+
253+
func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) {
254+
hashes = make([]common.Hash, 0, limit-number)
255+
256+
_, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
257+
for ; number <= limit; number++ {
258+
// Retrieve all the components of the canonical block.
259+
hash := ReadCanonicalHash(nfdb, number)
260+
if hash == (common.Hash{}) {
261+
return fmt.Errorf("canonical hash missing, can't freeze block %d", number)
262+
}
263+
header := ReadHeaderRLP(nfdb, hash, number)
264+
if len(header) == 0 {
265+
return fmt.Errorf("block header missing, can't freeze block %d", number)
266+
}
267+
body := ReadBodyRLP(nfdb, hash, number)
268+
if len(body) == 0 {
269+
return fmt.Errorf("block body missing, can't freeze block %d", number)
270+
}
271+
receipts := ReadReceiptsRLP(nfdb, hash, number)
272+
if len(receipts) == 0 {
273+
return fmt.Errorf("block receipts missing, can't freeze block %d", number)
274+
}
275+
td := ReadTdRLP(nfdb, hash, number)
276+
if len(td) == 0 {
277+
return fmt.Errorf("total difficulty missing, can't freeze block %d", number)
278+
}
279+
280+
// Write to the batch.
281+
if err := op.AppendRaw(freezerHashTable, number, hash[:]); err != nil {
282+
return fmt.Errorf("can't write hash to Freezer: %v", err)
283+
}
284+
if err := op.AppendRaw(freezerHeaderTable, number, header); err != nil {
285+
return fmt.Errorf("can't write header to Freezer: %v", err)
286+
}
287+
if err := op.AppendRaw(freezerBodiesTable, number, body); err != nil {
288+
return fmt.Errorf("can't write body to Freezer: %v", err)
289+
}
290+
if err := op.AppendRaw(freezerReceiptTable, number, receipts); err != nil {
291+
return fmt.Errorf("can't write receipts to Freezer: %v", err)
292+
}
293+
if err := op.AppendRaw(freezerDifficultyTable, number, td); err != nil {
294+
return fmt.Errorf("can't write td to Freezer: %v", err)
295+
}
296+
297+
hashes = append(hashes, hash)
298+
}
299+
return nil
300+
})
301+
302+
return hashes, err
303+
}

0 commit comments

Comments
 (0)