Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions ethdb/dbtest/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,32 @@ func TestDatabaseSuite(t *testing.T, New func() ethdb.KeyValueStore) {
}
}
})

t.Run("OperatonsAfterClose", func(t *testing.T) {
db := New()
db.Put([]byte("key"), []byte("value"))
db.Close()
if _, err := db.Get([]byte("key")); err == nil {
t.Fatalf("expected error on Get after Close")
}
if _, err := db.Has([]byte("key")); err == nil {
t.Fatalf("expected error on Get after Close")
}
if err := db.Put([]byte("key2"), []byte("value2")); err == nil {
t.Fatalf("expected error on Put after Close")
}
if err := db.Delete([]byte("key")); err == nil {
t.Fatalf("expected error on Delete after Close")
}

b := db.NewBatch()
if err := b.Put([]byte("batchkey"), []byte("batchval")); err != nil {
t.Fatalf("expected no error on batch.Put after Close, got %v", err)
}
if err := b.Write(); err == nil {
t.Fatalf("expected error on batch.Write after Close")
}
})
}

// BenchDatabaseSuite runs a suite of benchmarks against a KeyValueStore database
Expand Down
3 changes: 3 additions & 0 deletions ethdb/memorydb/memorydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (b *batch) Write() error {
b.db.lock.Lock()
defer b.db.lock.Unlock()

if b.db.db == nil {
return errMemorydbClosed
}
for _, keyvalue := range b.writes {
if keyvalue.delete {
delete(b.db.db, string(keyvalue.key))
Expand Down
49 changes: 39 additions & 10 deletions ethdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ type Database struct {
seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated

quitLock sync.Mutex // Mutex protecting the quit channel access
quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
closed bool // keep track of whether we're Closed

log log.Logger // Contextual logger tracking the database path

Expand Down Expand Up @@ -221,23 +222,29 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
func (d *Database) Close() error {
d.quitLock.Lock()
defer d.quitLock.Unlock()

// Allow double closing, simplifies things
if d.quitChan == nil {
if d.closed {
return nil
}
errc := make(chan error)
d.quitChan <- errc
if err := <-errc; err != nil {
d.log.Error("Metrics collection failed", "err", err)
d.closed = true
if d.quitChan != nil {
errc := make(chan error)
d.quitChan <- errc
if err := <-errc; err != nil {
d.log.Error("Metrics collection failed", "err", err)
}
d.quitChan = nil
}
d.quitChan = nil

return d.db.Close()
}

// Has retrieves if a key is present in the key-value store.
func (d *Database) Has(key []byte) (bool, error) {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return false, pebble.ErrClosed
}
_, closer, err := d.db.Get(key)
if err == pebble.ErrNotFound {
return false, nil
Expand All @@ -250,6 +257,11 @@ func (d *Database) Has(key []byte) (bool, error) {

// Get retrieves the given key if it's present in the key-value store.
func (d *Database) Get(key []byte) ([]byte, error) {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return nil, pebble.ErrClosed
}
dat, closer, err := d.db.Get(key)
if err != nil {
return nil, err
Expand All @@ -262,19 +274,30 @@ func (d *Database) Get(key []byte) ([]byte, error) {

// Put inserts the given value into the key-value store.
func (d *Database) Put(key []byte, value []byte) error {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return pebble.ErrClosed
}
return d.db.Set(key, value, pebble.NoSync)
}

// Delete removes the key from the key-value store.
func (d *Database) Delete(key []byte) error {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return pebble.ErrClosed
}
return d.db.Delete(key, nil)
}

// NewBatch creates a write-only key-value store that buffers changes to its host
// database until a final write is called.
func (d *Database) NewBatch() ethdb.Batch {
return &batch{
b: d.db.NewBatch(),
b: d.db.NewBatch(),
db: d,
}
}

Expand Down Expand Up @@ -481,6 +504,7 @@ func (d *Database) meter(refresh time.Duration) {
// when Write is called. A batch cannot be used concurrently.
type batch struct {
b *pebble.Batch
db *Database
size int
}

Expand All @@ -505,6 +529,11 @@ func (b *batch) ValueSize() int {

// Write flushes any accumulated data to disk.
func (b *batch) Write() error {
b.db.quitLock.RLock()
defer b.db.quitLock.RUnlock()
if b.db.closed {
return pebble.ErrClosed
}
return b.b.Commit(pebble.NoSync)
}

Expand Down