Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@ import (
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/sctx"
"github.com/ethersphere/swarm/spancontext"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/storage/feed"
"github.com/opentracing/opentracing-go"
olog "github.com/opentracing/opentracing-go/log"
"github.com/rs/cors"
)

Expand Down Expand Up @@ -300,6 +303,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Debug("handle.post.files", "ruid", ruid)
postFilesCount.Inc(1)

ctx, sp := spancontext.StartSpan(r.Context(), "handle.post.files")
defer sp.Finish()

quitChan := make(chan struct{})
defer close(quitChan)

// periodically monitor the tag for this upload and log its state to the `handle.post.files` span
go periodicTagTrace(s.api.Tags, sctx.GetTag(ctx), quitChan, sp)

contentType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
postFilesFail.Inc(1)
Expand All @@ -315,23 +327,23 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {

var addr storage.Address
if uri.Addr != "" && uri.Addr != "encrypt" {
addr, err = s.api.Resolve(r.Context(), uri.Addr)
addr, err = s.api.Resolve(ctx, uri.Addr)
if err != nil {
postFilesFail.Inc(1)
respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusInternalServerError)
return
}
log.Debug("resolved key", "ruid", ruid, "key", addr)
} else {
addr, err = s.api.NewManifest(r.Context(), toEncrypt)
addr, err = s.api.NewManifest(ctx, toEncrypt)
if err != nil {
postFilesFail.Inc(1)
respondError(w, r, err.Error(), http.StatusInternalServerError)
return
}
log.Debug("new manifest", "ruid", ruid, "key", addr)
}
newAddr, err := s.api.UpdateManifest(r.Context(), addr, func(mw *api.ManifestWriter) error {
newAddr, err := s.api.UpdateManifest(ctx, addr, func(mw *api.ManifestWriter) error {
switch contentType {
case "application/x-tar":
_, err := s.handleTarUpload(r, mw)
Expand Down Expand Up @@ -935,3 +947,28 @@ func (lrw *loggingResponseWriter) WriteHeader(code int) {
func isDecryptError(err error) bool {
return strings.Contains(err.Error(), api.ErrDecrypt.Error())
}

// periodicTagTrace queries the tag every 2 seconds and logs its state to the span
func periodicTagTrace(tags *chunk.Tags, tagUid uint32, q chan struct{}, sp opentracing.Span) {
f := func() {
tag, err := tags.Get(tagUid)
if err != nil {
log.Error("error while getting tag", "tagUid", tagUid, "err", err)
}

sp.LogFields(olog.String("tag state", fmt.Sprintf("split=%d stored=%d seen=%d synced=%d", tag.Get(chunk.StateSplit), tag.Get(chunk.StateStored), tag.Get(chunk.StateSeen), tag.Get(chunk.StateSynced))))
}

for {
select {
case <-q:
f()

return
default:
f()

time.Sleep(2 * time.Second)
}
}
}
12 changes: 12 additions & 0 deletions chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ var (
type Chunk interface {
Address() Address
Data() []byte
TagID() uint32
WithTagID(t uint32) Chunk
}

type chunk struct {
addr Address
sdata []byte
tagID uint32
}

func NewChunk(addr Address, data []byte) Chunk {
Expand All @@ -52,6 +55,11 @@ func NewChunk(addr Address, data []byte) Chunk {
}
}

func (c *chunk) WithTagID(t uint32) Chunk {
c.tagID = t
return c
}

func (c *chunk) Address() Address {
return c.addr
}
Expand All @@ -60,6 +68,10 @@ func (c *chunk) Data() []byte {
return c.sdata
}

func (c *chunk) TagID() uint32 {
return c.tagID
}

func (self *chunk) String() string {
return fmt.Sprintf("Address: %v Chunksize: %v", self.addr.Log(), len(self.sdata))
}
Expand Down
4 changes: 1 addition & 3 deletions chunk/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,20 @@ import (
// Tags hold tag information indexed by a unique random uint32
type Tags struct {
tags *sync.Map
rng *rand.Rand
}

// NewTags creates a tags object
func NewTags() *Tags {
return &Tags{
tags: &sync.Map{},
rng: rand.New(rand.NewSource(time.Now().Unix())),
}
}

// New creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
func (ts *Tags) New(s string, total int64) (*Tag, error) {
t := &Tag{
Uid: ts.rng.Uint32(),
Uid: uint32(rand.Int31()),
Name: s,
startedAt: time.Now(),
total: total,
Expand Down
13 changes: 7 additions & 6 deletions network/stream/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethersphere/swarm/chunk"
Expand Down Expand Up @@ -243,26 +244,26 @@ func (d *Delivery) FindPeer(ctx context.Context, req *storage.Request) (*Peer, e

// skip peers that we have already tried
if req.SkipPeer(id.String()) {
log.Trace("findpeer skip peer", "peer", id, "ref", req.Addr.String())
log.Trace("findpeer skip peer", "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String())
return true
}

if myPo < depth { // chunk is NOT within the neighbourhood
if po <= myPo { // always choose a peer strictly closer to chunk than us
log.Trace("findpeer1a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
log.Trace("findpeer1a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String())
return false
} else {
log.Trace("findpeer1b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
log.Trace("findpeer1b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String())
}
} else { // chunk IS WITHIN neighbourhood
if po < depth { // do not select peer outside the neighbourhood. But allows peers further from the chunk than us
log.Trace("findpeer2a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
log.Trace("findpeer2a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String())
return false
} else if po <= originPo { // avoid loop in neighbourhood, so not forward when a request comes from the neighbourhood
log.Trace("findpeer2b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
log.Trace("findpeer2b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String())
return false
} else {
log.Trace("findpeer2c", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
log.Trace("findpeer2c", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "self", hexutil.Encode(d.kad.BaseAddr()), "origin", req.Origin.String(), "peer", id, "ref", req.Addr.String())
}
}

Expand Down
6 changes: 0 additions & 6 deletions network/stream/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,6 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
// This is the most naive approach to label the chunk as synced
// allowing it to be garbage collected. A proper way requires
// validating that the chunk is successfully stored by the peer.
err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address)
if err != nil {
metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1)
log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err)
return nil, 0, 0, err
}
batchSize++
if batchStartID == nil {
// set batch start id only if
Expand Down
2 changes: 1 addition & 1 deletion pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool {
}
}
if !isPssEnabled {
log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
log.Warn("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps, "peer", fmt.Sprintf("%x", sp.BzzAddr.Address()))
return false
}

Expand Down
55 changes: 55 additions & 0 deletions pss/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package pss

import "github.com/ethereum/go-ethereum/p2p"

// PubSub implements the pushsync.PubSub interface using pss
type PubSub struct {
pss *Pss
}

// NewPubSub creates a new PubSub
func NewPubSub(p *Pss) *PubSub {
return &PubSub{
pss: p,
}
}

// BaseAddr returns Kademlia base address
func (p *PubSub) BaseAddr() []byte {
return p.pss.Kademlia.BaseAddr()
}

// Register registers a handler
func (p *PubSub) Register(topic string, prox bool, handler func(msg []byte, p *p2p.Peer) error) func() {
f := func(msg []byte, peer *p2p.Peer, _ bool, _ string) error {
return handler(msg, peer)
}
h := NewHandler(f).WithRaw()
if prox {
h = h.WithProxBin()
}
pt := BytesToTopic([]byte(topic))
return p.pss.Register(&pt, h)
}

// Send sends a message using pss SendRaw
func (p *PubSub) Send(to []byte, topic string, msg []byte) error {
pt := BytesToTopic([]byte(topic))
return p.pss.SendRaw(PssAddress(to), pt, msg)
}
4 changes: 4 additions & 0 deletions shed/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Item struct {
AccessTimestamp int64
StoreTimestamp int64
BinID uint64
Tag uint32
}

// Merge is a helper method to construct a new
Expand All @@ -62,6 +63,9 @@ func (i Item) Merge(i2 Item) (new Item) {
if i.BinID == 0 {
i.BinID = i2.BinID
}
if i.Tag == 0 {
i.Tag = i2.Tag
}
return i
}

Expand Down
2 changes: 1 addition & 1 deletion storage/hasherstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (h *hasherStore) createHash(chunkData ChunkData) Address {

func (h *hasherStore) createChunk(chunkData ChunkData) Chunk {
hash := h.createHash(chunkData)
chunk := NewChunk(hash, chunkData)
chunk := NewChunk(hash, chunkData).WithTagID(h.tag.Uid)
return chunk
}

Expand Down
8 changes: 6 additions & 2 deletions storage/localstore/localstore.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 The go-ethereum Authors
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
Expand Down Expand Up @@ -300,9 +300,12 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
return nil, nil
tag := make([]byte, 4)
binary.BigEndian.PutUint32(tag, fields.Tag)
return tag, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.Tag = binary.BigEndian.Uint32(value)
return e, nil
},
})
Expand Down Expand Up @@ -367,6 +370,7 @@ func chunkToItem(ch chunk.Chunk) shed.Item {
return shed.Item{
Address: ch.Address(),
Data: ch.Data(),
Tag: ch.TagID(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion storage/localstore/subscription_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
}

select {
case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data):
case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data).WithTagID(dataItem.Tag):
count++
// set next iteration start item
// when its chunk is successfully sent to channel
Expand Down
Loading