From 716fcdd3f3305dba647d42e5f25bf2f583fa5748 Mon Sep 17 00:00:00 2001 From: 7suyash7 Date: Sat, 10 May 2025 17:06:24 +0530 Subject: [PATCH 1/2] refactor(optimism_txpool): Move interop revalidation logic to SupervisorClient stream Signed-off-by: 7suyash7 --- crates/optimism/txpool/src/maintain.rs | 85 +++++++++---------- .../optimism/txpool/src/supervisor/client.rs | 50 ++++++++++- 2 files changed, 91 insertions(+), 44 deletions(-) diff --git a/crates/optimism/txpool/src/maintain.rs b/crates/optimism/txpool/src/maintain.rs index 571b1ab7b32..dc66fafa356 100644 --- a/crates/optimism/txpool/src/maintain.rs +++ b/crates/optimism/txpool/src/maintain.rs @@ -12,14 +12,14 @@ use crate::{ interop::{is_stale_interop, is_valid_interop, MaybeInteropTransaction}, supervisor::SupervisorClient, }; -use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader, Transaction}; +use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader}; use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt}; use metrics::Gauge; use reth_chain_state::CanonStateNotification; use reth_metrics::{metrics::Counter, Metrics}; use reth_primitives_traits::NodePrimitives; use reth_transaction_pool::{error::PoolTransactionError, PoolTransaction, TransactionPool}; -use std::sync::Arc; +use tracing::warn; /// Transaction pool maintenance metrics #[derive(Metrics)] @@ -153,66 +153,65 @@ pub async fn maintain_transaction_pool_interop( St: Stream> + Send + Unpin + 'static, { let metrics = MaintainPoolInteropMetrics::default(); - let supervisor_client = Arc::new(supervisor_client); + loop { let Some(event) = events.next().await else { break }; if let CanonStateNotification::Commit { new } = event { let timestamp = new.tip().timestamp(); let mut to_remove = Vec::new(); - let mut to_revalidate = Vec::new(); + let mut to_revalidate: Vec<::Transaction> = Vec::new(); let mut interop_count = 0; - for tx in &pool.pooled_transactions() { - // Only interop txs have this field set - if let Some(interop) = tx.transaction.interop_deadline() { + + for tx_arc_wrapper in pool.pooled_transactions() { + if let Some(interop_deadline_val) = tx_arc_wrapper.transaction.interop_deadline() { interop_count += 1; - if !is_valid_interop(interop, timestamp) { - // That means tx didn't revalidated during [`OFFSET_TIME`] time - // We could assume that it won't be validated at all and remove it - to_remove.push(*tx.hash()); - } else if is_stale_interop(interop, timestamp, OFFSET_TIME) { - // If tx has less then [`OFFSET_TIME`] of valid time we revalidate it - to_revalidate.push(tx.clone()) + if !is_valid_interop(interop_deadline_val, timestamp) { + to_remove.push(*tx_arc_wrapper.transaction.hash()); + } else if is_stale_interop(interop_deadline_val, timestamp, OFFSET_TIME) { + to_revalidate.push(tx_arc_wrapper.transaction.clone()); } } } metrics.set_interop_txs_in_pool(interop_count); + if !to_revalidate.is_empty() { metrics.inc_stale_tx_interop(to_revalidate.len()); - let checks_stream = - futures_util::stream::iter(to_revalidate.into_iter().map(|tx| { - let supervisor_client = supervisor_client.clone(); - async move { - let check = supervisor_client - .is_valid_cross_tx( - tx.transaction.access_list(), - tx.transaction.hash(), - timestamp, - Some(TRANSACTION_VALIDITY_WINDOW), - // We could assume that interop is enabled, because - // tx.transaction.interop() would be set only in - // this case - true, - ) - .await; - (tx.clone(), check) + + let revalidation_stream = supervisor_client.revalidate_interop_txs_stream( + to_revalidate, + timestamp, + TRANSACTION_VALIDITY_WINDOW, + MAX_SUPERVISOR_QUERIES, + ); + + futures_util::pin_mut!(revalidation_stream); + + while let Some((tx_item_from_stream, validation_result)) = + revalidation_stream.next().await + { + match validation_result { + Some(Ok(())) => { + tx_item_from_stream + .set_interop_deadline(timestamp + TRANSACTION_VALIDITY_WINDOW); } - })) - .buffered(MAX_SUPERVISOR_QUERIES); - futures_util::pin_mut!(checks_stream); - while let Some((tx, check)) = checks_stream.next().await { - if let Some(Err(err)) = check { - // We remove only bad transaction. If error caused by supervisor instability - // or other fixable issues transaction would be validated on next state - // change, so we ignore it - if err.is_bad_transaction() { - to_remove.push(*tx.transaction.hash()); + Some(Err(err)) => { + if err.is_bad_transaction() { + to_remove.push(*tx_item_from_stream.hash()); + } + } + None => { + warn!( + target: "txpool", + hash = %tx_item_from_stream.hash(), + "Interop transaction no longer considered cross-chain during revalidation; removing." + ); + to_remove.push(*tx_item_from_stream.hash()); } - } else { - tx.transaction.set_interop_deadline(timestamp + TRANSACTION_VALIDITY_WINDOW) } } } + if !to_remove.is_empty() { let removed = pool.remove_transactions(to_remove); metrics.inc_removed_tx_interop(removed.len()); diff --git a/crates/optimism/txpool/src/supervisor/client.rs b/crates/optimism/txpool/src/supervisor/client.rs index 075a5c92bbb..178bcd5227b 100644 --- a/crates/optimism/txpool/src/supervisor/client.rs +++ b/crates/optimism/txpool/src/supervisor/client.rs @@ -1,17 +1,24 @@ //! This is our custom implementation of validator struct use crate::{ + interop::MaybeInteropTransaction, supervisor::{ metrics::SupervisorMetrics, parse_access_list_items_to_inbox_entries, ExecutingDescriptor, InteropTxValidatorError, }, InvalidCrossTx, }; +use alloy_consensus::Transaction; use alloy_eips::eip2930::AccessList; use alloy_primitives::{TxHash, B256}; use alloy_rpc_client::ReqwestClient; -use futures_util::future::BoxFuture; +use futures_util::{ + future::BoxFuture, + stream::{self, StreamExt}, + Stream, +}; use op_alloy_consensus::interop::SafetyLevel; +use reth_transaction_pool::PoolTransaction; use std::{ borrow::Cow, future::IntoFuture, @@ -111,6 +118,47 @@ impl SupervisorClient { } Some(Ok(())) } + + /// Creates a stream that revalidates interop transactions against the supervisor. + /// Returns + /// An implementation of `Stream` that is `Send`-able and tied to the lifetime `'a` of `self`. + /// Each item yielded by the stream is a tuple `(TItem, Option>)`. + /// - The first element is the original `TItem` that was revalidated. + /// - The second element is the `Option>` returned directly by the + /// call to [`Self::is_valid_cross_tx`] for that `TItem`. + pub fn revalidate_interop_txs_stream<'a, TItem, InputIter>( + &'a self, // self is &'a SupervisorClient + txs_to_revalidate: InputIter, + current_timestamp: u64, + revalidation_window: u64, + max_concurrent_queries: usize, + ) -> impl Stream>)> + Send + 'a + where + InputIter: IntoIterator + Send + 'a, + InputIter::IntoIter: Send + 'a, + TItem: + MaybeInteropTransaction + PoolTransaction + Transaction + Clone + Send + Sync + 'static, + { + stream::iter(txs_to_revalidate.into_iter().map(move |tx_item| { + let client_for_async_task = self.clone(); + + async move { + let validation_result = client_for_async_task + .is_valid_cross_tx( + tx_item.access_list(), + tx_item.hash(), + current_timestamp, + Some(revalidation_window), + true, + ) + .await; + + // return the original transaction paired with its validation result. + (tx_item, validation_result) + } + })) + .buffered(max_concurrent_queries) + } } /// Holds supervisor data. Inner type of [`SupervisorClient`]. From b4ccfdd66448f671d0804765430c213ed751d610 Mon Sep 17 00:00:00 2001 From: 7suyash7 Date: Sat, 10 May 2025 17:22:37 +0530 Subject: [PATCH 2/2] refactor(optimism_txpool): Move interop revalidation logic to SupervisorClient stream Signed-off-by: 7suyash7 --- crates/optimism/txpool/src/supervisor/client.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/optimism/txpool/src/supervisor/client.rs b/crates/optimism/txpool/src/supervisor/client.rs index 178bcd5227b..5b6c65eeb28 100644 --- a/crates/optimism/txpool/src/supervisor/client.rs +++ b/crates/optimism/txpool/src/supervisor/client.rs @@ -124,10 +124,12 @@ impl SupervisorClient { /// An implementation of `Stream` that is `Send`-able and tied to the lifetime `'a` of `self`. /// Each item yielded by the stream is a tuple `(TItem, Option>)`. /// - The first element is the original `TItem` that was revalidated. - /// - The second element is the `Option>` returned directly by the - /// call to [`Self::is_valid_cross_tx`] for that `TItem`. + /// - The second element is the `Option>` describes the outcome + /// - `None`: Transaction was not identified as a cross-chain candidate by initial checks. + /// - `Some(Ok(()))`: Supervisor confirmed the transaction is valid. + /// - `Some(Err(InvalidCrossTx))`: Supervisor indicated the transaction is invalid. pub fn revalidate_interop_txs_stream<'a, TItem, InputIter>( - &'a self, // self is &'a SupervisorClient + &'a self, txs_to_revalidate: InputIter, current_timestamp: u64, revalidation_window: u64,