Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 42 additions & 43 deletions crates/optimism/txpool/src/maintain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use crate::{
interop::{is_stale_interop, is_valid_interop, MaybeInteropTransaction},
supervisor::SupervisorClient,
};
use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader, Transaction};
use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader};
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
use metrics::Gauge;
use reth_chain_state::CanonStateNotification;
use reth_metrics::{metrics::Counter, Metrics};
use reth_primitives_traits::NodePrimitives;
use reth_transaction_pool::{error::PoolTransactionError, PoolTransaction, TransactionPool};
use std::sync::Arc;
use tracing::warn;

/// Transaction pool maintenance metrics
#[derive(Metrics)]
Expand Down Expand Up @@ -153,66 +153,65 @@ pub async fn maintain_transaction_pool_interop<N, Pool, St>(
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
{
let metrics = MaintainPoolInteropMetrics::default();
let supervisor_client = Arc::new(supervisor_client);

loop {
let Some(event) = events.next().await else { break };
if let CanonStateNotification::Commit { new } = event {
let timestamp = new.tip().timestamp();
let mut to_remove = Vec::new();
let mut to_revalidate = Vec::new();
let mut to_revalidate: Vec<<Pool as TransactionPool>::Transaction> = Vec::new();
let mut interop_count = 0;
for tx in &pool.pooled_transactions() {
// Only interop txs have this field set
if let Some(interop) = tx.transaction.interop_deadline() {

for tx_arc_wrapper in pool.pooled_transactions() {
if let Some(interop_deadline_val) = tx_arc_wrapper.transaction.interop_deadline() {
interop_count += 1;
if !is_valid_interop(interop, timestamp) {
// That means tx didn't revalidated during [`OFFSET_TIME`] time
// We could assume that it won't be validated at all and remove it
to_remove.push(*tx.hash());
} else if is_stale_interop(interop, timestamp, OFFSET_TIME) {
// If tx has less then [`OFFSET_TIME`] of valid time we revalidate it
to_revalidate.push(tx.clone())
if !is_valid_interop(interop_deadline_val, timestamp) {
to_remove.push(*tx_arc_wrapper.transaction.hash());
} else if is_stale_interop(interop_deadline_val, timestamp, OFFSET_TIME) {
to_revalidate.push(tx_arc_wrapper.transaction.clone());
}
}
}

metrics.set_interop_txs_in_pool(interop_count);

if !to_revalidate.is_empty() {
metrics.inc_stale_tx_interop(to_revalidate.len());
let checks_stream =
futures_util::stream::iter(to_revalidate.into_iter().map(|tx| {
let supervisor_client = supervisor_client.clone();
async move {
let check = supervisor_client
.is_valid_cross_tx(
tx.transaction.access_list(),
tx.transaction.hash(),
timestamp,
Some(TRANSACTION_VALIDITY_WINDOW),
// We could assume that interop is enabled, because
// tx.transaction.interop() would be set only in
// this case
true,
)
.await;
(tx.clone(), check)

let revalidation_stream = supervisor_client.revalidate_interop_txs_stream(
to_revalidate,
timestamp,
TRANSACTION_VALIDITY_WINDOW,
MAX_SUPERVISOR_QUERIES,
);

futures_util::pin_mut!(revalidation_stream);

while let Some((tx_item_from_stream, validation_result)) =
revalidation_stream.next().await
{
match validation_result {
Some(Ok(())) => {
tx_item_from_stream
.set_interop_deadline(timestamp + TRANSACTION_VALIDITY_WINDOW);
}
}))
.buffered(MAX_SUPERVISOR_QUERIES);
futures_util::pin_mut!(checks_stream);
while let Some((tx, check)) = checks_stream.next().await {
if let Some(Err(err)) = check {
// We remove only bad transaction. If error caused by supervisor instability
// or other fixable issues transaction would be validated on next state
// change, so we ignore it
if err.is_bad_transaction() {
to_remove.push(*tx.transaction.hash());
Some(Err(err)) => {
if err.is_bad_transaction() {
to_remove.push(*tx_item_from_stream.hash());
}
}
None => {
warn!(
target: "txpool",
hash = %tx_item_from_stream.hash(),
"Interop transaction no longer considered cross-chain during revalidation; removing."
);
to_remove.push(*tx_item_from_stream.hash());
}
} else {
tx.transaction.set_interop_deadline(timestamp + TRANSACTION_VALIDITY_WINDOW)
}
}
}

if !to_remove.is_empty() {
let removed = pool.remove_transactions(to_remove);
metrics.inc_removed_tx_interop(removed.len());
Expand Down
52 changes: 51 additions & 1 deletion crates/optimism/txpool/src/supervisor/client.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
//! This is our custom implementation of validator struct

use crate::{
interop::MaybeInteropTransaction,
supervisor::{
metrics::SupervisorMetrics, parse_access_list_items_to_inbox_entries, ExecutingDescriptor,
InteropTxValidatorError,
},
InvalidCrossTx,
};
use alloy_consensus::Transaction;
use alloy_eips::eip2930::AccessList;
use alloy_primitives::{TxHash, B256};
use alloy_rpc_client::ReqwestClient;
use futures_util::future::BoxFuture;
use futures_util::{
future::BoxFuture,
stream::{self, StreamExt},
Stream,
};
use op_alloy_consensus::interop::SafetyLevel;
use reth_transaction_pool::PoolTransaction;
use std::{
borrow::Cow,
future::IntoFuture,
Expand Down Expand Up @@ -111,6 +118,49 @@ impl SupervisorClient {
}
Some(Ok(()))
}

/// Creates a stream that revalidates interop transactions against the supervisor.
/// Returns
/// An implementation of `Stream` that is `Send`-able and tied to the lifetime `'a` of `self`.
/// Each item yielded by the stream is a tuple `(TItem, Option<Result<(), InvalidCrossTx>>)`.
/// - The first element is the original `TItem` that was revalidated.
/// - The second element is the `Option<Result<(), InvalidCrossTx>>` describes the outcome
/// - `None`: Transaction was not identified as a cross-chain candidate by initial checks.
/// - `Some(Ok(()))`: Supervisor confirmed the transaction is valid.
/// - `Some(Err(InvalidCrossTx))`: Supervisor indicated the transaction is invalid.
pub fn revalidate_interop_txs_stream<'a, TItem, InputIter>(
&'a self,
txs_to_revalidate: InputIter,
current_timestamp: u64,
revalidation_window: u64,
max_concurrent_queries: usize,
) -> impl Stream<Item = (TItem, Option<Result<(), InvalidCrossTx>>)> + Send + 'a
where
InputIter: IntoIterator<Item = TItem> + Send + 'a,
InputIter::IntoIter: Send + 'a,
TItem:
MaybeInteropTransaction + PoolTransaction + Transaction + Clone + Send + Sync + 'static,
{
stream::iter(txs_to_revalidate.into_iter().map(move |tx_item| {
let client_for_async_task = self.clone();

async move {
let validation_result = client_for_async_task
.is_valid_cross_tx(
tx_item.access_list(),
tx_item.hash(),
current_timestamp,
Some(revalidation_window),
true,
)
.await;

// return the original transaction paired with its validation result.
(tx_item, validation_result)
}
}))
.buffered(max_concurrent_queries)
}
}

/// Holds supervisor data. Inner type of [`SupervisorClient`].
Expand Down
Loading