diff --git a/network/src/protocol/mod.rs b/network/src/protocol/mod.rs index ac3b6edfc60c..f9c7fd8b62d9 100644 --- a/network/src/protocol/mod.rs +++ b/network/src/protocol/mod.rs @@ -130,7 +130,7 @@ enum BackgroundToWorkerMsg { } /// Operations that a handle to an underlying network service should provide. -trait NetworkServiceOps: Send + Sync { +pub trait NetworkServiceOps: Send + Sync { /// Report the peer as having a particular positive or negative value. fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange); @@ -193,10 +193,18 @@ impl GossipOps for RegisteredMessageValidator { } /// An async handle to the network service. -#[derive(Clone)] -pub struct Service { +pub struct Service { sender: mpsc::Sender, - network_service: Arc, + network_service: Arc, +} + +impl Clone for Service { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + network_service: self.network_service.clone(), + } + } } /// Registers the protocol. @@ -209,7 +217,7 @@ pub fn start( chain_context: C, api: Arc, executor: SP, -) -> Result where +) -> Result, futures::task::SpawnError> where C: ChainContext + 'static, Api: ProvideRuntimeApi + Send + Sync + 'static, Api::Api: ParachainHost, @@ -292,14 +300,14 @@ pub fn start( } /// The Polkadot protocol status message. -#[derive(Debug, Encode, Decode)] +#[derive(Debug, Encode, Decode, PartialEq)] pub struct Status { version: u32, // protocol version. collating_for: Option<(CollatorId, ParaId)>, } /// Polkadot-specific messages from peer to peer. -#[derive(Debug, Encode, Decode)] +#[derive(Debug, Encode, Decode, PartialEq)] pub enum Message { /// Exchange status with a peer. This should be the first message sent. #[codec(index = "0")] @@ -451,6 +459,11 @@ impl RecentValidatorIds { fn as_slice(&self) -> &[ValidatorId] { &*self.inner } + + /// Returns the last inserted session key. + fn latest(&self) -> Option<&ValidatorId> { + self.inner.last() + } } struct ProtocolHandler { @@ -582,7 +595,19 @@ impl ProtocolHandler { let role = self.collators .on_new_collator(collator_id, para_id, remote.clone()); let service = &self.service; + let send_key = peer.should_send_key(); + if let Some(c_state) = peer.collator_state_mut() { + if send_key { + if let Some(key) = self.local_keys.latest() { + c_state.send_key(key.clone(), |msg| service.write_notification( + remote.clone(), + POLKADOT_ENGINE_ID, + msg.encode(), + )); + } + } + c_state.set_role(role, |msg| service.write_notification( remote.clone(), POLKADOT_ENGINE_ID, @@ -1323,7 +1348,7 @@ struct RouterInner { sender: mpsc::Sender, } -impl Service { +impl Service { /// Register an availablility-store that the network can query. pub fn register_availability_store(&self, store: av_store::Store) { let _ = self.sender.clone() @@ -1373,7 +1398,7 @@ impl Service { } } -impl ParachainNetwork for Service { +impl ParachainNetwork for Service { type Error = mpsc::SendError; type TableRouter = Router; type BuildTableRouter = Pin> + Send>>; @@ -1403,7 +1428,7 @@ impl ParachainNetwork for Service { } } -impl Collators for Service { +impl Collators for Service { type Error = future::Either; type Collation = Pin> + Send>>; @@ -1425,7 +1450,7 @@ impl Collators for Service { } } -impl av_store::ErasureNetworking for Service { +impl av_store::ErasureNetworking for Service { type Error = future::Either; fn fetch_erasure_chunk(&self, candidate_hash: &Hash, index: u32) diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index 481b27cf727a..991169561668 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -37,7 +37,7 @@ use futures::executor::LocalPool; use futures::task::LocalSpawnExt; #[derive(Default)] -struct MockNetworkOps { +pub struct MockNetworkOps { recorded: Mutex, } @@ -188,7 +188,7 @@ sp_api::mock_impl_runtime_apis! { } } -impl super::Service { +impl super::Service { async fn connect_peer(&mut self, peer: PeerId, roles: Roles) { self.sender.send(ServiceToWorkerMsg::PeerConnected(peer, roles)).await.unwrap(); } @@ -222,7 +222,7 @@ impl super::Service { } fn test_setup(config: Config) -> ( - Service, + Service, MockGossip, LocalPool, impl Future + 'static, @@ -264,7 +264,7 @@ fn worker_task_shuts_down_when_sender_dropped() { /// is handled. This helper functions checks multiple times that the given instance is dropped. Even /// if the first round fails, the second one should be successful as the consensus instance drop /// should be already handled this time. -fn wait_for_instance_drop(service: &mut Service, pool: &mut LocalPool, instance: Hash) { +fn wait_for_instance_drop(service: &mut Service, pool: &mut LocalPool, instance: Hash) { let mut try_counter = 0; let max_tries = 3; @@ -363,7 +363,6 @@ fn collation_is_received_with_dropped_router() { }))); } - #[test] fn validator_peer_cleaned_up() { let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); @@ -575,3 +574,32 @@ fn fetches_pov_block_from_gossip() { pool.run_until(test_work).unwrap(); } + +#[test] +fn validator_sends_key_to_collator_on_status() { + let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); + + let peer = PeerId::random(); + let peer_clone = peer.clone(); + let validator_key = Sr25519Keyring::Alice.pair(); + let validator_id = ValidatorId::from(validator_key.public()); + let validator_id_clone = validator_id.clone(); + let collator_id = CollatorId::from(Sr25519Keyring::Bob.public()); + let para_id = ParaId::from(100); + let mut service_clone = service.clone(); + + pool.spawner().spawn_local(worker_task).unwrap(); + pool.run_until(async move { + service_clone.synchronize(move |proto| { proto.local_keys.insert(validator_id_clone); }).await; + service_clone.connect_peer(peer_clone.clone(), Roles::AUTHORITY).await; + service_clone.peer_message(peer_clone.clone(), Message::Status(Status { + version: VERSION, + collating_for: Some((collator_id, para_id)), + })).await; + }); + + let expected_msg = Message::ValidatorId(validator_id.clone()); + assert!(service.network_service.recorded.lock().notifications.iter().any(|(p, notification)| { + peer == *p && *notification == expected_msg + })); +} diff --git a/service/src/lib.rs b/service/src/lib.rs index 7aa93393546d..277da0cb2722 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -25,7 +25,7 @@ use std::time::Duration; use polkadot_primitives::{parachain, Hash, BlockId, AccountId, Nonce, Balance}; #[cfg(feature = "full-node")] use polkadot_network::{legacy::gossip::Known, protocol as network_protocol}; -use service::{error::{Error as ServiceError}, ServiceBuilder}; +use service::{error::Error as ServiceError, ServiceBuilder}; use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use inherents::InherentDataProviders; use sc_executor::native_executor_instance; @@ -103,11 +103,9 @@ where >::StateBackend: sp_api::StateBackend, {} -pub trait RuntimeExtrinsic: codec::Codec + Send + Sync + 'static -{} +pub trait RuntimeExtrinsic: codec::Codec + Send + Sync + 'static {} -impl RuntimeExtrinsic for E where E: codec::Codec + Send + Sync + 'static -{} +impl RuntimeExtrinsic for E where E: codec::Codec + Send + Sync + 'static {} /// Can be called for a `Configuration` to check if it is a configuration for the `Kusama` network. pub trait IsKusama {