Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 203720a

Browse files
committed
change propagation
1 parent 35e3930 commit 203720a

File tree

4 files changed

+110
-59
lines changed

4 files changed

+110
-59
lines changed

client/network/src/config.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multia
2828
#[doc(hidden)]
2929
pub use crate::protocol::ProtocolConfig;
3030

31-
use crate::{ExHashT, ReportHandle};
31+
use crate::ExHashT;
3232

3333
use core::{fmt, iter};
34+
use futures::future;
3435
use libp2p::identity::{ed25519, Keypair};
3536
use libp2p::wasm_ext;
3637
use libp2p::{multiaddr, Multiaddr, PeerId};
3738
use prometheus_endpoint::Registry;
38-
use sc_peerset::ReputationChange;
3939
use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue};
4040
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
4141
use std::{borrow::Cow, convert::TryFrom, future::Future, pin::Pin, str::FromStr};
@@ -167,6 +167,22 @@ impl<B: BlockT> FinalityProofRequestBuilder<B> for DummyFinalityProofRequestBuil
167167
/// Shared finality proof request builder struct used by the queue.
168168
pub type BoxFinalityProofRequestBuilder<B> = Box<dyn FinalityProofRequestBuilder<B> + Send + Sync>;
169169

170+
/// Result of the transaction import.
171+
#[derive(Clone, Copy, Debug)]
172+
pub enum TransactionImport {
173+
/// Transaction is good but already known by the transaction pool.
174+
KnownGood,
175+
/// Transaction is good and not yet known.
176+
NewGood,
177+
/// Transaction is invalid.
178+
Bad,
179+
/// Transaction import was not performed.
180+
None,
181+
}
182+
183+
/// Fuure resolving to transaction import result.
184+
pub type TransactionImportFuture = Pin<Box<dyn Future<Output=TransactionImport> + Send>>;
185+
170186
/// Transaction pool interface
171187
pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
172188
/// Get transactions from the pool that are ready to be propagated.
@@ -175,15 +191,11 @@ pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
175191
fn hash_of(&self, transaction: &B::Extrinsic) -> H;
176192
/// Import a transaction into the pool.
177193
///
178-
/// Peer reputation is changed by reputation_change if transaction is accepted by the pool.
194+
/// This will return future.
179195
fn import(
180196
&self,
181-
report_handle: ReportHandle,
182-
who: PeerId,
183-
reputation_change_good: ReputationChange,
184-
reputation_change_bad: ReputationChange,
185197
transaction: B::Extrinsic,
186-
);
198+
) -> TransactionImportFuture;
187199
/// Notify the pool about transactions broadcast.
188200
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>);
189201
/// Get transaction by hash.
@@ -209,12 +221,10 @@ impl<H: ExHashT + Default, B: BlockT> TransactionPool<H, B> for EmptyTransaction
209221

210222
fn import(
211223
&self,
212-
_report_handle: ReportHandle,
213-
_who: PeerId,
214-
_rep_change_good: ReputationChange,
215-
_rep_change_bad: ReputationChange,
216224
_transaction: B::Extrinsic
217-
) {}
225+
) -> TransactionImportFuture {
226+
Box::pin(future::ready(TransactionImport::KnownGood))
227+
}
218228

219229
fn on_broadcasted(&self, _: HashMap<H, Vec<String>>) {}
220230

client/network/src/protocol.rs

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
use crate::{
1818
ExHashT,
1919
chain::{Client, FinalityProofProvider},
20-
config::{BoxFinalityProofRequestBuilder, ProtocolId, TransactionPool},
20+
config::{BoxFinalityProofRequestBuilder, ProtocolId, TransactionPool, TransactionImportFuture, TransactionImport},
2121
error,
2222
utils::interval
2323
};
2424

2525
use bytes::{Bytes, BytesMut};
26-
use futures::prelude::*;
26+
use futures::{prelude::*, stream::FuturesUnordered};
2727
use generic_proto::{GenericProto, GenericProtoOut};
2828
use libp2p::{Multiaddr, PeerId};
2929
use libp2p::core::{ConnectedPoint, connection::{ConnectionId, ListenerId}};
@@ -101,6 +101,13 @@ mod rep {
101101
pub const UNEXPECTED_STATUS: Rep = Rep::new(-(1 << 20), "Unexpected status message");
102102
/// Reputation change when we are a light client and a peer is behind us.
103103
pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer");
104+
/// Reputation change when a peer sends us any extrinsic.
105+
///
106+
/// This forces node to verify it, thus the negative value here. Once extrinsic is verified,
107+
/// reputation change should be refunded with `ANY_EXTRINSIC_REFUND`
108+
pub const ANY_EXTRINSIC: Rep = Rep::new(-(1 << 4), "Any extrinsic");
109+
/// Reputation change when a peer sends us any extrinsic that is not invalid.
110+
pub const ANY_EXTRINSIC_REFUND: Rep = Rep::new(1 << 4, "Any extrinsic (refund)");
104111
/// Reputation change when a peer sends us an extrinsic that we didn't know about.
105112
pub const GOOD_EXTRINSIC: Rep = Rep::new(1 << 7, "Good extrinsic");
106113
/// Reputation change when a peer sends us a bad extrinsic.
@@ -182,6 +189,24 @@ impl Metrics {
182189
}
183190
}
184191

192+
struct PendingTransaction {
193+
validation: TransactionImportFuture,
194+
peer_id: PeerId,
195+
}
196+
197+
impl Future for PendingTransaction {
198+
type Output = (PeerId, TransactionImport);
199+
200+
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
201+
let this = Pin::into_inner(self);
202+
if let Poll::Ready(import_result) = this.validation.poll_unpin(cx) {
203+
return Poll::Ready((this.peer_id.clone(), import_result));
204+
}
205+
206+
Poll::Pending
207+
}
208+
}
209+
185210
// Lock must always be taken in order declared here.
186211
pub struct Protocol<B: BlockT, H: ExHashT> {
187212
/// Interval at which we call `tick`.
@@ -190,6 +215,8 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
190215
propagate_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
191216
/// Pending list of messages to return from `poll` as a priority.
192217
pending_messages: VecDeque<CustomMessageOutcome<B>>,
218+
/// Pending extrinsic verification tasks.
219+
pending_transactions: FuturesUnordered<PendingTransaction>,
193220
config: ProtocolConfig,
194221
genesis_hash: B::Hash,
195222
sync: ChainSync<B>,
@@ -394,6 +421,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
394421
tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
395422
propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
396423
pending_messages: VecDeque::new(),
424+
pending_transactions: FuturesUnordered::new(),
397425
config,
398426
context_data: ContextData {
399427
peers: HashMap::new(),
@@ -1121,17 +1149,25 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
11211149
let hash = self.transaction_pool.hash_of(&t);
11221150
peer.known_extrinsics.insert(hash);
11231151

1124-
self.transaction_pool.import(
1125-
self.peerset_handle.clone().into(),
1126-
who.clone(),
1127-
rep::GOOD_EXTRINSIC,
1128-
rep::BAD_EXTRINSIC,
1129-
t,
1130-
);
1152+
self.peerset_handle.report_peer(who.clone(), rep::ANY_EXTRINSIC);
1153+
1154+
self.pending_transactions.push(PendingTransaction {
1155+
peer_id: who.clone(),
1156+
validation: self.transaction_pool.import(t),
1157+
});
11311158
}
11321159
}
11331160
}
11341161

1162+
fn on_handle_extrinsic_import(&mut self, who: PeerId, import: TransactionImport) {
1163+
match import {
1164+
TransactionImport::KnownGood => self.peerset_handle.report_peer(who, rep::ANY_EXTRINSIC_REFUND),
1165+
TransactionImport::NewGood => self.peerset_handle.report_peer(who, rep::GOOD_EXTRINSIC),
1166+
TransactionImport::Bad => self.peerset_handle.report_peer(who, rep::BAD_EXTRINSIC),
1167+
TransactionImport::None => {},
1168+
}
1169+
}
1170+
11351171
/// Propagate one extrinsic.
11361172
pub fn propagate_extrinsic(
11371173
&mut self,
@@ -1953,7 +1989,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
19531989
&mut self.context_data.stats,
19541990
&mut self.context_data.peers,
19551991
&id,
1956-
GenericMessage::BlockRequest(r)
1992+
GenericMessage::BlockRequest(r),
19571993
)
19581994
}
19591995
}
@@ -1970,7 +2006,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
19702006
&mut self.context_data.stats,
19712007
&mut self.context_data.peers,
19722008
&id,
1973-
GenericMessage::BlockRequest(r)
2009+
GenericMessage::BlockRequest(r),
19742010
)
19752011
}
19762012
}
@@ -1988,9 +2024,13 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
19882024
&mut self.context_data.stats,
19892025
&mut self.context_data.peers,
19902026
&id,
1991-
GenericMessage::FinalityProofRequest(r))
2027+
GenericMessage::FinalityProofRequest(r),
2028+
)
19922029
}
19932030
}
2031+
if let Poll::Ready(Some((peer_id, result))) = self.pending_transactions.poll_next_unpin(cx) {
2032+
self.on_handle_extrinsic_import(peer_id, result);
2033+
}
19942034
if let Some(message) = self.pending_messages.pop_front() {
19952035
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
19962036
}

client/service/src/builder.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -883,7 +883,6 @@ ServiceBuilder<
883883
imports_external_transactions: !matches!(config.role, Role::Light),
884884
pool: transaction_pool.clone(),
885885
client: client.clone(),
886-
executor: task_manager.spawn_handle(),
887886
});
888887

889888
let protocol_id = {

client/service/src/lib.rs

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use futures::{
5050
sink::SinkExt,
5151
task::{Spawn, FutureObj, SpawnError},
5252
};
53-
use sc_network::{NetworkService, network_state::NetworkState, PeerId, ReportHandle};
53+
use sc_network::{NetworkService, network_state::NetworkState, PeerId};
5454
use log::{log, warn, debug, error, Level};
5555
use codec::{Encode, Decode};
5656
use sp_runtime::generic::BlockId;
@@ -76,7 +76,10 @@ pub use sc_executor::NativeExecutionDispatch;
7676
#[doc(hidden)]
7777
pub use std::{ops::Deref, result::Result, sync::Arc};
7878
#[doc(hidden)]
79-
pub use sc_network::config::{FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder};
79+
pub use sc_network::config::{
80+
FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder, TransactionImport,
81+
TransactionImportFuture,
82+
};
8083
pub use sc_tracing::TracingReceiver;
8184
pub use task_manager::SpawnTaskHandle;
8285
use task_manager::TaskManager;
@@ -616,7 +619,6 @@ pub struct TransactionPoolAdapter<C, P> {
616619
imports_external_transactions: bool,
617620
pool: Arc<P>,
618621
client: Arc<C>,
619-
executor: SpawnTaskHandle,
620622
}
621623

622624
/// Get transactions for propagation.
@@ -659,42 +661,42 @@ where
659661

660662
fn import(
661663
&self,
662-
report_handle: ReportHandle,
663-
who: PeerId,
664-
reputation_change_good: sc_network::ReputationChange,
665-
reputation_change_bad: sc_network::ReputationChange,
666-
transaction: B::Extrinsic
667-
) {
664+
transaction: B::Extrinsic,
665+
) -> TransactionImportFuture {
668666
if !self.imports_external_transactions {
669667
debug!("Transaction rejected");
670-
return;
668+
Box::pin(futures::future::ready(TransactionImport::None));
671669
}
672670

673671
let encoded = transaction.encode();
674-
match Decode::decode(&mut &encoded[..]) {
675-
Ok(uxt) => {
676-
let best_block_id = BlockId::hash(self.client.info().best_hash);
677-
let source = sp_transaction_pool::TransactionSource::External;
678-
let import_future = self.pool.submit_one(&best_block_id, source, uxt);
679-
let import_future = import_future
680-
.map(move |import_result| {
681-
match import_result {
682-
Ok(_) => report_handle.report_peer(who, reputation_change_good),
683-
Err(e) => match e.into_pool_error() {
684-
Ok(sp_transaction_pool::error::Error::AlreadyImported(_)) => (),
685-
Ok(e) => {
686-
report_handle.report_peer(who, reputation_change_bad);
687-
debug!("Error adding transaction to the pool: {:?}", e)
688-
}
689-
Err(e) => debug!("Error converting pool error: {:?}", e),
690-
}
691-
}
692-
});
693-
694-
self.executor.spawn("extrinsic-import", import_future);
672+
let uxt = match Decode::decode(&mut &encoded[..]) {
673+
Ok(uxt) => uxt,
674+
Err(e) => {
675+
debug!("Transaction invalid: {:?}", e);
676+
return Box::pin(futures::future::ready(TransactionImport::Bad));
695677
}
696-
Err(e) => debug!("Error decoding transaction {}", e),
697-
}
678+
};
679+
680+
let best_block_id = BlockId::hash(self.client.info().best_hash);
681+
682+
let import_future = self.pool.submit_one(&best_block_id, sp_transaction_pool::TransactionSource::External, uxt);
683+
Box::pin(async move {
684+
match import_future.await {
685+
Ok(_) => TransactionImport::NewGood,
686+
Err(e) => match e.into_pool_error() {
687+
Ok(sp_transaction_pool::error::Error::AlreadyImported(_)) => TransactionImport::KnownGood,
688+
Ok(e) => {
689+
debug!("Error adding transaction to the pool: {:?}", e);
690+
TransactionImport::Bad
691+
}
692+
Err(e) => {
693+
debug!("Error converting pool error: {:?}", e);
694+
// it is not bad at least, just some internal node logic error, so peer is innocent.
695+
TransactionImport::KnownGood
696+
}
697+
}
698+
}
699+
})
698700
}
699701

700702
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>) {

0 commit comments

Comments
 (0)