Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions crates/net/eth-wire-types/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use alloy_primitives::{
use alloy_rlp::{
Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper,
};
use core::mem;
use core::{fmt::Debug, mem};
use derive_more::{Constructor, Deref, DerefMut, From, IntoIterator};
use reth_codecs_derive::{add_arbitrary_tests, generate_tests};
use reth_ethereum_primitives::TransactionSigned;
use reth_primitives_traits::SignedTransaction;
use reth_primitives_traits::{Block, SignedTransaction};

/// This informs peers of new blocks that have appeared on the network.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
Expand Down Expand Up @@ -64,6 +64,17 @@ impl From<NewBlockHashes> for Vec<BlockHashNumber> {
}
}

/// A trait for block payloads transmitted through p2p.
pub trait NewBlockPayload:
Encodable + Decodable + Clone + Eq + Debug + Send + Sync + Unpin + 'static
{
/// The block type.
type Block: Block;

/// Returns a reference to the block.
fn block(&self) -> &Self::Block;
}

/// A new block with the current total difficulty, which includes the difficulty of the returned
/// block.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)]
Expand All @@ -76,6 +87,14 @@ pub struct NewBlock<B = reth_ethereum_primitives::Block> {
pub td: U128,
}

impl<B: Block + 'static> NewBlockPayload for NewBlock<B> {
type Block = B;

fn block(&self) -> &Self::Block {
&self.block
}
}

generate_tests!(#[rlp, 25] NewBlock<reth_ethereum_primitives::Block>, EthNewBlockTests);

/// This informs peers of transactions that have appeared on the network and are not yet included
Expand Down
10 changes: 5 additions & 5 deletions crates/net/eth-wire-types/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use super::{
broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewPooledTransactionHashes66,
GetNodeData, GetPooledTransactions, GetReceipts, NewPooledTransactionHashes66,
NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69,
Transactions,
};
Expand Down Expand Up @@ -78,7 +78,7 @@ impl<N: NetworkPrimitives> ProtocolMessage<N> {
if version.is_eth69() {
return Err(MessageError::Invalid(version, EthMessageID::NewBlock));
}
EthMessage::NewBlock(Box::new(NewBlock::decode(buf)?))
EthMessage::NewBlock(Box::new(N::NewBlockPayload::decode(buf)?))
}
EthMessageID::Transactions => EthMessage::Transactions(Transactions::decode(buf)?),
EthMessageID::NewPooledTransactionHashes => {
Expand Down Expand Up @@ -218,9 +218,9 @@ pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Represents a `NewBlock` message broadcast to the network.
#[cfg_attr(
feature = "serde",
serde(bound = "N::Block: serde::Serialize + serde::de::DeserializeOwned")
serde(bound = "N::NewBlockPayload: serde::Serialize + serde::de::DeserializeOwned")
)]
NewBlock(Box<NewBlock<N::Block>>),
NewBlock(Box<N::NewBlockPayload>),
/// Represents a Transactions message broadcast to the network.
#[cfg_attr(
feature = "serde",
Expand Down Expand Up @@ -394,7 +394,7 @@ impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EthBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Represents a new block broadcast message.
NewBlock(Arc<NewBlock<N::Block>>),
NewBlock(Arc<N::NewBlockPayload>),
/// Represents a transactions broadcast message.
Transactions(SharedTransactions<N::BroadcastedTransaction>),
}
Expand Down
16 changes: 13 additions & 3 deletions crates/net/eth-wire-types/src/primitives.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
//! Abstraction over primitive types in network messages.

use crate::NewBlockPayload;
use alloy_consensus::{RlpDecodableReceipt, RlpEncodableReceipt, TxReceipt};
use alloy_rlp::{Decodable, Encodable};
use core::fmt::Debug;
use reth_ethereum_primitives::{EthPrimitives, PooledTransactionVariant};
use reth_primitives_traits::{Block, BlockBody, BlockHeader, NodePrimitives, SignedTransaction};
use reth_primitives_traits::{
Block, BlockBody, BlockHeader, BlockTy, NodePrimitives, SignedTransaction,
};

/// Abstraction over primitive types which might appear in network messages. See
/// [`crate::EthMessage`] for more context.
Expand Down Expand Up @@ -37,6 +40,9 @@ pub trait NetworkPrimitives: Send + Sync + Unpin + Clone + Debug + 'static {
+ Decodable
+ Unpin
+ 'static;

/// The payload type for the `NewBlock` message.
type NewBlockPayload: NewBlockPayload<Block = Self::Block>;
}

/// This is a helper trait for use in bounds, where some of the [`NetworkPrimitives`] associated
Expand Down Expand Up @@ -66,19 +72,23 @@ where
/// Basic implementation of [`NetworkPrimitives`] combining [`NodePrimitives`] and a pooled
/// transaction.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
pub struct BasicNetworkPrimitives<N, Pooled>(core::marker::PhantomData<(N, Pooled)>);
pub struct BasicNetworkPrimitives<N: NodePrimitives, Pooled, NewBlock = crate::NewBlock<BlockTy<N>>>(
core::marker::PhantomData<(N, Pooled, NewBlock)>,
);

impl<N, Pooled> NetworkPrimitives for BasicNetworkPrimitives<N, Pooled>
impl<N, Pooled, NewBlock> NetworkPrimitives for BasicNetworkPrimitives<N, Pooled, NewBlock>
where
N: NodePrimitives,
Pooled: SignedTransaction + TryFrom<N::SignedTx> + 'static,
NewBlock: NewBlockPayload<Block = N::Block>,
{
type BlockHeader = N::BlockHeader;
type BlockBody = N::BlockBody;
type Block = N::Block;
type BroadcastedTransaction = N::SignedTx;
type PooledTransaction = Pooled;
type Receipt = N::Receipt;
type NewBlockPayload = NewBlock;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this use NewBlock<N::Block>?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewBlock here is a generic

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah gotcha

}

/// Network primitive types used by Ethereum networks.
Expand Down
6 changes: 3 additions & 3 deletions crates/net/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub struct NetworkConfig<C, N: NetworkPrimitives = EthNetworkPrimitives> {
/// first hardfork, `Frontier` for mainnet.
pub fork_filter: ForkFilter,
/// The block importer type.
pub block_import: Box<dyn BlockImport<N::Block>>,
pub block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
/// The default mode of the network.
pub network_mode: NetworkMode,
/// The executor to use for spawning tasks.
Expand Down Expand Up @@ -209,7 +209,7 @@ pub struct NetworkConfigBuilder<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Whether tx gossip is disabled
tx_gossip_disabled: bool,
/// The block importer type
block_import: Option<Box<dyn BlockImport<N::Block>>>,
block_import: Option<Box<dyn BlockImport<N::NewBlockPayload>>>,
/// How to instantiate transactions manager.
transactions_manager_config: TransactionsManagerConfig,
/// The NAT resolver for external IP
Expand Down Expand Up @@ -536,7 +536,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
}

/// Sets the block import type.
pub fn block_import(mut self, block_import: Box<dyn BlockImport<N::Block>>) -> Self {
pub fn block_import(mut self, block_import: Box<dyn BlockImport<N::NewBlockPayload>>) -> Self {
self.block_import = Some(block_import);
self
}
Expand Down
5 changes: 3 additions & 2 deletions crates/net/network/src/import.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This module provides an abstraction over block import in the form of the `BlockImport` trait.

use crate::message::NewBlockMessage;
use reth_eth_wire::NewBlock;
use reth_eth_wire_types::broadcast::NewBlockHashes;
use reth_network_peers::PeerId;
use std::{
Expand All @@ -9,7 +10,7 @@ use std::{
};

/// Abstraction over block import.
pub trait BlockImport<B = reth_ethereum_primitives::Block>: std::fmt::Debug + Send + Sync {
pub trait BlockImport<B = NewBlock>: std::fmt::Debug + Send + Sync {
/// Invoked for a received block announcement from the peer.
///
/// For a `NewBlock` message:
Expand All @@ -27,7 +28,7 @@ pub trait BlockImport<B = reth_ethereum_primitives::Block>: std::fmt::Debug + Se

/// Represents different types of block announcement events from the network.
#[derive(Debug, Clone)]
pub enum NewBlockEvent<B = reth_ethereum_primitives::Block> {
pub enum NewBlockEvent<B = NewBlock> {
/// A new full block announcement
Block(NewBlockMessage<B>),
/// Only the hashes of new blocks
Expand Down
4 changes: 2 additions & 2 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Receiver half of the command channel set up between this type and the [`NetworkHandle`]
from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
/// Handles block imports according to the `eth` protocol.
block_import: Box<dyn BlockImport<N::Block>>,
block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
/// Sender for high level network events.
event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
/// Sender half to send events to the
Expand Down Expand Up @@ -523,7 +523,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
}

/// Invoked after a `NewBlock` message from the peer was validated
fn on_block_import_result(&mut self, event: BlockImportEvent<N::Block>) {
fn on_block_import_result(&mut self, event: BlockImportEvent<N::NewBlockPayload>) {
match event {
BlockImportEvent::Announcement(validation) => match validation {
BlockValidation::ValidHeader { block } => {
Expand Down
15 changes: 8 additions & 7 deletions crates/net/network/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ use futures::FutureExt;
use reth_eth_wire::{
message::RequestPair, BlockBodies, BlockHeaders, BlockRangeUpdate, EthMessage,
EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives, NewBlock,
NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts,
SharedTransactions, Transactions,
NewBlockHashes, NewBlockPayload, NewPooledTransactionHashes, NodeData, PooledTransactions,
Receipts, SharedTransactions, Transactions,
};
use reth_eth_wire_types::RawCapabilityMessage;
use reth_network_api::PeerRequest;
use reth_network_p2p::error::{RequestError, RequestResult};
use reth_primitives_traits::Block;
use std::{
sync::Arc,
task::{ready, Context, Poll},
Expand All @@ -24,19 +25,19 @@ use tokio::sync::oneshot;

/// Internal form of a `NewBlock` message
#[derive(Debug, Clone)]
pub struct NewBlockMessage<B = reth_ethereum_primitives::Block> {
pub struct NewBlockMessage<P = NewBlock<reth_ethereum_primitives::Block>> {
/// Hash of the block
pub hash: B256,
/// Raw received message
pub block: Arc<NewBlock<B>>,
pub block: Arc<P>,
}

// === impl NewBlockMessage ===

impl<B: reth_primitives_traits::Block> NewBlockMessage<B> {
impl<P: NewBlockPayload> NewBlockMessage<P> {
/// Returns the block number of the block
pub fn number(&self) -> u64 {
self.block.block.header().number()
self.block.block().header().number()
}
}

Expand All @@ -47,7 +48,7 @@ pub enum PeerMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Announce new block hashes
NewBlockHashes(NewBlockHashes),
/// Broadcast new block.
NewBlock(NewBlockMessage<N::Block>),
NewBlock(NewBlockMessage<N::NewBlockPayload>),
/// Received transactions _from_ the peer
ReceivedTransaction(Transactions<N::BroadcastedTransaction>),
/// Broadcast transactions _from_ local _to_ a peer.
Expand Down
6 changes: 3 additions & 3 deletions crates/net/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use parking_lot::Mutex;
use reth_discv4::{Discv4, NatResolver};
use reth_discv5::Discv5;
use reth_eth_wire::{
BlockRangeUpdate, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives, NewBlock,
BlockRangeUpdate, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
NewPooledTransactionHashes, SharedTransactions,
};
use reth_ethereum_forks::Head;
Expand Down Expand Up @@ -116,7 +116,7 @@ impl<N: NetworkPrimitives> NetworkHandle<N> {
/// Caution: in `PoS` this is a noop because new blocks are no longer announced over devp2p.
/// Instead they are sent to the node by CL and can be requested over devp2p.
/// Broadcasting new blocks is considered a protocol violation.
pub fn announce_block(&self, block: NewBlock<N::Block>, hash: B256) {
pub fn announce_block(&self, block: N::NewBlockPayload, hash: B256) {
self.send_message(NetworkHandleMessage::AnnounceBlock(block, hash))
}

Expand Down Expand Up @@ -484,7 +484,7 @@ pub(crate) enum NetworkHandleMessage<N: NetworkPrimitives = EthNetworkPrimitives
/// Disconnects a connection to a peer if it exists, optionally providing a disconnect reason.
DisconnectPeer(PeerId, Option<DisconnectReason>),
/// Broadcasts an event to announce a new block to all nodes.
AnnounceBlock(NewBlock<N::Block>, B256),
AnnounceBlock(N::NewBlockPayload, B256),
/// Sends a list of transactions to the given peer.
SendTransaction {
/// The ID of the peer to which the transactions are sent.
Expand Down
8 changes: 5 additions & 3 deletions crates/net/network/src/session/active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use metrics::Gauge;
use reth_eth_wire::{
errors::{EthHandshakeError, EthStreamError},
message::{EthBroadcastMessage, RequestPair},
Capabilities, DisconnectP2P, DisconnectReason, EthMessage, NetworkPrimitives,
Capabilities, DisconnectP2P, DisconnectReason, EthMessage, NetworkPrimitives, NewBlockPayload,
};
use reth_eth_wire_types::RawCapabilityMessage;
use reth_metrics::common::mpsc::MeteredPollSender;
Expand Down Expand Up @@ -201,8 +201,10 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
self.try_emit_broadcast(PeerMessage::NewBlockHashes(msg)).into()
}
EthMessage::NewBlock(msg) => {
let block =
NewBlockMessage { hash: msg.block.header().hash_slow(), block: Arc::new(*msg) };
let block = NewBlockMessage {
hash: msg.block().header().hash_slow(),
block: Arc::new(*msg),
};
self.try_emit_broadcast(PeerMessage::NewBlock(block)).into()
}
EthMessage::Transactions(msg) => {
Expand Down
12 changes: 6 additions & 6 deletions crates/net/network/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use alloy_primitives::B256;
use rand::seq::SliceRandom;
use reth_eth_wire::{
BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
NewBlockHashes, UnifiedStatus,
NewBlockHashes, NewBlockPayload, UnifiedStatus,
};
use reth_ethereum_forks::ForkId;
use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
Expand Down Expand Up @@ -185,12 +185,12 @@ impl<N: NetworkPrimitives> NetworkState<N> {
/// > the total number of peers) using the `NewBlock` message.
///
/// See also <https://github.com/ethereum/devp2p/blob/master/caps/eth.md>
pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage<N::Block>) {
pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
// send a `NewBlock` message to a fraction of the connected peers (square root of the total
// number of peers)
let num_propagate = (self.active_peers.len() as f64).sqrt() as u64 + 1;

let number = msg.block.block.header().number();
let number = msg.block.block().header().number();
let mut count = 0;

// Shuffle to propagate to a random sample of peers on every block announcement
Expand Down Expand Up @@ -227,8 +227,8 @@ impl<N: NetworkPrimitives> NetworkState<N> {

/// Completes the block propagation process started in [`NetworkState::announce_new_block()`]
/// but sending `NewBlockHash` broadcast to all peers that haven't seen it yet.
pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage<N::Block>) {
let number = msg.block.block.header().number();
pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
let number = msg.block.block().header().number();
let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]);
for (peer_id, peer) in &mut self.active_peers {
if peer.blocks.contains(&msg.hash) {
Expand Down Expand Up @@ -524,7 +524,7 @@ pub(crate) enum StateAction<N: NetworkPrimitives> {
/// Target of the message
peer_id: PeerId,
/// The `NewBlock` message
block: NewBlockMessage<N::Block>,
block: NewBlockMessage<N::NewBlockPayload>,
},
NewBlockHashes {
/// Target of the message
Expand Down
9 changes: 7 additions & 2 deletions examples/bsc-p2p/src/block_import/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![allow(unused)]
use handle::ImportHandle;
use reth_engine_primitives::EngineTypes;
use reth_eth_wire::NewBlock;
use reth_network::import::{BlockImport, BlockImportOutcome, NewBlockEvent};
use reth_network_peers::PeerId;
use reth_payload_primitives::{BuiltPayload, PayloadTypes};
Expand All @@ -25,8 +26,12 @@ impl<T: PayloadTypes> BscBlockImport<T> {
}
}

impl<T: PayloadTypes> BlockImport<BscBlock<T>> for BscBlockImport<T> {
fn on_new_block(&mut self, peer_id: PeerId, incoming_block: NewBlockEvent<BscBlock<T>>) {
impl<T: PayloadTypes> BlockImport<NewBlock<BscBlock<T>>> for BscBlockImport<T> {
fn on_new_block(
&mut self,
peer_id: PeerId,
incoming_block: NewBlockEvent<NewBlock<BscBlock<T>>>,
) {
if let NewBlockEvent::Block(block) = incoming_block {
let _ = self.handle.send_block(block, peer_id);
}
Expand Down
Loading
Loading