Skip to content
This repository was archived by the owner on Jul 4, 2022. It is now read-only.

Commit d4fbb89

Browse files
authored
client/finality-grandpa: Reintegrate periodic neighbor packet worker (#4631)
The `NeighborPacketWorker` within `client/finality-grandpa` does two things: 1. It receives neighbor packets from components within `client/finality-grandpa`, sends them down to the `GossipEngine` in order for neighboring nodes to receive. 2. It periodically sends out the most recent neighbor packet to the `GossipEngine`. In order to send out packets it had a clone to a `GossipEgine` within an atomic reference counter and a mutex. The `NeighborPacketWorker` was then spawned onto its own asynchronous task. Instead of running in its own task, this patch reintegrates the `NeighborPacketWorker` into the main `client/finality-grandpa` task not requiring the `NeighborPacketWorker` to own a clone of the `GossipEngine`. The greater picture This is a tiny change within a greater refactoring. The overall goal is to **simplify** how finality-grandpa interacts with the network and to **reduce** the amount of **unbounded channels** within the logic. Why no unbounded channels: Bounding channels is needed for backpressure and proper scheduling. With unbounded channels there is no way of telling the producer side to slow down for the consumer side to catch up. Rephrased, there is no way for the scheduler to know when to favour the consumer task over the producer task on a crowded channel and the other way round for an empty channel. Reducing the amount of shared ownership simplifies the logic and enables one to use async-await syntax-suggar, given that one does not need to hold a lock across poll invocations. Using async-await enables one to use bounded channels without complex logic.
1 parent 188d59e commit d4fbb89

File tree

3 files changed

+121
-68
lines changed

3 files changed

+121
-68
lines changed

client/finality-grandpa/src/communication/mod.rs

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,18 @@
2727
//! In the future, there will be a fallback for allowing sending the same message
2828
//! under certain conditions that are used to un-stick the protocol.
2929
30-
use std::sync::Arc;
31-
3230
use futures::{prelude::*, future::Executor as _, sync::mpsc};
33-
use futures03::{compat::Compat, stream::StreamExt, future::FutureExt as _, future::TryFutureExt as _};
31+
use futures03::{
32+
compat::Compat,
33+
stream::StreamExt,
34+
future::{Future as Future03, FutureExt as _, TryFutureExt as _},
35+
};
36+
use log::{debug, trace};
37+
use parking_lot::Mutex;
38+
use std::{pin::Pin, sync::Arc, task::{Context, Poll as Poll03}};
39+
3440
use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose};
3541
use finality_grandpa::{voter, voter_set::VoterSet};
36-
use log::{debug, trace};
3742
use sc_network::{NetworkService, ReputationChange};
3843
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
3944
use parity_scale_codec::{Encode, Decode};
@@ -134,9 +139,22 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
134139
service: N,
135140
gossip_engine: GossipEngine<B>,
136141
validator: Arc<GossipValidator<B>>,
142+
143+
/// Sender side of the neighbor packet channel.
144+
///
145+
/// Packets sent into this channel are processed by the `NeighborPacketWorker` and passed on to
146+
/// the underlying `GossipEngine`.
137147
neighbor_sender: periodic::NeighborPacketSender<B>,
148+
149+
/// `NeighborPacketWorker` processing packets sent through the `NeighborPacketSender`.
150+
//
151+
// NetworkBridge is required to be clonable, thus one needs to be able to clone its children,
152+
// thus one has to wrap neighor_packet_worker with an Arc Mutex.
153+
neighbor_packet_worker: Arc<Mutex<periodic::NeighborPacketWorker<B>>>,
138154
}
139155

156+
impl<B: BlockT, N: Network<B>> Unpin for NetworkBridge<B, N> {}
157+
140158
impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
141159
/// Create a new NetworkBridge to the given NetworkService. Returns the service
142160
/// handle.
@@ -195,14 +213,18 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
195213
}
196214
}
197215

198-
let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(gossip_engine.clone());
216+
let (neighbor_packet_worker, neighbor_packet_sender) = periodic::NeighborPacketWorker::new();
199217
let reporting_job = report_stream.consume(gossip_engine.clone());
200218

201-
let bridge = NetworkBridge { service, gossip_engine, validator, neighbor_sender };
219+
let bridge = NetworkBridge {
220+
service,
221+
gossip_engine,
222+
validator,
223+
neighbor_sender: neighbor_packet_sender,
224+
neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)),
225+
};
202226

203227
let executor = Compat::new(executor);
204-
executor.execute(Box::new(rebroadcast_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(()))))
205-
.expect("failed to spawn grandpa rebroadcast job task");
206228
executor.execute(Box::new(reporting_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(()))))
207229
.expect("failed to spawn grandpa reporting job task");
208230

@@ -391,6 +413,21 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
391413
}
392414
}
393415

416+
impl<B: BlockT, N: Network<B>> Future03 for NetworkBridge<B, N> {
417+
type Output = Result<(), Error>;
418+
419+
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Self::Output> {
420+
loop {
421+
match futures03::ready!((self.neighbor_packet_worker.lock()).poll_next_unpin(cx)) {
422+
None => return Poll03::Ready(
423+
Err(Error::Network("NeighborPacketWorker stream closed.".into()))
424+
),
425+
Some((to, packet)) => self.gossip_engine.send_message(to, packet.encode()),
426+
}
427+
}
428+
}
429+
}
430+
394431
fn incoming_global<B: BlockT>(
395432
mut gossip_engine: GossipEngine<B>,
396433
topic: B::Hash,
@@ -530,6 +567,7 @@ impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
530567
gossip_engine: self.gossip_engine.clone(),
531568
validator: Arc::clone(&self.validator),
532569
neighbor_sender: self.neighbor_sender.clone(),
570+
neighbor_packet_worker: self.neighbor_packet_worker.clone(),
533571
}
534572
}
535573
}

client/finality-grandpa/src/communication/periodic.rs

Lines changed: 61 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,16 @@
1616

1717
//! Periodic rebroadcast of neighbor packets.
1818
19-
use std::time::{Instant, Duration};
20-
21-
use parity_scale_codec::Encode;
22-
use futures::prelude::*;
23-
use futures::sync::mpsc;
2419
use futures_timer::Delay;
25-
use futures03::future::{FutureExt as _, TryFutureExt as _};
26-
use log::{debug, warn};
20+
use futures03::{channel::mpsc, future::{FutureExt as _}, prelude::*, ready, stream::Stream};
21+
use log::debug;
22+
use std::{pin::Pin, task::{Context, Poll}, time::{Instant, Duration}};
2723

2824
use sc_network::PeerId;
29-
use sc_network_gossip::GossipEngine;
3025
use sp_runtime::traits::{NumberFor, Block as BlockT};
3126
use super::gossip::{NeighborPacket, GossipMessage};
3227

33-
// how often to rebroadcast, if no other
28+
// How often to rebroadcast, in cases where no new packets are created.
3429
const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);
3530

3631
fn rebroadcast_instant() -> Instant {
@@ -56,56 +51,65 @@ impl<B: BlockT> NeighborPacketSender<B> {
5651
}
5752
}
5853

59-
/// Does the work of sending neighbor packets, asynchronously.
60-
///
61-
/// It may rebroadcast the last neighbor packet periodically when no
62-
/// progress is made.
63-
pub(super) fn neighbor_packet_worker<B>(net: GossipEngine<B>) -> (
64-
impl Future<Item = (), Error = ()> + Send + 'static,
65-
NeighborPacketSender<B>,
66-
) where
67-
B: BlockT,
68-
{
69-
let mut last = None;
70-
let (tx, mut rx) = mpsc::unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>();
71-
let mut delay = Delay::new(REBROADCAST_AFTER);
72-
73-
let work = futures::future::poll_fn(move || {
74-
loop {
75-
match rx.poll().expect("unbounded receivers do not error; qed") {
76-
Async::Ready(None) => return Ok(Async::Ready(())),
77-
Async::Ready(Some((to, packet))) => {
78-
// send to peers.
79-
net.send_message(to.clone(), GossipMessage::<B>::from(packet.clone()).encode());
80-
81-
// rebroadcasting network.
82-
delay.reset(rebroadcast_instant());
83-
last = Some((to, packet));
84-
}
85-
Async::NotReady => break,
86-
}
87-
}
54+
/// NeighborPacketWorker is listening on a channel for new neighbor packets being produced by
55+
/// components within `finality-grandpa` and forwards those packets to the underlying
56+
/// `NetworkEngine` through the `NetworkBridge` that it is being polled by (see `Stream`
57+
/// implementation). Periodically it sends out the last packet in cases where no new ones arrive.
58+
pub(super) struct NeighborPacketWorker<B: BlockT> {
59+
last: Option<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
60+
delay: Delay,
61+
rx: mpsc::UnboundedReceiver<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
62+
}
63+
64+
impl<B: BlockT> Unpin for NeighborPacketWorker<B> {}
65+
66+
impl<B: BlockT> NeighborPacketWorker<B> {
67+
pub(super) fn new() -> (Self, NeighborPacketSender<B>){
68+
let (tx, rx) = mpsc::unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>();
69+
let delay = Delay::new(REBROADCAST_AFTER);
70+
71+
(NeighborPacketWorker {
72+
last: None,
73+
delay,
74+
rx,
75+
}, NeighborPacketSender(tx))
76+
}
77+
}
8878

89-
// has to be done in a loop because it needs to be polled after
90-
// re-scheduling.
91-
loop {
92-
match (&mut delay).unit_error().compat().poll() {
93-
Err(e) => {
94-
warn!(target: "afg", "Could not rebroadcast neighbor packets: {:?}", e);
95-
delay.reset(rebroadcast_instant());
96-
}
97-
Ok(Async::Ready(())) => {
98-
delay.reset(rebroadcast_instant());
99-
100-
if let Some((ref to, ref packet)) = last {
101-
// send to peers.
102-
net.send_message(to.clone(), GossipMessage::<B>::from(packet.clone()).encode());
103-
}
104-
}
105-
Ok(Async::NotReady) => return Ok(Async::NotReady),
79+
impl <B: BlockT> Stream for NeighborPacketWorker<B> {
80+
type Item = (Vec<PeerId>, GossipMessage<B>);
81+
82+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>
83+
{
84+
let this = &mut *self;
85+
match this.rx.poll_next_unpin(cx) {
86+
Poll::Ready(None) => return Poll::Ready(None),
87+
Poll::Ready(Some((to, packet))) => {
88+
this.delay.reset(rebroadcast_instant());
89+
this.last = Some((to.clone(), packet.clone()));
90+
91+
return Poll::Ready(Some((to, GossipMessage::<B>::from(packet.clone()))));
10692
}
93+
// Don't return yet, maybe the timer fired.
94+
Poll::Pending => {},
95+
};
96+
97+
ready!(this.delay.poll_unpin(cx));
98+
99+
// Getting this far here implies that the timer fired.
100+
101+
this.delay.reset(rebroadcast_instant());
102+
103+
// Make sure the underlying task is scheduled for wake-up.
104+
//
105+
// Note: In case poll_unpin is called after the resetted delay fires again, this
106+
// will drop one tick. Deemed as very unlikely and also not critical.
107+
while let Poll::Ready(()) = this.delay.poll_unpin(cx) {};
108+
109+
if let Some((ref to, ref packet)) = this.last {
110+
return Poll::Ready(Some((to.clone(), GossipMessage::<B>::from(packet.clone()))));
107111
}
108-
});
109112

110-
(work, NeighborPacketSender(tx))
113+
return Poll::Pending;
114+
}
111115
}

client/finality-grandpa/src/lib.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -650,12 +650,13 @@ struct VoterWork<B, E, Block: BlockT, N: NetworkT<Block>, RA, SC, VR> {
650650
voter: Box<dyn Future<Item = (), Error = CommandOrError<Block::Hash, NumberFor<Block>>> + Send>,
651651
env: Arc<Environment<B, E, Block, N, RA, SC, VR>>,
652652
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
653+
network: futures03::compat::Compat<NetworkBridge<Block, N>>,
653654
}
654655

655656
impl<B, E, Block, N, RA, SC, VR> VoterWork<B, E, Block, N, RA, SC, VR>
656657
where
657658
Block: BlockT,
658-
N: NetworkT<Block> + Sync,
659+
N: NetworkT<Block> + Sync,
659660
NumberFor<Block>: BlockNumberOps,
660661
RA: 'static + Send + Sync,
661662
E: CallExecutor<Block> + Send + Sync + 'static,
@@ -681,7 +682,7 @@ where
681682
voting_rule,
682683
voters: Arc::new(voters),
683684
config,
684-
network,
685+
network: network.clone(),
685686
set_id: persistent_data.authority_set.set_id(),
686687
authority_set: persistent_data.authority_set.clone(),
687688
consensus_changes: persistent_data.consensus_changes.clone(),
@@ -694,6 +695,7 @@ where
694695
voter: Box::new(futures::empty()) as Box<_>,
695696
env,
696697
voter_commands_rx,
698+
network: futures03::future::TryFutureExt::compat(network),
697699
};
698700
work.rebuild_voter();
699701
work
@@ -831,7 +833,7 @@ where
831833
impl<B, E, Block, N, RA, SC, VR> Future for VoterWork<B, E, Block, N, RA, SC, VR>
832834
where
833835
Block: BlockT,
834-
N: NetworkT<Block> + Sync,
836+
N: NetworkT<Block> + Sync,
835837
NumberFor<Block>: BlockNumberOps,
836838
RA: 'static + Send + Sync,
837839
E: CallExecutor<Block> + Send + Sync + 'static,
@@ -878,6 +880,15 @@ where
878880
}
879881
}
880882

883+
match self.network.poll() {
884+
Ok(Async::NotReady) => {},
885+
Ok(Async::Ready(())) => {
886+
// the network bridge future should never conclude.
887+
return Ok(Async::Ready(()))
888+
}
889+
e @ Err(_) => return e,
890+
};
891+
881892
Ok(Async::NotReady)
882893
}
883894
}

0 commit comments

Comments
 (0)