diff --git a/api/api.go b/api/api.go index 8f816415ac..c79f2eba87 100644 --- a/api/api.go +++ b/api/api.go @@ -266,7 +266,6 @@ func (a *API) Resolve(ctx context.Context, address string) (storage.Address, err return resolved[:], nil } -// func tld(address string) (tld string) { splitAddress := strings.Split(address, ".") if len(splitAddress) > 1 { diff --git a/api/config.go b/api/config.go index 3672f55b05..1a24bd6b46 100644 --- a/api/config.go +++ b/api/config.go @@ -54,8 +54,8 @@ type Config struct { // Swap configs SwapBackendURL string // Ethereum API endpoint SwapEnabled bool // whether SWAP incentives are enabled - SwapPaymentThreshold uint64 // honey amount at which a payment is triggered - SwapDisconnectThreshold uint64 // honey amount at which a peer disconnects + SwapPaymentThreshold uint // honey amount at which a payment is triggered + SwapDisconnectThreshold uint // honey amount at which a peer disconnects SwapInitialDeposit uint64 // initial deposit amount to the chequebook SwapLogPath string // dir to swap related audit logs Contract common.Address // address of the chequebook contract diff --git a/bzzeth/bzzeth_test.go b/bzzeth/bzzeth_test.go index 9b423e6f2a..9f8ed64f0d 100644 --- a/bzzeth/bzzeth_test.go +++ b/bzzeth/bzzeth_test.go @@ -87,7 +87,7 @@ func newTestNetworkStore(t *testing.T) (prvkey *ecdsa.PrivateKey, netStore *stor } netStore = storage.NewNetStore(localStore, bzzAddr, enode.ID{}) - r := retrieval.New(kad, netStore, bzzAddr, nil) + r := retrieval.New(kad, netStore, bzzAddr, nil, 0) netStore.RemoteGet = r.RequestFromPeers cleanup = func() { diff --git a/cmd/swarm/config.go b/cmd/swarm/config.go index e502d2270e..60a95dc7a4 100644 --- a/cmd/swarm/config.go +++ b/cmd/swarm/config.go @@ -213,10 +213,10 @@ func flagsOverride(currentConfig *bzzapi.Config, ctx *cli.Context) *bzzapi.Confi if initialDepo := ctx.GlobalUint64(SwarmSwapInitialDepositFlag.Name); initialDepo != 0 { currentConfig.SwapInitialDeposit = initialDepo } - if paymentThreshold := ctx.GlobalUint64(SwarmSwapPaymentThresholdFlag.Name); paymentThreshold != 0 { + if paymentThreshold := ctx.GlobalUint(SwarmSwapPaymentThresholdFlag.Name); paymentThreshold != 0 { currentConfig.SwapPaymentThreshold = paymentThreshold } - if disconnectThreshold := ctx.GlobalUint64(SwarmSwapDisconnectThresholdFlag.Name); disconnectThreshold != 0 { + if disconnectThreshold := ctx.GlobalUint(SwarmSwapDisconnectThresholdFlag.Name); disconnectThreshold != 0 { currentConfig.SwapDisconnectThreshold = disconnectThreshold } if ctx.GlobalIsSet(SwarmNoSyncFlag.Name) { diff --git a/cmd/swarm/config_test.go b/cmd/swarm/config_test.go index 28dd14177b..e63d89a060 100644 --- a/cmd/swarm/config_test.go +++ b/cmd/swarm/config_test.go @@ -238,8 +238,8 @@ func TestConfigCmdLineOverrides(t *testing.T) { fmt.Sprintf("--%s", utils.DataDirFlag.Name), dir, fmt.Sprintf("--%s", utils.IPCPathFlag.Name), conf.IPCPath, "--verbosity", fmt.Sprintf("%d", *testutil.Loglevel), - fmt.Sprintf("--%s", SwarmSwapPaymentThresholdFlag.Name), strconv.FormatUint(swap.DefaultPaymentThreshold+1, 10), - fmt.Sprintf("--%s", SwarmSwapDisconnectThresholdFlag.Name), strconv.FormatUint(swap.DefaultDisconnectThreshold+1, 10), + fmt.Sprintf("--%s", SwarmSwapPaymentThresholdFlag.Name), strconv.FormatUint(uint64(swap.DefaultPaymentThreshold)+1, 10), + fmt.Sprintf("--%s", SwarmSwapDisconnectThresholdFlag.Name), strconv.FormatUint(uint64(swap.DefaultDisconnectThreshold)+1, 10), fmt.Sprintf("--%s", SwarmEnablePinningFlag.Name), } diff --git a/network/retrieval/peer.go b/network/retrieval/peer.go index b19278e4e3..efdf227583 100644 --- a/network/retrieval/peer.go +++ b/network/retrieval/peer.go @@ -28,21 +28,26 @@ import ( "github.com/ethersphere/swarm/storage" ) -// Peer wraps BzzPeer with a contextual logger and tracks open +var ErrCannotFindRUID = errors.New("cannot find ruid") +var ErrNoAddressMatch = errors.New("retrieve request found but address does not match") + +// Peer wraps BzzPeer with a contextual logger and tracks open. TODO: update comment // retrievals for that peer type Peer struct { *network.BzzPeer - logger log.Logger // logger with base and peer address - mtx sync.Mutex // synchronize retrievals - retrievals map[uint]chunk.Address // current ongoing retrievals + logger log.Logger // logger with base and peer address + mtx sync.Mutex // synchronize retrievals + retrievals map[uint]chunk.Address // current ongoing retrievals + priceInformation []uint // price per Proximity order } // NewPeer is the constructor for Peer func NewPeer(peer *network.BzzPeer, baseKey []byte) *Peer { return &Peer{ - BzzPeer: peer, - logger: log.New("base", hex.EncodeToString(baseKey)[:16], "peer", peer.ID().String()[:16]), - retrievals: make(map[uint]chunk.Address), + BzzPeer: peer, + logger: log.New("base", hex.EncodeToString(baseKey)[:16], "peer", peer.ID().String()[:16]), + retrievals: make(map[uint]chunk.Address), + priceInformation: make([]uint, 10), //TODO: more intelligent way of creating this array. Load historical data from store } } @@ -57,16 +62,23 @@ func (p *Peer) addRetrieval(ruid uint, addr storage.Address) { // chunkReceived is called upon ChunkDelivery message reception // it is meant to idenfify unsolicited chunk deliveries func (p *Peer) checkRequest(ruid uint, addr storage.Address) error { + // 0 means synthetic chunk + if ruid == uint(0) { + return nil + } p.mtx.Lock() defer p.mtx.Unlock() v, ok := p.retrievals[ruid] if !ok { - return errors.New("cannot find ruid") + return ErrCannotFindRUID } - delete(p.retrievals, ruid) // since we got the delivery we wanted - it is safe to delete the retrieve request if !bytes.Equal(v, addr) { - return errors.New("retrieve request found but address does not match") + return ErrNoAddressMatch } return nil } + +func (p *Peer) deleteRequest(ruid uint) { + delete(p.retrievals, ruid) +} diff --git a/network/retrieval/retrieve.go b/network/retrieval/retrieve.go index 74b2a31793..d09886cd41 100644 --- a/network/retrieval/retrieve.go +++ b/network/retrieval/retrieve.go @@ -39,7 +39,6 @@ import ( "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/spancontext" "github.com/ethersphere/swarm/storage" - "github.com/ethersphere/swarm/swap" opentracing "github.com/opentracing/opentracing-go" olog "github.com/opentracing/opentracing-go/log" ) @@ -63,30 +62,20 @@ var ( Messages: []interface{}{ ChunkDelivery{}, RetrieveRequest{}, + NewPrice{}, }, } ErrNoPeerFound = errors.New("no peer found") ) -// Price is the method through which a message type marks itself -// as implementing the protocols.Price protocol and thus -// as swap-enabled message -func (rr *RetrieveRequest) Price() *protocols.Price { - return &protocols.Price{ - Value: swap.RetrieveRequestPrice, - PerByte: false, - Payer: protocols.Sender, - } -} - // Price is the method through which a message type marks itself // as implementing the protocols.Price protocol and thus // as swap-enabled message func (cd *ChunkDelivery) Price() *protocols.Price { return &protocols.Price{ - Value: swap.ChunkDeliveryPrice, - PerByte: true, + Value: cd.price, + PerByte: false, Payer: protocols.Receiver, } } @@ -95,6 +84,7 @@ func (cd *ChunkDelivery) Price() *protocols.Price { type Retrieval struct { netStore *storage.NetStore kad *network.Kademlia + margin uint mtx sync.RWMutex // protect peer map peers map[enode.ID]*Peer // compatible peers spec *protocols.Spec // protocol spec @@ -103,10 +93,11 @@ type Retrieval struct { } // New returns a new instance of the retrieval protocol handler -func New(kad *network.Kademlia, ns *storage.NetStore, baseKey []byte, balance protocols.Balance) *Retrieval { +func New(kad *network.Kademlia, ns *storage.NetStore, baseKey []byte, balance protocols.Balance, margin uint) *Retrieval { r := &Retrieval{ netStore: ns, kad: kad, + margin: margin, peers: make(map[enode.ID]*Peer), spec: spec, logger: log.New("base", hex.EncodeToString(baseKey)[:16]), @@ -140,6 +131,29 @@ func (r *Retrieval) getPeer(id enode.ID) *Peer { return r.peers[id] } +// get the pric for requesting a chunk from peer +func (r *Retrieval) getPeerPrice(peer enode.ID, chunkAddr chunk.Address) uint { + r.mtx.RLock() + defer r.mtx.RUnlock() + //TODO: does this return 0 if the price was never initialized? + return r.peers[peer].priceInformation[chunk.Proximity(chunkAddr, peer.Bytes())] //TODO: is peer.Bytes() the correct address to use? +} + +func (r *Retrieval) setPeerPrice(peer enode.ID, chunkAddr chunk.Address, newPrice uint) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.peers[peer].priceInformation[chunk.Proximity(chunkAddr, peer.Bytes())] = newPrice +} + +// returns true if the price +func (r *Retrieval) verifyPrice(offeredPrice uint, wantedPrice uint) bool { + if offeredPrice >= wantedPrice { + return true + } else { + return false + } +} + // Run is being dispatched when 2 nodes connect func (r *Retrieval) Run(bp *network.BzzPeer) error { sp := NewPeer(bp, r.kad.BaseAddr()) @@ -158,6 +172,9 @@ func (r *Retrieval) handleMsg(p *Peer) func(context.Context, interface{}) error go r.handleRetrieveRequest(ctx, p, msg) case *ChunkDelivery: go r.handleChunkDelivery(ctx, p, msg) + + case *NewPrice: + go r.handleNewPrice(ctx, p, msg) } return nil } @@ -298,10 +315,31 @@ func (r *Retrieval) findPeer(ctx context.Context, req *storage.Request) (retPeer return retPeer, nil } +// handleNewPrice updates the priceInformation for the particular peer. If there are outstanding requests for the particular chunk, we handle them. +func (r *Retrieval) handleNewPrice(ctx context.Context, p *Peer, msg *NewPrice) { + // update price + r.setPeerPrice(p.ID(), msg.Addr, msg.Price) + err := r.peers[p.ID()].checkRequest(msg.Ruid, msg.Addr) + // outstanding request + if err == nil { + // TODO: compare newPrice with margin + // if newPrice + margin >= outstanding request price => issue new request + // else => send NewPrice message + } + // TODO: look up outstanding chunk requests, verify wether the newPrice + margin >= the price of the outstanding request. If so, resend with new price, if not, send newPrice message ourself to Origininator of request +} + // handleRetrieveRequest handles an incoming retrieve request from a certain Peer // if the chunk is found in the localstore it is served immediately, otherwise // it results in a new retrieve request to candidate peers in our kademlia func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *RetrieveRequest) { + // return directly with a NewPrice msg if msg.price is not bigger than margin + wantedPrice := r.margin + ok := r.verifyPrice(msg.Price, wantedPrice) + if !ok { + r.createAndSendNewPrice(ctx, p, msg.Ruid, msg.Addr, wantedPrice) + } + p.logger.Debug("retrieval.handleRetrieveRequest", "ref", msg.Addr) handleRetrieveRequestMsgCount.Inc(1) @@ -313,13 +351,15 @@ func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *Ret defer osp.Finish() - ctx, cancel := context.WithTimeout(ctx, timeouts.FetcherGlobalTimeout) - defer cancel() - req := &storage.Request{ Addr: msg.Addr, + Ruid: msg.Ruid, + Price: msg.Price, Origin: p.ID(), } + ctx, cancel := context.WithTimeout(ctx, timeouts.FetcherGlobalTimeout) + defer cancel() + chunk, err := r.netStore.Get(ctx, chunk.ModeGetRequest, req) if err != nil { retrieveChunkFail.Inc(1) @@ -328,9 +368,9 @@ func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *Ret } p.logger.Trace("retrieval.handleRetrieveRequest - delivery", "ref", msg.Addr) - deliveryMsg := &ChunkDelivery{ Ruid: msg.Ruid, + price: msg.Price, Addr: chunk.Address(), SData: chunk.Data(), } @@ -350,6 +390,9 @@ func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *Ret func (r *Retrieval) handleChunkDelivery(ctx context.Context, p *Peer, msg *ChunkDelivery) { p.logger.Debug("retrieval.handleChunkDelivery", "ref", msg.Addr) err := p.checkRequest(msg.Ruid, msg.Addr) + if err == ErrNoAddressMatch { + p.deleteRequest(msg.Ruid) + } if err != nil { unsolicitedChunkDelivery.Inc(1) p.logger.Error("unsolicited chunk delivery from peer", "ruid", msg.Ruid, "addr", msg.Addr, "err", err) @@ -397,7 +440,7 @@ func (r *Retrieval) RequestFromPeers(ctx context.Context, req *storage.Request, const maxFindPeerRetries = 5 retries := 0 - + //TODO: make FINDPEER request based on r.Peer.priceInformation FINDPEER: sp, err := r.findPeer(ctx, req) if err != nil { @@ -418,9 +461,23 @@ FINDPEER: goto FINDPEER } + //check if the price is ok, if not ok send a NewPrice msg + wantedPrice := r.margin + r.getPeerPrice(sp.ID(), req.Addr) + ok := r.verifyPrice(req.Price, wantedPrice) + if !ok { + r.createAndSendNewPrice(ctx, protoPeer, req.Ruid, req.Addr, wantedPrice) + } + + // create a non-zero ruid (zero specifices synthetic chunk) + ruid := uint(rand.Uint32()) + for ruid == 0 { + ruid = uint(rand.Uint32()) + } + ret := &RetrieveRequest{ - Ruid: uint(rand.Uint32()), - Addr: req.Addr, + Ruid: ruid, + Price: r.getPeerPrice(sp.ID(), req.Addr), + Addr: req.Addr, } protoPeer.logger.Trace("sending retrieve request", "ref", ret.Addr, "origin", localID, "ruid", ret.Ruid) protoPeer.addRetrieval(ret.Ruid, ret.Addr) @@ -434,6 +491,15 @@ FINDPEER: return &spID, nil } +func (r *Retrieval) createAndSendNewPrice(ctx context.Context, destinationPeer *Peer, ruid uint, addr storage.Address, wantedPrice uint) error { + newPriceMsg := &NewPrice{ + Ruid: ruid, + Price: wantedPrice, + Addr: addr, + } + return destinationPeer.Send(ctx, newPriceMsg) +} + func (r *Retrieval) Start(server *p2p.Server) error { r.logger.Info("starting bzz-retrieve") return nil diff --git a/network/retrieval/retrieve_test.go b/network/retrieval/retrieve_test.go index 538cd0c2e2..93d969abe7 100644 --- a/network/retrieval/retrieve_test.go +++ b/network/retrieval/retrieve_test.go @@ -449,7 +449,7 @@ func TestRequestFromPeers(t *testing.T) { to.On(peer) - s := New(to, nil, to.BaseAddr(), nil) + s := New(to, nil, to.BaseAddr(), nil, 0) req := storage.NewRequest(storage.Address(hash0[:])) id, err := s.findPeer(context.Background(), req) @@ -465,14 +465,9 @@ func TestRequestFromPeers(t *testing.T) { //TestHasPriceImplementation is to check that Retrieval provides priced messages func TestHasPriceImplementation(t *testing.T) { price := (&ChunkDelivery{}).Price() - if price == nil || price.Value == 0 { + if price == nil { t.Fatal("No prices set for chunk delivery msg") } - - price = (&RetrieveRequest{}).Price() - if price == nil || price.Value == 0 { - t.Fatal("No prices set for retrieve requests") - } } func newBzzRetrieveWithLocalstore(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { @@ -502,7 +497,7 @@ func newBzzRetrieveWithLocalstore(ctx *adapters.ServiceContext, bucket *sync.Map return nil, nil, err } - r := New(kad, netStore, kad.BaseAddr(), nil) + r := New(kad, netStore, kad.BaseAddr(), nil, 0) netStore.RemoteGet = r.RequestFromPeers bucket.Store(bucketKeyFileStore, fileStore) bucket.Store(bucketKeyNetstore, netStore) @@ -621,7 +616,7 @@ func newRetrievalTester(t *testing.T, prvkey *ecdsa.PrivateKey, netStore *storag prvkey = key } - r := New(kad, netStore, kad.BaseAddr(), nil) + r := New(kad, netStore, kad.BaseAddr(), nil, 0) protocolTester := p2ptest.NewProtocolTester(prvkey, 1, r.runProtocol) return protocolTester, r, protocolTester.Stop, nil diff --git a/network/retrieval/wire.go b/network/retrieval/wire.go index fcf98f762f..80a794f0bd 100644 --- a/network/retrieval/wire.go +++ b/network/retrieval/wire.go @@ -20,13 +20,22 @@ import "github.com/ethersphere/swarm/storage" // RetrieveRequest is the protocol msg for chunk retrieve requests type RetrieveRequest struct { - Ruid uint - Addr storage.Address + Ruid uint // unique identifier, to protect agains unsollicited chunks + Price uint // the best-effort price of the requested ChunkDelivery + Addr storage.Address // the address of the requested chunk } // ChunkDelivery is the protocol msg for delivering a solicited chunk to a peer type ChunkDelivery struct { - Ruid uint - Addr storage.Address - SData []byte + Ruid uint // unique identifier, to protect agains unsollicited chunks + price uint // the agreed-upon price of the ChunkDelivery + Addr storage.Address // the address of the chunk + SData []byte // the chunk +} + +// NewPrice is the protocol msg to notify a peer about a price for a certain chunk (distance) +type NewPrice struct { + Ruid uint // unique identifier. 0 signifisies synthetic chunk + Price uint // the proposed price for the chunk (distance) + Addr storage.Address // the address of the chunk, may be synthetic (does not correspond to a real chunk) } diff --git a/network/stream/common_test.go b/network/stream/common_test.go index 2bbccc3600..9a0518b845 100644 --- a/network/stream/common_test.go +++ b/network/stream/common_test.go @@ -126,7 +126,7 @@ func newSyncSimServiceFunc(o *SyncSimServiceOptions) func(ctx *adapters.ServiceC bucket.Store(bucketKeyFileStore, fileStore) bucket.Store(bucketKeyLocalStore, localStore) - ret := retrieval.New(kad, netStore, kad.BaseAddr(), nil) + ret := retrieval.New(kad, netStore, kad.BaseAddr(), nil, 0) netStore.RemoteGet = ret.RequestFromPeers if o.InitialChunkCount > 0 { diff --git a/p2p/protocols/accounting.go b/p2p/protocols/accounting.go index 1bb18a5ace..e6db6ce4de 100644 --- a/p2p/protocols/accounting.go +++ b/p2p/protocols/accounting.go @@ -62,7 +62,7 @@ const ( // Price represents the costs of a message type Price struct { - Value uint64 + Value uint PerByte bool // True if the price is per byte or for unit Payer Payer } @@ -77,7 +77,7 @@ type Price struct { func (p *Price) For(payer Payer, size uint32) int64 { price := p.Value if p.PerByte { - price *= uint64(size) + price *= uint(size) } if p.Payer == payer { return 0 - int64(price) diff --git a/p2p/protocols/accounting_test.go b/p2p/protocols/accounting_test.go index 2b452e255e..d0dd2c0d8d 100644 --- a/p2p/protocols/accounting_test.go +++ b/p2p/protocols/accounting_test.go @@ -58,7 +58,7 @@ type nilPriceMsg struct{} func (m *perBytesMsgReceiverPays) Price() *Price { return &Price{ PerByte: true, - Value: uint64(100), + Value: uint(100), Payer: Receiver, } } @@ -66,7 +66,7 @@ func (m *perBytesMsgReceiverPays) Price() *Price { func (m *perBytesMsgSenderPays) Price() *Price { return &Price{ PerByte: true, - Value: uint64(100), + Value: uint(100), Payer: Sender, } } @@ -74,7 +74,7 @@ func (m *perBytesMsgSenderPays) Price() *Price { func (m *perUnitMsgReceiverPays) Price() *Price { return &Price{ PerByte: false, - Value: uint64(99), + Value: uint(99), Payer: Receiver, } } @@ -82,7 +82,7 @@ func (m *perUnitMsgReceiverPays) Price() *Price { func (m *perUnitMsgSenderPays) Price() *Price { return &Price{ PerByte: false, - Value: uint64(99), + Value: uint(99), Payer: Sender, } } @@ -90,7 +90,7 @@ func (m *perUnitMsgSenderPays) Price() *Price { func (m *zeroPriceMsg) Price() *Price { return &Price{ PerByte: false, - Value: uint64(0), + Value: uint(0), Payer: Sender, } } diff --git a/pushsync/simulation_test.go b/pushsync/simulation_test.go index f558b2957b..9f3776819d 100644 --- a/pushsync/simulation_test.go +++ b/pushsync/simulation_test.go @@ -201,7 +201,7 @@ func newServiceFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Servic bucket.Store(bucketKeyNetStore, netStore) - r := retrieval.New(kad, netStore, kad.BaseAddr(), nil) + r := retrieval.New(kad, netStore, kad.BaseAddr(), nil, 0) netStore.RemoteGet = r.RequestFromPeers pubSub := pss.NewPubSub(ps, 1*time.Second) diff --git a/storage/netstore.go b/storage/netstore.go index 0aee4ba1d3..b4874d6d48 100644 --- a/storage/netstore.go +++ b/storage/netstore.go @@ -157,12 +157,11 @@ func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (C ref := req.Addr ch, err := n.Store.Get(ctx, mode, ref) + if err != nil && err != ErrChunkNotFound && err != leveldb.ErrNotFound { + n.logger.Error("localstore get error", "err", err) + //TODO: return error here? + } if err != nil { - // TODO: fix comparison - we should be comparing against leveldb.ErrNotFound, this error should be wrapped. - if err != ErrChunkNotFound && err != leveldb.ErrNotFound { - n.logger.Error("localstore get error", "err", err) - } - n.logger.Trace("netstore.chunk-not-in-localstore", "ref", ref.String()) v, err, _ := n.requestGroup.Do(ref.String(), func() (interface{}, error) { diff --git a/storage/request.go b/storage/request.go index 47bb66cb19..44218a2c58 100644 --- a/storage/request.go +++ b/storage/request.go @@ -29,7 +29,9 @@ import ( // These could have also been added as part of the interface of NetStore.Get, but a request struct seemed // like a better option type Request struct { - Addr Address // chunk address + Addr Address // chunk address + Ruid uint // Ruid of the originating RetriveRequest + Price uint // the price offered by Origin Origin enode.ID // who is sending us that request? we compare Origin to the suggested peer from RequestFromPeers PeersToSkip sync.Map // peers not to request chunk from } diff --git a/swap/config.go b/swap/config.go index ab7aae6e09..050c75a55e 100644 --- a/swap/config.go +++ b/swap/config.go @@ -25,7 +25,7 @@ import ( const ( // Thresholds which trigger payment or disconnection. The unit is in honey (internal accounting unit) // DefaultPaymentThreshold is set to be equivalent to requesting and serving 10mb of data (2441 chunks (4096 bytes) = 10 mb, 10^7 bytes = 10 mb) - DefaultPaymentThreshold = 2441*RetrieveRequestPrice + (10^7)*ChunkDeliveryPrice // 4096 * 2441 = 10 mb, + DefaultPaymentThreshold = uint(2441*RetrieveRequestPrice + (10^7)*ChunkDeliveryPrice) // 4096 * 2441 = 10 mb, DefaultDisconnectThreshold = 20 * DefaultPaymentThreshold // DefaultInitialDepositAmount is the default amount to send to the contract when initially deploying // NOTE: deliberate value for now; needs experimentation diff --git a/swap/protocol_test.go b/swap/protocol_test.go index 596868ee7c..58c4f7c1b4 100644 --- a/swap/protocol_test.go +++ b/swap/protocol_test.go @@ -352,7 +352,7 @@ func TestTriggerPaymentThreshold(t *testing.T) { // set the balance to manually be at PaymentThreshold overDraft := 42 - expectedAmount := uint64(overDraft) + DefaultPaymentThreshold + expectedAmount := uint(overDraft) + DefaultPaymentThreshold creditor.setBalance(-int64(DefaultPaymentThreshold)) // we expect a cheque at the end of the test, but not yet @@ -376,11 +376,11 @@ func TestTriggerPaymentThreshold(t *testing.T) { t.Fatal("Expected pending cheque") } - if pending.CumulativePayout != expectedAmount { + if pending.CumulativePayout != uint64(expectedAmount) { t.Fatalf("Expected cheque cumulative payout to be %d, but is %d", expectedAmount, pending.CumulativePayout) } - if pending.Honey != expectedAmount { + if pending.Honey != uint64(expectedAmount) { t.Fatalf("Expected cheque honey to be %d, but is %d", expectedAmount, pending.Honey) } diff --git a/swap/simulations_test.go b/swap/simulations_test.go index aefc479730..fe8232c469 100644 --- a/swap/simulations_test.go +++ b/swap/simulations_test.go @@ -275,7 +275,7 @@ func TestPingPongChequeSimulation(t *testing.T) { counter := cter.(metrics.Counter) counter.Clear() var lastCount int64 - expectedPayout1, expectedPayout2 := DefaultPaymentThreshold+1, DefaultPaymentThreshold+1 + expectedPayout1, expectedPayout2 := uint64(DefaultPaymentThreshold)+1, uint64(DefaultPaymentThreshold)+1 _, err = sim.AddNodesAndConnectFull(nodeCount) if err != nil { @@ -334,7 +334,7 @@ func TestPingPongChequeSimulation(t *testing.T) { t.Fatal(err) } lastCount += 1 - expectedPayout1 += DefaultPaymentThreshold + 1 + expectedPayout1 += uint64(DefaultPaymentThreshold) + 1 } else { if err := p1Peer.Send(context.Background(), &testMsgBigPrice{}); err != nil { t.Fatal(err) @@ -343,7 +343,7 @@ func TestPingPongChequeSimulation(t *testing.T) { t.Fatal(err) } lastCount += 1 - expectedPayout2 += DefaultPaymentThreshold + 1 + expectedPayout2 += uint64(DefaultPaymentThreshold) + 1 } } @@ -356,7 +356,7 @@ func TestPingPongChequeSimulation(t *testing.T) { t.Fatal(err) } - expected := uint64(maxCheques) / 2 * (DefaultPaymentThreshold + 1) + expected := uint64(maxCheques) / 2 * (uint64(DefaultPaymentThreshold) + 1) if ch1.CumulativePayout != expected { t.Fatalf("expected cumulative payout to be %d, but is %d", expected, ch1.CumulativePayout) } @@ -404,7 +404,7 @@ func TestMultiChequeSimulation(t *testing.T) { counter := cter.(metrics.Counter) counter.Clear() var lastCount int64 - expectedPayout := DefaultPaymentThreshold + 1 + expectedPayout := uint64(DefaultPaymentThreshold) + 1 _, err = sim.AddNodesAndConnectFull(nodeCount) if err != nil { @@ -465,7 +465,7 @@ func TestMultiChequeSimulation(t *testing.T) { t.Fatal(err) } lastCount += 1 - expectedPayout += DefaultPaymentThreshold + 1 + expectedPayout += uint64(DefaultPaymentThreshold) + 1 } // check balances: @@ -496,9 +496,9 @@ func TestMultiChequeSimulation(t *testing.T) { } // check also the actual expected amount - expectedPayout = uint64(maxCheques) * (DefaultPaymentThreshold + 1) + expectedPayout = uint64(uint(maxCheques) * (DefaultPaymentThreshold + 1)) - if cheque2.CumulativePayout != expectedPayout { + if cheque2.CumulativePayout != uint64(expectedPayout) { t.Fatalf("Expected %d in cumulative payout, got %d", expectedPayout, cheque1.CumulativePayout) } diff --git a/swap/swap_test.go b/swap/swap_test.go index ba0b4ba28d..6a71ba6b49 100644 --- a/swap/swap_test.go +++ b/swap/swap_test.go @@ -595,7 +595,7 @@ func TestPaymentThreshold(t *testing.T) { var cheque *Cheque _ = swap.store.Get(pendingChequeKey(testPeer.Peer.ID()), &cheque) - if cheque.CumulativePayout != DefaultPaymentThreshold { + if cheque.CumulativePayout != uint64(DefaultPaymentThreshold) { t.Fatal() } } diff --git a/swarm.go b/swarm.go index c6b7f2a1f4..773ba37f9c 100644 --- a/swarm.go +++ b/swarm.go @@ -70,6 +70,7 @@ var ( startCounter = metrics.NewRegisteredCounter("stack,start", nil) stopCounter = metrics.NewRegisteredCounter("stack,stop", nil) uptimeGauge = metrics.NewRegisteredGauge("stack.uptime", nil) + margin = uint(500) // TODO: make this configurable ) // Swarm abstracts the complete Swarm stack @@ -241,7 +242,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e nodeID := config.Enode.ID() self.netStore = storage.NewNetStore(lstore, bzzconfig.OverlayAddr, nodeID) - self.retrieval = retrieval.New(to, self.netStore, bzzconfig.OverlayAddr, self.swap) // nodeID.Bytes()) + self.retrieval = retrieval.New(to, self.netStore, bzzconfig.OverlayAddr, self.swap, margin) // nodeID.Bytes()) self.netStore.RemoteGet = self.retrieval.RequestFromPeers feedsHandler.SetStore(self.netStore) @@ -317,6 +318,31 @@ func parseResolverAPIAddress(s string) (tld, endpoint string, addr common.Addres return } +// parseRnsAPIAddress parses string according to format +// [contract-addr@]url and returns RNSClientConfig structure +// with endpoint and contract address +func parseRnsAPIAddress(s string) (endpoint string, addr common.Address) { + isAllLetterString := func(s string) bool { + for _, r := range s { + if !unicode.IsLetter(r) { + return false + } + } + return true + } + endpoint = s + if i := strings.Index(endpoint, ":"); i > 0 { + if isAllLetterString(endpoint[:i]) && len(endpoint) > i+2 && endpoint[i+1:i+3] != "//" { + endpoint = endpoint[i+1:] + } + } + if i := strings.Index(endpoint, "@"); i > 0 { + addr = common.HexToAddress(endpoint[:i]) + endpoint = endpoint[i+1:] + } + return +} + // ensClient provides functionality for api.ResolveValidator type ensClient struct { *ens.ENS