diff --git a/api/config.go b/api/config.go index d343c1c5be..96ae459763 100644 --- a/api/config.go +++ b/api/config.go @@ -54,7 +54,7 @@ type Config struct { *network.HiveParams Swap *swap.LocalProfile - Pss *pss.PssParams + Pss *pss.Params Contract common.Address EnsRoot common.Address EnsAPIs []string @@ -87,7 +87,7 @@ func NewConfig() (c *Config) { FileStoreParams: storage.NewFileStoreParams(), HiveParams: network.NewHiveParams(), Swap: swap.NewDefaultSwapParams(), - Pss: pss.NewPssParams(), + Pss: pss.NewParams(), ListenAddr: DefaultHTTPListenAddr, Port: DefaultHTTPPort, Path: node.DefaultDataDir(), diff --git a/pss/client/client_test.go b/pss/client/client_test.go index 5a26a14b82..54b87a152e 100644 --- a/pss/client/client_test.go +++ b/pss/client/client_test.go @@ -258,9 +258,9 @@ func newServices() adapters.Services { if err != nil { return nil, err } - psparams := pss.NewPssParams().WithPrivateKey(privkey) + psparams := pss.NewParams().WithPrivateKey(privkey) pskad := kademlia(ctx.Config.ID) - ps, err := pss.NewPss(pskad, psparams) + ps, err := pss.New(pskad, psparams) if err != nil { return nil, err } diff --git a/pss/forwarding_test.go b/pss/forwarding_test.go index a95020d2c9..3c3e38b402 100644 --- a/pss/forwarding_test.go +++ b/pss/forwarding_test.go @@ -324,8 +324,8 @@ func addPeers(kad *network.Kademlia, addresses []pot.Address) { func createPss(t *testing.T, kad *network.Kademlia) *Pss { privKey, err := crypto.GenerateKey() - pssp := NewPssParams().WithPrivateKey(privKey) - ps, err := NewPss(kad, pssp) + pssp := NewParams().WithPrivateKey(privKey) + ps, err := New(kad, pssp) if err != nil { t.Fatal(err.Error()) } diff --git a/pss/keystore.go b/pss/keystore.go index 3e0f7773b8..0e1049e7fb 100644 --- a/pss/keystore.go +++ b/pss/keystore.go @@ -33,18 +33,18 @@ type KeyStore struct { w *whisper.Whisper // key and encryption backend mx sync.RWMutex - pubKeyPool map[string]map[Topic]*pssPeer // mapping of hex public keys to peer address by topic. - symKeyPool map[string]map[Topic]*pssPeer // mapping of symkeyids to peer address by topic. - symKeyDecryptCache []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack - symKeyDecryptCacheCursor int // modular cursor pointing to last used, wraps on symKeyDecryptCache array + pubKeyPool map[string]map[Topic]*peer // mapping of hex public keys to peer address by topic. + symKeyPool map[string]map[Topic]*peer // mapping of symkeyids to peer address by topic. + symKeyDecryptCache []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack + symKeyDecryptCacheCursor int // modular cursor pointing to last used, wraps on symKeyDecryptCache array } func loadKeyStore() *KeyStore { return &KeyStore{ w: whisper.New(&whisper.DefaultConfig), - pubKeyPool: make(map[string]map[Topic]*pssPeer), - symKeyPool: make(map[string]map[Topic]*pssPeer), + pubKeyPool: make(map[string]map[Topic]*peer), + symKeyPool: make(map[string]map[Topic]*peer), symKeyDecryptCache: make([]*string, defaultSymKeyCacheCapacity), } } @@ -65,14 +65,14 @@ func (ks *KeyStore) isPubKeyStored(key string) bool { return ok } -func (ks *KeyStore) getPeerSym(symkeyid string, topic Topic) (*pssPeer, bool) { +func (ks *KeyStore) getPeerSym(symkeyid string, topic Topic) (*peer, bool) { ks.mx.RLock() defer ks.mx.RUnlock() psp, ok := ks.symKeyPool[symkeyid][topic] return psp, ok } -func (ks *KeyStore) getPeerPub(pubkeyid string, topic Topic) (*pssPeer, bool) { +func (ks *KeyStore) getPeerPub(pubkeyid string, topic Topic) (*peer, bool) { ks.mx.RLock() defer ks.mx.RUnlock() psp, ok := ks.pubKeyPool[pubkeyid][topic] @@ -91,12 +91,12 @@ func (ks *KeyStore) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, addre return fmt.Errorf("invalid public key: %v", pubkey) } pubkeyid := common.ToHex(pubkeybytes) - psp := &pssPeer{ + psp := &peer{ address: address, } ks.mx.Lock() if _, ok := ks.pubKeyPool[pubkeyid]; !ok { - ks.pubKeyPool[pubkeyid] = make(map[Topic]*pssPeer) + ks.pubKeyPool[pubkeyid] = make(map[Topic]*peer) } ks.pubKeyPool[pubkeyid][topic] = psp ks.mx.Unlock() @@ -107,13 +107,13 @@ func (ks *KeyStore) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, addre // adds a symmetric key to the pss key pool, and optionally adds the key to the // collection of keys used to attempt symmetric decryption of incoming messages func (ks *KeyStore) addSymmetricKeyToPool(keyid string, topic Topic, address PssAddress, addtocache bool, protected bool) { - psp := &pssPeer{ + psp := &peer{ address: address, protected: protected, } ks.mx.Lock() if _, ok := ks.symKeyPool[keyid]; !ok { - ks.symKeyPool[keyid] = make(map[Topic]*pssPeer) + ks.symKeyPool[keyid] = make(map[Topic]*peer) } ks.symKeyPool[keyid][topic] = psp ks.mx.Unlock() diff --git a/pss/notify/notify_test.go b/pss/notify/notify_test.go index 5bd028e7ad..d8c056aec9 100644 --- a/pss/notify/notify_test.go +++ b/pss/notify/notify_test.go @@ -238,11 +238,11 @@ func newServices(allowRaw bool) adapters.Services { if err != nil { return nil, err } - pssp := pss.NewPssParams().WithPrivateKey(privkey) + pssp := pss.NewParams().WithPrivateKey(privkey) pssp.MsgTTL = time.Second * 30 pssp.AllowRaw = allowRaw pskad := kademlia(ctx.Config.ID) - ps, err := pss.NewPss(pskad, pssp) + ps, err := pss.New(pskad, pssp) if err != nil { return nil, err } diff --git a/pss/prox_test.go b/pss/prox_test.go index 6436035a65..2fb7bcaa05 100644 --- a/pss/prox_test.go +++ b/pss/prox_test.go @@ -421,7 +421,7 @@ func newProxServices(td *testData, allowRaw bool, handlerContextFuncs map[Topic] defer cancel() keys, err := wapi.NewKeyPair(ctxlocal) privkey, err := w.GetPrivateKey(keys) - pssp := NewPssParams().WithPrivateKey(privkey) + pssp := NewParams().WithPrivateKey(privkey) pssp.AllowRaw = allowRaw bzzPrivateKey, err := simulation.BzzPrivateKeyFromConfig(ctx.Config) if err != nil { @@ -430,7 +430,7 @@ func newProxServices(td *testData, allowRaw bool, handlerContextFuncs map[Topic] bzzKey := network.PrivateKeyToBzzKey(bzzPrivateKey) pskad := kademlia(ctx.Config.ID, bzzKey) b.Store(simulation.BucketKeyKademlia, pskad) - ps, err := NewPss(pskad, pssp) + ps, err := New(pskad, pssp) if err != nil { return nil, nil, err } diff --git a/pss/pss.go b/pss/pss.go index bf45444858..0c3027187c 100644 --- a/pss/pss.go +++ b/pss/pss.go @@ -44,7 +44,7 @@ import ( const ( defaultPaddingByteSize = 16 - DefaultMsgTTL = time.Second * 120 + defaultMsgTTL = time.Second * 120 defaultDigestCacheTTL = time.Second * 10 defaultSymKeyCacheCapacity = 512 digestLength = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash) @@ -53,8 +53,8 @@ const ( defaultMaxMsgSize = 1024 * 1024 defaultCleanInterval = time.Second * 60 * 10 defaultOutboxCapacity = 100000 - pssProtocolName = "pss" - pssVersion = 2 + protocolName = "pss" + protocolVersion = 2 hasherCount = 8 ) @@ -62,10 +62,19 @@ var ( addressLength = len(pot.Address{}) ) +var spec = &protocols.Spec{ + Name: protocolName, + Version: protocolVersion, + MaxMsgSize: defaultMaxMsgSize, + Messages: []interface{}{ + PssMsg{}, + }, +} + // cache is used for preventing backwards routing // will also be instrumental in flood guard mechanism // and mailbox implementation -type pssCacheEntry struct { +type cacheEntry struct { expiresAt time.Time } @@ -79,14 +88,14 @@ type senderPeer interface { // per-key peer related information // member `protected` prevents garbage collection of the instance -type pssPeer struct { +type peer struct { lastSeen time.Time address PssAddress protected bool } // Pss configuration parameters -type PssParams struct { +type Params struct { MsgTTL time.Duration CacheTTL time.Duration privateKey *ecdsa.PrivateKey @@ -95,22 +104,21 @@ type PssParams struct { } // Sane defaults for Pss -func NewPssParams() *PssParams { - return &PssParams{ - MsgTTL: DefaultMsgTTL, +func NewParams() *Params { + return &Params{ + MsgTTL: defaultMsgTTL, CacheTTL: defaultDigestCacheTTL, SymKeyCacheCapacity: defaultSymKeyCacheCapacity, } } -func (params *PssParams) WithPrivateKey(privatekey *ecdsa.PrivateKey) *PssParams { +func (params *Params) WithPrivateKey(privatekey *ecdsa.PrivateKey) *Params { params.privateKey = privatekey return params } -// Toplevel pss object, takes care of message sending, receiving, decryption and encryption, message handler dispatchers and message forwarding. -// -// Implements node.Service +// Pss is the top-level struct, which takes care of message sending, receiving, decryption and encryption, message handler dispatchers +// and message forwarding. Implements node.Service type Pss struct { *network.Kademlia // we can get the Kademlia address from this *KeyStore @@ -119,15 +127,16 @@ type Pss struct { auxAPIs []rpc.API // builtins (handshake, test) can add APIs // sending and forwarding - fwdPool map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer - fwdPoolMu sync.RWMutex - fwdCache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg + peers map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer + peersMu sync.RWMutex + + fwdCache map[digest]cacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg fwdCacheMu sync.RWMutex cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented) msgTTL time.Duration paddingByteSize int capstring string - outbox chan *PssMsg + outbox chan *outboxMsg // message handling handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle() @@ -148,13 +157,13 @@ func (p *Pss) String() string { // // In addition to params, it takes a swarm network Kademlia // and a FileStore storage for message cache storage. -func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) { +func New(k *network.Kademlia, params *Params) (*Pss, error) { if params.privateKey == nil { return nil, errors.New("missing private key for pss") } - cap := p2p.Cap{ - Name: pssProtocolName, - Version: pssVersion, + c := p2p.Cap{ + Name: protocolName, + Version: protocolVersion, } ps := &Pss{ Kademlia: k, @@ -163,13 +172,13 @@ func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) { privateKey: params.privateKey, quitC: make(chan struct{}), - fwdPool: make(map[string]*protocols.Peer), - fwdCache: make(map[pssDigest]pssCacheEntry), + peers: make(map[string]*protocols.Peer), + fwdCache: make(map[digest]cacheEntry), cacheTTL: params.CacheTTL, msgTTL: params.MsgTTL, paddingByteSize: defaultPaddingByteSize, - capstring: cap.String(), - outbox: make(chan *PssMsg, defaultOutboxCapacity), + capstring: c.String(), + outbox: make(chan *outboxMsg, defaultOutboxCapacity), handlers: make(map[Topic]map[*handler]bool), topicHandlerCaps: make(map[Topic]*handlerCaps), @@ -214,11 +223,14 @@ func (p *Pss) Start(srv *p2p.Server) error { for { select { case msg := <-p.outbox: - err := p.forward(msg) + metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(len(p.outbox))) + + err := p.forward(msg.msg) if err != nil { log.Error(err.Error()) metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1) } + metrics.GetOrRegisterResettingTimer("pss.handle.outbox", nil).UpdateSince(msg.startedAt) case <-p.quitC: return } @@ -235,32 +247,42 @@ func (p *Pss) Stop() error { return nil } -var pssSpec = &protocols.Spec{ - Name: pssProtocolName, - Version: pssVersion, - MaxMsgSize: defaultMaxMsgSize, - Messages: []interface{}{ - PssMsg{}, - }, -} - func (p *Pss) Protocols() []p2p.Protocol { return []p2p.Protocol{ { - Name: pssSpec.Name, - Version: pssSpec.Version, - Length: pssSpec.Length(), + Name: spec.Name, + Version: spec.Version, + Length: spec.Length(), Run: p.Run, }, } } func (p *Pss) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error { - pp := protocols.NewPeer(peer, rw, pssSpec) - p.fwdPoolMu.Lock() - p.fwdPool[peer.Info().ID] = pp - p.fwdPoolMu.Unlock() - return pp.Run(p.handlePssMsg) + pp := protocols.NewPeer(peer, rw, spec) + p.addPeer(pp) + defer p.removePeer(pp) + return pp.Run(p.handle) +} + +func (p *Pss) getPeer(peer *protocols.Peer) (pp *protocols.Peer, ok bool) { + p.peersMu.RLock() + defer p.peersMu.RUnlock() + pp, ok = p.peers[peer.Peer.Info().ID] + return +} + +func (p *Pss) addPeer(peer *protocols.Peer) { + p.peersMu.Lock() + defer p.peersMu.Unlock() + p.peers[peer.Peer.Info().ID] = peer +} + +func (p *Pss) removePeer(peer *protocols.Peer) { + p.peersMu.Lock() + defer p.peersMu.Unlock() + log.Trace("removing peer", "id", peer.Peer.Info().ID) + delete(p.peers, peer.Peer.Info().ID) } func (p *Pss) APIs() []rpc.API { @@ -373,11 +395,12 @@ func (p *Pss) deregister(topic *Topic, hndlr *handler) { // Check if address partially matches // If yes, it CAN be for us, and we process it // Only passes error to pss protocol handler if payload is not valid pssmsg -func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { - metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1) +func (p *Pss) handle(ctx context.Context, msg interface{}) error { + defer metrics.GetOrRegisterResettingTimer("pss.handle", nil).UpdateSince(time.Now()) + pssmsg, ok := msg.(*PssMsg) if !ok { - return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg) + return fmt.Errorf("invalid message type. Expected *PssMsg, got %T", msg) } log.Trace("handler", "self", label(p.Kademlia.BaseAddr()), "topic", label(pssmsg.Payload.Topic[:])) if int64(pssmsg.Expire) < time.Now().Unix() { @@ -433,7 +456,7 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { // Attempts symmetric and asymmetric decryption with stored keys. // Dispatches message to all handlers matching the message topic func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error { - metrics.GetOrRegisterCounter("pss.process", nil).Inc(1) + defer metrics.GetOrRegisterResettingTimer("pss.process", nil).UpdateSince(time.Now()) var err error var recvmsg *whisper.ReceivedMessage @@ -481,6 +504,8 @@ func (p *Pss) getHandlers(topic Topic) (ret []*handler) { } func (p *Pss) executeHandlers(topic Topic, payload []byte, from PssAddress, raw bool, prox bool, asymmetric bool, keyid string) { + defer metrics.GetOrRegisterResettingTimer("pss.execute-handlers", nil).UpdateSince(time.Now()) + handlers := p.getHandlers(topic) peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{}) for _, h := range handlers { @@ -529,8 +554,13 @@ func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { ///////////////////////////////////////////////////////////////////// func (p *Pss) enqueue(msg *PssMsg) error { + defer metrics.GetOrRegisterResettingTimer("pss.enqueue", nil).UpdateSince(time.Now()) + + outboxmsg := newOutboxMsg(msg) select { - case p.outbox <- msg: + case p.outbox <- outboxmsg: + metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(len(p.outbox))) + return nil default: } @@ -543,9 +573,12 @@ func (p *Pss) enqueue(msg *PssMsg) error { // // Will fail if raw messages are disallowed func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { + defer metrics.GetOrRegisterResettingTimer("pss.send.raw", nil).UpdateSince(time.Now()) + if err := validateAddress(address); err != nil { return err } + pssMsgParams := &msgParams{ raw: true, } @@ -553,24 +586,15 @@ func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { Data: msg, Topic: whisper.TopicType(topic), } + pssMsg := newPssMsg(pssMsgParams) pssMsg.To = address pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix()) pssMsg.Payload = payload + p.addFwdCache(pssMsg) - err := p.enqueue(pssMsg) - if err != nil { - return err - } - // if we have a proxhandler on this topic - // also deliver message to ourselves - if capabilities, ok := p.getTopicHandlerCaps(topic); ok { - if p.isSelfPossibleRecipient(pssMsg, true) && capabilities.prox { - return p.process(pssMsg, true, true) - } - } - return nil + return p.enqueue(pssMsg) } // Send a message using symmetric encryption @@ -659,16 +683,8 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by pssMsg.To = to pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix()) pssMsg.Payload = envelope - err = p.enqueue(pssMsg) - if err != nil { - return err - } - if capabilities, ok := p.getTopicHandlerCaps(topic); ok { - if p.isSelfPossibleRecipient(pssMsg, true) && capabilities.prox { - return p.process(pssMsg, true, true) - } - } - return nil + + return p.enqueue(pssMsg) } // sendFunc is a helper function that tries to send a message and returns true on success. @@ -691,9 +707,11 @@ func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool { } // get the protocol peer from the forwarding peer cache - p.fwdPoolMu.RLock() - pp := p.fwdPool[sp.Info().ID] - p.fwdPoolMu.RUnlock() + pp, ok := p.getPeer(sp.BzzPeer.Peer) + if !ok { + log.Warn("peer no longer in our list, dropping message") + return false + } err := pp.Send(context.TODO(), msg) if err != nil { @@ -798,17 +816,17 @@ func label(b []byte) string { // add a message to the cache func (p *Pss) addFwdCache(msg *PssMsg) error { - metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1) + defer metrics.GetOrRegisterResettingTimer("pss.addfwdcache", nil).UpdateSince(time.Now()) - var entry pssCacheEntry + var entry cacheEntry var ok bool p.fwdCacheMu.Lock() defer p.fwdCacheMu.Unlock() - digest := p.digest(msg) + digest := p.msgDigest(msg) if entry, ok = p.fwdCache[digest]; !ok { - entry = pssCacheEntry{} + entry = cacheEntry{} } entry.expiresAt = time.Now().Add(p.cacheTTL) p.fwdCache[digest] = entry @@ -820,7 +838,7 @@ func (p *Pss) checkFwdCache(msg *PssMsg) bool { p.fwdCacheMu.Lock() defer p.fwdCacheMu.Unlock() - digest := p.digest(msg) + digest := p.msgDigest(msg) entry, ok := p.fwdCache[digest] if ok { if entry.expiresAt.After(time.Now()) { @@ -834,19 +852,19 @@ func (p *Pss) checkFwdCache(msg *PssMsg) bool { } // Digest of message -func (p *Pss) digest(msg *PssMsg) pssDigest { +func (p *Pss) msgDigest(msg *PssMsg) digest { return p.digestBytes(msg.serialize()) } -func (p *Pss) digestBytes(msg []byte) pssDigest { +func (p *Pss) digestBytes(msg []byte) digest { hasher := p.hashPool.Get().(hash.Hash) defer p.hashPool.Put(hasher) hasher.Reset() hasher.Write(msg) - digest := pssDigest{} + d := digest{} key := hasher.Sum(nil) - copy(digest[:], key[:digestLength]) - return digest + copy(d[:], key[:digestLength]) + return d } func validateAddress(addr PssAddress) error { diff --git a/pss/pss_test.go b/pss/pss_test.go index 0a8595b357..218f6423c1 100644 --- a/pss/pss_test.go +++ b/pss/pss_test.go @@ -169,7 +169,7 @@ func TestCache(t *testing.T) { } ps := newTestPss(privkey, nil, nil) defer ps.Stop() - pp := NewPssParams().WithPrivateKey(privkey) + pp := NewParams().WithPrivateKey(privkey) data := []byte("foo") datatwo := []byte("bar") datathree := []byte("baz") @@ -203,20 +203,20 @@ func TestCache(t *testing.T) { To: to, } - digest := ps.digest(msg) + digestone := ps.msgDigest(msg) if err != nil { t.Fatalf("could not store cache msgone: %v", err) } - digesttwo := ps.digest(msgtwo) + digesttwo := ps.msgDigest(msgtwo) if err != nil { t.Fatalf("could not store cache msgtwo: %v", err) } - digestthree := ps.digest(msgthree) + digestthree := ps.msgDigest(msgthree) if err != nil { t.Fatalf("could not store cache msgthree: %v", err) } - if digest == digesttwo { + if digestone == digesttwo { t.Fatalf("different msgs return same hash: %d", digesttwo) } @@ -268,8 +268,8 @@ func TestAddressMatch(t *testing.T) { t.Fatalf("Could not generate private key: %v", err) } privkey, err := w.GetPrivateKey(keys) - pssp := NewPssParams().WithPrivateKey(privkey) - ps, err := NewPss(kad, pssp) + pssp := NewParams().WithPrivateKey(privkey) + ps, err := New(kad, pssp) if err != nil { t.Fatal(err.Error()) } @@ -306,145 +306,6 @@ func TestAddressMatch(t *testing.T) { } -// test that message is handled by sender if a prox handler exists and sender is in prox of message -func TestProxShortCircuit(t *testing.T) { - - // sender node address - localAddr := network.RandomAddr().Over() - localPotAddr := pot.NewAddressFromBytes(localAddr) - - // set up kademlia - kadParams := network.NewKadParams() - kad := network.NewKademlia(localAddr, kadParams) - peerCount := kad.MinBinSize + 1 - - // set up pss - privKey, err := crypto.GenerateKey() - pssp := NewPssParams().WithPrivateKey(privKey) - ps, err := NewPss(kad, pssp) - if err != nil { - t.Fatal(err.Error()) - } - - // create kademlia peers, so we have peers both inside and outside minproxlimit - var peers []*network.Peer - proxMessageAddress := pot.RandomAddressAt(localPotAddr, peerCount).Bytes() - distantMessageAddress := pot.RandomAddressAt(localPotAddr, 0).Bytes() - - for i := 0; i < peerCount; i++ { - rw := &p2p.MsgPipeRW{} - ptpPeer := p2p.NewPeer(enode.ID{}, "wanna be with me? [ ] yes [ ] no", []p2p.Cap{}) - protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{}) - peerAddr := pot.RandomAddressAt(localPotAddr, i) - bzzPeer := &network.BzzPeer{ - Peer: protoPeer, - BzzAddr: &network.BzzAddr{ - OAddr: peerAddr.Bytes(), - UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])), - }, - } - peer := network.NewPeer(bzzPeer, kad) - kad.On(peer) - peers = append(peers, peer) - } - - // register it marking prox capability - delivered := make(chan struct{}) - rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { - log.Trace("in allowraw handler") - delivered <- struct{}{} - return nil - } - topic := BytesToTopic([]byte{0x2a}) - hndlrProxDereg := ps.Register(&topic, &handler{ - f: rawHandlerFunc, - caps: &handlerCaps{ - raw: true, - prox: true, - }, - }) - defer hndlrProxDereg() - - // send message too far away for sender to be in prox - // reception of this message should time out - errC := make(chan error) - go func() { - err := ps.SendRaw(distantMessageAddress, topic, []byte("foo")) - if err != nil { - errC <- err - } - }() - - ctx, cancel := context.WithTimeout(context.TODO(), time.Second) - defer cancel() - select { - case <-delivered: - t.Fatal("raw distant message delivered") - case err := <-errC: - t.Fatal(err) - case <-ctx.Done(): - } - - // send message that should be within sender prox - // this message should be delivered - go func() { - err := ps.SendRaw(proxMessageAddress, topic, []byte("bar")) - if err != nil { - errC <- err - } - }() - - ctx, cancel = context.WithTimeout(context.TODO(), time.Second) - defer cancel() - select { - case <-delivered: - case err := <-errC: - t.Fatal(err) - case <-ctx.Done(): - t.Fatal("raw timeout") - } - - // try the same prox message with sym and asym send - proxAddrPss := PssAddress(proxMessageAddress) - symKeyId, err := ps.GenerateSymmetricKey(topic, proxAddrPss, true) - go func() { - err := ps.SendSym(symKeyId, topic, []byte("baz")) - if err != nil { - errC <- err - } - }() - ctx, cancel = context.WithTimeout(context.TODO(), time.Second) - defer cancel() - select { - case <-delivered: - case err := <-errC: - t.Fatal(err) - case <-ctx.Done(): - t.Fatal("sym timeout") - } - - err = ps.SetPeerPublicKey(&privKey.PublicKey, topic, proxAddrPss) - if err != nil { - t.Fatal(err) - } - pubKeyId := hexutil.Encode(crypto.FromECDSAPub(&privKey.PublicKey)) - go func() { - err := ps.SendAsym(pubKeyId, topic, []byte("xyzzy")) - if err != nil { - errC <- err - } - }() - ctx, cancel = context.WithTimeout(context.TODO(), time.Second) - defer cancel() - select { - case <-delivered: - case err := <-errC: - t.Fatal(err) - case <-ctx.Done(): - t.Fatal("asym timeout") - } -} - // verify that node can be set as recipient regardless of explicit message address match if minimum one handler of a topic is explicitly set to allow it // note that in these tests we use the raw capability on handlers for convenience func TestAddressMatchProx(t *testing.T) { @@ -461,8 +322,8 @@ func TestAddressMatchProx(t *testing.T) { // set up pss privKey, err := crypto.GenerateKey() - pssp := NewPssParams().WithPrivateKey(privKey) - ps, err := NewPss(kad, pssp) + pssp := NewParams().WithPrivateKey(privKey) + ps, err := New(kad, pssp) if err != nil { t.Fatal(err.Error()) } @@ -571,7 +432,7 @@ func TestAddressMatchProx(t *testing.T) { } log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr) - ps.handlePssMsg(context.TODO(), pssMsg) + ps.handle(context.TODO(), pssMsg) if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) { t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i]) } @@ -602,7 +463,7 @@ func TestAddressMatchProx(t *testing.T) { } log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr) - ps.handlePssMsg(context.TODO(), pssMsg) + ps.handle(context.TODO(), pssMsg) if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) { t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i]) } @@ -626,7 +487,7 @@ func TestAddressMatchProx(t *testing.T) { } log.Trace("noprox addrs", "local", localAddr, "remote", remoteAddr) - ps.handlePssMsg(context.TODO(), pssMsg) + ps.handle(context.TODO(), pssMsg) if receives != 0 { t.Fatalf("expected distance %d to not be recipient when prox is not set for handler", distance) } @@ -646,7 +507,7 @@ func TestMessageProcessing(t *testing.T) { addr := make([]byte, 32) addr[0] = 0x01 - ps := newTestPss(privkey, network.NewKademlia(addr, network.NewKadParams()), NewPssParams()) + ps := newTestPss(privkey, network.NewKademlia(addr, network.NewKadParams()), NewParams()) defer ps.Stop() // message should pass @@ -657,11 +518,11 @@ func TestMessageProcessing(t *testing.T) { Topic: [4]byte{}, Data: []byte{0x66, 0x6f, 0x6f}, } - if err := ps.handlePssMsg(context.TODO(), msg); err != nil { + if err := ps.handle(context.TODO(), msg); err != nil { t.Fatal(err.Error()) } tmr := time.NewTimer(time.Millisecond * 100) - var outmsg *PssMsg + var outmsg *outboxMsg select { case outmsg = <-ps.outbox: case <-tmr.C: @@ -674,7 +535,7 @@ func TestMessageProcessing(t *testing.T) { // message should pass and queue due to partial length msg.To = addr[0:1] msg.Payload.Data = []byte{0x78, 0x79, 0x80, 0x80, 0x79} - if err := ps.handlePssMsg(context.TODO(), msg); err != nil { + if err := ps.handle(context.TODO(), msg); err != nil { t.Fatal(err.Error()) } tmr.Reset(time.Millisecond * 100) @@ -697,7 +558,7 @@ func TestMessageProcessing(t *testing.T) { // full address mismatch should put message in queue msg.To[0] = 0xff - if err := ps.handlePssMsg(context.TODO(), msg); err != nil { + if err := ps.handle(context.TODO(), msg); err != nil { t.Fatal(err.Error()) } tmr.Reset(time.Millisecond * 10) @@ -720,7 +581,7 @@ func TestMessageProcessing(t *testing.T) { // expired message should be dropped msg.Expire = uint32(time.Now().Add(-time.Second).Unix()) - if err := ps.handlePssMsg(context.TODO(), msg); err != nil { + if err := ps.handle(context.TODO(), msg); err != nil { t.Fatal(err.Error()) } tmr.Reset(time.Millisecond * 10) @@ -740,17 +601,19 @@ func TestMessageProcessing(t *testing.T) { }{ pssMsg: &PssMsg{}, } - if err := ps.handlePssMsg(context.TODO(), fckedupmsg); err == nil { + if err := ps.handle(context.TODO(), fckedupmsg); err == nil { t.Fatalf("expected error from processMsg but error nil") } // outbox full should return error msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix()) + + omsg := newOutboxMsg(msg) for i := 0; i < defaultOutboxCapacity; i++ { - ps.outbox <- msg + ps.outbox <- omsg } msg.Payload.Data = []byte{0x62, 0x61, 0x72} - err = ps.handlePssMsg(context.TODO(), msg) + err = ps.handle(context.TODO(), msg) if err == nil { t.Fatal("expected error when mailbox full, but was nil") } @@ -899,7 +762,7 @@ func TestPeerCapabilityMismatch(t *testing.T) { // one peer has a mismatching version of pss wrongpssaddr := network.RandomAddr() wrongpsscap := p2p.Cap{ - Name: pssProtocolName, + Name: protocolName, Version: 0, } nid := enode.ID{0x01} @@ -979,7 +842,7 @@ func TestRawAllow(t *testing.T) { pssMsg.Payload = &whisper.Envelope{ Topic: whisper.TopicType(topic), } - ps.handlePssMsg(context.TODO(), pssMsg) + ps.handle(context.TODO(), pssMsg) if receives > 0 { t.Fatalf("Expected handler not to be executed with raw cap off") } @@ -995,7 +858,7 @@ func TestRawAllow(t *testing.T) { // should work now pssMsg.Payload.Data = []byte("Raw Deal") - ps.handlePssMsg(context.TODO(), pssMsg) + ps.handle(context.TODO(), pssMsg) if receives == 0 { t.Fatalf("Expected handler to be executed with raw cap on") } @@ -1006,7 +869,7 @@ func TestRawAllow(t *testing.T) { // check that raw messages fail again pssMsg.Payload.Data = []byte("Raw Trump") - ps.handlePssMsg(context.TODO(), pssMsg) + ps.handle(context.TODO(), pssMsg) if receives != prevReceives { t.Fatalf("Expected handler not to be executed when raw handler is retracted") } @@ -1781,7 +1644,7 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) { keys, err := wapi.NewKeyPair(ctx) privkey, err := w.GetPrivateKey(keys) if cachesize > 0 { - ps = newTestPss(privkey, nil, &PssParams{SymKeyCacheCapacity: int(cachesize)}) + ps = newTestPss(privkey, nil, &Params{SymKeyCacheCapacity: int(cachesize)}) } else { ps = newTestPss(privkey, nil, nil) } @@ -1865,7 +1728,7 @@ func benchmarkSymkeyBruteforceSameaddr(b *testing.B) { keys, err := wapi.NewKeyPair(ctx) privkey, err := w.GetPrivateKey(keys) if cachesize > 0 { - ps = newTestPss(privkey, nil, &PssParams{SymKeyCacheCapacity: int(cachesize)}) + ps = newTestPss(privkey, nil, &Params{SymKeyCacheCapacity: int(cachesize)}) } else { ps = newTestPss(privkey, nil, nil) } @@ -1930,7 +1793,7 @@ func setupNetwork(numnodes int, allowRaw bool) (clients []*rpc.Client, err error }) for i := 0; i < numnodes; i++ { nodeconf := adapters.RandomNodeConfig() - nodeconf.Services = []string{"bzz", pssProtocolName} + nodeconf.Services = []string{"bzz", protocolName} nodes[i], err = net.NewNodeWithConfig(nodeconf) if err != nil { return nil, fmt.Errorf("error creating node 1: %v", err) @@ -1977,7 +1840,7 @@ func newServices(allowRaw bool) adapters.Services { return kademlias[id] } return adapters.Services{ - pssProtocolName: func(ctx *adapters.ServiceContext) (node.Service, error) { + protocolName: func(ctx *adapters.ServiceContext) (node.Service, error) { // execadapter does not exec init() initTest() @@ -1985,10 +1848,10 @@ func newServices(allowRaw bool) adapters.Services { defer cancel() keys, err := wapi.NewKeyPair(ctxlocal) privkey, err := w.GetPrivateKey(keys) - pssp := NewPssParams().WithPrivateKey(privkey) + pssp := NewParams().WithPrivateKey(privkey) pssp.AllowRaw = allowRaw pskad := kademlia(ctx.Config.ID) - ps, err := NewPss(pskad, pssp) + ps, err := New(pskad, pssp) if err != nil { return nil, err } @@ -2042,7 +1905,7 @@ func newServices(allowRaw bool) adapters.Services { } } -func newTestPss(privkey *ecdsa.PrivateKey, kad *network.Kademlia, ppextra *PssParams) *Pss { +func newTestPss(privkey *ecdsa.PrivateKey, kad *network.Kademlia, ppextra *Params) *Pss { nid := enode.PubkeyToIDV4(&privkey.PublicKey) // set up routing if kademlia is not passed to us if kad == nil { @@ -2052,11 +1915,11 @@ func newTestPss(privkey *ecdsa.PrivateKey, kad *network.Kademlia, ppextra *PssPa } // create pss - pp := NewPssParams().WithPrivateKey(privkey) + pp := NewParams().WithPrivateKey(privkey) if ppextra != nil { pp.SymKeyCacheCapacity = ppextra.SymKeyCacheCapacity } - ps, err := NewPss(kad, pp) + ps, err := New(kad, pp) if err != nil { return nil } diff --git a/pss/types.go b/pss/types.go index ad68ade260..759d6329bd 100644 --- a/pss/types.go +++ b/pss/types.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -87,7 +88,7 @@ func (a *PssAddress) UnmarshalJSON(input []byte) error { } // holds the digest of a message used for caching -type pssDigest [digestLength]byte +type digest [digestLength]byte // conceals bitwise operations on the control flags byte type msgParams struct { @@ -117,6 +118,18 @@ func (m *msgParams) Bytes() (paramBytes []byte) { return paramBytes } +type outboxMsg struct { + msg *PssMsg + startedAt time.Time +} + +func newOutboxMsg(msg *PssMsg) *outboxMsg { + return &outboxMsg{ + msg: msg, + startedAt: time.Now(), + } +} + // PssMsg encapsulates messages transported over pss. type PssMsg struct { To []byte diff --git a/swarm.go b/swarm.go index c8052d3bed..81809e74f8 100644 --- a/swarm.go +++ b/swarm.go @@ -219,7 +219,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e self.bzz = network.NewBzz(bzzconfig, to, self.stateStore, self.streamer.GetSpec(), self.streamer.Run) // Pss = postal service over swarm (devp2p over bzz) - self.ps, err = pss.NewPss(to, config.Pss) + self.ps, err = pss.New(to, config.Pss) if err != nil { return nil, err }