Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions crates/rpc/rpc/src/eth/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::{
time::{Duration, Instant},
};
use tokio::{
sync::{mpsc::Receiver, Mutex},
sync::{mpsc::Receiver, oneshot, Mutex},
time::MissedTickBehavior,
};
use tracing::{error, trace};
Expand All @@ -51,7 +51,7 @@ where
limits: QueryLimits,
) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
trace!(target: "rpc::eth", "Serving eth_getLogs");
self.inner.logs_for_filter(filter, limits).map_err(|e| e.into())
self.logs_for_filter(filter, limits).map_err(|e| e.into())
}
}

Expand Down Expand Up @@ -169,7 +169,7 @@ where

impl<Eth> EthFilter<Eth>
where
Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt,
Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
{
/// Access the underlying provider.
fn provider(&self) -> &Eth::Provider {
Expand Down Expand Up @@ -244,8 +244,9 @@ where
};
let logs = self
.inner
.clone()
.get_logs_in_block_range(
&filter,
*filter,
from_block_number,
to_block_number,
self.inner.query_limits,
Expand Down Expand Up @@ -274,7 +275,16 @@ where
}
};

self.inner.logs_for_filter(filter, self.inner.query_limits).await
self.logs_for_filter(filter, self.inner.query_limits).await
}

/// Returns logs matching given filter object.
async fn logs_for_filter(
&self,
filter: Filter,
limits: QueryLimits,
) -> Result<Vec<Log>, EthFilterError> {
self.inner.clone().logs_for_filter(filter, limits).await
}
}

Expand Down Expand Up @@ -364,7 +374,7 @@ where
/// Handler for `eth_getLogs`
async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getLogs");
Ok(self.inner.logs_for_filter(filter, self.inner.query_limits).await?)
Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
}
}

Expand Down Expand Up @@ -398,7 +408,7 @@ struct EthFilterInner<Eth: EthApiTypes> {

impl<Eth> EthFilterInner<Eth>
where
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
{
/// Access the underlying provider.
fn provider(&self) -> &Eth::Provider {
Expand All @@ -414,7 +424,7 @@ where

/// Returns logs matching given filter object.
async fn logs_for_filter(
&self,
self: Arc<Self>,
filter: Filter,
limits: QueryLimits,
) -> Result<Vec<Log>, EthFilterError> {
Expand Down Expand Up @@ -468,7 +478,7 @@ where
.flatten();
let (from_block_number, to_block_number) =
logs_utils::get_filter_block_range(from, to, start_block, info);
self.get_logs_in_block_range(&filter, from_block_number, to_block_number, limits)
self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
.await
}
}
Expand Down Expand Up @@ -504,14 +514,15 @@ where
/// - underlying database error
/// - amount of matches exceeds configured limit
async fn get_logs_in_block_range(
&self,
filter: &Filter,
self: Arc<Self>,
filter: Filter,
from_block: u64,
to_block: u64,
limits: QueryLimits,
) -> Result<Vec<Log>, EthFilterError> {
trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");

// perform boundary checks first
if to_block < from_block {
return Err(EthFilterError::InvalidBlockRangeParams)
}
Expand All @@ -522,6 +533,32 @@ where
return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
}

let (tx, rx) = oneshot::channel();
let this = self.clone();
self.task_spawner.spawn_blocking(Box::pin(async move {
let res =
this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
let _ = tx.send(res);
}));

rx.await.map_err(|_| EthFilterError::InternalError)?
}

/// Returns all logs in the given _inclusive_ range that match the filter
///
/// Note: This function uses a mix of blocking db operations for fetching indices and header
/// ranges and utilizes the rpc cache for optimistically fetching receipts and blocks.
/// This function is considered blocking and should thus be spawned on a blocking task.
///
/// Returns an error if:
/// - underlying database error
async fn get_logs_in_block_range_inner(
&self,
filter: &Filter,
from_block: u64,
to_block: u64,
limits: QueryLimits,
) -> Result<Vec<Log>, EthFilterError> {
let mut all_logs = Vec::new();

// loop over the range of new blocks and check logs if the filter matches the log's bloom
Expand Down
Loading