From 18fc4f6812e90dcf19a1caaa6d3fa92769b7a31f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 3 Apr 2020 14:32:18 +0200 Subject: [PATCH] Make sure we send the validator key to collators on status Before the validator only send the keys if it was updated and thus the collators would "never" be informed about the key of the validator. --- network/src/protocol/mod.rs | 47 +++++++++++++++++++++++++++-------- network/src/protocol/tests.rs | 38 ++++++++++++++++++++++++---- service/src/lib.rs | 8 +++--- 3 files changed, 72 insertions(+), 21 deletions(-) diff --git a/network/src/protocol/mod.rs b/network/src/protocol/mod.rs index ac3b6edfc60c..f9c7fd8b62d9 100644 --- a/network/src/protocol/mod.rs +++ b/network/src/protocol/mod.rs @@ -130,7 +130,7 @@ enum BackgroundToWorkerMsg { } /// Operations that a handle to an underlying network service should provide. -trait NetworkServiceOps: Send + Sync { +pub trait NetworkServiceOps: Send + Sync { /// Report the peer as having a particular positive or negative value. fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange); @@ -193,10 +193,18 @@ impl GossipOps for RegisteredMessageValidator { } /// An async handle to the network service. -#[derive(Clone)] -pub struct Service { +pub struct Service { sender: mpsc::Sender, - network_service: Arc, + network_service: Arc, +} + +impl Clone for Service { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + network_service: self.network_service.clone(), + } + } } /// Registers the protocol. @@ -209,7 +217,7 @@ pub fn start( chain_context: C, api: Arc, executor: SP, -) -> Result where +) -> Result, futures::task::SpawnError> where C: ChainContext + 'static, Api: ProvideRuntimeApi + Send + Sync + 'static, Api::Api: ParachainHost, @@ -292,14 +300,14 @@ pub fn start( } /// The Polkadot protocol status message. -#[derive(Debug, Encode, Decode)] +#[derive(Debug, Encode, Decode, PartialEq)] pub struct Status { version: u32, // protocol version. collating_for: Option<(CollatorId, ParaId)>, } /// Polkadot-specific messages from peer to peer. -#[derive(Debug, Encode, Decode)] +#[derive(Debug, Encode, Decode, PartialEq)] pub enum Message { /// Exchange status with a peer. This should be the first message sent. #[codec(index = "0")] @@ -451,6 +459,11 @@ impl RecentValidatorIds { fn as_slice(&self) -> &[ValidatorId] { &*self.inner } + + /// Returns the last inserted session key. + fn latest(&self) -> Option<&ValidatorId> { + self.inner.last() + } } struct ProtocolHandler { @@ -582,7 +595,19 @@ impl ProtocolHandler { let role = self.collators .on_new_collator(collator_id, para_id, remote.clone()); let service = &self.service; + let send_key = peer.should_send_key(); + if let Some(c_state) = peer.collator_state_mut() { + if send_key { + if let Some(key) = self.local_keys.latest() { + c_state.send_key(key.clone(), |msg| service.write_notification( + remote.clone(), + POLKADOT_ENGINE_ID, + msg.encode(), + )); + } + } + c_state.set_role(role, |msg| service.write_notification( remote.clone(), POLKADOT_ENGINE_ID, @@ -1323,7 +1348,7 @@ struct RouterInner { sender: mpsc::Sender, } -impl Service { +impl Service { /// Register an availablility-store that the network can query. pub fn register_availability_store(&self, store: av_store::Store) { let _ = self.sender.clone() @@ -1373,7 +1398,7 @@ impl Service { } } -impl ParachainNetwork for Service { +impl ParachainNetwork for Service { type Error = mpsc::SendError; type TableRouter = Router; type BuildTableRouter = Pin> + Send>>; @@ -1403,7 +1428,7 @@ impl ParachainNetwork for Service { } } -impl Collators for Service { +impl Collators for Service { type Error = future::Either; type Collation = Pin> + Send>>; @@ -1425,7 +1450,7 @@ impl Collators for Service { } } -impl av_store::ErasureNetworking for Service { +impl av_store::ErasureNetworking for Service { type Error = future::Either; fn fetch_erasure_chunk(&self, candidate_hash: &Hash, index: u32) diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index 481b27cf727a..991169561668 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -37,7 +37,7 @@ use futures::executor::LocalPool; use futures::task::LocalSpawnExt; #[derive(Default)] -struct MockNetworkOps { +pub struct MockNetworkOps { recorded: Mutex, } @@ -188,7 +188,7 @@ sp_api::mock_impl_runtime_apis! { } } -impl super::Service { +impl super::Service { async fn connect_peer(&mut self, peer: PeerId, roles: Roles) { self.sender.send(ServiceToWorkerMsg::PeerConnected(peer, roles)).await.unwrap(); } @@ -222,7 +222,7 @@ impl super::Service { } fn test_setup(config: Config) -> ( - Service, + Service, MockGossip, LocalPool, impl Future + 'static, @@ -264,7 +264,7 @@ fn worker_task_shuts_down_when_sender_dropped() { /// is handled. This helper functions checks multiple times that the given instance is dropped. Even /// if the first round fails, the second one should be successful as the consensus instance drop /// should be already handled this time. -fn wait_for_instance_drop(service: &mut Service, pool: &mut LocalPool, instance: Hash) { +fn wait_for_instance_drop(service: &mut Service, pool: &mut LocalPool, instance: Hash) { let mut try_counter = 0; let max_tries = 3; @@ -363,7 +363,6 @@ fn collation_is_received_with_dropped_router() { }))); } - #[test] fn validator_peer_cleaned_up() { let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); @@ -575,3 +574,32 @@ fn fetches_pov_block_from_gossip() { pool.run_until(test_work).unwrap(); } + +#[test] +fn validator_sends_key_to_collator_on_status() { + let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); + + let peer = PeerId::random(); + let peer_clone = peer.clone(); + let validator_key = Sr25519Keyring::Alice.pair(); + let validator_id = ValidatorId::from(validator_key.public()); + let validator_id_clone = validator_id.clone(); + let collator_id = CollatorId::from(Sr25519Keyring::Bob.public()); + let para_id = ParaId::from(100); + let mut service_clone = service.clone(); + + pool.spawner().spawn_local(worker_task).unwrap(); + pool.run_until(async move { + service_clone.synchronize(move |proto| { proto.local_keys.insert(validator_id_clone); }).await; + service_clone.connect_peer(peer_clone.clone(), Roles::AUTHORITY).await; + service_clone.peer_message(peer_clone.clone(), Message::Status(Status { + version: VERSION, + collating_for: Some((collator_id, para_id)), + })).await; + }); + + let expected_msg = Message::ValidatorId(validator_id.clone()); + assert!(service.network_service.recorded.lock().notifications.iter().any(|(p, notification)| { + peer == *p && *notification == expected_msg + })); +} diff --git a/service/src/lib.rs b/service/src/lib.rs index 7aa93393546d..277da0cb2722 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -25,7 +25,7 @@ use std::time::Duration; use polkadot_primitives::{parachain, Hash, BlockId, AccountId, Nonce, Balance}; #[cfg(feature = "full-node")] use polkadot_network::{legacy::gossip::Known, protocol as network_protocol}; -use service::{error::{Error as ServiceError}, ServiceBuilder}; +use service::{error::Error as ServiceError, ServiceBuilder}; use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use inherents::InherentDataProviders; use sc_executor::native_executor_instance; @@ -103,11 +103,9 @@ where >::StateBackend: sp_api::StateBackend, {} -pub trait RuntimeExtrinsic: codec::Codec + Send + Sync + 'static -{} +pub trait RuntimeExtrinsic: codec::Codec + Send + Sync + 'static {} -impl RuntimeExtrinsic for E where E: codec::Codec + Send + Sync + 'static -{} +impl RuntimeExtrinsic for E where E: codec::Codec + Send + Sync + 'static {} /// Can be called for a `Configuration` to check if it is a configuration for the `Kusama` network. pub trait IsKusama {