Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions polkadot/transaction-pool/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ error_chain! {
description("Unrecognised address in extrinsic"),
display("Unrecognised address in extrinsic: {}", who),
}
/// Temporarily banned
TemporarilyBanned {
description("Extrinsic is temporarily banned"),
display("Extrinsic is temporarily banned"),
}
}
}

Expand Down
57 changes: 46 additions & 11 deletions polkadot/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ extern crate error_chain;
extern crate log;

mod error;
mod rotator;

use std::{
cmp::Ordering,
collections::HashMap,
ops::Deref,
sync::Arc,
time::{Duration, Instant},
};

use codec::{Decode, Encode};
Expand All @@ -53,12 +55,18 @@ use extrinsic_pool::{
};
use polkadot_api::PolkadotApi;
use primitives::{AccountId, BlockId, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic};
use rotator::PoolRotator;
use runtime::{Address, UncheckedExtrinsic};
use substrate_runtime_primitives::traits::{Bounded, Checkable, Hash as HashT, BlakeTwo256};

pub use extrinsic_pool::txpool::{Options, Status, LightStatus, VerifiedTransaction as VerifiedTransactionOps};
pub use error::{Error, ErrorKind, Result};

/// Maximum time the transaction will be kept in the pool.
///
/// Transactions that don't get included within the limit are removed from the pool.
const POOL_TIME: Duration = Duration::from_secs(60 * 5);

/// Type alias for convenience.
pub type CheckedExtrinsic = <UncheckedExtrinsic as Checkable<fn(Address) -> std::result::Result<AccountId, &'static str>>>::Checked;

Expand All @@ -70,6 +78,7 @@ pub struct VerifiedTransaction {
sender: Option<AccountId>,
hash: Hash,
encoded_size: usize,
valid_till: Instant,
}

impl VerifiedTransaction {
Expand Down Expand Up @@ -148,8 +157,8 @@ impl txpool::Scoring<VerifiedTransaction> for Scoring {
if old.is_fully_verified() {
assert!(new.is_fully_verified(), "Scoring::choose called with transactions from different senders");
if old.index() == new.index() {
// TODO [ToDr] Do we allow replacement? If yes then it should be Choice::ReplaceOld
return Choice::RejectNew;
// Allow replacing transactions so that we never fail to import to the pool.
return Choice::ReplaceOld;
}
}

Expand Down Expand Up @@ -187,17 +196,21 @@ impl txpool::Scoring<VerifiedTransaction> for Scoring {
pub struct Ready<'a, A: 'a + PolkadotApi> {
at_block: BlockId,
api: &'a A,
rotator: &'a PoolRotator,
known_nonces: HashMap<AccountId, ::primitives::Index>,
now: Instant,
}

impl<'a, A: 'a + PolkadotApi> Ready<'a, A> {
/// Create a new readiness evaluator at the given block. Requires that
/// the ID has already been checked for local corresponding and available state.
fn create(at: BlockId, api: &'a A) -> Self {
fn create(at: BlockId, api: &'a A, rotator: &'a PoolRotator) -> Self {
Ready {
at_block: at,
api,
known_nonces: HashMap::new(),
rotator,
known_nonces: Default::default(),
now: Instant::now(),
}
}
}
Expand All @@ -207,7 +220,9 @@ impl<'a, T: 'a + PolkadotApi> Clone for Ready<'a, T> {
Ready {
at_block: self.at_block.clone(),
api: self.api,
rotator: self.rotator,
known_nonces: self.known_nonces.clone(),
now: self.now.clone(),
}
}
}
Expand All @@ -230,6 +245,11 @@ impl<'a, A: 'a + PolkadotApi> txpool::Ready<VerifiedTransaction> for Ready<'a, A

trace!(target: "transaction-pool", "Next index for sender is {}; xt index is {}", next_index, xt.original.extrinsic.index);

if self.rotator.ban_if_stale(&self.now, xt) {
debug!(target: "transaction-pool", "[{}] Banning as stale.", xt.hash);
return Readiness::Stale;
}

let result = match xt.original.extrinsic.index.cmp(&next_index) {
// TODO: this won't work perfectly since accounts can now be killed, returning the nonce
// to zero.
Expand All @@ -251,11 +271,12 @@ impl<'a, A: 'a + PolkadotApi> txpool::Ready<VerifiedTransaction> for Ready<'a, A

pub struct Verifier<'a, A: 'a> {
api: &'a A,
rotator: &'a PoolRotator,
at_block: BlockId,
}

impl<'a, A> Verifier<'a, A> where
A: 'a + PolkadotApi,
A: 'a + PolkadotApi,
{
const NO_ACCOUNT: &'static str = "Account not found.";

Expand Down Expand Up @@ -289,6 +310,11 @@ impl<'a, A> txpool::Verifier<UncheckedExtrinsic> for Verifier<'a, A> where

debug!(target: "transaction-pool", "Transaction submitted: {}", ::substrate_primitives::hexdisplay::HexDisplay::from(&encoded));

if self.rotator.is_banned(&hash) {
debug!(target: "transaction-pool", "[{}] Transaction is temporarily banned", hash);
bail!(ErrorKind::TemporarilyBanned);
}

let inner = match uxt.clone().check_with(|a| self.lookup(a)) {
Ok(xt) => Some(xt),
// keep the transaction around in the future pool and attempt to promote it later.
Expand All @@ -298,17 +324,18 @@ impl<'a, A> txpool::Verifier<UncheckedExtrinsic> for Verifier<'a, A> where
let sender = inner.as_ref().map(|x| x.signed.clone());

if encoded_size < 1024 {
debug!(target: "transaction-pool", "Transaction verified: {} => {:?}", hash, uxt);
debug!(target: "transaction-pool", "[{}] Transaction verified: {:?}", hash, uxt);
} else {
debug!(target: "transaction-pool", "Transaction verified: {} ({} bytes is too large to display)", hash, encoded_size);
debug!(target: "transaction-pool", "[{}] Transaction verified: ({} bytes is too large to display)", hash, encoded_size);
}

Ok(VerifiedTransaction {
original: uxt,
inner,
sender,
hash,
encoded_size
encoded_size,
valid_till: Instant::now() + POOL_TIME,
})
}
}
Expand All @@ -319,16 +346,18 @@ impl<'a, A> txpool::Verifier<UncheckedExtrinsic> for Verifier<'a, A> where
pub struct TransactionPool<A> {
inner: Pool<Hash, VerifiedTransaction, Scoring, Error>,
api: Arc<A>,
rotator: PoolRotator,
}

impl<A> TransactionPool<A> where
A: PolkadotApi,
A: PolkadotApi,
{
/// Create a new transaction pool.
pub fn new(options: Options, api: Arc<A>) -> Self {
TransactionPool {
inner: Pool::new(options, Scoring),
api,
rotator: Default::default(),
}
}

Expand All @@ -337,6 +366,7 @@ impl<A> TransactionPool<A> where
let verifier = Verifier {
api: &*self.api,
at_block: block,
rotator: &self.rotator,
};
self.inner.submit(verifier, vec![uxt]).map(|mut v| v.swap_remove(0))
}
Expand All @@ -347,6 +377,7 @@ impl<A> TransactionPool<A> where
let verifier = Verifier {
api: &*self.api,
at_block: block,
rotator: &self.rotator,
};

self.inner.submit(verifier, to_reverify.into_iter().map(|tx| tx.original.clone()))?;
Expand All @@ -372,15 +403,18 @@ impl<A> TransactionPool<A> where

/// Cull old transactions from the queue.
pub fn cull(&self, block: BlockId) -> Result<usize> {
let ready = Ready::create(block, &*self.api);
self.rotator.clear_timeouts(&Instant::now());

let ready = Ready::create(block, &*self.api, &self.rotator);
Ok(self.inner.cull(None, ready))
}

/// Cull transactions from the queue and then compute the pending set.
pub fn cull_and_get_pending<F, T>(&self, block: BlockId, f: F) -> Result<T> where
F: FnOnce(txpool::PendingIterator<VerifiedTransaction, Ready<A>, Scoring, Listener<Hash>>) -> T,
{
let ready = Ready::create(block, &*self.api);
self.rotator.clear_timeouts(&Instant::now());
let ready = Ready::create(block, &*self.api, &self.rotator);
self.inner.cull(None, ready.clone());
Ok(self.inner.pending(ready, f))
}
Expand Down Expand Up @@ -424,6 +458,7 @@ impl<A> ExtrinsicPool<FutureProofUncheckedExtrinsic, BlockId, Hash> for Transact

let verifier = Verifier {
api: &*self.api,
rotator: &self.rotator,
at_block: block,
};

Expand Down
Loading