-
Notifications
You must be signed in to change notification settings - Fork 110
network/retrieve: add bzz-retrieve protocol #1589
Conversation
| ) | ||
|
|
||
| // NodeConfigAtPo brute forces a node config to create a node that has an overlay address at the provided po in relation to the given baseaddr | ||
| func NodeConfigAtPo(t *testing.T, baseaddr []byte, po int) *adapters.NodeConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer if we don't add test code in the main binary... maybe rename this to util_test.go or something ending with _test.go ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't. either it sits in this package or it gets added to the binary. this functionality is super important in our test suite and should be accessible from any package. i don't have any solution for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that if this package is imported only in test files, it will not be included in the binary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand why we need a testutil package, and why can't that live next to the test code. It doesn't appear to be used by anything else, but the retrieval tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is going to be used in the new stream package for certain tests. that's why i put it here so i dont need to rebase or clip this out of the directory just in a few days. that's all
holisticode
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a couple of comments; consider my review as non-authoritative, I mainly wanted to update myself on what you were working on
| // findPeer finds a peer we need to ask for a specific chunk from according to our kademlia | ||
| func (r *Retrieval) findPeer(ctx context.Context, req *storage.Request) (retPeer *network.Peer, err error) { | ||
| log.Trace("retrieval.findPeer", "req.Addr", req.Addr) | ||
| osp, _ := ctx.Value("remote.fetch").(opentracing.Span) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
personal opinion: even if the conversion is 100% fool proof (now), I don't like to ignore errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wouldn't like to have this function fail on a debugging feature nor would i like to see anything in the logs as of such. leaving it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess what Fabio meant is to handle the error, not necessarily to fail the function on a debugging feature. We can log the error if it happens. Then again we know that this is opentracing.Span so I think it is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i understand. and i dont want to log anything because is a very non-critical error and i have a feeling that if we log it - we'll see a lot of log spamming in production
| } | ||
|
|
||
| // skip peers that we have already tried | ||
| if req.SkipPeer(id.String()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bytes signature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i rather not touch anything in network/stream right now (this change vets it)
| return true | ||
| } | ||
|
|
||
| if myPo < depth { // chunk is NOT within the neighbourhood |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually have unit tests for all this logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no and this is not something i'm going to attend. this PR's purpose is not to unit test findPeer (there's a specific ticket for that) but to pull out the retrieve protocol.
That being said, I think that the delivery forwarding test provides better testing than we ever had on the functionality of forwarding. plus it somewhat tests generally the logic in this function. In any case, utilizing that test as a starting point we could quite easily test all of the logic in this function.
| r.mtx.Lock() | ||
| defer r.mtx.Unlock() | ||
|
|
||
| return r.peers[id] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would happen if we are concurrently handling 2 messages from the same peer, one of them resulting in an error, which would return from the Run method below, and call removePeer, while the other is still processing a message and then calling getPeer and send on a nil pointer?
I think it is a bit dangerous to assume that a peer is always in this slice, this is why I have added checks in the PSS PR.
I guess you can leave it as is, but keep it at the back of your head in case we ever see nil pointer dereferences on Send.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ) | ||
|
|
||
| // NodeConfigAtPo brute forces a node config to create a node that has an overlay address at the provided po in relation to the given baseaddr | ||
| func NodeConfigAtPo(t *testing.T, baseaddr []byte, po int) *adapters.NodeConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that if this package is imported only in test files, it will not be included in the binary.
| return nil, err | ||
| } | ||
|
|
||
| protoPeer := r.getPeer(sp.ID()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @nonsense said in comment above, protoPeer can be nil. Maybe just to explicitly check for that before calling using it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok i add a nil check. now the next question is how the nil should be handled. ideally we should not fail the request but maybe goto the line where findPeer is called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should fail the request if getPeer is nil, because the connection is already dropped due to another request/message problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if a node requests a chunk from us, why should we fail because another node disconnected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that's right. Then I guess it is fine as you have written it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is super ugly.
- Perform the
findPeercall under the peers mutex locked and just user.peers[..]. - Just define
findPeeras an iterator so retrieving the protocol peer is part of the iterator.
findPeer(ctx, req, func(*Peer)) (*Peer, error) {
})
in fact the send can also error in which case we also need to go further. so 2 is much better.
|
|
||
| protoPeer := r.getPeer(sp.ID()) | ||
| if protoPeer == nil { | ||
| goto FINDPEER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential endless loop. And looks like a very likely one, to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If getPeer returns nil, it means that the connection is dropped and the peer is removed from the collection, so just return an error and that's it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guys, RequestFromPeer is triggered when we want a chunk from our peers. that is - we want a chunk, or another node asks us for a chunk and we can't deliver so we ask other peers.
the possible race condition that a peer that is returned from the kademlia in r.findPeer then disconnecting before we reach r.getPeer is so remote that i would barely even consider it feasible, also, the potential very likely infinite loop you're talking about means that every subsequent call to findPeer will yield the same race condition.that is very un -likely.
returning an error when this race condition has occured makes no sense to me. why should we fail a delivery forwarding request from a certain peer just because another peer disconnected from us? we should just find another peer to request from.
my 2 cents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@acud you're right, we should just find another suggested peer, as you've written it, I was confused when I wrote my comment, it makes sense now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My reasoning is with this code:
- findPeer returns no error
- getPeer returns nil
-> infinite loop
Why findPeer and getPeer would behave like that, It does not matter to me in the context of this function, as with any change to this two functions it is possible, however unlikely it is to happen now. When someone changes this two functions, he/she should be aware of RequestFromPeers implications. If it is possible, my opinion is that it should be handled.
I just wanted to express my opinion. I am deeply sorry if I am frustrating anybody with this. Whatever you think is correct is ok with me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
janos i have addressed this with a limit to how many retries we should do
| log.Debug("retrieval.requestFromPeers", "req.Addr", req.Addr) | ||
| metrics.GetOrRegisterCounter("network.retrieve.requestfrompeers", nil).Inc(1) | ||
|
|
||
| FINDPEER: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for FINDPEER.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually @acud is right, we need this. Basically this is a case where the suggested peer from findPeer has been disconnected in the meantime, and we just need a new suggested peer.
| func NodeConfigAtPo(t *testing.T, baseaddr []byte, po int) *adapters.NodeConfig { | ||
| foundPo := -1 | ||
| var conf *adapters.NodeConfig | ||
| for foundPo != po { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be mixing enode and bzzaddresses. If you want a bzzaddress with a certain po from another address, you can also use:
pot.RandomAddressAt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? it creates the kademlia base address using all of this enr voodoo and checks it for the po from the supplied baseaddr, which is also coming out the kademlia of the reference node.i dont need just a random address, i need the whole config that is used to generate a new node so that when the node is generated, the correct address is bootstrapped into its kademlia
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RandomAddressAt doesn't generate a random address, but one with a given po - same as what you're doing here.
ENR and Enodes are independent from BzzAddr.
| t.Fatalf("unable to create enode: %v", err) | ||
| } | ||
|
|
||
| n := network.NewAddr(nod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nonsense here you can see that network.NewAddr is called and that should give us the correct overlay address
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, no - network.NewAddr(nod) should not be used as discussed in chat - it is very misleading and broken.
| return fileStore.GetAllReferences(context.Background(), reader, false) | ||
| } | ||
|
|
||
| func getChunks(store chunk.Store) (chunks map[string]struct{}, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this used for? it feels strange you need the subscribePull function here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read the tests and thou shalt understand
| return &spID, nil | ||
| } | ||
|
|
||
| func (r *Retrieval) Start(server *p2p.Server) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer convinced that retrieval should be a service. we could just wrap it for simulation testing if needed.
could still keep the Protocols() func or better func (*Retrieval) Protocol() p2p.Protocol {}
but also ok to keep it, just wonder what motivates premature abstraction ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
..............?
I'm not sure i'm following you. we agreed to implement retrieval as a separate p2p protocol, that's what i did and p2p protocols need to implement node.Service, that's what I did. I don't understand what you want at this stage. please be kind enough to clarify because it seems you want me to scratch this PR? with which alternative? thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am just saying that looking at Start, Stop, APIs, i dont see a compelling case that Retrieval should implement node.Service . Just include Retrieval.Protocol() among the protocols in swarm.go. thats all. But i dont mind
| // where po(fetching,forwarding) = 1 and po(forwarding,uploading) = 1, then uploads chunks to the uploading node, afterwards | ||
| // tries to retrieve the relevant chunks (ones with po = 0 to fetching i.e. no bits in common with fetching and with | ||
| // po >= 1 with uploading i.e. with 1 bit or more in common with the uploading) | ||
| func TestDeliveryForwarding(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having a hard time understanding the generaslity and relevance of this test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is there not to understand? findPeer needs a certain topology for the retrieve requests to propagate through the forwarding node. i'm not which other explanations you expect and what is exactly unclear? the comment, the code? please be a bit more specific
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, to further clarify, you cannot test retrieval in a snapshot without pull syncing (since due to the logic in findPeer you'll never get to the content unless you build the topology in such a way that every request to every chunk will result in it arriving to the storing node).
so, to test if retrieve requests are being forwarded, a basic topology that adheres to certain POs between nodes is needed, with discovery disabled. This test tests exactly this, just retrieve request forwarding without any other moving parts
|
@zelig you're asking me to refactor |
* 'master' of github.com:ethersphere/swarm: (54 commits) api, chunk, cmd, shed, storage: add support for pinning content (ethersphere#1509) docs/swarm-guide: cleanup (ethersphere#1620) travis: split jobs into different stages (ethersphere#1615) simulation: retry if we hit a collision on tcp/udp ports (ethersphere#1616) api, chunk: rename Tag.New to Tag.Create (ethersphere#1614) pss: instrumentation and refactor (ethersphere#1580) api, cmd, network: add --disable-auto-connect flag (ethersphere#1576) changelog: fix typo (ethersphere#1605) version: update to v0.4.4 unstable (ethersphere#1603) swarm: release v0.4.3 (ethersphere#1602) network/retrieve: add bzz-retrieve protocol (ethersphere#1589) PoC: Network simulation framework (ethersphere#1555) network: structured output for kademlia table (ethersphere#1586) client: add bzz client, update smoke tests (ethersphere#1582) swarm-smoke: fix check max prox hosts for pull/push sync modes (ethersphere#1578) cmd/swarm: allow using a network interface by name for nat purposes (ethersphere#1557) pss: disable TestForwardBasic (ethersphere#1544) api, network: count chunk deliveries per peer (ethersphere#1534) network/newstream: new stream! protocol base implementation (ethersphere#1500) swarm: fix bzz_info.port when using dynamic port allocation (ethersphere#1537) ...
This PR adds a separate
bzz-retrieveprotocol to Swarm.The necessity of the protocol was initially identified during the
streamprotocol rewrite and has finally crystallised into its own separate package when retrievals were needed in order to test the newstream.Since
streamnotions towards chunks are in ranges of sequentially incremented indexes, there is in fact no place to treat and request chunks by their content-addressed hash. Furthermore, a node's participation inbzz-retrievedoes not anymore coerce a node to participate instream, which has it's own set of implications. This is imperative in order to facilitate finer granularity of feature-set support for adaptive nodes in the future.This PR does not hardwire the protocol into
swarm.goand all of the necessary locations. This is going to be done as part of the newstreamprotocol PR (as with the current codebase, as long as the oldstreamis used - there's no need to wire the protocol in since retrieve requests are handled as part of currentstream). This is to clean the diff and allow easier review of newstreamprotocol.