Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 2 additions & 10 deletions client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,12 @@ pub(crate) struct TestNetwork {
sender: mpsc::UnboundedSender<Event>,
}

impl TestNetwork {
fn event_stream_03(&self) -> Pin<Box<dyn futures::Stream<Item = NetworkEvent> + Send>> {
impl sc_network_gossip::Network<Block> for TestNetwork {
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = NetworkEvent> + Send>> {
let (tx, rx) = mpsc::unbounded();
let _ = self.sender.unbounded_send(Event::EventStream(tx));
Box::pin(rx)
}
}

impl sc_network_gossip::Network<Block> for TestNetwork {
fn event_stream(&self) -> Box<dyn futures01::Stream<Item = NetworkEvent, Error = ()> + Send> {
Box::new(
self.event_stream_03().map(Ok::<_, ()>).compat()
)
}

fn report_peer(&self, who: sc_network::PeerId, cost_benefit: sc_network::ReputationChange) {
let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit));
Expand Down
7 changes: 3 additions & 4 deletions client/network-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ authors = ["Parity Technologies <[email protected]>"]
edition = "2018"

[dependencies]
log = "0.4.8"
futures = { version = "0.3.1", features = ["compat"] }
wasm-timer = "0.2"
futures = "0.3.1"
futures-timer = "3.0.1"
futures01 = { package = "futures", version = "0.1.29" }
libp2p = { version = "0.16.0", default-features = false, features = ["libp2p-websocket"] }
log = "0.4.8"
lru = "0.1.2"
parking_lot = "0.10.0"
sc-network = { version = "0.8", path = "../network" }
sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" }
wasm-timer = "0.2"
9 changes: 4 additions & 5 deletions client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use crate::state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENA
use sc_network::message::generic::ConsensusMessage;
use sc_network::{Event, ReputationChange};

use futures::{prelude::*, channel::mpsc, compat::Compat01As03};
use futures01::stream::Stream as Stream01;
use futures::{prelude::*, channel::mpsc};
use libp2p::PeerId;
use parking_lot::Mutex;
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
Expand All @@ -38,7 +37,7 @@ struct GossipEngineInner<B: BlockT> {
state_machine: ConsensusGossip<B>,
network: Box<dyn Network<B> + Send>,
periodic_maintenance_interval: futures_timer::Delay,
network_event_stream: Compat01As03<Box<dyn Stream01<Error = (), Item = Event> + Send>>,
network_event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>,
engine_id: ConsensusEngineId,
}

Expand All @@ -64,7 +63,7 @@ impl<B: BlockT> GossipEngine<B> {
state_machine,
network: Box::new(network),
periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
network_event_stream: Compat01As03::new(network_event_stream),
network_event_stream,
engine_id,
}));

Expand Down Expand Up @@ -178,7 +177,7 @@ impl<B: BlockT> Future for GossipEngineInner<B> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = &mut *self;

while let Poll::Ready(Some(Ok(event))) = this.network_event_stream.poll_next_unpin(cx) {
while let Poll::Ready(Some(event)) = this.network_event_stream.poll_next_unpin(cx) {
match event {
Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, roles } => {
if msg_engine_id != this.engine_id {
Expand Down
8 changes: 4 additions & 4 deletions client/network-gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub use self::validator::{DiscardAll, MessageIntent, Validator, ValidatorContext
use futures::prelude::*;
use sc_network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange};
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::sync::Arc;
use std::{pin::Pin, sync::Arc};

mod bridge;
mod state_machine;
Expand All @@ -70,7 +70,7 @@ mod validator;
/// Abstraction over a network.
pub trait Network<B: BlockT> {
/// Returns a stream of events representing what happens on the network.
fn event_stream(&self) -> Box<dyn futures01::Stream<Item = Event, Error = ()> + Send>;
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>>;

/// Adjust the reputation of a node.
fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange);
Expand All @@ -97,8 +97,8 @@ pub trait Network<B: BlockT> {
}

impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Network<B> for Arc<NetworkService<B, S, H>> {
fn event_stream(&self) -> Box<dyn futures01::Stream<Item = Event, Error = ()> + Send> {
Box::new(NetworkService::event_stream(self).map(|v| Ok::<_, ()>(v)).compat())
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
Box::pin(NetworkService::event_stream(self))
}

fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange) {
Expand Down