diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index f5bc648c..abfe3d22 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -257,13 +257,7 @@ impl NodeBuilder { self } - pub fn try_build( - self, - ) -> Result<( - Node, - tokio::sync::mpsc::Receiver, - tokio::sync::mpsc::Sender, - )> { + pub fn try_build(self) -> Result<(Node, P2PHandle)> { let Self { port, mut listen_addrs, @@ -315,8 +309,7 @@ impl NodeBuilder { outgoing_message_rx, cancellation_token: cancellation_token.unwrap_or_default(), }, - incoming_message_rx, - outgoing_message_tx, + P2PHandle::new(incoming_message_rx, outgoing_message_tx), )) } } @@ -341,11 +334,10 @@ mod test { #[tokio::test] async fn two_nodes_can_connect_and_do_request_response() { - let (node1, mut incoming_message_rx1, outgoing_message_tx1) = - NodeBuilder::new().with_get_task_logs().try_build().unwrap(); + let (node1, mut p2p_handle1) = NodeBuilder::new().with_get_task_logs().try_build().unwrap(); let node1_peer_id = node1.peer_id(); - let (node2, mut incoming_message_rx2, outgoing_message_tx2) = NodeBuilder::new() + let (node2, mut p2p_handle2) = NodeBuilder::new() .with_get_task_logs() .with_bootnodes(node1.multiaddrs()) .try_build() @@ -360,11 +352,12 @@ mod test { // send request from node1->node2 let request = message::Request::GetTaskLogs; - outgoing_message_tx1 + p2p_handle1 + .outgoing_sender .send(request.into_outgoing_message(node2_peer_id, vec![])) .await .unwrap(); - let message = incoming_message_rx2.recv().await.unwrap(); + let message = p2p_handle2.incoming_receiver.recv().await.unwrap(); assert_eq!(message.peer, node1_peer_id); let libp2p::request_response::Message::Request { request_id: _, @@ -378,11 +371,12 @@ mod test { // send response from node2->node1 let response = message::Response::GetTaskLogs(message::GetTaskLogsResponse::Ok("logs".to_string())); - outgoing_message_tx2 + p2p_handle2 + .outgoing_sender .send(response.into_outgoing_message(channel)) .await .unwrap(); - let message = incoming_message_rx1.recv().await.unwrap(); + let message = p2p_handle1.incoming_receiver.recv().await.unwrap(); assert_eq!(message.peer, node2_peer_id); let libp2p::request_response::Message::Response { request_id: _, diff --git a/crates/p2p/src/message/mod.rs b/crates/p2p/src/message/mod.rs index 74b09c5a..e9048dff 100644 --- a/crates/p2p/src/message/mod.rs +++ b/crates/p2p/src/message/mod.rs @@ -7,6 +7,24 @@ mod hardware_challenge; pub use hardware_challenge::*; +#[derive(Debug)] +pub struct P2PHandle { + pub incoming_receiver: tokio::sync::mpsc::Receiver, + pub outgoing_sender: tokio::sync::mpsc::Sender, +} + +impl P2PHandle { + pub fn new( + incoming_receiver: tokio::sync::mpsc::Receiver, + outgoing_sender: tokio::sync::mpsc::Sender, + ) -> Self { + Self { + incoming_receiver, + outgoing_sender, + } + } +} + #[derive(Debug)] pub struct IncomingMessage { pub peer: PeerId, diff --git a/crates/shared/src/p2p/service.rs b/crates/shared/src/p2p/service.rs index bf776009..b0a4322d 100644 --- a/crates/shared/src/p2p/service.rs +++ b/crates/shared/src/p2p/service.rs @@ -109,13 +109,18 @@ fn build_p2p_node( cancellation_token: CancellationToken, protocols: Protocols, ) -> Result<(Node, Receiver, Sender)> { - NodeBuilder::new() + let builder = NodeBuilder::new() .with_keypair(keypair) .with_port(port) .with_authentication() .with_protocols(protocols) .with_cancellation_token(cancellation_token) - .try_build() + .try_build()?; + Ok(( + builder.0, + builder.1.incoming_receiver, + builder.1.outgoing_sender, + )) } #[derive(Clone)] diff --git a/crates/worker/src/p2p/mod.rs b/crates/worker/src/p2p/mod.rs index 94fe10a3..b40a21a5 100644 --- a/crates/worker/src/p2p/mod.rs +++ b/crates/worker/src/p2p/mod.rs @@ -4,6 +4,7 @@ use futures::stream::FuturesUnordered; use p2p::InviteRequestUrl; use p2p::Node; use p2p::NodeBuilder; +use p2p::P2PHandle; use p2p::PeerId; use p2p::Response; use p2p::{IncomingMessage, Libp2pIncomingMessage, OutgoingMessage}; @@ -43,16 +44,15 @@ impl Service { provider_wallet: Wallet, cancellation_token: CancellationToken, ) -> Result { - let (node, incoming_messages, outgoing_messages) = - build_p2p_node(keypair, port, cancellation_token.clone()) - .context("failed to build p2p node")?; + let (node, p2p_handle) = build_p2p_node(keypair, port, cancellation_token.clone()) + .context("failed to build p2p node")?; Ok(Self { node, - incoming_messages, + incoming_messages: p2p_handle.incoming_receiver, cancellation_token, context: Context::new( wallet, - outgoing_messages, + p2p_handle.outgoing_sender, validator_addresses, docker_service, heartbeat_service, @@ -111,8 +111,8 @@ fn build_p2p_node( keypair: p2p::Keypair, port: u16, cancellation_token: CancellationToken, -) -> Result<(Node, Receiver, Sender)> { - let (node, incoming_message_rx, outgoing_message_tx) = NodeBuilder::new() +) -> Result<(Node, P2PHandle)> { + let (node, p2p_handle) = NodeBuilder::new() .with_keypair(keypair) .with_port(port) .with_authentication() @@ -123,7 +123,7 @@ fn build_p2p_node( .with_cancellation_token(cancellation_token) .try_build() .context("failed to build p2p node")?; - Ok((node, incoming_message_rx, outgoing_message_tx)) + Ok((node, p2p_handle)) } #[derive(Clone)]