Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,7 @@ impl NodeBuilder {
self
}

pub fn try_build(
self,
) -> Result<(
Node,
tokio::sync::mpsc::Receiver<IncomingMessage>,
tokio::sync::mpsc::Sender<OutgoingMessage>,
)> {
pub fn try_build(self) -> Result<(Node, P2PHandle)> {
let Self {
port,
mut listen_addrs,
Expand Down Expand Up @@ -315,8 +309,7 @@ impl NodeBuilder {
outgoing_message_rx,
cancellation_token: cancellation_token.unwrap_or_default(),
},
incoming_message_rx,
outgoing_message_tx,
P2PHandle::new(incoming_message_rx, outgoing_message_tx),
))
}
}
Expand All @@ -341,11 +334,10 @@ mod test {

#[tokio::test]
async fn two_nodes_can_connect_and_do_request_response() {
let (node1, mut incoming_message_rx1, outgoing_message_tx1) =
NodeBuilder::new().with_get_task_logs().try_build().unwrap();
let (node1, mut p2p_handle1) = NodeBuilder::new().with_get_task_logs().try_build().unwrap();
let node1_peer_id = node1.peer_id();

let (node2, mut incoming_message_rx2, outgoing_message_tx2) = NodeBuilder::new()
let (node2, mut p2p_handle2) = NodeBuilder::new()
.with_get_task_logs()
.with_bootnodes(node1.multiaddrs())
.try_build()
Expand All @@ -360,11 +352,12 @@ mod test {

// send request from node1->node2
let request = message::Request::GetTaskLogs;
outgoing_message_tx1
p2p_handle1
.outgoing_sender
.send(request.into_outgoing_message(node2_peer_id, vec![]))
.await
.unwrap();
let message = incoming_message_rx2.recv().await.unwrap();
let message = p2p_handle2.incoming_receiver.recv().await.unwrap();
assert_eq!(message.peer, node1_peer_id);
let libp2p::request_response::Message::Request {
request_id: _,
Expand All @@ -378,11 +371,12 @@ mod test {
// send response from node2->node1
let response =
message::Response::GetTaskLogs(message::GetTaskLogsResponse::Ok("logs".to_string()));
outgoing_message_tx2
p2p_handle2
.outgoing_sender
.send(response.into_outgoing_message(channel))
.await
.unwrap();
let message = incoming_message_rx1.recv().await.unwrap();
let message = p2p_handle1.incoming_receiver.recv().await.unwrap();
assert_eq!(message.peer, node2_peer_id);
let libp2p::request_response::Message::Response {
request_id: _,
Expand Down
18 changes: 18 additions & 0 deletions crates/p2p/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ mod hardware_challenge;

pub use hardware_challenge::*;

#[derive(Debug)]
pub struct P2PHandle {
pub incoming_receiver: tokio::sync::mpsc::Receiver<IncomingMessage>,
pub outgoing_sender: tokio::sync::mpsc::Sender<OutgoingMessage>,
}

impl P2PHandle {
pub fn new(
incoming_receiver: tokio::sync::mpsc::Receiver<IncomingMessage>,
outgoing_sender: tokio::sync::mpsc::Sender<OutgoingMessage>,
) -> Self {
Self {
incoming_receiver,
outgoing_sender,
}
}
}

#[derive(Debug)]
pub struct IncomingMessage {
pub peer: PeerId,
Expand Down
9 changes: 7 additions & 2 deletions crates/shared/src/p2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,18 @@ fn build_p2p_node(
cancellation_token: CancellationToken,
protocols: Protocols,
) -> Result<(Node, Receiver<IncomingMessage>, Sender<OutgoingMessage>)> {
NodeBuilder::new()
let builder = NodeBuilder::new()
.with_keypair(keypair)
.with_port(port)
.with_authentication()
.with_protocols(protocols)
.with_cancellation_token(cancellation_token)
.try_build()
.try_build()?;
Ok((
builder.0,
builder.1.incoming_receiver,
builder.1.outgoing_sender,
))
}

#[derive(Clone)]
Expand Down
16 changes: 8 additions & 8 deletions crates/worker/src/p2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::stream::FuturesUnordered;
use p2p::InviteRequestUrl;
use p2p::Node;
use p2p::NodeBuilder;
use p2p::P2PHandle;
use p2p::PeerId;
use p2p::Response;
use p2p::{IncomingMessage, Libp2pIncomingMessage, OutgoingMessage};
Expand Down Expand Up @@ -43,16 +44,15 @@ impl Service {
provider_wallet: Wallet,
cancellation_token: CancellationToken,
) -> Result<Self> {
let (node, incoming_messages, outgoing_messages) =
build_p2p_node(keypair, port, cancellation_token.clone())
.context("failed to build p2p node")?;
let (node, p2p_handle) = build_p2p_node(keypair, port, cancellation_token.clone())
.context("failed to build p2p node")?;
Ok(Self {
node,
incoming_messages,
incoming_messages: p2p_handle.incoming_receiver,
cancellation_token,
context: Context::new(
wallet,
outgoing_messages,
p2p_handle.outgoing_sender,
validator_addresses,
docker_service,
heartbeat_service,
Expand Down Expand Up @@ -111,8 +111,8 @@ fn build_p2p_node(
keypair: p2p::Keypair,
port: u16,
cancellation_token: CancellationToken,
) -> Result<(Node, Receiver<IncomingMessage>, Sender<OutgoingMessage>)> {
let (node, incoming_message_rx, outgoing_message_tx) = NodeBuilder::new()
) -> Result<(Node, P2PHandle)> {
let (node, p2p_handle) = NodeBuilder::new()
.with_keypair(keypair)
.with_port(port)
.with_authentication()
Expand All @@ -123,7 +123,7 @@ fn build_p2p_node(
.with_cancellation_token(cancellation_token)
.try_build()
.context("failed to build p2p node")?;
Ok((node, incoming_message_rx, outgoing_message_tx))
Ok((node, p2p_handle))
}

#[derive(Clone)]
Expand Down
Loading