Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
dbb7614
wip clean
acud Feb 26, 2020
33414c5
cleanup
acud Feb 26, 2020
3b89055
forky: reinstate shard field (de)serialisation
acud Feb 27, 2020
d7a8fb9
forky: add NextShard tests
acud Feb 27, 2020
0744fd4
tests failing
acud Feb 27, 2020
f616fff
remove t.run
acud Feb 27, 2020
c3d92c1
remove println
acud Feb 27, 2020
afad773
fix build
acud Feb 27, 2020
6ec5f97
forky: address some PR comments
acud Feb 28, 2020
06a2466
forky/leveldb: instate free slot serialisation
acud Feb 28, 2020
a74470e
fix build
acud Feb 28, 2020
c5cb5b7
wip persistence test
acud Feb 28, 2020
0b24c4b
wip test
acud Feb 28, 2020
9ca5025
add sorting unit test
acud Mar 2, 2020
699940c
add test desc
acud Mar 2, 2020
9f2806c
forky: add free counter persistence e2e test, shard selection logic
acud Mar 2, 2020
b0a2954
pick the smallest shard on no free slots
acud Mar 2, 2020
0a72e7c
fix build
acud Mar 2, 2020
c8fb4d7
next shard unit test
acud Mar 2, 2020
f818729
cleanup, fix free slots in leveldb implementation
acud Mar 2, 2020
aa2b1b0
fix build
acud Mar 2, 2020
50db06f
add metrics for debugging
acud Mar 3, 2020
e7bcded
fix build
acud Mar 3, 2020
b8c1fef
pessimistic locking
acud Mar 3, 2020
fe9c14b
Revert "pessimistic locking"
acud Mar 3, 2020
bebdd45
reenable tests
acud Mar 3, 2020
8bcaabb
try something
acud Mar 3, 2020
2b791ad
Revert "try something"
acud Mar 3, 2020
5cfb4a5
remove caching, use mem
acud Mar 3, 2020
c72a6c3
remove free slots slice
acud Mar 3, 2020
88ab291
Revert "Revert "pessimistic locking""
acud Mar 3, 2020
92886fe
add metrics
acud Mar 3, 2020
8ec201a
fix has metrics
acud Mar 3, 2020
6624115
storage/fcds/test: fix NewFCDSStore path handling
janos Mar 3, 2020
00d4c5a
instrument till you drop
acud Mar 4, 2020
48143c6
still works
acud Mar 4, 2020
193a30a
getting there
acud Mar 4, 2020
231f4f7
remove annoying prints
acud Mar 4, 2020
08ed173
restore gc logic, still ok
acud Mar 4, 2020
db304d8
reinstate reclaimed check
acud Mar 4, 2020
ae3886c
cleaup
acud Mar 4, 2020
e7a7ae2
remove fcds lock, still green
acud Mar 4, 2020
78a12e5
still green
acud Mar 4, 2020
6961a6e
mem passing, leveldb broken
acud Mar 4, 2020
20c729e
leveldb kind of stable, mem ok
acud Mar 4, 2020
ed0f2ca
mem ok
acud Mar 4, 2020
8b6d42f
still ok
acud Mar 4, 2020
7e546ca
still ok
acud Mar 4, 2020
468c061
green still
acud Mar 4, 2020
9661248
janos version of the test
acud Mar 4, 2020
e23853c
Revert "janos version of the test"
acud Mar 4, 2020
ba10c6d
pull lock up
acud Mar 4, 2020
aa8d825
mega ugly but works
acud Mar 4, 2020
4541c9f
clean
acud Mar 4, 2020
86b3a13
prevent double puts into forky for existing chunks
acud Mar 4, 2020
bbaf1c2
wip cleanup
acud Mar 4, 2020
387bbc0
wip cleanup
acud Mar 4, 2020
18855fb
more house cleaning
acud Mar 4, 2020
df3a326
maintain free slots in memory, persist and load from leveldb on batch…
acud Mar 4, 2020
84acb66
fix build
acud Mar 4, 2020
b82eea4
randomize next free shard to reduce contention
acud Mar 4, 2020
e4951ec
wip return locked shard directly
acud Mar 5, 2020
d2e73d9
remove mem test
acud Mar 5, 2020
238c673
try out cancellable offset
acud Mar 9, 2020
95e257d
dont mock me
acud Mar 9, 2020
f4fc2c0
switch back to leveldb
acud Mar 9, 2020
39daa0c
puttopgccheck
acud Mar 13, 2020
2fe7596
forky: remove offset deletion from Set since it should already be del…
acud Mar 13, 2020
da85d55
forky: simplify set, change free offset method
acud Mar 13, 2020
4cba21a
cleanup
acud Mar 13, 2020
0df8e67
cleanup
acud Mar 13, 2020
7d9156c
cleanup
acud Mar 13, 2020
e3d8ccf
cleanup tests
acud Mar 13, 2020
9050b96
cleanup
acud Mar 13, 2020
ece3c7d
cleanup
acud Mar 13, 2020
6068e92
cleanup test vectors
acud Mar 13, 2020
1e0589f
Merge branch 'fcds' into fcds-teenage-mutants
acud Mar 13, 2020
4d4f809
remove mutex
acud Mar 13, 2020
e6e8a71
dont test for no grow on mock
acud Mar 16, 2020
99ac34a
remove error\
acud Mar 16, 2020
219050e
forky: address pr comments
acud Mar 18, 2020
6fb35e5
forky: address pr comments
acud Mar 23, 2020
4700187
Add benchmark to compare to badger
jmozah Mar 24, 2020
7055489
Ignoring setup stage in benchmark timings
jmozah Mar 26, 2020
7771a35
add benchmark from badger branch
acud Mar 26, 2020
d098a92
fcds_test.go
acud Mar 26, 2020
d259aa0
change to 50k-500k-50lakh
acud Mar 26, 2020
315e5cd
better write testing
acud Mar 26, 2020
77b0c85
fix build
acud Mar 26, 2020
51ff0d7
on par
acud Mar 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 98 additions & 90 deletions storage/fcds/fcds.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"io"
"os"
"path/filepath"
"sort"
"sync"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/log"

"github.com/ethersphere/swarm/chunk"
Expand All @@ -36,8 +38,9 @@ import (
type Storer interface {
Get(addr chunk.Address) (ch chunk.Chunk, err error)
Has(addr chunk.Address) (yes bool, err error)
Put(ch chunk.Chunk) (err error)
Put(ch chunk.Chunk) (shard uint8, err error)
Delete(addr chunk.Address) (err error)
ShardSize() (slots []ShardSize, err error)
Count() (count int, err error)
Iterate(func(ch chunk.Chunk) (stop bool, err error)) (err error)
Close() (err error)
Expand All @@ -46,7 +49,7 @@ type Storer interface {
var _ Storer = new(Store)

// Number of files that store chunk data.
const shardCount = 32
var ShardCount = uint8(32)

// ErrStoreClosed is returned if store is already closed.
var ErrStoreClosed = errors.New("closed store")
Expand All @@ -56,9 +59,6 @@ var ErrStoreClosed = errors.New("closed store")
type Store struct {
shards []shard // relations with shard id and a shard file and their mutexes
meta MetaStore // stores chunk offsets
free []bool // which shards have free offsets
freeMu sync.RWMutex // protects free field
freeCache *offsetCache // optional cache of free offset values
wg sync.WaitGroup // blocks Close until all other method calls are done
maxChunkSize int // maximal chunk data size
quit chan struct{} // quit disables all operations after Close is called
Expand All @@ -68,24 +68,11 @@ type Store struct {
// Option is an optional argument passed to New.
type Option func(*Store)

// WithCache is an optional argument to New constructor that enables
// in memory cache of free chunk data positions in files
func WithCache(yes bool) Option {
return func(s *Store) {
if yes {
s.freeCache = newOffsetCache(shardCount)
} else {
s.freeCache = nil
}
}
}

// New constructs a new Store with files at path, with specified max chunk size.
func New(path string, maxChunkSize int, metaStore MetaStore, opts ...Option) (s *Store, err error) {
s = &Store{
shards: make([]shard, shardCount),
shards: make([]shard, ShardCount),
meta: metaStore,
free: make([]bool, shardCount),
maxChunkSize: maxChunkSize,
quit: make(chan struct{}),
}
Expand All @@ -95,7 +82,7 @@ func New(path string, maxChunkSize int, metaStore MetaStore, opts ...Option) (s
if err := os.MkdirAll(path, 0777); err != nil {
return nil, err
}
for i := byte(0); i < shardCount; i++ {
for i := byte(0); i < ShardCount; i++ {
s.shards[i].f, err = os.OpenFile(filepath.Join(path, fmt.Sprintf("chunks-%v.db", i)), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, err
Expand All @@ -105,6 +92,21 @@ func New(path string, maxChunkSize int, metaStore MetaStore, opts ...Option) (s
return s, nil
}

func (s *Store) ShardSize() (slots []ShardSize, err error) {
slots = make([]ShardSize, len(s.shards))
for i, sh := range s.shards {
sh.mu.Lock()
fs, err := sh.f.Stat()
sh.mu.Unlock()
if err != nil {
return nil, err
}
slots[i] = ShardSize{Shard: uint8(i), Size: fs.Size()}
}

return slots, nil
}

// Get returns a chunk with data.
func (s *Store) Get(addr chunk.Address) (ch chunk.Chunk, err error) {
if err := s.protect(); err != nil {
Expand All @@ -117,18 +119,23 @@ func (s *Store) Get(addr chunk.Address) (ch chunk.Chunk, err error) {
return nil, err
}

sh := s.shards[getShard(addr)]
sh := s.shards[m.Shard]
sh.mu.Lock()
defer sh.mu.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this defer removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea was not to hold the lock beyond the specific time that the shard is used (i.e. to not hold it for the metric increment or the chunk constructor)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will put it back as a defer statement


data := make([]byte, m.Size)
n, err := sh.f.ReadAt(data, m.Offset)
if err != nil && err != io.EOF {
metrics.GetOrRegisterCounter("fcds/get/error", nil).Inc(1)

return nil, err
}
if n != int(m.Size) {
return nil, fmt.Errorf("incomplete chunk data, read %v of %v", n, m.Size)
}

metrics.GetOrRegisterCounter("fcds/get/ok", nil).Inc(1)

return chunk.NewChunk(addr, data), nil
}

Expand All @@ -139,97 +146,111 @@ func (s *Store) Has(addr chunk.Address) (yes bool, err error) {
}
defer s.unprotect()

return s.meta.Has(addr)
_, err = s.getMeta(addr)
if err != nil {
if err == chunk.ErrChunkNotFound {
metrics.GetOrRegisterCounter("fcds/has/no", nil).Inc(1)
return false, nil
}
metrics.GetOrRegisterCounter("fcds/has/err", nil).Inc(1)
return false, err
}
metrics.GetOrRegisterCounter("fcds/has/ok", nil).Inc(1)

return true, nil
}

// Put stores chunk data.
func (s *Store) Put(ch chunk.Chunk) (err error) {
// Returns the shard number into which the chunk was added.
func (s *Store) Put(ch chunk.Chunk) (uint8, error) {
if err := s.protect(); err != nil {
return err
return 0, err
}
defer s.unprotect()

m, err := s.getMeta(ch.Address())
if err == nil {
return m.Shard, nil
}
addr := ch.Address()
data := ch.Data()

size := len(data)
if size > s.maxChunkSize {
return fmt.Errorf("chunk data size %v exceeds %v bytes", size, s.maxChunkSize)
return 0, fmt.Errorf("chunk data size %v exceeds %v bytes", size, s.maxChunkSize)
}

section := make([]byte, s.maxChunkSize)
copy(section, data)

shard := getShard(addr)
sh := s.shards[shard]

has, err := s.meta.Has(addr)
shardId, offset, reclaimed, cancel, err := s.getOffset()
if err != nil {
return err
}
if has {
return nil
return 0, err
}

sh := s.shards[shardId]
sh.mu.Lock()
defer sh.mu.Unlock()

offset, reclaimed, err := s.getOffset(shard)
if err != nil {
return err
if reclaimed {
metrics.GetOrRegisterCounter("fcds/put/reclaimed", nil).Inc(1)
}

if offset < 0 {
metrics.GetOrRegisterCounter("fcds/put/append", nil).Inc(1)
// no free offsets found,
// append the chunk data by
// seeking to the end of the file
offset, err = sh.f.Seek(0, io.SeekEnd)
} else {
metrics.GetOrRegisterCounter("fcds/put/offset", nil).Inc(1)
// seek to the offset position
// to replace the chunk data at that position
_, err = sh.f.Seek(offset, io.SeekStart)
}
if err != nil {
return err
cancel()
return 0, err
}

if _, err = sh.f.Write(section); err != nil {
return err
}
if reclaimed && s.freeCache != nil {
s.freeCache.remove(shard, offset)
cancel()
return 0, err
}
return s.meta.Set(addr, shard, reclaimed, &Meta{

err = s.meta.Set(addr, shardId, reclaimed, &Meta{
Size: uint16(size),
Offset: offset,
Shard: shardId,
})
if err != nil {
cancel()
}

return shardId, err
}

// getOffset returns an offset where chunk data can be written to
// getOffset returns an offset on a shard where chunk data can be written to
// and a flag if the offset is reclaimed from a previously removed chunk.
// If offset is less then 0, no free offsets are available.
func (s *Store) getOffset(shard uint8) (offset int64, reclaimed bool, err error) {
if !s.shardHasFreeOffsets(shard) {
return -1, false, nil
func (s *Store) getOffset() (shard uint8, offset int64, reclaimed bool, cancel func(), err error) {
cancel = func() {}
shard, offset, cancel = s.meta.FreeOffset()
if offset >= 0 {
return shard, offset, true, cancel, nil
}

offset = -1
if s.freeCache != nil {
offset = s.freeCache.get(shard)
// each element Val is the shard size in bytes
shardSizes, err := s.ShardSize()
if err != nil {
return 0, 0, false, cancel, err
}

if offset < 0 {
offset, err = s.meta.FreeOffset(shard)
if err != nil {
return 0, false, err
}
}
if offset < 0 {
s.markShardWithFreeOffsets(shard, false)
return -1, false, nil
}
// sorting them will make the first element the largest shard and the last
// element the smallest shard; pick the smallest
sort.Sort(bySize(shardSizes))

return shardSizes[len(shardSizes)-1].Shard, -1, false, cancel, nil

return offset, true, nil
}

// Delete makes the chunk unavailable.
Expand All @@ -239,17 +260,23 @@ func (s *Store) Delete(addr chunk.Address) (err error) {
}
defer s.unprotect()

shard := getShard(addr)
s.markShardWithFreeOffsets(shard, true)
m, err := s.getMeta(addr)
if err != nil {
return err
}

if s.freeCache != nil {
m, err := s.getMeta(addr)
if err != nil {
return err
}
s.freeCache.set(shard, m.Offset)
mu := s.shards[m.Shard].mu
mu.Lock()
defer mu.Unlock()

err = s.meta.Remove(addr, m.Shard)
if err != nil {
metrics.GetOrRegisterCounter("fcds/delete/fail", nil).Inc(1)
return err
}
return s.meta.Remove(addr, shard)

metrics.GetOrRegisterCounter("fcds/delete/ok", nil).Inc(1)
return nil
}

// Count returns a number of stored chunks.
Expand All @@ -263,7 +290,6 @@ func (s *Store) Iterate(fn func(chunk.Chunk) (stop bool, err error)) (err error)
return err
}
defer s.unprotect()

for _, sh := range s.shards {
sh.mu.Lock()
}
Expand All @@ -275,7 +301,7 @@ func (s *Store) Iterate(fn func(chunk.Chunk) (stop bool, err error)) (err error)

return s.meta.Iterate(func(addr chunk.Address, m *Meta) (stop bool, err error) {
data := make([]byte, m.Size)
_, err = s.shards[getShard(addr)].f.ReadAt(data, m.Offset)
_, err = s.shards[m.Shard].f.ReadAt(data, m.Offset)
if err != nil {
return true, err
}
Expand Down Expand Up @@ -338,24 +364,6 @@ func (s *Store) getMeta(addr chunk.Address) (m *Meta, err error) {
return s.meta.Get(addr)
}

func (s *Store) markShardWithFreeOffsets(shard uint8, has bool) {
s.freeMu.Lock()
s.free[shard] = has
s.freeMu.Unlock()
}

func (s *Store) shardHasFreeOffsets(shard uint8) (has bool) {
s.freeMu.RLock()
has = s.free[shard]
s.freeMu.RUnlock()
return has
}

// getShard returns a shard number for the chunk address.
func getShard(addr chunk.Address) (shard uint8) {
return addr[len(addr)-1] % shardCount
}

type shard struct {
f *os.File
mu *sync.Mutex
Expand Down
Loading