From 93a9c8bf29225bb134e0a4e2dc0df52d35af43ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 23 Aug 2018 11:14:46 +0200 Subject: [PATCH 1/5] Allow replacing transactions. --- polkadot/transaction-pool/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/polkadot/transaction-pool/src/lib.rs b/polkadot/transaction-pool/src/lib.rs index d76eae57cbd74..5fc15087330cd 100644 --- a/polkadot/transaction-pool/src/lib.rs +++ b/polkadot/transaction-pool/src/lib.rs @@ -148,8 +148,8 @@ impl txpool::Scoring for Scoring { if old.is_fully_verified() { assert!(new.is_fully_verified(), "Scoring::choose called with transactions from different senders"); if old.index() == new.index() { - // TODO [ToDr] Do we allow replacement? If yes then it should be Choice::ReplaceOld - return Choice::RejectNew; + // Allow replacing transactions so that we never fail to import to the pool. + return Choice::ReplaceOld; } } From b30c6e5ca25879e28a74a194d4a9da1f3ef1e9d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 23 Aug 2018 13:08:08 +0200 Subject: [PATCH 2/5] Clear old transactions and ban them temporarily. --- polkadot/transaction-pool/src/error.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/polkadot/transaction-pool/src/error.rs b/polkadot/transaction-pool/src/error.rs index ef6cdf6b4166c..09a2324316ae7 100644 --- a/polkadot/transaction-pool/src/error.rs +++ b/polkadot/transaction-pool/src/error.rs @@ -55,6 +55,11 @@ error_chain! { description("Unrecognised address in extrinsic"), display("Unrecognised address in extrinsic: {}", who), } + /// Temporarily banned + TemporarilyBanned { + description("Extrinsic is temporarily banned"), + display("Extrinsic is temporarily banned"), + } } } From c8faf8739e5514ea2cf9617f7ef721cd96fde859 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 23 Aug 2018 15:04:31 +0200 Subject: [PATCH 3/5] Move to a separate module and add some tests. --- polkadot/transaction-pool/src/lib.rs | 109 +++++++++++++-- polkadot/transaction-pool/src/rotator.rs | 165 +++++++++++++++++++++++ 2 files changed, 265 insertions(+), 9 deletions(-) create mode 100644 polkadot/transaction-pool/src/rotator.rs diff --git a/polkadot/transaction-pool/src/lib.rs b/polkadot/transaction-pool/src/lib.rs index 5fc15087330cd..e57a23b3bd994 100644 --- a/polkadot/transaction-pool/src/lib.rs +++ b/polkadot/transaction-pool/src/lib.rs @@ -41,6 +41,7 @@ use std::{ collections::HashMap, ops::Deref, sync::Arc, + time::{Duration, Instant}, }; use codec::{Decode, Encode}; @@ -51,6 +52,7 @@ use extrinsic_pool::{ Pool, Listener, }; +use parking_lot::RwLock; use polkadot_api::PolkadotApi; use primitives::{AccountId, BlockId, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic}; use runtime::{Address, UncheckedExtrinsic}; @@ -59,6 +61,11 @@ use substrate_runtime_primitives::traits::{Bounded, Checkable, Hash as HashT, Bl pub use extrinsic_pool::txpool::{Options, Status, LightStatus, VerifiedTransaction as VerifiedTransactionOps}; pub use error::{Error, ErrorKind, Result}; +/// Maximum time the transaction will be kept in the pool. +/// +/// Transactions that don't get included within the limit are removed from the pool. +const POOL_TIME: Duration = Duration::from_secs(60 * 5); + /// Type alias for convenience. pub type CheckedExtrinsic = std::result::Result>>::Checked; @@ -70,6 +77,7 @@ pub struct VerifiedTransaction { sender: Option, hash: Hash, encoded_size: usize, + valid_till: Instant, } impl VerifiedTransaction { @@ -187,17 +195,21 @@ impl txpool::Scoring for Scoring { pub struct Ready<'a, A: 'a + PolkadotApi> { at_block: BlockId, api: &'a A, + rotator: &'a PoolRotator, known_nonces: HashMap, + now: Instant, } impl<'a, A: 'a + PolkadotApi> Ready<'a, A> { /// Create a new readiness evaluator at the given block. Requires that /// the ID has already been checked for local corresponding and available state. - fn create(at: BlockId, api: &'a A) -> Self { + fn create(at: BlockId, api: &'a A, rotator: &'a PoolRotator) -> Self { Ready { at_block: at, api, - known_nonces: HashMap::new(), + rotator, + known_nonces: Default::default(), + now: Instant::now(), } } } @@ -207,7 +219,9 @@ impl<'a, T: 'a + PolkadotApi> Clone for Ready<'a, T> { Ready { at_block: self.at_block.clone(), api: self.api, + rotator: self.rotator, known_nonces: self.known_nonces.clone(), + now: self.now.clone(), } } } @@ -230,6 +244,11 @@ impl<'a, A: 'a + PolkadotApi> txpool::Ready for Ready<'a, A trace!(target: "transaction-pool", "Next index for sender is {}; xt index is {}", next_index, xt.original.extrinsic.index); + if self.rotator.ban_if_stale(&self.now, xt) { + debug!(target: "transaction-pool", "[{}] Banning as stale.", xt.hash); + return Readiness::Stale; + } + let result = match xt.original.extrinsic.index.cmp(&next_index) { // TODO: this won't work perfectly since accounts can now be killed, returning the nonce // to zero. @@ -251,11 +270,12 @@ impl<'a, A: 'a + PolkadotApi> txpool::Ready for Ready<'a, A pub struct Verifier<'a, A: 'a> { api: &'a A, + rotator: &'a PoolRotator, at_block: BlockId, } impl<'a, A> Verifier<'a, A> where - A: 'a + PolkadotApi, +A: 'a + PolkadotApi, { const NO_ACCOUNT: &'static str = "Account not found."; @@ -289,6 +309,11 @@ impl<'a, A> txpool::Verifier for Verifier<'a, A> where debug!(target: "transaction-pool", "Transaction submitted: {}", ::substrate_primitives::hexdisplay::HexDisplay::from(&encoded)); + if self.rotator.is_banned(&hash) { + debug!(target: "transaction-pool", "[{}] Transaction is temporarily banned", hash); + bail!(ErrorKind::TemporarilyBanned); + } + let inner = match uxt.clone().check_with(|a| self.lookup(a)) { Ok(xt) => Some(xt), // keep the transaction around in the future pool and attempt to promote it later. @@ -298,9 +323,9 @@ impl<'a, A> txpool::Verifier for Verifier<'a, A> where let sender = inner.as_ref().map(|x| x.signed.clone()); if encoded_size < 1024 { - debug!(target: "transaction-pool", "Transaction verified: {} => {:?}", hash, uxt); + debug!(target: "transaction-pool", "[{}] Transaction verified: {:?}", hash, uxt); } else { - debug!(target: "transaction-pool", "Transaction verified: {} ({} bytes is too large to display)", hash, encoded_size); + debug!(target: "transaction-pool", "[{}] Transaction verified: ({} bytes is too large to display)", hash, encoded_size); } Ok(VerifiedTransaction { @@ -308,27 +333,87 @@ impl<'a, A> txpool::Verifier for Verifier<'a, A> where inner, sender, hash, - encoded_size + encoded_size, + valid_till: Instant::now() + POOL_TIME, }) } } +/// Pool rotator is responsible to only keep fresh transactions in the pool. +/// +/// Transactions that occupy the pool for too long are culled and temporarily banned from entering +/// the pool again. +struct PoolRotator { + /// How long the transaction is banned for. + ban_time: Duration, + /// Currently banned transactions. + banned_until: RwLock>, +} + +impl Default for PoolRotator { + fn default() -> Self { + PoolRotator { + ban_time: Duration::from_secs(60 * 30), + banned_until: Default::default(), + } + } +} + +impl PoolRotator { + /// Returns `true` if transaction hash is currently banned. + pub fn is_banned(&self, hash: &Hash) -> bool { + self.banned_until.read().contains_key(hash) + } + + /// Bans transaction if it's stale. + /// + /// Returns `true` if transaction is stale and got banned. + pub fn ban_if_stale(&self, now: &Instant, tx: &VerifiedTransaction) -> bool { + if &tx.valid_till > now { + return false; + } + + self.banned_until.write().insert(*tx.hash(), *now + self.ban_time); + true + } + + /// Removes timed bans. + pub fn clear_timeouts(&self, now: &Instant) { + let to_remove = { + self.banned_until.read() + .iter() + .filter_map(|(k, v)| if v < now { + Some(*k) + } else { + None + }).collect::>() + }; + + let mut banned = self.banned_until.write(); + for k in to_remove { + banned.remove(&k); + } + } +} + /// The polkadot transaction pool. /// /// Wraps a `extrinsic_pool::Pool`. pub struct TransactionPool { inner: Pool, api: Arc, + rotator: PoolRotator, } impl TransactionPool where - A: PolkadotApi, +A: PolkadotApi, { /// Create a new transaction pool. pub fn new(options: Options, api: Arc) -> Self { TransactionPool { inner: Pool::new(options, Scoring), api, + rotator: Default::default(), } } @@ -337,6 +422,7 @@ impl TransactionPool where let verifier = Verifier { api: &*self.api, at_block: block, + rotator: &self.rotator, }; self.inner.submit(verifier, vec![uxt]).map(|mut v| v.swap_remove(0)) } @@ -347,6 +433,7 @@ impl TransactionPool where let verifier = Verifier { api: &*self.api, at_block: block, + rotator: &self.rotator, }; self.inner.submit(verifier, to_reverify.into_iter().map(|tx| tx.original.clone()))?; @@ -372,7 +459,9 @@ impl TransactionPool where /// Cull old transactions from the queue. pub fn cull(&self, block: BlockId) -> Result { - let ready = Ready::create(block, &*self.api); + self.rotator.clear_timeouts(&Instant::now()); + + let ready = Ready::create(block, &*self.api, &self.rotator); Ok(self.inner.cull(None, ready)) } @@ -380,7 +469,8 @@ impl TransactionPool where pub fn cull_and_get_pending(&self, block: BlockId, f: F) -> Result where F: FnOnce(txpool::PendingIterator, Scoring, Listener>) -> T, { - let ready = Ready::create(block, &*self.api); + self.rotator.clear_timeouts(&Instant::now()); + let ready = Ready::create(block, &*self.api, &self.rotator); self.inner.cull(None, ready.clone()); Ok(self.inner.pending(ready, f)) } @@ -424,6 +514,7 @@ impl ExtrinsicPool for Transact let verifier = Verifier { api: &*self.api, + rotator: &self.rotator, at_block: block, }; diff --git a/polkadot/transaction-pool/src/rotator.rs b/polkadot/transaction-pool/src/rotator.rs new file mode 100644 index 0000000000000..f1c324fd84fff --- /dev/null +++ b/polkadot/transaction-pool/src/rotator.rs @@ -0,0 +1,165 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Rotate extrinsic inside the pool. +//! +//! Keeps only recent extrinsic and discard the ones kept for a significant amount of time. +//! Discarded extrinsics are banned so that they don't get re-imported again. + +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; +use parking_lot::RwLock; +use primitives::Hash; +use VerifiedTransaction; + +/// Pool rotator is responsible to only keep fresh extrinsics in the pool. +/// +/// Extrinsics that occupy the pool for too long are culled and temporarily banned from entering +/// the pool again. +pub struct PoolRotator { + /// How long the extrinsic is banned for. + ban_time: Duration, + /// Currently banned extrinsics. + banned_until: RwLock>, +} + +impl Default for PoolRotator { + fn default() -> Self { + PoolRotator { + ban_time: Duration::from_secs(60 * 30), + banned_until: Default::default(), + } + } +} + +impl PoolRotator { + /// Returns `true` if extrinsic hash is currently banned. + pub fn is_banned(&self, hash: &Hash) -> bool { + self.banned_until.read().contains_key(hash) + } + + /// Bans extrinsic if it's stale. + /// + /// Returns `true` if extrinsic is stale and got banned. + pub fn ban_if_stale(&self, now: &Instant, tx: &VerifiedTransaction) -> bool { + if &tx.valid_till > now { + return false; + } + + self.banned_until.write().insert(*tx.hash(), *now + self.ban_time); + true + } + + /// Removes timed bans. + pub fn clear_timeouts(&self, now: &Instant) { + let to_remove = { + self.banned_until.read() + .iter() + .filter_map(|(k, v)| if v < now { + Some(*k) + } else { + None + }).collect::>() + }; + + let mut banned = self.banned_until.write(); + for k in to_remove { + banned.remove(&k); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use runtime::{Extrinsic, Call, TimestampCall, UncheckedExtrinsic}; + + fn rotator() -> PoolRotator { + PoolRotator { + ban_time: Duration::from_millis(10), + ..Default::default() + } + } + + fn tx() -> (Hash, VerifiedTransaction) { + let hash: Hash = 5.into(); + let tx = VerifiedTransaction { + original: UncheckedExtrinsic::new( + Extrinsic { + function: Call::Timestamp(TimestampCall::set(100_000_000)), + signed: Default::default(), + index: Default::default(), + }, + Default::default(), + ), + inner: None, + sender: None, + hash: hash.clone(), + encoded_size: 1024, + valid_till: Instant::now(), + }; + + (hash, tx) + } + + #[test] + fn should_not_ban_if_not_stale() { + // given + let (hash, tx) = tx(); + let rotator = rotator(); + assert!(!rotator.is_banned(&hash)); + let past = Instant::now() - Duration::from_millis(1000); + + // when + assert!(!rotator.ban_if_stale(&past, &tx)); + + // then + assert!(!rotator.is_banned(&hash)); + } + + #[test] + fn should_ban_stale_extrinsic() { + // given + let (hash, tx) = tx(); + let rotator = rotator(); + assert!(!rotator.is_banned(&hash)); + + // when + assert!(rotator.ban_if_stale(&Instant::now(), &tx)); + + // then + assert!(rotator.is_banned(&hash)); + } + + + #[test] + fn should_clear_banned() { + // given + let (hash, tx) = tx(); + let rotator = rotator(); + assert!(rotator.ban_if_stale(&Instant::now(), &tx)); + assert!(rotator.is_banned(&hash)); + + // when + let future = Instant::now() + rotator.ban_time + rotator.ban_time; + rotator.clear_timeouts(&future); + + // then + assert!(!rotator.is_banned(&hash)); + } +} From 030c68ccf142df929b47a74e24eec4fd8ce6c67d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 23 Aug 2018 16:06:39 +0200 Subject: [PATCH 4/5] Add bound to banned transactions. --- polkadot/transaction-pool/src/rotator.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/polkadot/transaction-pool/src/rotator.rs b/polkadot/transaction-pool/src/rotator.rs index f1c324fd84fff..52ee1ed64925a 100644 --- a/polkadot/transaction-pool/src/rotator.rs +++ b/polkadot/transaction-pool/src/rotator.rs @@ -27,6 +27,9 @@ use parking_lot::RwLock; use primitives::Hash; use VerifiedTransaction; +/// Expected size of the banned extrinsics cache. +const EXPECTED_SIZE: usize = 2048; + /// Pool rotator is responsible to only keep fresh extrinsics in the pool. /// /// Extrinsics that occupy the pool for too long are culled and temporarily banned from entering @@ -61,7 +64,16 @@ impl PoolRotator { return false; } - self.banned_until.write().insert(*tx.hash(), *now + self.ban_time); + let mut banned = self.banned_until.write(); + banned.insert(*tx.hash(), *now + self.ban_time); + if banned.len() > 2 * EXPECTED_SIZE { + while banned.len() > EXPECTED_SIZE { + if let Ok(key) = banned.keys().next().cloned() { + banned.remove(&key); + } + } + } + true } From 6ef592c1d5b3e84e19f999a4919af3ef41056c74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 24 Aug 2018 14:49:28 +0200 Subject: [PATCH 5/5] Remove unnecessary block and double PoolRotator. --- polkadot/transaction-pool/src/lib.rs | 60 +----------------------- polkadot/transaction-pool/src/rotator.rs | 21 ++++----- 2 files changed, 12 insertions(+), 69 deletions(-) diff --git a/polkadot/transaction-pool/src/lib.rs b/polkadot/transaction-pool/src/lib.rs index e57a23b3bd994..5903c96388a2c 100644 --- a/polkadot/transaction-pool/src/lib.rs +++ b/polkadot/transaction-pool/src/lib.rs @@ -35,6 +35,7 @@ extern crate error_chain; extern crate log; mod error; +mod rotator; use std::{ cmp::Ordering, @@ -52,9 +53,9 @@ use extrinsic_pool::{ Pool, Listener, }; -use parking_lot::RwLock; use polkadot_api::PolkadotApi; use primitives::{AccountId, BlockId, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic}; +use rotator::PoolRotator; use runtime::{Address, UncheckedExtrinsic}; use substrate_runtime_primitives::traits::{Bounded, Checkable, Hash as HashT, BlakeTwo256}; @@ -339,63 +340,6 @@ impl<'a, A> txpool::Verifier for Verifier<'a, A> where } } -/// Pool rotator is responsible to only keep fresh transactions in the pool. -/// -/// Transactions that occupy the pool for too long are culled and temporarily banned from entering -/// the pool again. -struct PoolRotator { - /// How long the transaction is banned for. - ban_time: Duration, - /// Currently banned transactions. - banned_until: RwLock>, -} - -impl Default for PoolRotator { - fn default() -> Self { - PoolRotator { - ban_time: Duration::from_secs(60 * 30), - banned_until: Default::default(), - } - } -} - -impl PoolRotator { - /// Returns `true` if transaction hash is currently banned. - pub fn is_banned(&self, hash: &Hash) -> bool { - self.banned_until.read().contains_key(hash) - } - - /// Bans transaction if it's stale. - /// - /// Returns `true` if transaction is stale and got banned. - pub fn ban_if_stale(&self, now: &Instant, tx: &VerifiedTransaction) -> bool { - if &tx.valid_till > now { - return false; - } - - self.banned_until.write().insert(*tx.hash(), *now + self.ban_time); - true - } - - /// Removes timed bans. - pub fn clear_timeouts(&self, now: &Instant) { - let to_remove = { - self.banned_until.read() - .iter() - .filter_map(|(k, v)| if v < now { - Some(*k) - } else { - None - }).collect::>() - }; - - let mut banned = self.banned_until.write(); - for k in to_remove { - banned.remove(&k); - } - } -} - /// The polkadot transaction pool. /// /// Wraps a `extrinsic_pool::Pool`. diff --git a/polkadot/transaction-pool/src/rotator.rs b/polkadot/transaction-pool/src/rotator.rs index 52ee1ed64925a..06847443c6f6e 100644 --- a/polkadot/transaction-pool/src/rotator.rs +++ b/polkadot/transaction-pool/src/rotator.rs @@ -68,7 +68,7 @@ impl PoolRotator { banned.insert(*tx.hash(), *now + self.ban_time); if banned.len() > 2 * EXPECTED_SIZE { while banned.len() > EXPECTED_SIZE { - if let Ok(key) = banned.keys().next().cloned() { + if let Some(key) = banned.keys().next().cloned() { banned.remove(&key); } } @@ -79,17 +79,16 @@ impl PoolRotator { /// Removes timed bans. pub fn clear_timeouts(&self, now: &Instant) { - let to_remove = { - self.banned_until.read() - .iter() - .filter_map(|(k, v)| if v < now { - Some(*k) - } else { - None - }).collect::>() - }; - let mut banned = self.banned_until.write(); + + let to_remove = banned + .iter() + .filter_map(|(k, v)| if v < now { + Some(*k) + } else { + None + }).collect::>(); + for k in to_remove { banned.remove(&k); }