Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions core/rawdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,6 @@ func (b *tableBatch) Delete(key []byte) error {
return b.batch.Delete(append([]byte(b.prefix), key...))
}

// KeyCount retrieves the number of keys queued up for writing.
func (b *tableBatch) KeyCount() int {
return b.batch.KeyCount()
}

// ValueSize retrieves the amount of data queued up for writing.
func (b *tableBatch) ValueSize() int {
return b.batch.ValueSize()
Expand Down
2 changes: 1 addition & 1 deletion core/state/snapshot/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix
}
// Verify the snapshot segment with range prover, ensure that all flat states
// in this range correspond to merkle trie.
_, cont, err := trie.VerifyRangeProof(root, origin, last, keys, vals, proof)
cont, err := trie.VerifyRangeProof(root, origin, last, keys, vals, proof)
return &proofResult{
keys: keys,
vals: vals,
Expand Down
121 changes: 64 additions & 57 deletions eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,8 @@ type storageResponse struct {
accounts []common.Hash // Account hashes requested, may be only partially filled
roots []common.Hash // Storage roots requested, may be only partially filled

hashes [][]common.Hash // Storage slot hashes in the returned range
slots [][][]byte // Storage slot values in the returned range
nodes []ethdb.KeyValueStore // Database containing the reconstructed trie nodes
hashes [][]common.Hash // Storage slot hashes in the returned range
slots [][][]byte // Storage slot values in the returned range

cont bool // Whether the last storage range has a continuation
}
Expand Down Expand Up @@ -680,12 +679,22 @@ func (s *Syncer) loadSyncStatus() {
}
s.tasks = progress.Tasks
for _, task := range s.tasks {
task.genBatch = s.db.NewBatch()
task.genBatch = ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
task.genTrie = trie.NewStackTrie(task.genBatch)

for _, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
subtask.genBatch = s.db.NewBatch()
subtask.genBatch = ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
subtask.genTrie = trie.NewStackTrie(task.genBatch)
}
}
Expand Down Expand Up @@ -729,7 +738,12 @@ func (s *Syncer) loadSyncStatus() {
// Make sure we don't overflow if the step is not a proper divisor
last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
}
batch := s.db.NewBatch()
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
s.tasks = append(s.tasks, &accountTask{
Next: next,
Last: last,
Expand All @@ -746,19 +760,14 @@ func (s *Syncer) loadSyncStatus() {
func (s *Syncer) saveSyncStatus() {
// Serialize any partial progress to disk before spinning down
for _, task := range s.tasks {
keys, bytes := task.genBatch.KeyCount(), task.genBatch.ValueSize()
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist account slots", "err", err)
}
s.accountBytes += common.StorageSize(keys*common.HashLength + bytes)

for _, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
keys, bytes := subtask.genBatch.KeyCount(), subtask.genBatch.ValueSize()
if err := subtask.genBatch.Write(); err != nil {
log.Error("Failed to persist storage slots", "err", err)
}
s.accountBytes += common.StorageSize(keys*common.HashLength + bytes)
}
}
}
Expand Down Expand Up @@ -1763,12 +1772,15 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
if res.subTask != nil {
res.subTask.req = nil
}
batch := s.db.NewBatch()

batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
var (
slots int
nodes int
bytes common.StorageSize
slots int
oldStorageBytes = s.storageBytes
)
// Iterate over all the accounts and reconstruct their storage tries from the
// delivered slots
Expand Down Expand Up @@ -1829,7 +1841,12 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
r := newHashRange(lastKey, chunks)

// Our first task is the one that was just filled by this response.
batch := s.db.NewBatch()
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
tasks = append(tasks, &storageTask{
Next: common.Hash{},
Last: r.End(),
Expand All @@ -1838,7 +1855,12 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
genTrie: trie.NewStackTrie(batch),
})
for r.Next() {
batch := s.db.NewBatch()
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
tasks = append(tasks, &storageTask{
Next: r.Start(),
Last: r.End(),
Expand Down Expand Up @@ -1883,27 +1905,23 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
}
}
}
// Iterate over all the reconstructed trie nodes and push them to disk
// if the contract is fully delivered. If it's chunked, the trie nodes
// will be reconstructed later.
// Iterate over all the complete contracts, reconstruct the trie nodes and
// push them to disk. If the contract is chunked, the trie nodes will be
// reconstructed later.
slots += len(res.hashes[i])

if i < len(res.hashes)-1 || res.subTask == nil {
it := res.nodes[i].NewIterator(nil, nil)
for it.Next() {
batch.Put(it.Key(), it.Value())

bytes += common.StorageSize(common.HashLength + len(it.Value()))
nodes++
tr := trie.NewStackTrie(batch)
for j := 0; j < len(res.hashes[i]); j++ {
tr.Update(res.hashes[i][j][:], res.slots[i][j])
}
it.Release()
tr.Commit()
}
// Persist the received storage segements. These flat state maybe
// outdated during the sync, but it can be fixed later during the
// snapshot generation.
for j := 0; j < len(res.hashes[i]); j++ {
rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
bytes += common.StorageSize(1 + 2*common.HashLength + len(res.slots[i][j]))

// If we're storing large contracts, generate the trie nodes
// on the fly to not trash the gluing points
Expand All @@ -1926,25 +1944,20 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
}
}
}
if data := res.subTask.genBatch.ValueSize(); data > ethdb.IdealBatchSize || res.subTask.done {
keys := res.subTask.genBatch.KeyCount()
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize || res.subTask.done {
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
res.subTask.genBatch.Reset()

bytes += common.StorageSize(keys*common.HashLength + data)
nodes += keys
}
}
// Flush anything written just now and update the stats
if err := batch.Write(); err != nil {
log.Crit("Failed to persist storage slots", "err", err)
}
s.storageSynced += uint64(slots)
s.storageBytes += bytes

log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "nodes", nodes, "bytes", bytes)
log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes)

// If this delivery completed the last pending task, forward the account task
// to the next chunk
Expand Down Expand Up @@ -2042,18 +2055,20 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
// Persist the received account segements. These flat state maybe
// outdated during the sync, but it can be fixed later during the
// snapshot generation.
var (
nodes int
bytes common.StorageSize
)
batch := s.db.NewBatch()
oldAccountBytes := s.accountBytes

batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
for i, hash := range res.hashes {
if task.needCode[i] || task.needState[i] {
break
}
slim := snapshot.SlimAccountRLP(res.accounts[i].Nonce, res.accounts[i].Balance, res.accounts[i].Root, res.accounts[i].CodeHash)
rawdb.WriteAccountSnapshot(batch, hash, slim)
bytes += common.StorageSize(1 + common.HashLength + len(slim))

// If the task is complete, drop it into the stack trie to generate
// account trie nodes for it
Expand All @@ -2069,7 +2084,6 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
if err := batch.Write(); err != nil {
log.Crit("Failed to persist accounts", "err", err)
}
s.accountBytes += bytes
s.accountSynced += uint64(len(res.accounts))

// Task filling persisted, push it the chunk marker forward to the first
Expand All @@ -2091,17 +2105,13 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
log.Error("Failed to commit stack account", "err", err)
}
}
if data := task.genBatch.ValueSize(); data > ethdb.IdealBatchSize || task.done {
keys := task.genBatch.KeyCount()
if task.genBatch.ValueSize() > ethdb.IdealBatchSize || task.done {
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist stack account", "err", err)
}
task.genBatch.Reset()

nodes += keys
bytes += common.StorageSize(keys*common.HashLength + data)
}
log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "bytes", bytes)
log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", s.accountBytes-oldAccountBytes)
}

// OnAccounts is a callback method to invoke when a range of accounts are
Expand Down Expand Up @@ -2176,7 +2186,7 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
if len(keys) > 0 {
end = keys[len(keys)-1]
}
_, cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb)
cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb)
if err != nil {
logger.Warn("Account range failed proof", "err", err)
// Signal this request as failed, and ready for rescheduling
Expand Down Expand Up @@ -2393,10 +2403,8 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
s.lock.Unlock()

// Reconstruct the partial tries from the response and verify them
var (
dbs = make([]ethdb.KeyValueStore, len(hashes))
cont bool
)
var cont bool

for i := 0; i < len(hashes); i++ {
// Convert the keys and proofs into an internal format
keys := make([][]byte, len(hashes[i]))
Expand All @@ -2413,7 +2421,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
if len(nodes) == 0 {
// No proof has been attached, the response must cover the entire key
// space and hash to the origin root.
dbs[i], _, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil)
_, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil)
if err != nil {
s.scheduleRevertStorageRequest(req) // reschedule request
logger.Warn("Storage slots failed proof", "err", err)
Expand All @@ -2428,7 +2436,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
if len(keys) > 0 {
end = keys[len(keys)-1]
}
dbs[i], cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb)
cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb)
if err != nil {
s.scheduleRevertStorageRequest(req) // reschedule request
logger.Warn("Storage range failed proof", "err", err)
Expand All @@ -2444,7 +2452,6 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
roots: req.roots,
hashes: hashes,
slots: slots,
nodes: dbs,
cont: cont,
}
select {
Expand Down
28 changes: 25 additions & 3 deletions ethdb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ const IdealBatchSize = 100 * 1024
type Batch interface {
KeyValueWriter

// KeyCount retrieves the number of keys queued up for writing.
KeyCount() int

// ValueSize retrieves the amount of data queued up for writing.
ValueSize() int

Expand All @@ -47,3 +44,28 @@ type Batcher interface {
// until a final write is called.
NewBatch() Batch
}

// HookedBatch wraps an arbitrary batch where each operation may be hooked into
// to monitor from black box code.
type HookedBatch struct {
Batch

OnPut func(key []byte, value []byte) // Callback if a key is inserted
OnDelete func(key []byte) // Callback if a key is deleted
}

// Put inserts the given value into the key-value data store.
func (b HookedBatch) Put(key []byte, value []byte) error {
if b.OnPut != nil {
b.OnPut(key, value)
}
return b.Batch.Put(key, value)
}

// Delete removes the key from the key-value data store.
func (b HookedBatch) Delete(key []byte) error {
if b.OnDelete != nil {
b.OnDelete(key)
}
return b.Batch.Delete(key)
}
9 changes: 1 addition & 8 deletions ethdb/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,6 @@ func (db *Database) meter(refresh time.Duration) {
type batch struct {
db *leveldb.DB
b *leveldb.Batch
keys int
size int
}

Expand All @@ -462,16 +461,10 @@ func (b *batch) Put(key, value []byte) error {
// Delete inserts the a key removal into the batch for later committing.
func (b *batch) Delete(key []byte) error {
b.b.Delete(key)
b.keys++
b.size += len(key)
return nil
}

// KeyCount retrieves the number of keys queued up for writing.
func (b *batch) KeyCount() int {
return b.keys
}

// ValueSize retrieves the amount of data queued up for writing.
func (b *batch) ValueSize() int {
return b.size
Expand All @@ -485,7 +478,7 @@ func (b *batch) Write() error {
// Reset resets the batch for reuse.
func (b *batch) Reset() {
b.b.Reset()
b.keys, b.size = 0, 0
b.size = 0
}

// Replay replays the batch contents.
Expand Down
Loading