diff --git a/api/http/server.go b/api/http/server.go index e13fb11b42..8d8c78f9ea 100644 --- a/api/http/server.go +++ b/api/http/server.go @@ -42,8 +42,11 @@ import ( "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/sctx" + "github.com/ethersphere/swarm/spancontext" "github.com/ethersphere/swarm/storage" "github.com/ethersphere/swarm/storage/feed" + "github.com/opentracing/opentracing-go" + olog "github.com/opentracing/opentracing-go/log" "github.com/rs/cors" ) @@ -300,6 +303,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { log.Debug("handle.post.files", "ruid", ruid) postFilesCount.Inc(1) + ctx, sp := spancontext.StartSpan(r.Context(), "handle.post.files") + defer sp.Finish() + + quitChan := make(chan struct{}) + defer close(quitChan) + + // periodically monitor the tag for this upload and log its state to the `handle.post.files` span + go periodicTagTrace(s.api.Tags, sctx.GetTag(ctx), quitChan, sp) + contentType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { postFilesFail.Inc(1) @@ -315,7 +327,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { var addr storage.Address if uri.Addr != "" && uri.Addr != "encrypt" { - addr, err = s.api.Resolve(r.Context(), uri.Addr) + addr, err = s.api.Resolve(ctx, uri.Addr) if err != nil { postFilesFail.Inc(1) respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusInternalServerError) @@ -323,7 +335,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { } log.Debug("resolved key", "ruid", ruid, "key", addr) } else { - addr, err = s.api.NewManifest(r.Context(), toEncrypt) + addr, err = s.api.NewManifest(ctx, toEncrypt) if err != nil { postFilesFail.Inc(1) respondError(w, r, err.Error(), http.StatusInternalServerError) @@ -331,7 +343,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { } log.Debug("new manifest", "ruid", ruid, "key", addr) } - newAddr, err := s.api.UpdateManifest(r.Context(), addr, func(mw *api.ManifestWriter) error { + newAddr, err := s.api.UpdateManifest(ctx, addr, func(mw *api.ManifestWriter) error { switch contentType { case "application/x-tar": _, err := s.handleTarUpload(r, mw) @@ -935,3 +947,28 @@ func (lrw *loggingResponseWriter) WriteHeader(code int) { func isDecryptError(err error) bool { return strings.Contains(err.Error(), api.ErrDecrypt.Error()) } + +// periodicTagTrace queries the tag every 2 seconds and logs its state to the span +func periodicTagTrace(tags *chunk.Tags, tagUid uint32, q chan struct{}, sp opentracing.Span) { + f := func() { + tag, err := tags.Get(tagUid) + if err != nil { + log.Error("error while getting tag", "tagUid", tagUid, "err", err) + } + + sp.LogFields(olog.String("tag state", fmt.Sprintf("split=%d stored=%d seen=%d synced=%d", tag.Get(chunk.StateSplit), tag.Get(chunk.StateStored), tag.Get(chunk.StateSeen), tag.Get(chunk.StateSynced)))) + } + + for { + select { + case <-q: + f() + + return + default: + f() + + time.Sleep(2 * time.Second) + } + } +} diff --git a/chunk/chunk.go b/chunk/chunk.go index c44292bb92..c14572333a 100644 --- a/chunk/chunk.go +++ b/chunk/chunk.go @@ -38,11 +38,14 @@ var ( type Chunk interface { Address() Address Data() []byte + TagID() uint32 + WithTagID(t uint32) Chunk } type chunk struct { addr Address sdata []byte + tagID uint32 } func NewChunk(addr Address, data []byte) Chunk { @@ -52,6 +55,11 @@ func NewChunk(addr Address, data []byte) Chunk { } } +func (c *chunk) WithTagID(t uint32) Chunk { + c.tagID = t + return c +} + func (c *chunk) Address() Address { return c.addr } @@ -60,6 +68,10 @@ func (c *chunk) Data() []byte { return c.sdata } +func (c *chunk) TagID() uint32 { + return c.tagID +} + func (self *chunk) String() string { return fmt.Sprintf("Address: %v Chunksize: %v", self.addr.Log(), len(self.sdata)) } diff --git a/chunk/tags.go b/chunk/tags.go index aa88b5eddd..fc660db4d7 100644 --- a/chunk/tags.go +++ b/chunk/tags.go @@ -29,14 +29,12 @@ import ( // Tags hold tag information indexed by a unique random uint32 type Tags struct { tags *sync.Map - rng *rand.Rand } // NewTags creates a tags object func NewTags() *Tags { return &Tags{ tags: &sync.Map{}, - rng: rand.New(rand.NewSource(time.Now().Unix())), } } @@ -44,7 +42,7 @@ func NewTags() *Tags { // it returns an error if the tag with this name already exists func (ts *Tags) New(s string, total int64) (*Tag, error) { t := &Tag{ - Uid: ts.rng.Uint32(), + Uid: uint32(rand.Int31()), Name: s, startedAt: time.Now(), total: total, diff --git a/network/stream/delivery.go b/network/stream/delivery.go index 794f9c9f03..bd74738c8c 100644 --- a/network/stream/delivery.go +++ b/network/stream/delivery.go @@ -22,6 +22,7 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethersphere/swarm/chunk" @@ -243,26 +244,26 @@ func (d *Delivery) FindPeer(ctx context.Context, req *storage.Request) (*Peer, e // skip peers that we have already tried if req.SkipPeer(id.String()) { - log.Trace("findpeer skip peer", "peer", id, "ref", req.Addr.String()) + log.Trace("findpeer skip peer", "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String()) return true } if myPo < depth { // chunk is NOT within the neighbourhood if po <= myPo { // always choose a peer strictly closer to chunk than us - log.Trace("findpeer1a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String()) + log.Trace("findpeer1a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String()) return false } else { - log.Trace("findpeer1b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String()) + log.Trace("findpeer1b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String()) } } else { // chunk IS WITHIN neighbourhood if po < depth { // do not select peer outside the neighbourhood. But allows peers further from the chunk than us - log.Trace("findpeer2a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String()) + log.Trace("findpeer2a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String()) return false } else if po <= originPo { // avoid loop in neighbourhood, so not forward when a request comes from the neighbourhood - log.Trace("findpeer2b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String()) + log.Trace("findpeer2b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String()) return false } else { - log.Trace("findpeer2c", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String()) + log.Trace("findpeer2c", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String()) } } diff --git a/network/stream/syncer.go b/network/stream/syncer.go index 6fc1202ad3..723d300ae3 100644 --- a/network/stream/syncer.go +++ b/network/stream/syncer.go @@ -128,12 +128,6 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // This is the most naive approach to label the chunk as synced // allowing it to be garbage collected. A proper way requires // validating that the chunk is successfully stored by the peer. - err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address) - if err != nil { - metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1) - log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err) - return nil, 0, 0, err - } batchSize++ if batchStartID == nil { // set batch start id only if diff --git a/pss/pss.go b/pss/pss.go index bf45444858..d0997ab1f2 100644 --- a/pss/pss.go +++ b/pss/pss.go @@ -686,7 +686,7 @@ func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool { } } if !isPssEnabled { - log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) + log.Warn("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps, "peer", fmt.Sprintf("%x", sp.BzzAddr.Address())) return false } diff --git a/pss/pubsub.go b/pss/pubsub.go new file mode 100644 index 0000000000..03ebf9ee5f --- /dev/null +++ b/pss/pubsub.go @@ -0,0 +1,55 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pss + +import "github.com/ethereum/go-ethereum/p2p" + +// PubSub implements the pushsync.PubSub interface using pss +type PubSub struct { + pss *Pss +} + +// NewPubSub creates a new PubSub +func NewPubSub(p *Pss) *PubSub { + return &PubSub{ + pss: p, + } +} + +// BaseAddr returns Kademlia base address +func (p *PubSub) BaseAddr() []byte { + return p.pss.Kademlia.BaseAddr() +} + +// Register registers a handler +func (p *PubSub) Register(topic string, prox bool, handler func(msg []byte, p *p2p.Peer) error) func() { + f := func(msg []byte, peer *p2p.Peer, _ bool, _ string) error { + return handler(msg, peer) + } + h := NewHandler(f).WithRaw() + if prox { + h = h.WithProxBin() + } + pt := BytesToTopic([]byte(topic)) + return p.pss.Register(&pt, h) +} + +// Send sends a message using pss SendRaw +func (p *PubSub) Send(to []byte, topic string, msg []byte) error { + pt := BytesToTopic([]byte(topic)) + return p.pss.SendRaw(PssAddress(to), pt, msg) +} diff --git a/shed/index.go b/shed/index.go index 38afbce4ca..e9a7d0308c 100644 --- a/shed/index.go +++ b/shed/index.go @@ -41,6 +41,7 @@ type Item struct { AccessTimestamp int64 StoreTimestamp int64 BinID uint64 + Tag uint32 } // Merge is a helper method to construct a new @@ -62,6 +63,9 @@ func (i Item) Merge(i2 Item) (new Item) { if i.BinID == 0 { i.BinID = i2.BinID } + if i.Tag == 0 { + i.Tag = i2.Tag + } return i } diff --git a/storage/hasherstore.go b/storage/hasherstore.go index 0ac8cc408d..50f6278155 100644 --- a/storage/hasherstore.go +++ b/storage/hasherstore.go @@ -183,7 +183,7 @@ func (h *hasherStore) createHash(chunkData ChunkData) Address { func (h *hasherStore) createChunk(chunkData ChunkData) Chunk { hash := h.createHash(chunkData) - chunk := NewChunk(hash, chunkData) + chunk := NewChunk(hash, chunkData).WithTagID(h.tag.Uid) return chunk } diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go index 265cd045ed..09f6ec7e5c 100644 --- a/storage/localstore/localstore.go +++ b/storage/localstore/localstore.go @@ -1,4 +1,4 @@ -// Copyright 2018 The go-ethereum Authors +// Copyright 2019 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -300,9 +300,12 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { return e, nil }, EncodeValue: func(fields shed.Item) (value []byte, err error) { - return nil, nil + tag := make([]byte, 4) + binary.BigEndian.PutUint32(tag, fields.Tag) + return tag, nil }, DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.Tag = binary.BigEndian.Uint32(value) return e, nil }, }) @@ -367,6 +370,7 @@ func chunkToItem(ch chunk.Chunk) shed.Item { return shed.Item{ Address: ch.Address(), Data: ch.Data(), + Tag: ch.TagID(), } } diff --git a/storage/localstore/subscription_push.go b/storage/localstore/subscription_push.go index c8dd5cf215..94da228d30 100644 --- a/storage/localstore/subscription_push.go +++ b/storage/localstore/subscription_push.go @@ -75,7 +75,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun } select { - case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data): + case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data).WithTagID(dataItem.Tag): count++ // set next iteration start item // when its chunk is successfully sent to channel diff --git a/storage/netstore.go b/storage/netstore.go index 8f92d7aefd..6b2978db04 100644 --- a/storage/netstore.go +++ b/storage/netstore.go @@ -23,14 +23,13 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/network/timeouts" "github.com/ethersphere/swarm/spancontext" lru "github.com/hashicorp/golang-lru" - - "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/p2p/enode" olog "github.com/opentracing/opentracing-go/log" "github.com/syndtr/goleveldb/leveldb" "golang.org/x/sync/singleflight" @@ -107,7 +106,7 @@ func (n *NetStore) Put(ctx context.Context, mode chunk.ModePut, ch Chunk) (bool, n.putMu.Lock() defer n.putMu.Unlock() - log.Trace("netstore.put", "ref", ch.Address().String(), "mode", mode) + log.Trace("netstore.put", "ref", ch.Address().String(), "self", n.localID.String(), "mode", mode) // put the chunk to the localstore, there should be no error exists, err := n.Store.Put(ctx, mode, ch) @@ -151,7 +150,7 @@ func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (C ref := req.Addr - log.Trace("netstore.get", "ref", ref.String()) + log.Trace("netstore.get", "ref", ref.String(), "self", n.localID.String()) ch, err := n.Store.Get(ctx, mode, ref) if err != nil { @@ -160,7 +159,7 @@ func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (C log.Error("localstore get error", "err", err) } - log.Trace("netstore.chunk-not-in-localstore", "ref", ref.String()) + log.Trace("netstore.chunk-not-in-localstore", "ref", ref.String(), "self", n.localID.String()) v, err, _ := n.requestGroup.Do(ref.String(), func() (interface{}, error) { // currently we issue a retrieve request if a fetcher @@ -198,7 +197,7 @@ func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (C c := v.(Chunk) - log.Trace("netstore.singleflight returned", "ref", ref.String(), "err", err) + log.Trace("netstore.singleflight returned", "ref", ref.String(), "self", n.localID.String(), "err", err) return c, nil } @@ -211,7 +210,7 @@ func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (C return ch, nil } -// RemoteFetch is handling the retry mechanism when making a chunk request to our peers. +// RemoteFetch is handling the retry mechanism when making a chunk request to ou r peers. // For a given chunk Request, we call RemoteGet, which selects the next eligible peer and // issues a RetrieveRequest and we wait for a delivery. If a delivery doesn't arrive within the SearchTimeout // we retry. @@ -237,13 +236,11 @@ func (n *NetStore) RemoteFetch(ctx context.Context, req *Request, fi *Fetcher) e log.Trace(err.Error(), "ref", ref) osp.LogFields(olog.String("err", err.Error())) osp.Finish() - return ErrNoSuitablePeer + } else { + // add peer to the set of peers to skip from now + log.Trace("remote.fetch, adding peer to skip", "ref", ref, "peer", currentPeer.String()) + req.PeersToSkip.Store(currentPeer.String(), time.Now()) } - - // add peer to the set of peers to skip from now - log.Trace("remote.fetch, adding peer to skip", "ref", ref, "peer", currentPeer.String()) - req.PeersToSkip.Store(currentPeer.String(), time.Now()) - select { case <-fi.Delivered: log.Trace("remote.fetch, chunk delivered", "ref", ref) @@ -256,7 +253,6 @@ func (n *NetStore) RemoteFetch(ctx context.Context, req *Request, fi *Fetcher) e osp.LogFields(olog.Bool("timeout", true)) osp.Finish() - break case <-ctx.Done(): // global fetcher timeout log.Trace("remote.fetch, fail", "ref", ref) metrics.GetOrRegisterCounter("remote.fetch.timeout.global", nil).Inc(1) @@ -290,7 +286,7 @@ func (n *NetStore) GetOrCreateFetcher(ctx context.Context, ref Address, interest f = NewFetcher() v, loaded := n.fetchers.Get(ref.String()) - log.Trace("netstore.has-with-callback.loadorstore", "ref", ref.String(), "loaded", loaded) + log.Trace("netstore.has-with-callback.loadorstore", "ref", ref.String(), "loaded", loaded, "self", n.localID.String()) if loaded { f = v.(*Fetcher) } else { diff --git a/storage/pushsync/protocol.go b/storage/pushsync/protocol.go new file mode 100644 index 0000000000..3285f84833 --- /dev/null +++ b/storage/pushsync/protocol.go @@ -0,0 +1,87 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pushsync + +import ( + "crypto/rand" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + pssChunkTopic = "PUSHSYNC_CHUNKS" // pss topic for chunks + pssReceiptTopic = "PUSHSYNC_RECEIPTS" // pss topic for statement of custody receipts +) + +// PubSub is a Postal Service interface needed to send/receive chunks and receipts for push syncing +type PubSub interface { + Register(topic string, prox bool, handler func(msg []byte, p *p2p.Peer) error) func() + Send(to []byte, topic string, msg []byte) error + BaseAddr() []byte +} + +// chunkMsg is the message construct to send chunks to their local neighbourhood +type chunkMsg struct { + Addr []byte // chunk address + Data []byte // chunk data + Origin []byte // originator - need this for sending receipt back to origin + Nonce []byte // nonce to make multiple instances of send immune to deduplication cache +} + +// receiptMsg is a statement of custody response to receiving a push-synced chunk +// it is currently a notification only (contains no proof) sent to the originator +// Nonce is there to make multiple responses immune to deduplication cache +type receiptMsg struct { + Addr []byte + Nonce []byte +} + +func decodeChunkMsg(msg []byte) (*chunkMsg, error) { + var chmsg chunkMsg + err := rlp.DecodeBytes(msg, &chmsg) + if err != nil { + return nil, err + } + return &chmsg, nil +} + +func decodeReceiptMsg(msg []byte) (*receiptMsg, error) { + var rmsg receiptMsg + err := rlp.DecodeBytes(msg, &rmsg) + if err != nil { + return nil, err + } + return &rmsg, nil +} + +// newNonce creates a random nonce; +// even without POC it is important otherwise resending a chunk is deduplicated by pss +func newNonce() []byte { + buf := make([]byte, 32) + t := 0 + for t < len(buf) { + n, _ := rand.Read(buf[t:]) + t += n + } + return buf +} + +func label(b []byte) string { + return hexutil.Encode(b[:8]) +} diff --git a/storage/pushsync/pusher.go b/storage/pushsync/pusher.go new file mode 100644 index 0000000000..1f1dbda53f --- /dev/null +++ b/storage/pushsync/pusher.go @@ -0,0 +1,277 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pushsync + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/storage" +) + +// DB interface implemented by localstore +type DB interface { + // subscribe to chunk to be push synced - iterates from earliest to newest + SubscribePush(context.Context) (<-chan storage.Chunk, func()) + // called to set a chunk as synced - and allow it to be garbage collected + // TODO this should take ... last argument to delete many in one batch + Set(context.Context, chunk.ModeSet, storage.Address) error +} + +// Pusher takes care of the push syncing +type Pusher struct { + store DB // localstore DB + tags *chunk.Tags // tags to update counts + quit chan struct{} // channel to signal quitting on all loops + pushed map[string]*pushedItem // cache of items push-synced + receipts chan chunk.Address // channel to receive receipts + ps PubSub // PubSub interface to send chunks and receive receipts +} + +var ( + retryInterval = 2 * time.Second // seconds to wait before retry sync +) + +// pushedItem captures the info needed for the pusher about a chunk during the +// push-sync--receipt roundtrip +type pushedItem struct { + tag *chunk.Tag // tag for the chunk + sentAt time.Time // most recently sent at time + synced bool // set when chunk got synced +} + +// NewPusher contructs a Pusher and starts up the push sync protocol +// takes +// - a DB interface to subscribe to push sync index to allow iterating over recently stored chunks +// - a pubsub interface to send chunks and receive statements of custody +// - tags that hold the several tag +func NewPusher(store DB, ps PubSub, tags *chunk.Tags) *Pusher { + p := &Pusher{ + store: store, + tags: tags, + quit: make(chan struct{}), + pushed: make(map[string]*pushedItem), + receipts: make(chan chunk.Address), + ps: ps, + } + go p.sync() + return p +} + +// Close closes the pusher +func (p *Pusher) Close() { + close(p.quit) +} + +// sync starts a forever loop that pushes chunks to their neighbourhood +// and receives receipts (statements of custody) for them. +// chunks that are not acknowledged with a receipt are retried +// not earlier than retryInterval after they were last pushed +// the routine also updates counts of states on a tag in order +// to monitor the proportion of saved, sent and synced chunks of +// a file or collection +func (p *Pusher) sync() { + var chunks <-chan chunk.Chunk + var cancel, stop func() + var ctx context.Context + var synced []storage.Address + + // timer + timer := time.NewTimer(0) + defer timer.Stop() + + // register handler for pssReceiptTopic on pss pubsub + deregister := p.ps.Register(pssReceiptTopic, false, func(msg []byte, _ *p2p.Peer) error { + return p.handleReceiptMsg(msg) + }) + defer deregister() + + chunksInBatch := -1 + var batchStartTime time.Time + + for { + select { + + // retry interval timer triggers starting from new + case <-timer.C: + metrics.GetOrRegisterCounter("pusher.subscribe-push", nil).Inc(1) + // TODO: implement some smart retry strategy relying on sent/synced ratio change + // if subscribe was running, stop it + if stop != nil { + stop() + } + for _, addr := range synced { + // set chunk status to synced, insert to db GC index + if err := p.store.Set(context.Background(), chunk.ModeSetSync, addr); err != nil { + log.Warn("error setting chunk to synced", "addr", addr, "err", err) + continue + } + delete(p.pushed, addr.Hex()) + } + // reset synced list + synced = nil + + // we don't want to record the first iteration + if chunksInBatch != -1 { + // this measurement is not a timer, but we want a histogram, so it fits the data structure + metrics.GetOrRegisterResettingTimer("pusher.subscribe-push.chunks-in-batch.hist", nil).Update(time.Duration(chunksInBatch)) + + metrics.GetOrRegisterResettingTimer("pusher.subscribe-push.chunks-in-batch.time", nil).UpdateSince(batchStartTime) + metrics.GetOrRegisterCounter("pusher.subscribe-push.chunks-in-batch", nil).Inc(int64(chunksInBatch)) + } + chunksInBatch = 0 + batchStartTime = time.Now() + + // and start iterating on Push index from the beginning + ctx, cancel = context.WithCancel(context.Background()) + chunks, stop = p.store.SubscribePush(ctx) + // reset timer to go off after retryInterval + timer.Reset(retryInterval) + + // handle incoming chunks + case ch, more := <-chunks: + chunksInBatch++ + // if no more, set to nil and wait for timer + if !more { + chunks = nil + continue + } + + metrics.GetOrRegisterCounter("pusher.send-chunk", nil).Inc(1) + // if no need to sync this chunk then continue + if !p.needToSync(ch) { + continue + } + + metrics.GetOrRegisterCounter("pusher.send-chunk.send-to-sync", nil).Inc(1) + // send the chunk and ignore the error + if err := p.sendChunkMsg(ch); err != nil { + log.Error("error sending chunk", "addr", ch.Address(), "err", err) + } + + // handle incoming receipts + case addr := <-p.receipts: + metrics.GetOrRegisterCounter("pusher.receipts.all", nil).Inc(1) + log.Debug("synced", "addr", addr) + // ignore if already received receipt + item, found := p.pushed[addr.Hex()] + if !found { + metrics.GetOrRegisterCounter("pusher.receipts.not-found", nil).Inc(1) + log.Debug("not wanted or already got... ignore", "addr", addr) + continue + } + if item.synced { + metrics.GetOrRegisterCounter("pusher.receipts.already-synced", nil).Inc(1) + log.Debug("just synced... ignore", "addr", addr) + continue + } + metrics.GetOrRegisterCounter("pusher.receipts.synced", nil).Inc(1) + // collect synced addresses + synced = append(synced, addr) + // set synced flag + item.synced = true + // increment synced count for the tag if exists + if item.tag != nil { + item.tag.Inc(chunk.StateSynced) + } + + case <-p.quit: + // if there was a subscription, cancel it + if cancel != nil { + cancel() + } + return + } + } +} + +// handleReceiptMsg is a handler for pssReceiptTopic that +// - deserialises receiptMsg and +// - sends the receipted address on a channel +func (p *Pusher) handleReceiptMsg(msg []byte) error { + receipt, err := decodeReceiptMsg(msg) + if err != nil { + return err + } + log.Debug("Handler", "receipt", label(receipt.Addr), "self", label(p.ps.BaseAddr())) + p.PushReceipt(receipt.Addr) + return nil +} + +// pushReceipt just inserts the address into the channel +// it is also called by the push sync Storer if the originator and storer identical +func (p *Pusher) PushReceipt(addr []byte) { + select { + case p.receipts <- addr: + case <-p.quit: + } +} + +// sendChunkMsg sends chunks to their destination +// using the PubSub interface Send method (e.g., pss neighbourhood addressing) +func (p *Pusher) sendChunkMsg(ch chunk.Chunk) error { + cmsg := &chunkMsg{ + Origin: p.ps.BaseAddr(), + Addr: ch.Address()[:], + Data: ch.Data(), + Nonce: newNonce(), + } + msg, err := rlp.EncodeToBytes(cmsg) + if err != nil { + return err + } + log.Debug("send chunk", "addr", label(ch.Address()), "self", label(p.ps.BaseAddr())) + return p.ps.Send(ch.Address()[:], pssChunkTopic, msg) +} + +// needToSync checks if a chunk needs to be push-synced: +// * if not sent yet OR +// * if sent but more then retryInterval ago, so need resend +func (p *Pusher) needToSync(ch chunk.Chunk) bool { + item, found := p.pushed[ch.Address().Hex()] + // has been pushed already + if found { + // has synced already since subscribe called + if item.synced { + return false + } + // too early to retry + if item.sentAt.Add(retryInterval).After(time.Now()) { + return false + } + // first time encountered + } else { + // remember item + tag, _ := p.tags.Get(ch.TagID()) + item = &pushedItem{ + tag: tag, + } + // increment SENT count on tag if it exists + if item.tag != nil { + item.tag.Inc(chunk.StateSent) + } + // remember the item + p.pushed[ch.Address().Hex()] = item + } + item.sentAt = time.Now() + return true +} diff --git a/storage/pushsync/pusher_test.go b/storage/pushsync/pusher_test.go new file mode 100644 index 0000000000..ba1d1ec31d --- /dev/null +++ b/storage/pushsync/pusher_test.go @@ -0,0 +1,414 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . +package pushsync + +import ( + "context" + "encoding/binary" + "encoding/hex" + "flag" + "fmt" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/storage" + colorable "github.com/mattn/go-colorable" +) + +var ( + loglevel = flag.Int("loglevel", 3, "verbosity of logs") +) + +func init() { + flag.Parse() + log.PrintOrigins(true) + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) +} + +// loopback implements PubSub as a central subscription engine, +// ie a msg sent is received by all handlers registered for the topic +type loopBack struct { + async bool + addr []byte + handlers map[string][]func(msg []byte, p *p2p.Peer) error +} + +func newLoopBack(async bool) *loopBack { + return &loopBack{ + async: async, + addr: make([]byte, 32), + handlers: make(map[string][]func(msg []byte, p *p2p.Peer) error), + } +} + +// Register subscribes to a topic with a handler +func (lb *loopBack) Register(topic string, _ bool, handler func(msg []byte, p *p2p.Peer) error) func() { + lb.handlers[topic] = append(lb.handlers[topic], handler) + return func() {} +} + +// Send publishes a msg with a topic and directly calls registered handlers with +// that topic +func (lb *loopBack) Send(to []byte, topic string, msg []byte) error { + if lb.async { + go lb.send(to, topic, msg) + return nil + } + return lb.send(to, topic, msg) +} + +func (lb *loopBack) send(to []byte, topic string, msg []byte) error { + p := p2p.NewPeer(enode.ID{}, "", nil) + for _, handler := range lb.handlers[topic] { + if err := handler(msg, p); err != nil { + return err + } + } + return nil +} + +// BaseAddr needed to implement PubSub interface +func (lb *loopBack) BaseAddr() []byte { + return lb.addr +} + +// testPushSyncIndex mocks localstore and provides subscription and setting synced status +// it implements the DB interface +type testPushSyncIndex struct { + i, total int + tagIDs []uint32 // + tags *chunk.Tags + sent *sync.Map // to store time of send for retry + synced chan int // to check if right amount of chunks +} + +func newTestPushSyncIndex(chunkCnt int, tagIDs []uint32, tags *chunk.Tags, sent *sync.Map) *testPushSyncIndex { + return &testPushSyncIndex{ + i: 0, + total: chunkCnt, + tagIDs: tagIDs, + tags: tags, + sent: sent, + synced: make(chan int), + } +} + +// SubscribePush allows iteration on the hashes and mocks the behaviour of localstore +// push index +// we keep track of an index so that each call to SubscribePush knows where to start +// generating the new fake hashes +// Before the new fake hashes it iterates over hashes not synced yet +func (t *testPushSyncIndex) SubscribePush(context.Context) (<-chan storage.Chunk, func()) { + chunks := make(chan storage.Chunk) + tagCnt := len(t.tagIDs) + quit := make(chan struct{}) + stop := func() { close(quit) } + go func() { + // feed fake chunks into the db, hashes encode the order so that + // it can be traced + feed := func(i int) bool { + // generate fake hashes that encode the chunk order + addr := make([]byte, 32) + binary.BigEndian.PutUint64(addr, uint64(i)) + // remember when the chunk was put + // if sent again, dont modify the time + t.sent.Store(i, time.Now()) + // increment stored count on tag + tagID := t.tagIDs[i%tagCnt] + if tag, _ := t.tags.Get(tagID); tag != nil { + tag.Inc(chunk.StateStored) + } + select { + // chunks have no data and belong to tag i%tagCount + case chunks <- storage.NewChunk(addr, nil).WithTagID(tagID): + return true + case <-quit: + return false + } + } + // push the chunks already pushed but not yet synced + t.sent.Range(func(k, _ interface{}) bool { + log.Debug("resending", "cur", k) + return feed(k.(int)) + }) + // generate the new chunks from t.i + for t.i < t.total && feed(t.i) { + t.i++ + } + + log.Debug("sent all chunks", "total", t.total) + close(chunks) + }() + return chunks, stop +} + +func (t *testPushSyncIndex) Set(ctx context.Context, _ chunk.ModeSet, addr storage.Address) error { + cur := int(binary.BigEndian.Uint64(addr[:8])) + t.sent.Delete(cur) + t.synced <- cur + log.Debug("set chunk synced", "cur", cur, "addr", addr) + return nil +} + +var ( + maxDelay = 210 // max delay in millisecond + minDelay = 1 // min delay in millisecond + retentionLimit = 200 // ~5% of msg lost +) + +// delayResponse when called mock connection/throughput +func delayResponse() bool { + delay := rand.Intn(maxDelay) + minDelay + time.Sleep(time.Duration(delay) * time.Millisecond) + return delay < retentionLimit +} + +// TestPusher tests the correct behaviour of Pusher +// in the context of inserting n chunks +// receipt response model: the pushed chunk's receipt is sent back +// after a random delay +// The test checks: +// - if sync function is called on chunks in order of insertion (FIFO) +// - repeated sending is attempted only if retryInterval time passed +// - already synced chunks are not resynced +// - if no more data inserted, the db is emptied shortly +func TestPusher(t *testing.T) { + timeout := 10 * time.Second + chunkCnt := 200 + tagCnt := 4 + + errc := make(chan error) + sent := &sync.Map{} + sendTimes := make(map[int]time.Time) + synced := make(map[int]int) + quit := make(chan struct{}) + defer close(quit) + + errf := func(s string, vals ...interface{}) { + select { + case errc <- fmt.Errorf(s, vals...): + case <-quit: + } + } + + ps := newLoopBack(false) + + max := 0 // the highest index sent so far + respond := func(msg []byte, _ *p2p.Peer) error { + chmsg, err := decodeChunkMsg(msg) + if err != nil { + errf("error decoding chunk message: %v", err) + return nil + } + // check outgoing chunk messages + cur := int(binary.BigEndian.Uint64(chmsg.Addr[:8])) + if cur > max { + errf("incorrect order of chunks from db chunk #%d before #%d", cur, max) + return nil + } + v, found := sent.Load(cur) + previouslySentAt, repeated := sendTimes[cur] + if !found { + if !repeated { + errf("chunk #%d not sent but received", cur) + } + return nil + } + sentAt := v.(time.Time) + if repeated { + // expect at least retryInterval since previous push + if expectedAt := previouslySentAt.Add(retryInterval); expectedAt.After(sentAt) { + errf("resync chunk #%d too early. previously sent at %v, next at %v < expected at %v", cur, previouslySentAt, sentAt, expectedAt) + return nil + } + } + // remember the latest time sent + sendTimes[cur] = sentAt + max++ + // respond ~ mock storer protocol + go func() { + receipt := &receiptMsg{Addr: chmsg.Addr} + rmsg, err := rlp.EncodeToBytes(receipt) + if err != nil { + errf("error encoding receipt message: %v", err) + } + log.Debug("chunk sent", "addr", hex.EncodeToString(receipt.Addr)) + // random delay to allow retries + if !delayResponse() { + log.Debug("chunk/receipt lost", "addr", hex.EncodeToString(receipt.Addr)) + return + } + log.Debug("store chunk, send receipt", "addr", hex.EncodeToString(receipt.Addr)) + err = ps.Send(chmsg.Origin, pssReceiptTopic, rmsg) + if err != nil { + errf("error sending receipt message: %v", err) + } + }() + return nil + } + // register the respond function + ps.Register(pssChunkTopic, false, respond) + tags, tagIDs := setupTags(chunkCnt, tagCnt) + // construct the mock push sync index iterator + tp := newTestPushSyncIndex(chunkCnt, tagIDs, tags, sent) + // start push syncing in a go routine + p := NewPusher(tp, ps, tags) + defer p.Close() + // collect synced chunks until all chunks synced + // wait on errc for errors on any thread + // otherwise time out + for { + select { + case i := <-tp.synced: + sent.Delete(i) + n := synced[i] + synced[i] = n + 1 + if len(synced) == chunkCnt { + expTotal := chunkCnt / tagCnt + checkTags(t, expTotal, tagIDs[:tagCnt-1], tags) + return + } + case err := <-errc: + if err != nil { + t.Fatal(err) + } + case <-time.After(timeout): + t.Fatalf("timeout waiting for all chunks to be synced") + } + } + +} + +// setupTags constructs tags object create tagCnt - 1 tags +// the sequential fake chunk i will be tagged with i%tagCnt +func setupTags(chunkCnt, tagCnt int) (tags *chunk.Tags, tagIDs []uint32) { + // construct tags object + tags = chunk.NewTags() + // all but one tag is created + for i := 0; i < tagCnt-1; i++ { + tags.New("", int64(chunkCnt/tagCnt)) + } + // extract tag ids + tags.Range(func(k, _ interface{}) bool { + tagIDs = append(tagIDs, k.(uint32)) + return true + }) + // add an extra for which no tag exists + return tags, append(tagIDs, 0) +} + +func checkTags(t *testing.T, expTotal int, tagIDs []uint32, tags *chunk.Tags) { + for _, tagID := range tagIDs { + tag, err := tags.Get(tagID) + if err != nil { + t.Fatalf("expected no error getting tag '%v', got %v", tagID, err) + } + n, total, err := tag.Status(chunk.StateSent) + if err != nil { + t.Fatalf("getting status for tag '%v', expected no error, got %v", tagID, err) + } + if int(n) != expTotal { + t.Fatalf("expected Sent count on tag '%v' to be %v, got %v", tagID, expTotal, n) + } + if int(total) != expTotal { + t.Fatalf("expected Sent count on tag '%v' to be %v, got %v", tagID, expTotal, n) + } + n, total, err = tag.Status(chunk.StateSynced) + if err != nil { + t.Fatalf("getting status for tag '%v', expected no error, got %v", tagID, err) + } + if int(n) != expTotal { + t.Fatalf("expected Sent count on tag '%v' to be %v, got %v", tagID, expTotal, n) + } + if int(total) != expTotal { + t.Fatalf("expected Sent count on tag '%v' to be %v, got %v", tagID, expTotal, n) + } + } +} + +type testStore struct { + store *sync.Map +} + +func (t *testStore) Put(_ context.Context, _ chunk.ModePut, ch chunk.Chunk) (bool, error) { + cur := binary.BigEndian.Uint64(ch.Address()[:8]) + var storedCnt uint32 = 1 + v, loaded := t.store.LoadOrStore(cur, &storedCnt) + if loaded { + atomic.AddUint32(v.(*uint32), 1) + } + return false, nil +} + +// TestPushSyncAndStoreWithLoopbackPubSub tests the push sync protocol +// push syncer node communicate with storers via mock PubSub +func TestPushSyncAndStoreWithLoopbackPubSub(t *testing.T) { + timeout := 10 * time.Second + chunkCnt := 2000 + tagCnt := 4 + storerCnt := 3 + sent := &sync.Map{} + store := &sync.Map{} + // mock pubsub messenger + ps := newLoopBack(true) + + tags, tagIDs := setupTags(chunkCnt, tagCnt) + // construct the mock push sync index iterator + tp := newTestPushSyncIndex(chunkCnt, tagIDs, tags, sent) + // start push syncing in a go routine + p := NewPusher(tp, ps, tags) + defer p.Close() + + // set up a number of storers + storers := make([]*Storer, storerCnt) + for i := 0; i < storerCnt; i++ { + storers[i] = NewStorer(&testStore{store}, ps, p.PushReceipt) + } + + synced := 0 + for { + select { + case i := <-tp.synced: + synced++ + sent.Delete(i) + if synced == chunkCnt { + expTotal := chunkCnt / tagCnt + checkTags(t, expTotal, tagIDs[:tagCnt-1], tags) + for i := uint64(0); i < uint64(chunkCnt); i++ { + v, ok := store.Load(i) + if !ok { + t.Fatalf("chunk %v not stored", i) + } + if cnt := *(v.(*uint32)); cnt != uint32(storerCnt) { + t.Fatalf("chunk %v expected to be saved %v times, got %v", i, storerCnt, cnt) + } + } + return + } + case <-time.After(timeout): + t.Fatalf("timeout waiting for all chunks to be synced") + } + } + +} diff --git a/storage/pushsync/pushsync_simulation_test.go b/storage/pushsync/pushsync_simulation_test.go new file mode 100644 index 0000000000..eda7a870d8 --- /dev/null +++ b/storage/pushsync/pushsync_simulation_test.go @@ -0,0 +1,273 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pushsync + +import ( + "context" + "fmt" + "io/ioutil" + "math/rand" + "os" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/network" + "github.com/ethersphere/swarm/network/simulation" + "github.com/ethersphere/swarm/network/stream" + "github.com/ethersphere/swarm/pss" + "github.com/ethersphere/swarm/state" + "github.com/ethersphere/swarm/storage" + "github.com/ethersphere/swarm/storage/localstore" +) + +var ( + bucketKeyPushSyncer = simulation.BucketKey("pushsyncer") + bucketKeyNetStore = simulation.BucketKey("netstore") +) + +// test syncer using pss +// the test +// * creates a simulation with connectivity loaded from a snapshot +// * for each trial, two nodes are chosen randomly, an uploader and a downloader +// * uploader uploads a number of chunks +// * wait until the uploaded chunks are synced +// * downloader downloads the chunk +// Trials are run concurrently +func TestPushSyncSimulation(t *testing.T) { + nodeCnt := 64 + chunkCnt := 32 + trials := 32 + testSyncerWithPubSub(t, nodeCnt, chunkCnt, trials, newServiceFunc) +} + +func testSyncerWithPubSub(t *testing.T, nodeCnt, chunkCnt, trials int, sf simulation.ServiceFunc) { + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": sf, + }) + defer sim.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + err := sim.UploadSnapshot(ctx, fmt.Sprintf("../../network/stream/testing/snapshot_%d.json", nodeCnt)) + if err != nil { + t.Fatal(err) + } + log.Info("Snapshot loaded") + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + errc := make(chan error) + for i := 0; i < trials; i++ { + go uploadAndDownload(ctx, sim, errc, nodeCnt, chunkCnt, i) + } + i := 0 + for err := range errc { + if err != nil { + return err + } + i++ + if i >= trials { + break + } + } + return nil + }) + if result.Error != nil { + t.Fatalf("simulation error: %v", result.Error) + } +} + +// pickNodes selects 2 distinct +func pickNodes(n int) (i, j int) { + i = rand.Intn(n) + j = rand.Intn(n - 1) + if j >= i { + j++ + } + return +} + +func uploadAndDownload(ctx context.Context, sim *simulation.Simulation, errc chan error, nodeCnt, chunkCnt, i int) { + // chose 2 random nodes as uploader and downloader + u, d := pickNodes(nodeCnt) + // setup uploader node + uid := sim.UpNodeIDs()[u] + val, _ := sim.NodeItem(uid, bucketKeyPushSyncer) + p := val.(*Pusher) + // setup downloader node + did := sim.UpNodeIDs()[d] + // the created tag indicates the uploader and downloader nodes + tagname := fmt.Sprintf("tag-%v-%v-%d", label(uid[:]), label(did[:]), i) + log.Debug("uploading", "peer", uid, "chunks", chunkCnt, "tagname", tagname) + tag, what, err := upload(ctx, p.store.(*localstore.DB), p.tags, tagname, chunkCnt) + if err != nil { + select { + case errc <- err: + case <-ctx.Done(): + return + } + return + } + + // wait till synced + for { + n, total, err := tag.Status(chunk.StateSynced) + if err == nil && n == total { + break + } + time.Sleep(100 * time.Millisecond) + } + + log.Debug("synced", "peer", uid, "chunks", chunkCnt, "tagname", tagname) + log.Debug("downloading", "peer", did, "chunks", chunkCnt, "tagname", tagname) + val, _ = sim.NodeItem(did, bucketKeyNetStore) + netstore := val.(*storage.NetStore) + select { + case errc <- download(ctx, netstore, what): + case <-ctx.Done(): + } + log.Debug("downloaded", "peer", did, "chunks", chunkCnt, "tagname", tagname) +} + +// newServiceFunc constructs a minimal service needed for a simulation test for Push Sync, namely: +// localstore, netstore, stream and pss +func newServiceFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) { + // setup localstore + n := ctx.Config.Node() + addr := network.NewAddr(n) + dir, err := ioutil.TempDir("", "localstore-test") + if err != nil { + return nil, nil, err + } + lstore, err := localstore.New(dir, addr.Over(), nil) + if err != nil { + os.RemoveAll(dir) + return nil, nil, err + } + + // setup pss + kadParams := network.NewKadParams() + kad := network.NewKademlia(addr.Over(), kadParams) + bucket.Store(simulation.BucketKeyKademlia, kad) + + privKey, err := crypto.GenerateKey() + pssp := pss.NewPssParams().WithPrivateKey(privKey) + ps, err := pss.NewPss(kad, pssp) + if err != nil { + return nil, nil, err + } + // setup netstore + netStore := storage.NewNetStore(lstore, enode.HexID(hexutil.Encode(kad.BaseAddr()))) + // streamer + delivery := stream.NewDelivery(kad, netStore) + netStore.RemoteGet = delivery.RequestFromPeers + + bucket.Store(bucketKeyNetStore, netStore) + + noSyncing := &stream.RegistryOptions{Syncing: stream.SyncingDisabled, SyncUpdateDelay: 50 * time.Millisecond} + r := stream.NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), noSyncing, nil) + + pubSub := pss.NewPubSub(ps) + + // set up syncer + p := NewPusher(lstore, pubSub, chunk.NewTags()) + bucket.Store(bucketKeyPushSyncer, p) + + // setup storer + s := NewStorer(netStore, pubSub, p.PushReceipt) + + cleanup := func() { + p.Close() + s.Close() + netStore.Close() + r.Close() + os.RemoveAll(dir) + } + + return &StreamerAndPss{r, ps}, cleanup, nil +} + +// implements the node.Service interface +type StreamerAndPss struct { + *stream.Registry + pss *pss.Pss +} + +func (s *StreamerAndPss) Protocols() []p2p.Protocol { + return append(s.Registry.Protocols(), s.pss.Protocols()...) +} + +func (s *StreamerAndPss) Start(srv *p2p.Server) error { + err := s.Registry.Start(srv) + if err != nil { + return err + } + return s.pss.Start(srv) +} + +func (s *StreamerAndPss) Stop() error { + err := s.Registry.Stop() + if err != nil { + return err + } + return s.pss.Stop() +} + +func upload(ctx context.Context, store Store, tags *chunk.Tags, tagname string, n int) (tag *chunk.Tag, addrs []storage.Address, err error) { + tag, err = tags.New(tagname, int64(n)) + if err != nil { + return nil, nil, err + } + for i := 0; i < n; i++ { + ch := storage.GenerateRandomChunk(int64(chunk.DefaultSize)) + addrs = append(addrs, ch.Address()) + store.Put(ctx, chunk.ModePutUpload, ch.WithTagID(tag.Uid)) + tag.Inc(chunk.StateStored) + } + return tag, addrs, nil +} + +func download(ctx context.Context, store *storage.NetStore, addrs []storage.Address) error { + errc := make(chan error) + for _, addr := range addrs { + go func(addr storage.Address) { + _, err := store.Get(ctx, chunk.ModeGetRequest, storage.NewRequest(addr)) + select { + case errc <- err: + case <-ctx.Done(): + } + }(addr) + } + i := 0 + for err := range errc { + if err != nil { + return err + } + i++ + if i == len(addrs) { + break + } + } + return nil +} diff --git a/storage/pushsync/storer.go b/storage/pushsync/storer.go new file mode 100644 index 0000000000..c0a966ac8b --- /dev/null +++ b/storage/pushsync/storer.go @@ -0,0 +1,120 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pushsync + +import ( + "bytes" + "context" + "encoding/hex" + "fmt" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/storage" +) + +// Store is the storage interface to save chunks +// NetStore implements this interface +type Store interface { + Put(context.Context, chunk.ModePut, chunk.Chunk) (bool, error) +} + +// Storer is the object used by the push-sync server side protocol +type Storer struct { + store Store // store to put chunks in, and retrieve them + ps PubSub // pubsub interface to receive chunks and send receipts + deregister func() // deregister the registered handler when Storer is closed + pushReceipt func(addr []byte) // to be called... +} + +// NewStorer constructs a Storer +// Storer run storer nodes to handle the reception of push-synced chunks +// that fall within their area of responsibility. +// The protocol makes sure that +// - the chunks are stored and synced to their nearest neighbours and +// - a statement of custody receipt is sent as a response to the originator +// it sets a cancel function that deregisters the handler +func NewStorer(store Store, ps PubSub, pushReceipt func(addr []byte)) *Storer { + s := &Storer{ + store: store, + ps: ps, + pushReceipt: pushReceipt, + } + s.deregister = ps.Register(pssChunkTopic, true, func(msg []byte, _ *p2p.Peer) error { + return s.handleChunkMsg(msg) + }) + return s +} + +// Close needs to be called to deregister the handler +func (s *Storer) Close() { + s.deregister() +} + +// handleChunkMsg is called by the pss dispatcher on pssChunkTopic msgs +// - deserialises chunkMsg and +// - calls storer.processChunkMsg function +func (s *Storer) handleChunkMsg(msg []byte) error { + chmsg, err := decodeChunkMsg(msg) + if err != nil { + return err + } + log.Debug("Handler", "chunk", label(chmsg.Addr), "origin", label(chmsg.Origin), "self", fmt.Sprintf("%x", s.ps.BaseAddr())) + return s.processChunkMsg(chmsg) +} + +// processChunkMsg processes a chunk received via pss pssChunkTopic +// these chunk messages are sent to their address as destination +// using neighbourhood addressing. Therefore nodes only handle +// chunks that fall within their area of responsibility. +// Upon receiving the chunk is saved and a statement of custody +// receipt message is sent as a response to the originator. +func (s *Storer) processChunkMsg(chmsg *chunkMsg) error { + // TODO: double check if it falls in area of responsibility + ch := storage.NewChunk(chmsg.Addr, chmsg.Data) + if _, err := s.store.Put(context.TODO(), chunk.ModePutSync, ch); err != nil { + return err + } + log.Debug("push sync storer", "addr", label(chmsg.Addr), "to", label(chmsg.Origin), "self", hex.EncodeToString(s.ps.BaseAddr())) + // TODO: check if originator or relayer is a nearest neighbour then return + // otherwise send back receipt + return s.sendReceiptMsg(chmsg) +} + +// sendReceiptMsg sends a statement of custody receipt message +// to the originator of a push-synced chunk message. +// Including a unique nonce makes the receipt immune to deduplication cache +func (s *Storer) sendReceiptMsg(chmsg *chunkMsg) error { + // if origin is self, use direct channel, no pubsub send needed + if bytes.Equal(chmsg.Origin, s.ps.BaseAddr()) { + go s.pushReceipt(chmsg.Addr) + return nil + } + rmsg := &receiptMsg{ + Addr: chmsg.Addr, + Nonce: newNonce(), + } + msg, err := rlp.EncodeToBytes(rmsg) + if err != nil { + return err + } + to := chmsg.Origin + log.Debug("send receipt", "addr", label(chmsg.Addr), "to", label(to), "self", hex.EncodeToString(s.ps.BaseAddr())) + return s.ps.Send(to, pssReceiptTopic, msg) +} diff --git a/swarm.go b/swarm.go index d6f40beed3..6f86d683f7 100644 --- a/swarm.go +++ b/swarm.go @@ -30,11 +30,6 @@ import ( "time" "unicode" - "github.com/ethersphere/swarm/chunk" - - "github.com/ethersphere/swarm/storage/feed" - "github.com/ethersphere/swarm/storage/localstore" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" @@ -44,6 +39,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/ethersphere/swarm/api" httpapi "github.com/ethersphere/swarm/api/http" + "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/contracts/chequebook" "github.com/ethersphere/swarm/contracts/ens" "github.com/ethersphere/swarm/fuse" @@ -54,7 +50,10 @@ import ( "github.com/ethersphere/swarm/pss" "github.com/ethersphere/swarm/state" "github.com/ethersphere/swarm/storage" + "github.com/ethersphere/swarm/storage/feed" + "github.com/ethersphere/swarm/storage/localstore" "github.com/ethersphere/swarm/storage/mock" + "github.com/ethersphere/swarm/storage/pushsync" "github.com/ethersphere/swarm/swap" "github.com/ethersphere/swarm/tracing" ) @@ -79,6 +78,8 @@ type Swarm struct { netStore *storage.NetStore sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit ps *pss.Pss + pushSync *pushsync.Pusher + storer *pushsync.Storer swap *swap.Swap stateStore *state.DBStore accountingMetrics *protocols.AccountingMetrics @@ -225,6 +226,10 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e pss.SetHandshakeController(self.ps, pss.NewHandshakeParams()) } + pubsub := pss.NewPubSub(self.ps) + self.pushSync = pushsync.NewPusher(localStore, pubsub, tags) + self.storer = pushsync.NewStorer(self.netStore, pubsub, self.pushSync.PushReceipt) + self.api = api.NewAPI(self.fileStore, self.dns, feedsHandler, self.privateKey, tags) self.sfs = fuse.NewSwarmFS(self.api) @@ -454,6 +459,9 @@ func (s *Swarm) Stop() error { s.stateStore.Close() } + s.pushSync.Close() + s.storer.Close() + for _, cleanF := range s.cleanupFuncs { err = cleanF() if err != nil {