diff --git a/api/http/server.go b/api/http/server.go
index e13fb11b42..8d8c78f9ea 100644
--- a/api/http/server.go
+++ b/api/http/server.go
@@ -42,8 +42,11 @@ import (
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/sctx"
+ "github.com/ethersphere/swarm/spancontext"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/storage/feed"
+ "github.com/opentracing/opentracing-go"
+ olog "github.com/opentracing/opentracing-go/log"
"github.com/rs/cors"
)
@@ -300,6 +303,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Debug("handle.post.files", "ruid", ruid)
postFilesCount.Inc(1)
+ ctx, sp := spancontext.StartSpan(r.Context(), "handle.post.files")
+ defer sp.Finish()
+
+ quitChan := make(chan struct{})
+ defer close(quitChan)
+
+ // periodically monitor the tag for this upload and log its state to the `handle.post.files` span
+ go periodicTagTrace(s.api.Tags, sctx.GetTag(ctx), quitChan, sp)
+
contentType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
postFilesFail.Inc(1)
@@ -315,7 +327,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
var addr storage.Address
if uri.Addr != "" && uri.Addr != "encrypt" {
- addr, err = s.api.Resolve(r.Context(), uri.Addr)
+ addr, err = s.api.Resolve(ctx, uri.Addr)
if err != nil {
postFilesFail.Inc(1)
respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusInternalServerError)
@@ -323,7 +335,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
}
log.Debug("resolved key", "ruid", ruid, "key", addr)
} else {
- addr, err = s.api.NewManifest(r.Context(), toEncrypt)
+ addr, err = s.api.NewManifest(ctx, toEncrypt)
if err != nil {
postFilesFail.Inc(1)
respondError(w, r, err.Error(), http.StatusInternalServerError)
@@ -331,7 +343,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
}
log.Debug("new manifest", "ruid", ruid, "key", addr)
}
- newAddr, err := s.api.UpdateManifest(r.Context(), addr, func(mw *api.ManifestWriter) error {
+ newAddr, err := s.api.UpdateManifest(ctx, addr, func(mw *api.ManifestWriter) error {
switch contentType {
case "application/x-tar":
_, err := s.handleTarUpload(r, mw)
@@ -935,3 +947,28 @@ func (lrw *loggingResponseWriter) WriteHeader(code int) {
func isDecryptError(err error) bool {
return strings.Contains(err.Error(), api.ErrDecrypt.Error())
}
+
+// periodicTagTrace queries the tag every 2 seconds and logs its state to the span
+func periodicTagTrace(tags *chunk.Tags, tagUid uint32, q chan struct{}, sp opentracing.Span) {
+ f := func() {
+ tag, err := tags.Get(tagUid)
+ if err != nil {
+ log.Error("error while getting tag", "tagUid", tagUid, "err", err)
+ }
+
+ sp.LogFields(olog.String("tag state", fmt.Sprintf("split=%d stored=%d seen=%d synced=%d", tag.Get(chunk.StateSplit), tag.Get(chunk.StateStored), tag.Get(chunk.StateSeen), tag.Get(chunk.StateSynced))))
+ }
+
+ for {
+ select {
+ case <-q:
+ f()
+
+ return
+ default:
+ f()
+
+ time.Sleep(2 * time.Second)
+ }
+ }
+}
diff --git a/chunk/chunk.go b/chunk/chunk.go
index c44292bb92..c14572333a 100644
--- a/chunk/chunk.go
+++ b/chunk/chunk.go
@@ -38,11 +38,14 @@ var (
type Chunk interface {
Address() Address
Data() []byte
+ TagID() uint32
+ WithTagID(t uint32) Chunk
}
type chunk struct {
addr Address
sdata []byte
+ tagID uint32
}
func NewChunk(addr Address, data []byte) Chunk {
@@ -52,6 +55,11 @@ func NewChunk(addr Address, data []byte) Chunk {
}
}
+func (c *chunk) WithTagID(t uint32) Chunk {
+ c.tagID = t
+ return c
+}
+
func (c *chunk) Address() Address {
return c.addr
}
@@ -60,6 +68,10 @@ func (c *chunk) Data() []byte {
return c.sdata
}
+func (c *chunk) TagID() uint32 {
+ return c.tagID
+}
+
func (self *chunk) String() string {
return fmt.Sprintf("Address: %v Chunksize: %v", self.addr.Log(), len(self.sdata))
}
diff --git a/chunk/tags.go b/chunk/tags.go
index aa88b5eddd..fc660db4d7 100644
--- a/chunk/tags.go
+++ b/chunk/tags.go
@@ -29,14 +29,12 @@ import (
// Tags hold tag information indexed by a unique random uint32
type Tags struct {
tags *sync.Map
- rng *rand.Rand
}
// NewTags creates a tags object
func NewTags() *Tags {
return &Tags{
tags: &sync.Map{},
- rng: rand.New(rand.NewSource(time.Now().Unix())),
}
}
@@ -44,7 +42,7 @@ func NewTags() *Tags {
// it returns an error if the tag with this name already exists
func (ts *Tags) New(s string, total int64) (*Tag, error) {
t := &Tag{
- Uid: ts.rng.Uint32(),
+ Uid: uint32(rand.Int31()),
Name: s,
startedAt: time.Now(),
total: total,
diff --git a/network/stream/delivery.go b/network/stream/delivery.go
index 794f9c9f03..bd74738c8c 100644
--- a/network/stream/delivery.go
+++ b/network/stream/delivery.go
@@ -22,6 +22,7 @@ import (
"fmt"
"time"
+ "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethersphere/swarm/chunk"
@@ -243,26 +244,26 @@ func (d *Delivery) FindPeer(ctx context.Context, req *storage.Request) (*Peer, e
// skip peers that we have already tried
if req.SkipPeer(id.String()) {
- log.Trace("findpeer skip peer", "peer", id, "ref", req.Addr.String())
+ log.Trace("findpeer skip peer", "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String())
return true
}
if myPo < depth { // chunk is NOT within the neighbourhood
if po <= myPo { // always choose a peer strictly closer to chunk than us
- log.Trace("findpeer1a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+ log.Trace("findpeer1a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String())
return false
} else {
- log.Trace("findpeer1b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+ log.Trace("findpeer1b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String())
}
} else { // chunk IS WITHIN neighbourhood
if po < depth { // do not select peer outside the neighbourhood. But allows peers further from the chunk than us
- log.Trace("findpeer2a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+ log.Trace("findpeer2a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String())
return false
} else if po <= originPo { // avoid loop in neighbourhood, so not forward when a request comes from the neighbourhood
- log.Trace("findpeer2b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+ log.Trace("findpeer2b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String())
return false
} else {
- log.Trace("findpeer2c", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+ log.Trace("findpeer2c", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String())
}
}
diff --git a/network/stream/syncer.go b/network/stream/syncer.go
index 6fc1202ad3..723d300ae3 100644
--- a/network/stream/syncer.go
+++ b/network/stream/syncer.go
@@ -128,12 +128,6 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
// This is the most naive approach to label the chunk as synced
// allowing it to be garbage collected. A proper way requires
// validating that the chunk is successfully stored by the peer.
- err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address)
- if err != nil {
- metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1)
- log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err)
- return nil, 0, 0, err
- }
batchSize++
if batchStartID == nil {
// set batch start id only if
diff --git a/pss/pss.go b/pss/pss.go
index bf45444858..d0997ab1f2 100644
--- a/pss/pss.go
+++ b/pss/pss.go
@@ -686,7 +686,7 @@ func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool {
}
}
if !isPssEnabled {
- log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
+ log.Warn("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps, "peer", fmt.Sprintf("%x", sp.BzzAddr.Address()))
return false
}
diff --git a/pss/pubsub.go b/pss/pubsub.go
new file mode 100644
index 0000000000..03ebf9ee5f
--- /dev/null
+++ b/pss/pubsub.go
@@ -0,0 +1,55 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package pss
+
+import "github.com/ethereum/go-ethereum/p2p"
+
+// PubSub implements the pushsync.PubSub interface using pss
+type PubSub struct {
+ pss *Pss
+}
+
+// NewPubSub creates a new PubSub
+func NewPubSub(p *Pss) *PubSub {
+ return &PubSub{
+ pss: p,
+ }
+}
+
+// BaseAddr returns Kademlia base address
+func (p *PubSub) BaseAddr() []byte {
+ return p.pss.Kademlia.BaseAddr()
+}
+
+// Register registers a handler
+func (p *PubSub) Register(topic string, prox bool, handler func(msg []byte, p *p2p.Peer) error) func() {
+ f := func(msg []byte, peer *p2p.Peer, _ bool, _ string) error {
+ return handler(msg, peer)
+ }
+ h := NewHandler(f).WithRaw()
+ if prox {
+ h = h.WithProxBin()
+ }
+ pt := BytesToTopic([]byte(topic))
+ return p.pss.Register(&pt, h)
+}
+
+// Send sends a message using pss SendRaw
+func (p *PubSub) Send(to []byte, topic string, msg []byte) error {
+ pt := BytesToTopic([]byte(topic))
+ return p.pss.SendRaw(PssAddress(to), pt, msg)
+}
diff --git a/shed/index.go b/shed/index.go
index 38afbce4ca..e9a7d0308c 100644
--- a/shed/index.go
+++ b/shed/index.go
@@ -41,6 +41,7 @@ type Item struct {
AccessTimestamp int64
StoreTimestamp int64
BinID uint64
+ Tag uint32
}
// Merge is a helper method to construct a new
@@ -62,6 +63,9 @@ func (i Item) Merge(i2 Item) (new Item) {
if i.BinID == 0 {
i.BinID = i2.BinID
}
+ if i.Tag == 0 {
+ i.Tag = i2.Tag
+ }
return i
}
diff --git a/storage/hasherstore.go b/storage/hasherstore.go
index 0ac8cc408d..50f6278155 100644
--- a/storage/hasherstore.go
+++ b/storage/hasherstore.go
@@ -183,7 +183,7 @@ func (h *hasherStore) createHash(chunkData ChunkData) Address {
func (h *hasherStore) createChunk(chunkData ChunkData) Chunk {
hash := h.createHash(chunkData)
- chunk := NewChunk(hash, chunkData)
+ chunk := NewChunk(hash, chunkData).WithTagID(h.tag.Uid)
return chunk
}
diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go
index 265cd045ed..09f6ec7e5c 100644
--- a/storage/localstore/localstore.go
+++ b/storage/localstore/localstore.go
@@ -1,4 +1,4 @@
-// Copyright 2018 The go-ethereum Authors
+// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
@@ -300,9 +300,12 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
- return nil, nil
+ tag := make([]byte, 4)
+ binary.BigEndian.PutUint32(tag, fields.Tag)
+ return tag, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
+ e.Tag = binary.BigEndian.Uint32(value)
return e, nil
},
})
@@ -367,6 +370,7 @@ func chunkToItem(ch chunk.Chunk) shed.Item {
return shed.Item{
Address: ch.Address(),
Data: ch.Data(),
+ Tag: ch.TagID(),
}
}
diff --git a/storage/localstore/subscription_push.go b/storage/localstore/subscription_push.go
index c8dd5cf215..94da228d30 100644
--- a/storage/localstore/subscription_push.go
+++ b/storage/localstore/subscription_push.go
@@ -75,7 +75,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
}
select {
- case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data):
+ case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data).WithTagID(dataItem.Tag):
count++
// set next iteration start item
// when its chunk is successfully sent to channel
diff --git a/storage/netstore.go b/storage/netstore.go
index 8f92d7aefd..6b2978db04 100644
--- a/storage/netstore.go
+++ b/storage/netstore.go
@@ -23,14 +23,13 @@ import (
"sync"
"time"
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network/timeouts"
"github.com/ethersphere/swarm/spancontext"
lru "github.com/hashicorp/golang-lru"
-
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/p2p/enode"
olog "github.com/opentracing/opentracing-go/log"
"github.com/syndtr/goleveldb/leveldb"
"golang.org/x/sync/singleflight"
@@ -107,7 +106,7 @@ func (n *NetStore) Put(ctx context.Context, mode chunk.ModePut, ch Chunk) (bool,
n.putMu.Lock()
defer n.putMu.Unlock()
- log.Trace("netstore.put", "ref", ch.Address().String(), "mode", mode)
+ log.Trace("netstore.put", "ref", ch.Address().String(), "self", n.localID.String(), "mode", mode)
// put the chunk to the localstore, there should be no error
exists, err := n.Store.Put(ctx, mode, ch)
@@ -151,7 +150,7 @@ func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (C
ref := req.Addr
- log.Trace("netstore.get", "ref", ref.String())
+ log.Trace("netstore.get", "ref", ref.String(), "self", n.localID.String())
ch, err := n.Store.Get(ctx, mode, ref)
if err != nil {
@@ -160,7 +159,7 @@ func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (C
log.Error("localstore get error", "err", err)
}
- log.Trace("netstore.chunk-not-in-localstore", "ref", ref.String())
+ log.Trace("netstore.chunk-not-in-localstore", "ref", ref.String(), "self", n.localID.String())
v, err, _ := n.requestGroup.Do(ref.String(), func() (interface{}, error) {
// currently we issue a retrieve request if a fetcher
@@ -198,7 +197,7 @@ func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (C
c := v.(Chunk)
- log.Trace("netstore.singleflight returned", "ref", ref.String(), "err", err)
+ log.Trace("netstore.singleflight returned", "ref", ref.String(), "self", n.localID.String(), "err", err)
return c, nil
}
@@ -211,7 +210,7 @@ func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (C
return ch, nil
}
-// RemoteFetch is handling the retry mechanism when making a chunk request to our peers.
+// RemoteFetch is handling the retry mechanism when making a chunk request to ou r peers.
// For a given chunk Request, we call RemoteGet, which selects the next eligible peer and
// issues a RetrieveRequest and we wait for a delivery. If a delivery doesn't arrive within the SearchTimeout
// we retry.
@@ -237,13 +236,11 @@ func (n *NetStore) RemoteFetch(ctx context.Context, req *Request, fi *Fetcher) e
log.Trace(err.Error(), "ref", ref)
osp.LogFields(olog.String("err", err.Error()))
osp.Finish()
- return ErrNoSuitablePeer
+ } else {
+ // add peer to the set of peers to skip from now
+ log.Trace("remote.fetch, adding peer to skip", "ref", ref, "peer", currentPeer.String())
+ req.PeersToSkip.Store(currentPeer.String(), time.Now())
}
-
- // add peer to the set of peers to skip from now
- log.Trace("remote.fetch, adding peer to skip", "ref", ref, "peer", currentPeer.String())
- req.PeersToSkip.Store(currentPeer.String(), time.Now())
-
select {
case <-fi.Delivered:
log.Trace("remote.fetch, chunk delivered", "ref", ref)
@@ -256,7 +253,6 @@ func (n *NetStore) RemoteFetch(ctx context.Context, req *Request, fi *Fetcher) e
osp.LogFields(olog.Bool("timeout", true))
osp.Finish()
- break
case <-ctx.Done(): // global fetcher timeout
log.Trace("remote.fetch, fail", "ref", ref)
metrics.GetOrRegisterCounter("remote.fetch.timeout.global", nil).Inc(1)
@@ -290,7 +286,7 @@ func (n *NetStore) GetOrCreateFetcher(ctx context.Context, ref Address, interest
f = NewFetcher()
v, loaded := n.fetchers.Get(ref.String())
- log.Trace("netstore.has-with-callback.loadorstore", "ref", ref.String(), "loaded", loaded)
+ log.Trace("netstore.has-with-callback.loadorstore", "ref", ref.String(), "loaded", loaded, "self", n.localID.String())
if loaded {
f = v.(*Fetcher)
} else {
diff --git a/storage/pushsync/protocol.go b/storage/pushsync/protocol.go
new file mode 100644
index 0000000000..3285f84833
--- /dev/null
+++ b/storage/pushsync/protocol.go
@@ -0,0 +1,87 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package pushsync
+
+import (
+ "crypto/rand"
+
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+const (
+ pssChunkTopic = "PUSHSYNC_CHUNKS" // pss topic for chunks
+ pssReceiptTopic = "PUSHSYNC_RECEIPTS" // pss topic for statement of custody receipts
+)
+
+// PubSub is a Postal Service interface needed to send/receive chunks and receipts for push syncing
+type PubSub interface {
+ Register(topic string, prox bool, handler func(msg []byte, p *p2p.Peer) error) func()
+ Send(to []byte, topic string, msg []byte) error
+ BaseAddr() []byte
+}
+
+// chunkMsg is the message construct to send chunks to their local neighbourhood
+type chunkMsg struct {
+ Addr []byte // chunk address
+ Data []byte // chunk data
+ Origin []byte // originator - need this for sending receipt back to origin
+ Nonce []byte // nonce to make multiple instances of send immune to deduplication cache
+}
+
+// receiptMsg is a statement of custody response to receiving a push-synced chunk
+// it is currently a notification only (contains no proof) sent to the originator
+// Nonce is there to make multiple responses immune to deduplication cache
+type receiptMsg struct {
+ Addr []byte
+ Nonce []byte
+}
+
+func decodeChunkMsg(msg []byte) (*chunkMsg, error) {
+ var chmsg chunkMsg
+ err := rlp.DecodeBytes(msg, &chmsg)
+ if err != nil {
+ return nil, err
+ }
+ return &chmsg, nil
+}
+
+func decodeReceiptMsg(msg []byte) (*receiptMsg, error) {
+ var rmsg receiptMsg
+ err := rlp.DecodeBytes(msg, &rmsg)
+ if err != nil {
+ return nil, err
+ }
+ return &rmsg, nil
+}
+
+// newNonce creates a random nonce;
+// even without POC it is important otherwise resending a chunk is deduplicated by pss
+func newNonce() []byte {
+ buf := make([]byte, 32)
+ t := 0
+ for t < len(buf) {
+ n, _ := rand.Read(buf[t:])
+ t += n
+ }
+ return buf
+}
+
+func label(b []byte) string {
+ return hexutil.Encode(b[:8])
+}
diff --git a/storage/pushsync/pusher.go b/storage/pushsync/pusher.go
new file mode 100644
index 0000000000..1f1dbda53f
--- /dev/null
+++ b/storage/pushsync/pusher.go
@@ -0,0 +1,277 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package pushsync
+
+import (
+ "context"
+ "time"
+
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethersphere/swarm/chunk"
+ "github.com/ethersphere/swarm/log"
+ "github.com/ethersphere/swarm/storage"
+)
+
+// DB interface implemented by localstore
+type DB interface {
+ // subscribe to chunk to be push synced - iterates from earliest to newest
+ SubscribePush(context.Context) (<-chan storage.Chunk, func())
+ // called to set a chunk as synced - and allow it to be garbage collected
+ // TODO this should take ... last argument to delete many in one batch
+ Set(context.Context, chunk.ModeSet, storage.Address) error
+}
+
+// Pusher takes care of the push syncing
+type Pusher struct {
+ store DB // localstore DB
+ tags *chunk.Tags // tags to update counts
+ quit chan struct{} // channel to signal quitting on all loops
+ pushed map[string]*pushedItem // cache of items push-synced
+ receipts chan chunk.Address // channel to receive receipts
+ ps PubSub // PubSub interface to send chunks and receive receipts
+}
+
+var (
+ retryInterval = 2 * time.Second // seconds to wait before retry sync
+)
+
+// pushedItem captures the info needed for the pusher about a chunk during the
+// push-sync--receipt roundtrip
+type pushedItem struct {
+ tag *chunk.Tag // tag for the chunk
+ sentAt time.Time // most recently sent at time
+ synced bool // set when chunk got synced
+}
+
+// NewPusher contructs a Pusher and starts up the push sync protocol
+// takes
+// - a DB interface to subscribe to push sync index to allow iterating over recently stored chunks
+// - a pubsub interface to send chunks and receive statements of custody
+// - tags that hold the several tag
+func NewPusher(store DB, ps PubSub, tags *chunk.Tags) *Pusher {
+ p := &Pusher{
+ store: store,
+ tags: tags,
+ quit: make(chan struct{}),
+ pushed: make(map[string]*pushedItem),
+ receipts: make(chan chunk.Address),
+ ps: ps,
+ }
+ go p.sync()
+ return p
+}
+
+// Close closes the pusher
+func (p *Pusher) Close() {
+ close(p.quit)
+}
+
+// sync starts a forever loop that pushes chunks to their neighbourhood
+// and receives receipts (statements of custody) for them.
+// chunks that are not acknowledged with a receipt are retried
+// not earlier than retryInterval after they were last pushed
+// the routine also updates counts of states on a tag in order
+// to monitor the proportion of saved, sent and synced chunks of
+// a file or collection
+func (p *Pusher) sync() {
+ var chunks <-chan chunk.Chunk
+ var cancel, stop func()
+ var ctx context.Context
+ var synced []storage.Address
+
+ // timer
+ timer := time.NewTimer(0)
+ defer timer.Stop()
+
+ // register handler for pssReceiptTopic on pss pubsub
+ deregister := p.ps.Register(pssReceiptTopic, false, func(msg []byte, _ *p2p.Peer) error {
+ return p.handleReceiptMsg(msg)
+ })
+ defer deregister()
+
+ chunksInBatch := -1
+ var batchStartTime time.Time
+
+ for {
+ select {
+
+ // retry interval timer triggers starting from new
+ case <-timer.C:
+ metrics.GetOrRegisterCounter("pusher.subscribe-push", nil).Inc(1)
+ // TODO: implement some smart retry strategy relying on sent/synced ratio change
+ // if subscribe was running, stop it
+ if stop != nil {
+ stop()
+ }
+ for _, addr := range synced {
+ // set chunk status to synced, insert to db GC index
+ if err := p.store.Set(context.Background(), chunk.ModeSetSync, addr); err != nil {
+ log.Warn("error setting chunk to synced", "addr", addr, "err", err)
+ continue
+ }
+ delete(p.pushed, addr.Hex())
+ }
+ // reset synced list
+ synced = nil
+
+ // we don't want to record the first iteration
+ if chunksInBatch != -1 {
+ // this measurement is not a timer, but we want a histogram, so it fits the data structure
+ metrics.GetOrRegisterResettingTimer("pusher.subscribe-push.chunks-in-batch.hist", nil).Update(time.Duration(chunksInBatch))
+
+ metrics.GetOrRegisterResettingTimer("pusher.subscribe-push.chunks-in-batch.time", nil).UpdateSince(batchStartTime)
+ metrics.GetOrRegisterCounter("pusher.subscribe-push.chunks-in-batch", nil).Inc(int64(chunksInBatch))
+ }
+ chunksInBatch = 0
+ batchStartTime = time.Now()
+
+ // and start iterating on Push index from the beginning
+ ctx, cancel = context.WithCancel(context.Background())
+ chunks, stop = p.store.SubscribePush(ctx)
+ // reset timer to go off after retryInterval
+ timer.Reset(retryInterval)
+
+ // handle incoming chunks
+ case ch, more := <-chunks:
+ chunksInBatch++
+ // if no more, set to nil and wait for timer
+ if !more {
+ chunks = nil
+ continue
+ }
+
+ metrics.GetOrRegisterCounter("pusher.send-chunk", nil).Inc(1)
+ // if no need to sync this chunk then continue
+ if !p.needToSync(ch) {
+ continue
+ }
+
+ metrics.GetOrRegisterCounter("pusher.send-chunk.send-to-sync", nil).Inc(1)
+ // send the chunk and ignore the error
+ if err := p.sendChunkMsg(ch); err != nil {
+ log.Error("error sending chunk", "addr", ch.Address(), "err", err)
+ }
+
+ // handle incoming receipts
+ case addr := <-p.receipts:
+ metrics.GetOrRegisterCounter("pusher.receipts.all", nil).Inc(1)
+ log.Debug("synced", "addr", addr)
+ // ignore if already received receipt
+ item, found := p.pushed[addr.Hex()]
+ if !found {
+ metrics.GetOrRegisterCounter("pusher.receipts.not-found", nil).Inc(1)
+ log.Debug("not wanted or already got... ignore", "addr", addr)
+ continue
+ }
+ if item.synced {
+ metrics.GetOrRegisterCounter("pusher.receipts.already-synced", nil).Inc(1)
+ log.Debug("just synced... ignore", "addr", addr)
+ continue
+ }
+ metrics.GetOrRegisterCounter("pusher.receipts.synced", nil).Inc(1)
+ // collect synced addresses
+ synced = append(synced, addr)
+ // set synced flag
+ item.synced = true
+ // increment synced count for the tag if exists
+ if item.tag != nil {
+ item.tag.Inc(chunk.StateSynced)
+ }
+
+ case <-p.quit:
+ // if there was a subscription, cancel it
+ if cancel != nil {
+ cancel()
+ }
+ return
+ }
+ }
+}
+
+// handleReceiptMsg is a handler for pssReceiptTopic that
+// - deserialises receiptMsg and
+// - sends the receipted address on a channel
+func (p *Pusher) handleReceiptMsg(msg []byte) error {
+ receipt, err := decodeReceiptMsg(msg)
+ if err != nil {
+ return err
+ }
+ log.Debug("Handler", "receipt", label(receipt.Addr), "self", label(p.ps.BaseAddr()))
+ p.PushReceipt(receipt.Addr)
+ return nil
+}
+
+// pushReceipt just inserts the address into the channel
+// it is also called by the push sync Storer if the originator and storer identical
+func (p *Pusher) PushReceipt(addr []byte) {
+ select {
+ case p.receipts <- addr:
+ case <-p.quit:
+ }
+}
+
+// sendChunkMsg sends chunks to their destination
+// using the PubSub interface Send method (e.g., pss neighbourhood addressing)
+func (p *Pusher) sendChunkMsg(ch chunk.Chunk) error {
+ cmsg := &chunkMsg{
+ Origin: p.ps.BaseAddr(),
+ Addr: ch.Address()[:],
+ Data: ch.Data(),
+ Nonce: newNonce(),
+ }
+ msg, err := rlp.EncodeToBytes(cmsg)
+ if err != nil {
+ return err
+ }
+ log.Debug("send chunk", "addr", label(ch.Address()), "self", label(p.ps.BaseAddr()))
+ return p.ps.Send(ch.Address()[:], pssChunkTopic, msg)
+}
+
+// needToSync checks if a chunk needs to be push-synced:
+// * if not sent yet OR
+// * if sent but more then retryInterval ago, so need resend
+func (p *Pusher) needToSync(ch chunk.Chunk) bool {
+ item, found := p.pushed[ch.Address().Hex()]
+ // has been pushed already
+ if found {
+ // has synced already since subscribe called
+ if item.synced {
+ return false
+ }
+ // too early to retry
+ if item.sentAt.Add(retryInterval).After(time.Now()) {
+ return false
+ }
+ // first time encountered
+ } else {
+ // remember item
+ tag, _ := p.tags.Get(ch.TagID())
+ item = &pushedItem{
+ tag: tag,
+ }
+ // increment SENT count on tag if it exists
+ if item.tag != nil {
+ item.tag.Inc(chunk.StateSent)
+ }
+ // remember the item
+ p.pushed[ch.Address().Hex()] = item
+ }
+ item.sentAt = time.Now()
+ return true
+}
diff --git a/storage/pushsync/pusher_test.go b/storage/pushsync/pusher_test.go
new file mode 100644
index 0000000000..ba1d1ec31d
--- /dev/null
+++ b/storage/pushsync/pusher_test.go
@@ -0,0 +1,414 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+package pushsync
+
+import (
+ "context"
+ "encoding/binary"
+ "encoding/hex"
+ "flag"
+ "fmt"
+ "math/rand"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethersphere/swarm/chunk"
+ "github.com/ethersphere/swarm/storage"
+ colorable "github.com/mattn/go-colorable"
+)
+
+var (
+ loglevel = flag.Int("loglevel", 3, "verbosity of logs")
+)
+
+func init() {
+ flag.Parse()
+ log.PrintOrigins(true)
+ log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
+}
+
+// loopback implements PubSub as a central subscription engine,
+// ie a msg sent is received by all handlers registered for the topic
+type loopBack struct {
+ async bool
+ addr []byte
+ handlers map[string][]func(msg []byte, p *p2p.Peer) error
+}
+
+func newLoopBack(async bool) *loopBack {
+ return &loopBack{
+ async: async,
+ addr: make([]byte, 32),
+ handlers: make(map[string][]func(msg []byte, p *p2p.Peer) error),
+ }
+}
+
+// Register subscribes to a topic with a handler
+func (lb *loopBack) Register(topic string, _ bool, handler func(msg []byte, p *p2p.Peer) error) func() {
+ lb.handlers[topic] = append(lb.handlers[topic], handler)
+ return func() {}
+}
+
+// Send publishes a msg with a topic and directly calls registered handlers with
+// that topic
+func (lb *loopBack) Send(to []byte, topic string, msg []byte) error {
+ if lb.async {
+ go lb.send(to, topic, msg)
+ return nil
+ }
+ return lb.send(to, topic, msg)
+}
+
+func (lb *loopBack) send(to []byte, topic string, msg []byte) error {
+ p := p2p.NewPeer(enode.ID{}, "", nil)
+ for _, handler := range lb.handlers[topic] {
+ if err := handler(msg, p); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// BaseAddr needed to implement PubSub interface
+func (lb *loopBack) BaseAddr() []byte {
+ return lb.addr
+}
+
+// testPushSyncIndex mocks localstore and provides subscription and setting synced status
+// it implements the DB interface
+type testPushSyncIndex struct {
+ i, total int
+ tagIDs []uint32 //
+ tags *chunk.Tags
+ sent *sync.Map // to store time of send for retry
+ synced chan int // to check if right amount of chunks
+}
+
+func newTestPushSyncIndex(chunkCnt int, tagIDs []uint32, tags *chunk.Tags, sent *sync.Map) *testPushSyncIndex {
+ return &testPushSyncIndex{
+ i: 0,
+ total: chunkCnt,
+ tagIDs: tagIDs,
+ tags: tags,
+ sent: sent,
+ synced: make(chan int),
+ }
+}
+
+// SubscribePush allows iteration on the hashes and mocks the behaviour of localstore
+// push index
+// we keep track of an index so that each call to SubscribePush knows where to start
+// generating the new fake hashes
+// Before the new fake hashes it iterates over hashes not synced yet
+func (t *testPushSyncIndex) SubscribePush(context.Context) (<-chan storage.Chunk, func()) {
+ chunks := make(chan storage.Chunk)
+ tagCnt := len(t.tagIDs)
+ quit := make(chan struct{})
+ stop := func() { close(quit) }
+ go func() {
+ // feed fake chunks into the db, hashes encode the order so that
+ // it can be traced
+ feed := func(i int) bool {
+ // generate fake hashes that encode the chunk order
+ addr := make([]byte, 32)
+ binary.BigEndian.PutUint64(addr, uint64(i))
+ // remember when the chunk was put
+ // if sent again, dont modify the time
+ t.sent.Store(i, time.Now())
+ // increment stored count on tag
+ tagID := t.tagIDs[i%tagCnt]
+ if tag, _ := t.tags.Get(tagID); tag != nil {
+ tag.Inc(chunk.StateStored)
+ }
+ select {
+ // chunks have no data and belong to tag i%tagCount
+ case chunks <- storage.NewChunk(addr, nil).WithTagID(tagID):
+ return true
+ case <-quit:
+ return false
+ }
+ }
+ // push the chunks already pushed but not yet synced
+ t.sent.Range(func(k, _ interface{}) bool {
+ log.Debug("resending", "cur", k)
+ return feed(k.(int))
+ })
+ // generate the new chunks from t.i
+ for t.i < t.total && feed(t.i) {
+ t.i++
+ }
+
+ log.Debug("sent all chunks", "total", t.total)
+ close(chunks)
+ }()
+ return chunks, stop
+}
+
+func (t *testPushSyncIndex) Set(ctx context.Context, _ chunk.ModeSet, addr storage.Address) error {
+ cur := int(binary.BigEndian.Uint64(addr[:8]))
+ t.sent.Delete(cur)
+ t.synced <- cur
+ log.Debug("set chunk synced", "cur", cur, "addr", addr)
+ return nil
+}
+
+var (
+ maxDelay = 210 // max delay in millisecond
+ minDelay = 1 // min delay in millisecond
+ retentionLimit = 200 // ~5% of msg lost
+)
+
+// delayResponse when called mock connection/throughput
+func delayResponse() bool {
+ delay := rand.Intn(maxDelay) + minDelay
+ time.Sleep(time.Duration(delay) * time.Millisecond)
+ return delay < retentionLimit
+}
+
+// TestPusher tests the correct behaviour of Pusher
+// in the context of inserting n chunks
+// receipt response model: the pushed chunk's receipt is sent back
+// after a random delay
+// The test checks:
+// - if sync function is called on chunks in order of insertion (FIFO)
+// - repeated sending is attempted only if retryInterval time passed
+// - already synced chunks are not resynced
+// - if no more data inserted, the db is emptied shortly
+func TestPusher(t *testing.T) {
+ timeout := 10 * time.Second
+ chunkCnt := 200
+ tagCnt := 4
+
+ errc := make(chan error)
+ sent := &sync.Map{}
+ sendTimes := make(map[int]time.Time)
+ synced := make(map[int]int)
+ quit := make(chan struct{})
+ defer close(quit)
+
+ errf := func(s string, vals ...interface{}) {
+ select {
+ case errc <- fmt.Errorf(s, vals...):
+ case <-quit:
+ }
+ }
+
+ ps := newLoopBack(false)
+
+ max := 0 // the highest index sent so far
+ respond := func(msg []byte, _ *p2p.Peer) error {
+ chmsg, err := decodeChunkMsg(msg)
+ if err != nil {
+ errf("error decoding chunk message: %v", err)
+ return nil
+ }
+ // check outgoing chunk messages
+ cur := int(binary.BigEndian.Uint64(chmsg.Addr[:8]))
+ if cur > max {
+ errf("incorrect order of chunks from db chunk #%d before #%d", cur, max)
+ return nil
+ }
+ v, found := sent.Load(cur)
+ previouslySentAt, repeated := sendTimes[cur]
+ if !found {
+ if !repeated {
+ errf("chunk #%d not sent but received", cur)
+ }
+ return nil
+ }
+ sentAt := v.(time.Time)
+ if repeated {
+ // expect at least retryInterval since previous push
+ if expectedAt := previouslySentAt.Add(retryInterval); expectedAt.After(sentAt) {
+ errf("resync chunk #%d too early. previously sent at %v, next at %v < expected at %v", cur, previouslySentAt, sentAt, expectedAt)
+ return nil
+ }
+ }
+ // remember the latest time sent
+ sendTimes[cur] = sentAt
+ max++
+ // respond ~ mock storer protocol
+ go func() {
+ receipt := &receiptMsg{Addr: chmsg.Addr}
+ rmsg, err := rlp.EncodeToBytes(receipt)
+ if err != nil {
+ errf("error encoding receipt message: %v", err)
+ }
+ log.Debug("chunk sent", "addr", hex.EncodeToString(receipt.Addr))
+ // random delay to allow retries
+ if !delayResponse() {
+ log.Debug("chunk/receipt lost", "addr", hex.EncodeToString(receipt.Addr))
+ return
+ }
+ log.Debug("store chunk, send receipt", "addr", hex.EncodeToString(receipt.Addr))
+ err = ps.Send(chmsg.Origin, pssReceiptTopic, rmsg)
+ if err != nil {
+ errf("error sending receipt message: %v", err)
+ }
+ }()
+ return nil
+ }
+ // register the respond function
+ ps.Register(pssChunkTopic, false, respond)
+ tags, tagIDs := setupTags(chunkCnt, tagCnt)
+ // construct the mock push sync index iterator
+ tp := newTestPushSyncIndex(chunkCnt, tagIDs, tags, sent)
+ // start push syncing in a go routine
+ p := NewPusher(tp, ps, tags)
+ defer p.Close()
+ // collect synced chunks until all chunks synced
+ // wait on errc for errors on any thread
+ // otherwise time out
+ for {
+ select {
+ case i := <-tp.synced:
+ sent.Delete(i)
+ n := synced[i]
+ synced[i] = n + 1
+ if len(synced) == chunkCnt {
+ expTotal := chunkCnt / tagCnt
+ checkTags(t, expTotal, tagIDs[:tagCnt-1], tags)
+ return
+ }
+ case err := <-errc:
+ if err != nil {
+ t.Fatal(err)
+ }
+ case <-time.After(timeout):
+ t.Fatalf("timeout waiting for all chunks to be synced")
+ }
+ }
+
+}
+
+// setupTags constructs tags object create tagCnt - 1 tags
+// the sequential fake chunk i will be tagged with i%tagCnt
+func setupTags(chunkCnt, tagCnt int) (tags *chunk.Tags, tagIDs []uint32) {
+ // construct tags object
+ tags = chunk.NewTags()
+ // all but one tag is created
+ for i := 0; i < tagCnt-1; i++ {
+ tags.New("", int64(chunkCnt/tagCnt))
+ }
+ // extract tag ids
+ tags.Range(func(k, _ interface{}) bool {
+ tagIDs = append(tagIDs, k.(uint32))
+ return true
+ })
+ // add an extra for which no tag exists
+ return tags, append(tagIDs, 0)
+}
+
+func checkTags(t *testing.T, expTotal int, tagIDs []uint32, tags *chunk.Tags) {
+ for _, tagID := range tagIDs {
+ tag, err := tags.Get(tagID)
+ if err != nil {
+ t.Fatalf("expected no error getting tag '%v', got %v", tagID, err)
+ }
+ n, total, err := tag.Status(chunk.StateSent)
+ if err != nil {
+ t.Fatalf("getting status for tag '%v', expected no error, got %v", tagID, err)
+ }
+ if int(n) != expTotal {
+ t.Fatalf("expected Sent count on tag '%v' to be %v, got %v", tagID, expTotal, n)
+ }
+ if int(total) != expTotal {
+ t.Fatalf("expected Sent count on tag '%v' to be %v, got %v", tagID, expTotal, n)
+ }
+ n, total, err = tag.Status(chunk.StateSynced)
+ if err != nil {
+ t.Fatalf("getting status for tag '%v', expected no error, got %v", tagID, err)
+ }
+ if int(n) != expTotal {
+ t.Fatalf("expected Sent count on tag '%v' to be %v, got %v", tagID, expTotal, n)
+ }
+ if int(total) != expTotal {
+ t.Fatalf("expected Sent count on tag '%v' to be %v, got %v", tagID, expTotal, n)
+ }
+ }
+}
+
+type testStore struct {
+ store *sync.Map
+}
+
+func (t *testStore) Put(_ context.Context, _ chunk.ModePut, ch chunk.Chunk) (bool, error) {
+ cur := binary.BigEndian.Uint64(ch.Address()[:8])
+ var storedCnt uint32 = 1
+ v, loaded := t.store.LoadOrStore(cur, &storedCnt)
+ if loaded {
+ atomic.AddUint32(v.(*uint32), 1)
+ }
+ return false, nil
+}
+
+// TestPushSyncAndStoreWithLoopbackPubSub tests the push sync protocol
+// push syncer node communicate with storers via mock PubSub
+func TestPushSyncAndStoreWithLoopbackPubSub(t *testing.T) {
+ timeout := 10 * time.Second
+ chunkCnt := 2000
+ tagCnt := 4
+ storerCnt := 3
+ sent := &sync.Map{}
+ store := &sync.Map{}
+ // mock pubsub messenger
+ ps := newLoopBack(true)
+
+ tags, tagIDs := setupTags(chunkCnt, tagCnt)
+ // construct the mock push sync index iterator
+ tp := newTestPushSyncIndex(chunkCnt, tagIDs, tags, sent)
+ // start push syncing in a go routine
+ p := NewPusher(tp, ps, tags)
+ defer p.Close()
+
+ // set up a number of storers
+ storers := make([]*Storer, storerCnt)
+ for i := 0; i < storerCnt; i++ {
+ storers[i] = NewStorer(&testStore{store}, ps, p.PushReceipt)
+ }
+
+ synced := 0
+ for {
+ select {
+ case i := <-tp.synced:
+ synced++
+ sent.Delete(i)
+ if synced == chunkCnt {
+ expTotal := chunkCnt / tagCnt
+ checkTags(t, expTotal, tagIDs[:tagCnt-1], tags)
+ for i := uint64(0); i < uint64(chunkCnt); i++ {
+ v, ok := store.Load(i)
+ if !ok {
+ t.Fatalf("chunk %v not stored", i)
+ }
+ if cnt := *(v.(*uint32)); cnt != uint32(storerCnt) {
+ t.Fatalf("chunk %v expected to be saved %v times, got %v", i, storerCnt, cnt)
+ }
+ }
+ return
+ }
+ case <-time.After(timeout):
+ t.Fatalf("timeout waiting for all chunks to be synced")
+ }
+ }
+
+}
diff --git a/storage/pushsync/pushsync_simulation_test.go b/storage/pushsync/pushsync_simulation_test.go
new file mode 100644
index 0000000000..eda7a870d8
--- /dev/null
+++ b/storage/pushsync/pushsync_simulation_test.go
@@ -0,0 +1,273 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package pushsync
+
+import (
+ "context"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+ "github.com/ethersphere/swarm/chunk"
+ "github.com/ethersphere/swarm/log"
+ "github.com/ethersphere/swarm/network"
+ "github.com/ethersphere/swarm/network/simulation"
+ "github.com/ethersphere/swarm/network/stream"
+ "github.com/ethersphere/swarm/pss"
+ "github.com/ethersphere/swarm/state"
+ "github.com/ethersphere/swarm/storage"
+ "github.com/ethersphere/swarm/storage/localstore"
+)
+
+var (
+ bucketKeyPushSyncer = simulation.BucketKey("pushsyncer")
+ bucketKeyNetStore = simulation.BucketKey("netstore")
+)
+
+// test syncer using pss
+// the test
+// * creates a simulation with connectivity loaded from a snapshot
+// * for each trial, two nodes are chosen randomly, an uploader and a downloader
+// * uploader uploads a number of chunks
+// * wait until the uploaded chunks are synced
+// * downloader downloads the chunk
+// Trials are run concurrently
+func TestPushSyncSimulation(t *testing.T) {
+ nodeCnt := 64
+ chunkCnt := 32
+ trials := 32
+ testSyncerWithPubSub(t, nodeCnt, chunkCnt, trials, newServiceFunc)
+}
+
+func testSyncerWithPubSub(t *testing.T, nodeCnt, chunkCnt, trials int, sf simulation.ServiceFunc) {
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": sf,
+ })
+ defer sim.Close()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
+ defer cancel()
+ err := sim.UploadSnapshot(ctx, fmt.Sprintf("../../network/stream/testing/snapshot_%d.json", nodeCnt))
+ if err != nil {
+ t.Fatal(err)
+ }
+ log.Info("Snapshot loaded")
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ errc := make(chan error)
+ for i := 0; i < trials; i++ {
+ go uploadAndDownload(ctx, sim, errc, nodeCnt, chunkCnt, i)
+ }
+ i := 0
+ for err := range errc {
+ if err != nil {
+ return err
+ }
+ i++
+ if i >= trials {
+ break
+ }
+ }
+ return nil
+ })
+ if result.Error != nil {
+ t.Fatalf("simulation error: %v", result.Error)
+ }
+}
+
+// pickNodes selects 2 distinct
+func pickNodes(n int) (i, j int) {
+ i = rand.Intn(n)
+ j = rand.Intn(n - 1)
+ if j >= i {
+ j++
+ }
+ return
+}
+
+func uploadAndDownload(ctx context.Context, sim *simulation.Simulation, errc chan error, nodeCnt, chunkCnt, i int) {
+ // chose 2 random nodes as uploader and downloader
+ u, d := pickNodes(nodeCnt)
+ // setup uploader node
+ uid := sim.UpNodeIDs()[u]
+ val, _ := sim.NodeItem(uid, bucketKeyPushSyncer)
+ p := val.(*Pusher)
+ // setup downloader node
+ did := sim.UpNodeIDs()[d]
+ // the created tag indicates the uploader and downloader nodes
+ tagname := fmt.Sprintf("tag-%v-%v-%d", label(uid[:]), label(did[:]), i)
+ log.Debug("uploading", "peer", uid, "chunks", chunkCnt, "tagname", tagname)
+ tag, what, err := upload(ctx, p.store.(*localstore.DB), p.tags, tagname, chunkCnt)
+ if err != nil {
+ select {
+ case errc <- err:
+ case <-ctx.Done():
+ return
+ }
+ return
+ }
+
+ // wait till synced
+ for {
+ n, total, err := tag.Status(chunk.StateSynced)
+ if err == nil && n == total {
+ break
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+
+ log.Debug("synced", "peer", uid, "chunks", chunkCnt, "tagname", tagname)
+ log.Debug("downloading", "peer", did, "chunks", chunkCnt, "tagname", tagname)
+ val, _ = sim.NodeItem(did, bucketKeyNetStore)
+ netstore := val.(*storage.NetStore)
+ select {
+ case errc <- download(ctx, netstore, what):
+ case <-ctx.Done():
+ }
+ log.Debug("downloaded", "peer", did, "chunks", chunkCnt, "tagname", tagname)
+}
+
+// newServiceFunc constructs a minimal service needed for a simulation test for Push Sync, namely:
+// localstore, netstore, stream and pss
+func newServiceFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) {
+ // setup localstore
+ n := ctx.Config.Node()
+ addr := network.NewAddr(n)
+ dir, err := ioutil.TempDir("", "localstore-test")
+ if err != nil {
+ return nil, nil, err
+ }
+ lstore, err := localstore.New(dir, addr.Over(), nil)
+ if err != nil {
+ os.RemoveAll(dir)
+ return nil, nil, err
+ }
+
+ // setup pss
+ kadParams := network.NewKadParams()
+ kad := network.NewKademlia(addr.Over(), kadParams)
+ bucket.Store(simulation.BucketKeyKademlia, kad)
+
+ privKey, err := crypto.GenerateKey()
+ pssp := pss.NewPssParams().WithPrivateKey(privKey)
+ ps, err := pss.NewPss(kad, pssp)
+ if err != nil {
+ return nil, nil, err
+ }
+ // setup netstore
+ netStore := storage.NewNetStore(lstore, enode.HexID(hexutil.Encode(kad.BaseAddr())))
+ // streamer
+ delivery := stream.NewDelivery(kad, netStore)
+ netStore.RemoteGet = delivery.RequestFromPeers
+
+ bucket.Store(bucketKeyNetStore, netStore)
+
+ noSyncing := &stream.RegistryOptions{Syncing: stream.SyncingDisabled, SyncUpdateDelay: 50 * time.Millisecond}
+ r := stream.NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), noSyncing, nil)
+
+ pubSub := pss.NewPubSub(ps)
+
+ // set up syncer
+ p := NewPusher(lstore, pubSub, chunk.NewTags())
+ bucket.Store(bucketKeyPushSyncer, p)
+
+ // setup storer
+ s := NewStorer(netStore, pubSub, p.PushReceipt)
+
+ cleanup := func() {
+ p.Close()
+ s.Close()
+ netStore.Close()
+ r.Close()
+ os.RemoveAll(dir)
+ }
+
+ return &StreamerAndPss{r, ps}, cleanup, nil
+}
+
+// implements the node.Service interface
+type StreamerAndPss struct {
+ *stream.Registry
+ pss *pss.Pss
+}
+
+func (s *StreamerAndPss) Protocols() []p2p.Protocol {
+ return append(s.Registry.Protocols(), s.pss.Protocols()...)
+}
+
+func (s *StreamerAndPss) Start(srv *p2p.Server) error {
+ err := s.Registry.Start(srv)
+ if err != nil {
+ return err
+ }
+ return s.pss.Start(srv)
+}
+
+func (s *StreamerAndPss) Stop() error {
+ err := s.Registry.Stop()
+ if err != nil {
+ return err
+ }
+ return s.pss.Stop()
+}
+
+func upload(ctx context.Context, store Store, tags *chunk.Tags, tagname string, n int) (tag *chunk.Tag, addrs []storage.Address, err error) {
+ tag, err = tags.New(tagname, int64(n))
+ if err != nil {
+ return nil, nil, err
+ }
+ for i := 0; i < n; i++ {
+ ch := storage.GenerateRandomChunk(int64(chunk.DefaultSize))
+ addrs = append(addrs, ch.Address())
+ store.Put(ctx, chunk.ModePutUpload, ch.WithTagID(tag.Uid))
+ tag.Inc(chunk.StateStored)
+ }
+ return tag, addrs, nil
+}
+
+func download(ctx context.Context, store *storage.NetStore, addrs []storage.Address) error {
+ errc := make(chan error)
+ for _, addr := range addrs {
+ go func(addr storage.Address) {
+ _, err := store.Get(ctx, chunk.ModeGetRequest, storage.NewRequest(addr))
+ select {
+ case errc <- err:
+ case <-ctx.Done():
+ }
+ }(addr)
+ }
+ i := 0
+ for err := range errc {
+ if err != nil {
+ return err
+ }
+ i++
+ if i == len(addrs) {
+ break
+ }
+ }
+ return nil
+}
diff --git a/storage/pushsync/storer.go b/storage/pushsync/storer.go
new file mode 100644
index 0000000000..c0a966ac8b
--- /dev/null
+++ b/storage/pushsync/storer.go
@@ -0,0 +1,120 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package pushsync
+
+import (
+ "bytes"
+ "context"
+ "encoding/hex"
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethersphere/swarm/chunk"
+ "github.com/ethersphere/swarm/log"
+ "github.com/ethersphere/swarm/storage"
+)
+
+// Store is the storage interface to save chunks
+// NetStore implements this interface
+type Store interface {
+ Put(context.Context, chunk.ModePut, chunk.Chunk) (bool, error)
+}
+
+// Storer is the object used by the push-sync server side protocol
+type Storer struct {
+ store Store // store to put chunks in, and retrieve them
+ ps PubSub // pubsub interface to receive chunks and send receipts
+ deregister func() // deregister the registered handler when Storer is closed
+ pushReceipt func(addr []byte) // to be called...
+}
+
+// NewStorer constructs a Storer
+// Storer run storer nodes to handle the reception of push-synced chunks
+// that fall within their area of responsibility.
+// The protocol makes sure that
+// - the chunks are stored and synced to their nearest neighbours and
+// - a statement of custody receipt is sent as a response to the originator
+// it sets a cancel function that deregisters the handler
+func NewStorer(store Store, ps PubSub, pushReceipt func(addr []byte)) *Storer {
+ s := &Storer{
+ store: store,
+ ps: ps,
+ pushReceipt: pushReceipt,
+ }
+ s.deregister = ps.Register(pssChunkTopic, true, func(msg []byte, _ *p2p.Peer) error {
+ return s.handleChunkMsg(msg)
+ })
+ return s
+}
+
+// Close needs to be called to deregister the handler
+func (s *Storer) Close() {
+ s.deregister()
+}
+
+// handleChunkMsg is called by the pss dispatcher on pssChunkTopic msgs
+// - deserialises chunkMsg and
+// - calls storer.processChunkMsg function
+func (s *Storer) handleChunkMsg(msg []byte) error {
+ chmsg, err := decodeChunkMsg(msg)
+ if err != nil {
+ return err
+ }
+ log.Debug("Handler", "chunk", label(chmsg.Addr), "origin", label(chmsg.Origin), "self", fmt.Sprintf("%x", s.ps.BaseAddr()))
+ return s.processChunkMsg(chmsg)
+}
+
+// processChunkMsg processes a chunk received via pss pssChunkTopic
+// these chunk messages are sent to their address as destination
+// using neighbourhood addressing. Therefore nodes only handle
+// chunks that fall within their area of responsibility.
+// Upon receiving the chunk is saved and a statement of custody
+// receipt message is sent as a response to the originator.
+func (s *Storer) processChunkMsg(chmsg *chunkMsg) error {
+ // TODO: double check if it falls in area of responsibility
+ ch := storage.NewChunk(chmsg.Addr, chmsg.Data)
+ if _, err := s.store.Put(context.TODO(), chunk.ModePutSync, ch); err != nil {
+ return err
+ }
+ log.Debug("push sync storer", "addr", label(chmsg.Addr), "to", label(chmsg.Origin), "self", hex.EncodeToString(s.ps.BaseAddr()))
+ // TODO: check if originator or relayer is a nearest neighbour then return
+ // otherwise send back receipt
+ return s.sendReceiptMsg(chmsg)
+}
+
+// sendReceiptMsg sends a statement of custody receipt message
+// to the originator of a push-synced chunk message.
+// Including a unique nonce makes the receipt immune to deduplication cache
+func (s *Storer) sendReceiptMsg(chmsg *chunkMsg) error {
+ // if origin is self, use direct channel, no pubsub send needed
+ if bytes.Equal(chmsg.Origin, s.ps.BaseAddr()) {
+ go s.pushReceipt(chmsg.Addr)
+ return nil
+ }
+ rmsg := &receiptMsg{
+ Addr: chmsg.Addr,
+ Nonce: newNonce(),
+ }
+ msg, err := rlp.EncodeToBytes(rmsg)
+ if err != nil {
+ return err
+ }
+ to := chmsg.Origin
+ log.Debug("send receipt", "addr", label(chmsg.Addr), "to", label(to), "self", hex.EncodeToString(s.ps.BaseAddr()))
+ return s.ps.Send(to, pssReceiptTopic, msg)
+}
diff --git a/swarm.go b/swarm.go
index d6f40beed3..6f86d683f7 100644
--- a/swarm.go
+++ b/swarm.go
@@ -30,11 +30,6 @@ import (
"time"
"unicode"
- "github.com/ethersphere/swarm/chunk"
-
- "github.com/ethersphere/swarm/storage/feed"
- "github.com/ethersphere/swarm/storage/localstore"
-
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
@@ -44,6 +39,7 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethersphere/swarm/api"
httpapi "github.com/ethersphere/swarm/api/http"
+ "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/contracts/chequebook"
"github.com/ethersphere/swarm/contracts/ens"
"github.com/ethersphere/swarm/fuse"
@@ -54,7 +50,10 @@ import (
"github.com/ethersphere/swarm/pss"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/storage"
+ "github.com/ethersphere/swarm/storage/feed"
+ "github.com/ethersphere/swarm/storage/localstore"
"github.com/ethersphere/swarm/storage/mock"
+ "github.com/ethersphere/swarm/storage/pushsync"
"github.com/ethersphere/swarm/swap"
"github.com/ethersphere/swarm/tracing"
)
@@ -79,6 +78,8 @@ type Swarm struct {
netStore *storage.NetStore
sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit
ps *pss.Pss
+ pushSync *pushsync.Pusher
+ storer *pushsync.Storer
swap *swap.Swap
stateStore *state.DBStore
accountingMetrics *protocols.AccountingMetrics
@@ -225,6 +226,10 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
pss.SetHandshakeController(self.ps, pss.NewHandshakeParams())
}
+ pubsub := pss.NewPubSub(self.ps)
+ self.pushSync = pushsync.NewPusher(localStore, pubsub, tags)
+ self.storer = pushsync.NewStorer(self.netStore, pubsub, self.pushSync.PushReceipt)
+
self.api = api.NewAPI(self.fileStore, self.dns, feedsHandler, self.privateKey, tags)
self.sfs = fuse.NewSwarmFS(self.api)
@@ -454,6 +459,9 @@ func (s *Swarm) Stop() error {
s.stateStore.Close()
}
+ s.pushSync.Close()
+ s.storer.Close()
+
for _, cleanF := range s.cleanupFuncs {
err = cleanF()
if err != nil {