Skip to content

Commit 6f5ed03

Browse files
authored
kademlia persistence simulation (ethereum#347)
* swarm/network: Kademlia updates to full and SuggestPeer methods * swarm/network: add more tests and fixes * fixed a bug in state store that did not close the file handle on shutdown * added test for kademlia state storage across sessions
1 parent 6f8f818 commit 6f5ed03

File tree

5 files changed

+275
-17
lines changed

5 files changed

+275
-17
lines changed

p2p/simulations/adapters/inproc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func (self *SimNode) Start(snapshots map[string][]byte) error {
274274
for _, name := range self.config.Services {
275275
if err := self.node.Register(newService(name)); err != nil {
276276
regErr = err
277-
return
277+
break
278278
}
279279
}
280280
})

p2p/simulations/network.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,10 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error)
8787
if conf.Reachable == nil {
8888
conf.Reachable = func(otherID discover.NodeID) bool {
8989
_, err := self.InitConn(conf.ID, otherID)
90-
return err == nil
90+
if err != nil && bytes.Compare(conf.ID.Bytes(), otherID.Bytes()) < 0 {
91+
return false
92+
}
93+
return true
9194
}
9295
}
9396

@@ -448,25 +451,31 @@ func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn {
448451
// this is cheating as the simulation is used as an oracle and know about
449452
// remote peers attempt to connect to a node which will then not initiate the connection
450453
func (self *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) {
454+
log.Debug(fmt.Sprintf("InitConn(oneID: %v, otherID: %v)", oneID, otherID))
451455
self.lock.Lock()
452456
defer self.lock.Unlock()
453457
if oneID == otherID {
458+
log.Trace(fmt.Sprintf("refusing to connect to self %v", oneID))
454459
return nil, fmt.Errorf("refusing to connect to self %v", oneID)
455460
}
456461
conn, err := self.getOrCreateConn(oneID, otherID)
457462
if err != nil {
458463
return nil, err
459464
}
460465
if time.Since(conn.initiated) < dialBanTimeout {
466+
log.Trace(fmt.Sprintf("connection between %v and %v recently attempted", oneID, otherID))
461467
return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
462468
}
463469
if conn.Up {
470+
log.Trace(fmt.Sprintf("%v and %v already connected", oneID, otherID))
464471
return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
465472
}
466473
err = conn.nodesUp()
467474
if err != nil {
475+
log.Trace(fmt.Sprintf("nodes not up: %v", err))
468476
return nil, fmt.Errorf("nodes not up: %v", err)
469477
}
478+
log.Debug("InitConn - connection initiated")
470479
conn.initiated = time.Now()
471480
return conn, nil
472481
}

swarm/network/hive.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package network
1818

1919
import (
2020
"fmt"
21-
"math/rand"
2221
"sync"
2322
"time"
2423

@@ -71,7 +70,7 @@ func NewHiveParams() *HiveParams {
7170
Discovery: true,
7271
PeersBroadcastSetSize: 3,
7372
MaxPeersPerRequest: 5,
74-
KeepAliveInterval: 1000 * time.Millisecond,
73+
KeepAliveInterval: 500 * time.Millisecond,
7574
}
7675
}
7776

@@ -102,10 +101,12 @@ func NewHive(params *HiveParams, overlay Overlay, store state.Store) *Hive {
102101
// server is used to connect to a peer based on its NodeID or enode URL
103102
// these are called on the p2p.Server which runs on the node
104103
func (h *Hive) Start(server *p2p.Server) error {
105-
log.Trace(fmt.Sprintf("%08x hive starting", h.BaseAddr()[:4]))
104+
log.Info(fmt.Sprintf("%08x hive starting", h.BaseAddr()[:4]))
106105
// if state store is specified, load peers to prepopulate the overlay address book
107106
if h.Store != nil {
107+
log.Info("detected an existing store. trying to load peers")
108108
if err := h.loadPeers(); err != nil {
109+
log.Error(fmt.Sprintf("%08x hive encoutered an error trying to load peers", h.BaseAddr()[:4]))
109110
return err
110111
}
111112
}
@@ -123,7 +124,12 @@ func (h *Hive) Stop() error {
123124
log.Info(fmt.Sprintf("%08x hive stopping, saving peers", h.BaseAddr()[:4]))
124125
h.ticker.Stop()
125126
if h.Store != nil {
126-
return h.savePeers()
127+
if err := h.savePeers(); err != nil {
128+
return fmt.Errorf("could not save peers to persistence store: %v", err)
129+
}
130+
if err := h.Store.Close(); err != nil {
131+
return fmt.Errorf("could not close file handle to persistence store: %v", err)
132+
}
127133
}
128134
log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4]))
129135
h.EachConn(nil, 255, func(p OverlayConn, _ int, _ bool) bool {
@@ -139,8 +145,9 @@ func (h *Hive) Stop() error {
139145
// at each iteration, ask the overlay driver to suggest the most preferred peer to connect to
140146
// as well as advertises saturation depth if needed
141147
func (h *Hive) connect() {
142-
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
143148
for range h.ticker.C {
149+
log.Trace(fmt.Sprintf("%08x hive connect()", h.BaseAddr()[:4]))
150+
144151
addr, depth, changed := h.SuggestPeer()
145152
if h.Discovery && changed {
146153
NotifyDepth(uint8(depth), h)
@@ -203,14 +210,16 @@ func ToAddr(pa OverlayPeer) *BzzAddr {
203210
// loadPeers, savePeer implement persistence callback/
204211
func (h *Hive) loadPeers() error {
205212
var as []*BzzAddr
206-
207213
err := h.Store.Get("peers", &as)
208214
if err != nil {
209215
if err == state.ErrNotFound {
216+
log.Info(fmt.Sprintf("hive %08x: no persisted peers found", h.BaseAddr()[:4]))
210217
return nil
211218
}
212219
return err
213220
}
221+
log.Info(fmt.Sprintf("hive %08x: peers loaded", h.BaseAddr()[:4]))
222+
214223
return h.Register(toOverlayAddrs(as...))
215224
}
216225

swarm/network/kademlia.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,10 +263,14 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) {
263263
if po >= depth {
264264
return false
265265
}
266-
return f(func(val pot.Val, _ int) bool {
266+
ok := f(func(val pot.Val, _ int) bool {
267267
a = k.callable(val)
268268
return a == nil
269269
})
270+
if !ok {
271+
return false
272+
}
273+
return true
270274
})
271275
// found a candidate
272276
if a != nil {

0 commit comments

Comments
 (0)