Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/ethereum/node/tests/e2e/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ async fn test_reorg_through_backfill() -> eyre::Result<()> {
let head = first_provider.get_block_by_number(20.into()).await?.unwrap();
second_node.sync_to(head.header.hash).await?;

// Produce an unfinalized fork chain with 5 blocks
// Produce an unfinalized fork chain with 30 blocks
second_node.payload.timestamp = head.header.timestamp;
advance_with_random_transactions(&mut second_node, 10, &mut rng, false).await?;
advance_with_random_transactions(&mut second_node, 30, &mut rng, false).await?;

// Now reorg second node to the finalized canonical head
let head = first_provider.get_block_by_number(100.into()).await?.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ reth-testing-utils.workspace = true
test-utils = [
"reth-consensus/test-utils",
"reth-network-p2p/test-utils",
"reth-primitives-traits/test-utils",
"reth-provider/test-utils",
"reth-stages-types/test-utils",
"reth-primitives-traits/test-utils",
]
2 changes: 2 additions & 0 deletions crates/stages/api/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ impl<Provider> PipelineBuilder<Provider> {
progress: Default::default(),
metrics_tx,
fail_on_unwind,
last_detached_head_unwind_target: None,
detached_head_attempts: 0,
}
}
}
Expand Down
217 changes: 123 additions & 94 deletions crates/stages/api/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ pub use event::*;
use futures_util::Future;
use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, ChainStateBlockReader,
ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
StageCheckpointWriter,
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader,
ChainStateBlockReader, ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory,
StageCheckpointReader, StageCheckpointWriter,
};
use reth_prune::PrunerBuilder;
use reth_static_file::StaticFileProducer;
Expand Down Expand Up @@ -83,6 +83,12 @@ pub struct Pipeline<N: ProviderNodeTypes> {
/// Whether an unwind should fail the syncing process. Should only be set when downloading
/// blocks from trusted sources and expecting them to be valid.
fail_on_unwind: bool,
/// Block that was chosen as a target of the last unwind triggered by
/// [`StageError::DetachedHead`] error.
last_detached_head_unwind_target: Option<B256>,
/// Number of consecutive unwind attempts due to [`StageError::DetachedHead`] for the current
/// fork.
detached_head_attempts: u64,
}

impl<N: ProviderNodeTypes> Pipeline<N> {
Expand Down Expand Up @@ -110,6 +116,14 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
pub fn events(&self) -> EventStream<PipelineEvent> {
self.event_sender.new_listener()
}

/// Get a mutable reference to a stage by index.
pub fn stage(
&mut self,
idx: usize,
) -> &mut dyn Stage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW> {
&mut self.stages[idx]
}
}

impl<N: ProviderNodeTypes> Pipeline<N> {
Expand Down Expand Up @@ -383,8 +397,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
) -> Result<ControlFlow, PipelineError> {
let total_stages = self.stages.len();

let stage = &mut self.stages[stage_index];
let stage_id = stage.id();
let stage_id = self.stage(stage_index).id();
let mut made_progress = false;
let target = self.max_block.or(previous_stage);

Expand Down Expand Up @@ -422,10 +435,9 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
target,
});

if let Err(err) = stage.execute_ready(exec_input).await {
if let Err(err) = self.stage(stage_index).execute_ready(exec_input).await {
self.event_sender.notify(PipelineEvent::Error { stage_id });

match on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)? {
match self.on_stage_error(stage_id, prev_checkpoint, err)? {
Some(ctrl) => return Ok(ctrl),
None => continue,
};
Expand All @@ -443,7 +455,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
target,
});

match stage.execute(&provider_rw, exec_input) {
match self.stage(stage_index).execute(&provider_rw, exec_input) {
Ok(out @ ExecOutput { checkpoint, done }) => {
made_progress |=
checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
Expand All @@ -468,7 +480,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {

UnifiedStorageWriter::commit(provider_rw)?;

stage.post_execute_commit()?;
self.stage(stage_index).post_execute_commit()?;

if done {
let block_number = checkpoint.block_number;
Expand All @@ -483,101 +495,118 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
drop(provider_rw);
self.event_sender.notify(PipelineEvent::Error { stage_id });

if let Some(ctrl) =
on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)?
{
if let Some(ctrl) = self.on_stage_error(stage_id, prev_checkpoint, err)? {
return Ok(ctrl)
}
}
}
}
}
}

fn on_stage_error<N: ProviderNodeTypes>(
factory: &ProviderFactory<N>,
stage_id: StageId,
prev_checkpoint: Option<StageCheckpoint>,
err: StageError,
) -> Result<Option<ControlFlow>, PipelineError> {
if let StageError::DetachedHead { local_head, header, error } = err {
warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");

// We unwind because of a detached head.
let unwind_to =
local_head.block.number.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH).max(1);
Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
} else if let StageError::Block { block, error } = err {
match error {
BlockErrorKind::Validation(validation_error) => {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.block.number,
"Stage encountered a validation error: {validation_error}"
);

// FIXME: When handling errors, we do not commit the database transaction. This
// leads to the Merkle stage not clearing its checkpoint, and restarting from an
// invalid place.
let provider_rw = factory.database_provider_rw()?;
provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
provider_rw.save_stage_checkpoint(
StageId::MerkleExecute,
prev_checkpoint.unwrap_or_default(),
)?;

UnifiedStorageWriter::commit(provider_rw)?;

// We unwind because of a validation error. If the unwind itself
// fails, we bail entirely,
// otherwise we restart the execution loop from the
// beginning.
Ok(Some(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
}))
fn on_stage_error(
&mut self,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a lot nicer

stage_id: StageId,
prev_checkpoint: Option<StageCheckpoint>,
err: StageError,
) -> Result<Option<ControlFlow>, PipelineError> {
if let StageError::DetachedHead { local_head, header, error } = err {
warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");

if let Some(last_detached_head_unwind_target) = self.last_detached_head_unwind_target {
if local_head.block.hash == last_detached_head_unwind_target &&
header.block.number == local_head.block.number + 1
{
self.detached_head_attempts += 1;
} else {
self.detached_head_attempts = 1;
}
} else {
self.detached_head_attempts = 1;
}
BlockErrorKind::Execution(execution_error) => {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.block.number,
"Stage encountered an execution error: {execution_error}"
);

// We unwind because of an execution error. If the unwind itself
// fails, we bail entirely,
// otherwise we restart
// the execution loop from the beginning.
Ok(Some(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
}))
// We unwind because of a detached head.
let unwind_to = local_head
.block
.number
.saturating_sub(
BEACON_CONSENSUS_REORG_UNWIND_DEPTH.saturating_mul(self.detached_head_attempts),
)
.max(1);

self.last_detached_head_unwind_target = self.provider_factory.block_hash(unwind_to)?;
Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
} else if let StageError::Block { block, error } = err {
match error {
BlockErrorKind::Validation(validation_error) => {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.block.number,
"Stage encountered a validation error: {validation_error}"
);

// FIXME: When handling errors, we do not commit the database transaction. This
// leads to the Merkle stage not clearing its checkpoint, and restarting from an
// invalid place.
let provider_rw = self.provider_factory.database_provider_rw()?;
provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
provider_rw.save_stage_checkpoint(
StageId::MerkleExecute,
prev_checkpoint.unwrap_or_default(),
)?;

UnifiedStorageWriter::commit(provider_rw)?;

// We unwind because of a validation error. If the unwind itself
// fails, we bail entirely,
// otherwise we restart the execution loop from the
// beginning.
Ok(Some(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
}))
}
BlockErrorKind::Execution(execution_error) => {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.block.number,
"Stage encountered an execution error: {execution_error}"
);

// We unwind because of an execution error. If the unwind itself
// fails, we bail entirely,
// otherwise we restart
// the execution loop from the beginning.
Ok(Some(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
}))
}
}
}
} else if let StageError::MissingStaticFileData { block, segment } = err {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.block.number,
segment = %segment,
"Stage is missing static file data."
);
} else if let StageError::MissingStaticFileData { block, segment } = err {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.block.number,
segment = %segment,
"Stage is missing static file data."
);

Ok(Some(ControlFlow::Unwind { target: block.block.number - 1, bad_block: block }))
} else if err.is_fatal() {
error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
Err(err.into())
} else {
// On other errors we assume they are recoverable if we discard the
// transaction and run the stage again.
warn!(
target: "sync::pipeline",
stage = %stage_id,
"Stage encountered a non-fatal error: {err}. Retrying..."
);
Ok(None)
Ok(Some(ControlFlow::Unwind { target: block.block.number - 1, bad_block: block }))
} else if err.is_fatal() {
error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
Err(err.into())
} else {
// On other errors we assume they are recoverable if we discard the
// transaction and run the stage again.
warn!(
target: "sync::pipeline",
stage = %stage_id,
"Stage encountered a non-fatal error: {err}. Retrying..."
);
Ok(None)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/stages/api/src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,4 @@ pub trait StageExt<Provider>: Stage<Provider> {
}
}

impl<Provider, S: Stage<Provider>> StageExt<Provider> for S {}
impl<Provider, S: Stage<Provider> + ?Sized> StageExt<Provider> for S {}
Loading