diff --git a/Cargo.lock b/Cargo.lock index db15cbe591996..622867dac2c0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6255,7 +6255,6 @@ dependencies = [ name = "sc-network-gossip" version = "0.8.0" dependencies = [ - "futures 0.1.29", "futures 0.3.4", "futures-timer 3.0.1", "libp2p", diff --git a/client/finality-grandpa/src/communication/tests.rs b/client/finality-grandpa/src/communication/tests.rs index 040ee4c7bbd22..5506512b531d1 100644 --- a/client/finality-grandpa/src/communication/tests.rs +++ b/client/finality-grandpa/src/communication/tests.rs @@ -44,20 +44,12 @@ pub(crate) struct TestNetwork { sender: mpsc::UnboundedSender, } -impl TestNetwork { - fn event_stream_03(&self) -> Pin + Send>> { +impl sc_network_gossip::Network for TestNetwork { + fn event_stream(&self) -> Pin + Send>> { let (tx, rx) = mpsc::unbounded(); let _ = self.sender.unbounded_send(Event::EventStream(tx)); Box::pin(rx) } -} - -impl sc_network_gossip::Network for TestNetwork { - fn event_stream(&self) -> Box + Send> { - Box::new( - self.event_stream_03().map(Ok::<_, ()>).compat() - ) - } fn report_peer(&self, who: sc_network::PeerId, cost_benefit: sc_network::ReputationChange) { let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit)); diff --git a/client/network-gossip/Cargo.toml b/client/network-gossip/Cargo.toml index 98b2bd0590af9..8866db1f343db 100644 --- a/client/network-gossip/Cargo.toml +++ b/client/network-gossip/Cargo.toml @@ -7,13 +7,12 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] -log = "0.4.8" -futures = { version = "0.3.1", features = ["compat"] } -wasm-timer = "0.2" +futures = "0.3.1" futures-timer = "3.0.1" -futures01 = { package = "futures", version = "0.1.29" } libp2p = { version = "0.16.0", default-features = false, features = ["libp2p-websocket"] } +log = "0.4.8" lru = "0.1.2" parking_lot = "0.10.0" sc-network = { version = "0.8", path = "../network" } sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" } +wasm-timer = "0.2" diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index 87958cbc14563..7968e59d0704e 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -20,8 +20,7 @@ use crate::state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENA use sc_network::message::generic::ConsensusMessage; use sc_network::{Event, ReputationChange}; -use futures::{prelude::*, channel::mpsc, compat::Compat01As03}; -use futures01::stream::Stream as Stream01; +use futures::{prelude::*, channel::mpsc}; use libp2p::PeerId; use parking_lot::Mutex; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; @@ -38,7 +37,7 @@ struct GossipEngineInner { state_machine: ConsensusGossip, network: Box + Send>, periodic_maintenance_interval: futures_timer::Delay, - network_event_stream: Compat01As03 + Send>>, + network_event_stream: Pin + Send>>, engine_id: ConsensusEngineId, } @@ -64,7 +63,7 @@ impl GossipEngine { state_machine, network: Box::new(network), periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL), - network_event_stream: Compat01As03::new(network_event_stream), + network_event_stream, engine_id, })); @@ -178,7 +177,7 @@ impl Future for GossipEngineInner { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = &mut *self; - while let Poll::Ready(Some(Ok(event))) = this.network_event_stream.poll_next_unpin(cx) { + while let Poll::Ready(Some(event)) = this.network_event_stream.poll_next_unpin(cx) { match event { Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, roles } => { if msg_engine_id != this.engine_id { diff --git a/client/network-gossip/src/lib.rs b/client/network-gossip/src/lib.rs index 705a27210ac53..c4f057a775f47 100644 --- a/client/network-gossip/src/lib.rs +++ b/client/network-gossip/src/lib.rs @@ -61,7 +61,7 @@ pub use self::validator::{DiscardAll, MessageIntent, Validator, ValidatorContext use futures::prelude::*; use sc_network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange}; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; -use std::sync::Arc; +use std::{pin::Pin, sync::Arc}; mod bridge; mod state_machine; @@ -70,7 +70,7 @@ mod validator; /// Abstraction over a network. pub trait Network { /// Returns a stream of events representing what happens on the network. - fn event_stream(&self) -> Box + Send>; + fn event_stream(&self) -> Pin + Send>>; /// Adjust the reputation of a node. fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange); @@ -97,8 +97,8 @@ pub trait Network { } impl, H: ExHashT> Network for Arc> { - fn event_stream(&self) -> Box + Send> { - Box::new(NetworkService::event_stream(self).map(|v| Ok::<_, ()>(v)).compat()) + fn event_stream(&self) -> Pin + Send>> { + Box::pin(NetworkService::event_stream(self)) } fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange) {