diff --git a/client/finality-grandpa/src/until_imported.rs b/client/finality-grandpa/src/until_imported.rs index 737c8c6a774e8..6f76ce3fa86c8 100644 --- a/client/finality-grandpa/src/until_imported.rs +++ b/client/finality-grandpa/src/until_imported.rs @@ -31,7 +31,7 @@ use super::{ use log::{debug, warn}; use sp_utils::mpsc::TracingUnboundedReceiver; use futures::prelude::*; -use futures::stream::Fuse; +use futures::stream::{Fuse, StreamExt}; use futures_timer::Delay; use finality_grandpa::voter; use parking_lot::Mutex; @@ -137,14 +137,16 @@ impl Drop for Metrics { } } -/// Buffering imported messages until blocks with given hashes are imported. -#[pin_project::pin_project] -pub(crate) struct UntilImported> { +/// Buffering incoming messages until blocks with given hashes are imported. +pub(crate) struct UntilImported where + Block: BlockT, + I: Stream + Unpin, + M: BlockUntilImported, +{ import_notifications: Fuse>>, block_sync_requester: BlockSyncRequester, status_check: BlockStatus, - #[pin] - inner: Fuse, + incoming_messages: Fuse, ready: VecDeque, /// Interval at which to check status of each awaited block. check_pending: Pin> + Send + Sync>>, @@ -159,11 +161,17 @@ pub(crate) struct UntilImported, } +impl Unpin for UntilImported where + Block: BlockT, + I: Stream + Unpin, + M: BlockUntilImported, +{} + impl UntilImported where Block: BlockT, BlockStatus: BlockStatusT, BlockSyncRequester: BlockSyncRequesterT, - I: Stream, + I: Stream + Unpin, M: BlockUntilImported, { /// Create a new `UntilImported` wrapper. @@ -171,7 +179,7 @@ impl UntilImported, block_sync_requester: BlockSyncRequester, status_check: BlockStatus, - stream: I, + incoming_messages: I, identifier: &'static str, metrics: Option, ) -> Self { @@ -192,7 +200,7 @@ impl UntilImported Stream for UntilImported, BSyncRequester: BlockSyncRequesterT, - I: Stream, + I: Stream + Unpin, M: BlockUntilImported, { type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - // We are using a `this` variable in order to allow multiple simultaneous mutable borrow - // to `self`. - let mut this = self.project(); + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + // We are using a `this` variable in order to allow multiple simultaneous mutable borrow to + // `self`. + let this = &mut *self; loop { - match Stream::poll_next(Pin::new(&mut this.inner), cx) { + match StreamExt::poll_next_unpin(&mut this.incoming_messages, cx) { Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(Some(input)) => { // new input: schedule wait of any parts which require // blocks to be known. - match M::needs_waiting(input, this.status_check)? { + match M::needs_waiting(input, &this.status_check)? { DiscardWaitOrReady::Discard => {}, DiscardWaitOrReady::Wait(items) => { for (target_hash, target_number, wait) in items { @@ -245,7 +253,7 @@ impl Stream for UntilImported return Poll::Ready(None), Poll::Ready(Some(notification)) => { // new block imported. queue up all messages tied to that hash. @@ -315,7 +323,7 @@ impl Stream for UntilImported