From 67633cf7d63016ad3b2ba5339b7defa715835ebb Mon Sep 17 00:00:00 2001 From: linning Date: Sat, 26 Jul 2025 04:02:29 +0800 Subject: [PATCH 1/3] Export items that is required by the fork chain command Signed-off-by: linning --- crates/subspace-node/src/commands/run.rs | 6 ++-- .../src/commands/run/consensus.rs | 28 +++++++-------- .../subspace-node/src/commands/run/domain.rs | 36 +++++++++---------- crates/subspace-service/src/lib.rs | 7 ++++ .../client/block-builder/src/custom_api.rs | 21 +++++------ domains/client/block-builder/src/lib.rs | 4 +-- 6 files changed, 53 insertions(+), 49 deletions(-) diff --git a/crates/subspace-node/src/commands/run.rs b/crates/subspace-node/src/commands/run.rs index a7ad4b802d2..b31494ffbe9 100644 --- a/crates/subspace-node/src/commands/run.rs +++ b/crates/subspace-node/src/commands/run.rs @@ -1,5 +1,5 @@ -mod consensus; -mod domain; +pub mod consensus; +pub mod domain; mod shared; use crate::commands::run::consensus::{ @@ -57,7 +57,7 @@ pub struct RunOptions { domain_args: Vec, } -fn raise_fd_limit() { +pub fn raise_fd_limit() { match fdlimit::raise_fd_limit() { Ok(fdlimit::Outcome::LimitRaised { from, to }) => { debug!( diff --git a/crates/subspace-node/src/commands/run/consensus.rs b/crates/subspace-node/src/commands/run/consensus.rs index 0089789db32..c5cf87ba301 100644 --- a/crates/subspace-node/src/commands/run/consensus.rs +++ b/crates/subspace-node/src/commands/run/consensus.rs @@ -365,12 +365,12 @@ impl fmt::Display for CreateObjectMappingConfig { /// Options for running a node #[derive(Debug, Parser)] -pub(super) struct ConsensusChainOptions { +pub(crate) struct ConsensusChainOptions { /// Base path to store node files. /// /// Required unless --dev mode is used. #[arg(long)] - base_path: Option, + pub base_path: Option, /// The chain specification. /// @@ -491,24 +491,24 @@ pub(super) struct ConsensusChainOptions { pub trie_cache_params: TrieCacheParams, } -pub(super) struct PrometheusConfiguration { - pub(super) listen_on: SocketAddr, - pub(super) prometheus_registry: Registry, - pub(super) substrate_registry: substrate_prometheus_endpoint::Registry, +pub(crate) struct PrometheusConfiguration { + pub(crate) listen_on: SocketAddr, + pub(crate) prometheus_registry: Registry, + pub(crate) substrate_registry: substrate_prometheus_endpoint::Registry, } -pub(super) struct ConsensusChainConfiguration { - pub(super) maybe_tmp_dir: Option, - pub(super) subspace_configuration: SubspaceConfiguration, - pub(super) dev: bool, +pub(crate) struct ConsensusChainConfiguration { + pub(crate) maybe_tmp_dir: Option, + pub(crate) subspace_configuration: SubspaceConfiguration, + pub(crate) dev: bool, /// External entropy, used initially when PoT chain starts to derive the first seed - pub(super) pot_external_entropy: Vec, - pub(super) storage_monitor: StorageMonitorParams, - pub(super) prometheus_configuration: Option, + pub(crate) pot_external_entropy: Vec, + pub(crate) storage_monitor: StorageMonitorParams, + pub(crate) prometheus_configuration: Option, } #[expect(clippy::result_large_err, reason = "Comes from Substrate")] -pub(super) fn create_consensus_chain_configuration( +pub(crate) fn create_consensus_chain_configuration( consensus_node_options: ConsensusChainOptions, domains_enabled: bool, ) -> Result { diff --git a/crates/subspace-node/src/commands/run/domain.rs b/crates/subspace-node/src/commands/run/domain.rs index 045d431114a..3ff1efbb25a 100644 --- a/crates/subspace-node/src/commands/run/domain.rs +++ b/crates/subspace-node/src/commands/run/domain.rs @@ -91,7 +91,7 @@ struct SubstrateNetworkOptions { /// Options for running a domain #[derive(Debug, Parser)] -pub(super) struct DomainOptions { +pub(crate) struct DomainOptions { /// ID of the domain to run #[clap(long)] domain_id: Option, @@ -181,15 +181,15 @@ struct PruningOptions { } #[derive(Debug)] -pub(super) struct DomainConfiguration { - pub(super) domain_config: Configuration, - pub(super) domain_id: DomainId, - pub(super) operator_id: Option, - pub(super) domain_type_args: Vec, +pub(crate) struct DomainConfiguration { + pub(crate) domain_config: Configuration, + pub(crate) domain_id: DomainId, + pub(crate) operator_id: Option, + pub(crate) domain_type_args: Vec, } #[expect(clippy::result_large_err, reason = "Comes from Substrate")] -pub(super) fn create_domain_configuration( +pub(crate) fn create_domain_configuration( consensus_chain_configuration: &Configuration, dev: bool, domain_options: DomainOptions, @@ -442,21 +442,21 @@ pub(super) fn create_domain_configuration( }) } -pub(super) struct DomainStartOptions { - pub(super) consensus_client: Arc>, - pub(super) consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory, - pub(super) consensus_network: Arc, - pub(super) block_importing_notification_stream: +pub(crate) struct DomainStartOptions { + pub(crate) consensus_client: Arc>, + pub(crate) consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory, + pub(crate) consensus_network: Arc, + pub(crate) block_importing_notification_stream: SubspaceNotificationStream>, - pub(super) pot_slot_info_stream: Receiver, - pub(super) consensus_network_sync_oracle: Arc>, - pub(super) domain_message_receiver: + pub(crate) pot_slot_info_stream: Receiver, + pub(crate) consensus_network_sync_oracle: Arc>, + pub(crate) domain_message_receiver: TracingUnboundedReceiver, - pub(super) gossip_message_sink: TracingUnboundedSender, - pub(super) domain_backend: Arc>, + pub(crate) gossip_message_sink: TracingUnboundedSender, + pub(crate) domain_backend: Arc>, } -pub(super) async fn run_domain( +pub(crate) async fn run_domain( bootstrap_result: BootstrapResult, domain_configuration: DomainConfiguration, domain_start_options: DomainStartOptions, diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index 05c22ff160b..e5e1042bc30 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -451,6 +451,8 @@ where pub sync_target_block_number: Arc, /// Telemetry pub telemetry: Option, + /// Executor + pub executor: Arc, } type PartialComponents = sc_service::PartialComponents< @@ -658,6 +660,7 @@ where pot_verifier, sync_target_block_number, telemetry, + executor: Arc::new(executor), }; Ok(PartialComponents { @@ -724,6 +727,8 @@ where pub network_starter: NetworkStarter, /// Transaction pool. pub transaction_pool: Arc>, + /// Executor + pub executor: Arc, } type FullNode = NewFull>; @@ -772,6 +777,7 @@ where pot_verifier, sync_target_block_number, mut telemetry, + executor, } = other; let offchain_indexing_enabled = config.base.offchain_worker.indexing_enabled; @@ -1360,6 +1366,7 @@ where archived_segment_notification_stream, network_starter, transaction_pool, + executor, }) } diff --git a/domains/client/block-builder/src/custom_api.rs b/domains/client/block-builder/src/custom_api.rs index 011d338402c..fb4617ec5a4 100644 --- a/domains/client/block-builder/src/custom_api.rs +++ b/domains/client/block-builder/src/custom_api.rs @@ -22,7 +22,7 @@ use subspace_runtime_primitives::ExtrinsicFor; type TrieBackendStorageFor = >>::TrieBackendStorage; -pub(crate) type TrieDeltaBackendFor<'a, State, Block> = TrieBackend< +pub type TrieDeltaBackendFor<'a, State, Block> = TrieBackend< DeltaBackend<'a, TrieBackendStorageFor, HashingFor>, HashingFor, >; @@ -201,7 +201,7 @@ where } } -pub(crate) struct TrieBackendApi, Exec> { +pub struct TrieBackendApi, Exec> { parent_hash: Block::Hash, parent_number: NumberFor, client: Arc, @@ -229,7 +229,7 @@ where Client: ExecutorProvider, Exec: CodeExecutor, { - pub(crate) fn new( + pub fn new( parent_hash: Block::Hash, parent_number: NumberFor, client: Arc, @@ -316,7 +316,7 @@ where self.maybe_storage_changes = Some(changes) } - pub(crate) fn initialize_block( + pub fn initialize_block( &self, header: Block::Header, backend: &TrieDeltaBackendFor, @@ -331,7 +331,7 @@ where ) } - pub(crate) fn apply_extrinsic( + pub fn apply_extrinsic( &self, extrinsic: ExtrinsicFor, backend: &TrieDeltaBackendFor, @@ -346,7 +346,7 @@ where ) } - pub(crate) fn finalize_block( + pub fn finalize_block( &self, backend: &TrieDeltaBackendFor, overlayed_changes: &mut OverlayedChanges>, @@ -359,7 +359,7 @@ where ) } - pub(crate) fn inherent_extrinsics( + pub fn inherent_extrinsics( &self, inherent: InherentData, backend: &TrieDeltaBackendFor, @@ -377,7 +377,7 @@ where /// Collect storage changes returns the storage changes and intermediate roots collected so far. /// The changes are reset after this call. /// Could return None if there were no execution done. - pub(crate) fn collect_storage_changes( + pub fn collect_storage_changes( &mut self, ) -> Option>> { let mut intermediate_roots = self.intermediate_roots.drain(..).collect::>(); @@ -395,10 +395,7 @@ where } } - pub(crate) fn execute_in_transaction( - &mut self, - call: F, - ) -> Result + pub fn execute_in_transaction(&mut self, call: F) -> Result where F: FnOnce( &Self, diff --git a/domains/client/block-builder/src/lib.rs b/domains/client/block-builder/src/lib.rs index 626e5d8e583..8311a1cb94b 100644 --- a/domains/client/block-builder/src/lib.rs +++ b/domains/client/block-builder/src/lib.rs @@ -24,12 +24,12 @@ //! The block builder utility is used in the node as an abstraction over the runtime api to //! initialize a block, to push extrinsics and to finalize a block. -#![warn(missing_docs)] +#![allow(missing_docs)] mod custom_api; mod genesis_block_builder; -use crate::custom_api::{TrieBackendApi, TrieDeltaBackendFor}; +pub use crate::custom_api::{TrieBackendApi, TrieDeltaBackendFor}; pub use custom_api::{CollectedStorageChanges, DeltaBackend, create_delta_backend}; pub use genesis_block_builder::CustomGenesisBlockBuilder; use parity_scale_codec::Encode; From 027e039180846e8e67e3aa47425608d0fe4aef38 Mon Sep 17 00:00:00 2001 From: linning Date: Sat, 26 Jul 2025 04:06:56 +0800 Subject: [PATCH 2/3] Init the fork chain command Mostly port from the run command Signed-off-by: linning --- crates/subspace-node/src/cli.rs | 5 +- crates/subspace-node/src/commands.rs | 2 + crates/subspace-node/src/commands/fork.rs | 433 ++++++++++++++++++++++ crates/subspace-node/src/main.rs | 3 + 4 files changed, 442 insertions(+), 1 deletion(-) create mode 100644 crates/subspace-node/src/commands/fork.rs diff --git a/crates/subspace-node/src/cli.rs b/crates/subspace-node/src/cli.rs index 7463f3ad428..55e50a81f2c 100644 --- a/crates/subspace-node/src/cli.rs +++ b/crates/subspace-node/src/cli.rs @@ -1,5 +1,5 @@ use crate::chain_spec; -use crate::commands::{RunOptions, WipeOptions}; +use crate::commands::{ForkOptions, RunOptions, WipeOptions}; use clap::Parser; use sc_chain_spec::GenericChainSpec; use sc_cli::SubstrateCli; @@ -13,6 +13,9 @@ pub enum Cli { /// Run blockchain node Run(RunOptions), + /// Fork existing blockchain + Fork(ForkOptions), + /// Build a chain specification. BuildSpec(sc_cli::BuildSpecCmd), diff --git a/crates/subspace-node/src/commands.rs b/crates/subspace-node/src/commands.rs index a0f4c68925d..09fda73fe0b 100644 --- a/crates/subspace-node/src/commands.rs +++ b/crates/subspace-node/src/commands.rs @@ -1,4 +1,5 @@ mod domain_key; +mod fork; mod run; mod shared; mod wipe; @@ -6,6 +7,7 @@ mod wipe; pub use domain_key::{ CreateDomainKeyOptions, InsertDomainKeyOptions, create_domain_key, insert_domain_key, }; +pub use fork::{ForkOptions, fork}; pub use run::{RunOptions, run}; pub(crate) use shared::set_exit_on_panic; pub use wipe::{WipeOptions, wipe}; diff --git a/crates/subspace-node/src/commands/fork.rs b/crates/subspace-node/src/commands/fork.rs new file mode 100644 index 00000000000..eafa4586ef7 --- /dev/null +++ b/crates/subspace-node/src/commands/fork.rs @@ -0,0 +1,433 @@ +use crate::commands::run::consensus::{ + ConsensusChainConfiguration, ConsensusChainOptions, create_consensus_chain_configuration, +}; +use crate::commands::run::domain::{ + DomainOptions, DomainStartOptions, create_domain_configuration, run_domain, +}; +use crate::commands::run::{ensure_block_and_state_pruning_params, raise_fd_limit}; +use crate::{Error, PosTable, set_default_ss58_version}; +use clap::Parser; +use cross_domain_message_gossip::GossipWorkerBuilder; +use domain_block_builder::{BlockBuilderApi, TrieBackendApi, TrieDeltaBackendFor}; +use domain_client_operator::fetch_domain_bootstrap_info; +use domain_client_operator::snap_sync::{ + BlockImportingAcknowledgement, ConsensusChainSyncParams, SnapSyncOrchestrator, +}; +use domain_runtime_primitives::opaque::Block as DomainBlock; +use frame_support::{Blake2_128Concat, StorageHasher}; +use frame_system::AccountInfo; +use futures::FutureExt; +use futures::stream::StreamExt; +use pallet_balances::AccountData; +use parity_scale_codec::Encode; +use sc_cli::Signals; +use sc_client_api::{BlockBackend, ExecutorProvider, HeaderBackend}; +use sc_consensus::block_import::{BlockImportParams, ForkChoiceStrategy}; +use sc_consensus::{BlockImport, StateAction, StorageChanges}; +use sc_consensus_slots::SlotProportion; +use sc_domains::domain_block_er::receipt_receiver::DomainBlockERReceiver; +use sc_storage_monitor::StorageMonitorService; +use sc_transaction_pool_api::OffchainTransactionPoolFactory; +use sc_utils::mpsc::tracing_unbounded; +use sp_api::{ApiError, CallApiAt, ProvideRuntimeApi}; +use sp_blockchain::ApplyExtrinsicFailed; +use sp_consensus::BlockOrigin; +use sp_consensus_subspace::SubspaceApi; +use sp_core::traits::{CodeExecutor, SpawnEssentialNamed}; +use sp_keyring::Sr25519Keyring; +use sp_messenger::messages::ChainId; +use sp_runtime::TransactionOutcome; +use sp_runtime::traits::{Block as BlockT, HashingFor, Header, One}; +use sp_state_machine::OverlayedChanges; +use std::path::Path; +use std::sync::Arc; +use std::{env, fs, io}; +use subspace_core_primitives::PublicKey; +use subspace_logging::init_logger; +use subspace_metrics::{RegistryAdapter, start_prometheus_metrics_server}; +use subspace_runtime::{Block, RuntimeApi}; +use subspace_runtime_primitives::{AI3, Nonce}; +use subspace_service::config::{ChainSyncMode, SubspaceNetworking}; +use tracing::{debug, error, info, info_span}; + +/// Options for running a node +#[derive(Debug, Parser)] +pub struct RunOptions { + /// Consensus chain options + #[clap(flatten)] + consensus: ConsensusChainOptions, + + /// Domain arguments + /// + /// The command-line arguments provided first will be passed to the embedded consensus node, + /// while the arguments provided after `--` will be passed to the domain node. + /// + /// subspace-node [consensus-chain-args] -- [domain-args] + #[arg(raw = true)] + domain_args: Vec, +} + +/// Default run command for node +#[tokio::main] +#[expect(clippy::result_large_err, reason = "Comes from Substrate")] +pub async fn run(run_options: RunOptions) -> Result<(), Error> { + init_logger(); + raise_fd_limit(); + let signals = Signals::capture()?; + + let RunOptions { + consensus, + domain_args, + } = run_options; + + let domain_options = (!domain_args.is_empty()) + .then(|| DomainOptions::parse_from(env::args().take(1).chain(domain_args))); + + let ConsensusChainConfiguration { + maybe_tmp_dir: _maybe_tmp_dir, + mut subspace_configuration, + dev, + pot_external_entropy, + storage_monitor, + mut prometheus_configuration, + } = create_consensus_chain_configuration(consensus, domain_options.is_some())?; + + let maybe_domain_configuration = domain_options + .map(|domain_options| { + create_domain_configuration(&subspace_configuration, dev, domain_options) + }) + .transpose()?; + + set_default_ss58_version(subspace_configuration.chain_spec.as_ref()); + + let base_path = subspace_configuration.base_path.path().to_path_buf(); + + info!("Subspace"); + info!("✌️ version {}", env!("SUBSTRATE_CLI_IMPL_VERSION")); + info!("❤️ by {}", env!("CARGO_PKG_AUTHORS")); + info!( + "📋 Chain specification: {}", + subspace_configuration.chain_spec.name() + ); + info!("🏷 Node name: {}", subspace_configuration.network.node_name); + info!("💾 Node path: {}", base_path.display()); + + let fork_id = subspace_configuration + .base + .chain_spec + .fork_id() + .map(String::from); + let snap_sync_orchestrator = if maybe_domain_configuration.is_some() + && subspace_configuration.sync == ChainSyncMode::Snap + { + Some(Arc::new(SnapSyncOrchestrator::new())) + } else { + None + }; + + if maybe_domain_configuration.is_some() { + // TODO: currently, we set consensus block and state pruning to challenge period + // when the node is running a domain node. + // But is there a situation when challenge period is not enough? + // If we do such a scenario, we would rather keep the consensus block and state pruning + // to archive-canonical + // we supress warning here since the consensus has different defaults when running without + // domain node, even if user do not override them, the default vaules are low enough to + // trigger a warning and confusing user. Instead supress them + ensure_block_and_state_pruning_params(&mut subspace_configuration.base, true) + } + + let mut task_manager = { + let subspace_link; + let consensus_chain_node = { + let span = info_span!("Consensus"); + let _enter = span.enter(); + + let partial_components = subspace_service::new_partial::( + &subspace_configuration, + match subspace_configuration.sync { + ChainSyncMode::Full => false, + ChainSyncMode::Snap => true, + }, + &pot_external_entropy, + ) + .map_err(|error| { + sc_service::Error::Other(format!( + "Failed to build a full subspace node 1: {error:?}" + )) + })?; + + subspace_link = partial_components.other.subspace_link.clone(); + + let full_node_fut = subspace_service::new_full::( + subspace_configuration, + partial_components, + prometheus_configuration + .as_mut() + .map(|prometheus_configuration| { + &mut prometheus_configuration.prometheus_registry + }), + true, + SlotProportion::new(3f32 / 4f32), + snap_sync_orchestrator + .as_ref() + .map(|orchestrator| orchestrator.consensus_snap_sync_target_block_receiver()), + ); + + full_node_fut.await.map_err(|error| { + sc_service::Error::Other(format!( + "Failed to build a full subspace node 3: {error:?}" + )) + })? + }; + + StorageMonitorService::try_spawn( + storage_monitor, + base_path, + &consensus_chain_node.task_manager.spawn_essential_handle(), + ) + .map_err(|error| { + sc_service::Error::Other(format!("Failed to start storage monitor: {error:?}")) + })?; + + // Run a domain + if let Some(mut domain_configuration) = maybe_domain_configuration { + ensure_block_and_state_pruning_params(&mut domain_configuration.domain_config, false); + let mut xdm_gossip_worker_builder = GossipWorkerBuilder::new(); + let gossip_message_sink = xdm_gossip_worker_builder.gossip_msg_sink(); + let (domain_message_sink, domain_message_receiver) = + tracing_unbounded("domain_message_channel", 100); + let (consensus_msg_sink, consensus_msg_receiver) = + tracing_unbounded("consensus_message_channel", 100); + + // Start XDM related workers for consensus chain + { + let span = info_span!("Consensus"); + let _enter = span.enter(); + + // Start cross domain message listener for Consensus chain to receive messages from domains in the network + let domain_code_executor: sc_domains::RuntimeExecutor = + sc_service::new_wasm_executor(&domain_configuration.domain_config.executor); + consensus_chain_node + .task_manager + .spawn_essential_handle() + .spawn_essential_blocking( + "consensus-message-listener", + None, + Box::pin( + cross_domain_message_gossip::start_cross_chain_message_listener::< + _, + _, + _, + _, + _, + _, + _, + >( + ChainId::Consensus, + consensus_chain_node.client.clone(), + consensus_chain_node.client.clone(), + consensus_chain_node.transaction_pool.clone(), + consensus_chain_node.network_service.clone(), + consensus_msg_receiver, + domain_code_executor.into(), + consensus_chain_node.sync_service.clone(), + ), + ), + ); + + consensus_chain_node + .task_manager + .spawn_essential_handle() + .spawn_essential_blocking( + "consensus-chain-channel-update-worker", + None, + Box::pin( + domain_client_message_relayer::worker::gossip_channel_updates::< + _, + _, + Block, + _, + >( + ChainId::Consensus, + consensus_chain_node.client.clone(), + consensus_chain_node.sync_service.clone(), + xdm_gossip_worker_builder.gossip_msg_sink(), + ), + ), + ); + + xdm_gossip_worker_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink); + xdm_gossip_worker_builder.push_chain_sink( + ChainId::Domain(domain_configuration.domain_id), + domain_message_sink, + ); + + let cross_domain_message_gossip_worker = xdm_gossip_worker_builder + .build::( + consensus_chain_node.network_service.clone(), + consensus_chain_node.xdm_gossip_notification_service, + consensus_chain_node.sync_service.clone(), + ); + + consensus_chain_node + .task_manager + .spawn_essential_handle() + .spawn_essential_blocking( + "cross-domain-gossip-message-worker", + None, + Box::pin(cross_domain_message_gossip_worker.run()), + ); + }; + + let domain_backend = { + let consensus_best_hash = consensus_chain_node.client.info().best_hash; + let chain_constants = consensus_chain_node + .client + .runtime_api() + .chain_constants(consensus_best_hash) + .map_err(|err| Error::Other(err.to_string()))?; + Arc::new( + sc_client_db::Backend::new( + domain_configuration.domain_config.db_config(), + chain_constants.confirmation_depth_k().into(), + ) + .map_err(|error| { + Error::Other(format!("Failed to create domain backend: {error:?}")) + })?, + ) + }; + + let domain_start_options = DomainStartOptions { + consensus_client: consensus_chain_node.client.clone(), + consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory::new( + consensus_chain_node.transaction_pool, + ), + consensus_network: consensus_chain_node.network_service.clone(), + block_importing_notification_stream: consensus_chain_node + .block_importing_notification_stream, + pot_slot_info_stream: consensus_chain_node.pot_slot_info_stream, + consensus_network_sync_oracle: consensus_chain_node.sync_service.clone(), + domain_message_receiver, + gossip_message_sink, + domain_backend, + }; + + consensus_chain_node + .task_manager + .spawn_essential_handle() + .spawn_essential_blocking("domain", Some("domains"), { + let consensus_chain_network_service = + consensus_chain_node.network_service.clone(); + let consensus_chain_sync_service = consensus_chain_node.sync_service.clone(); + Box::pin(async move { + let span = info_span!("Domain"); + let _enter = span.enter(); + + let maybe_snap_sync_params = + if let Some(snap_sync_orchestrator) = snap_sync_orchestrator { + let domain_block_er_receiver = DomainBlockERReceiver::new( + domain_configuration.domain_id, + fork_id, + domain_start_options.consensus_client.clone(), + consensus_chain_network_service, + consensus_chain_sync_service, + ); + + let maybe_last_confirmed_er = domain_block_er_receiver + .get_last_confirmed_domain_block_receipt() + .await; + + let Some(last_confirmed_er) = maybe_last_confirmed_er else { + error!("Failed to get last confirmed domain block ER"); + return; + }; + + // unblock consensus snap sync + snap_sync_orchestrator.unblock_consensus_snap_sync( + *last_confirmed_er.consensus_block_number(), + ); + + Some((snap_sync_orchestrator, last_confirmed_er)) + } else { + None + }; + + let bootstrap_result_fut = + fetch_domain_bootstrap_info::( + &*domain_start_options.consensus_client, + &*domain_start_options.domain_backend, + domain_configuration.domain_id, + ); + + let bootstrap_result = match bootstrap_result_fut.await { + Ok(bootstrap_result) => bootstrap_result, + Err(error) => { + error!(%error, "Domain bootstrapper exited with an error"); + return; + } + }; + + let consensus_chain_sync_params = maybe_snap_sync_params.map( + |(snap_sync_orchestrator, last_confirmed_er)| { + ConsensusChainSyncParams { + snap_sync_orchestrator, + last_domain_block_er: last_confirmed_er, + block_importing_notification_stream: Box::new( + subspace_link + .block_importing_notification_stream() + .subscribe() + .map(|block| BlockImportingAcknowledgement { + block_number: block.block_number, + acknowledgement_sender: block + .acknowledgement_sender, + }), + ), + } + }, + ); + + let start_domain = run_domain( + bootstrap_result, + domain_configuration, + domain_start_options, + consensus_chain_sync_params, + ); + + if let Err(error) = start_domain.await { + error!(%error, "Domain starter exited with an error"); + } + }) + }); + }; + + consensus_chain_node.network_starter.start_network(); + + if let Some(prometheus_configuration) = prometheus_configuration.take() { + let metrics_server = start_prometheus_metrics_server( + vec![prometheus_configuration.listen_on], + RegistryAdapter::Both( + prometheus_configuration.prometheus_registry, + prometheus_configuration.substrate_registry, + ), + ) + .map_err(|error| Error::SubspaceService(error.into()))? + .map(|error| { + debug!(?error, "Metrics server error."); + }); + + consensus_chain_node.task_manager.spawn_handle().spawn( + "metrics-server", + None, + metrics_server, + ); + }; + + consensus_chain_node.task_manager + }; + + signals + .run_until_signal(task_manager.future().fuse()) + .await + .map_err(Into::into) +} diff --git a/crates/subspace-node/src/main.rs b/crates/subspace-node/src/main.rs index bb6b6d59e47..5befbd725a1 100644 --- a/crates/subspace-node/src/main.rs +++ b/crates/subspace-node/src/main.rs @@ -124,6 +124,9 @@ fn main() -> Result<(), Error> { Cli::Run(run_options) => { commands::run(run_options)?; } + Cli::Fork(fork_options) => { + commands::fork(fork_options)?; + } Cli::BuildSpec(cmd) => { let runner = SubspaceCliPlaceholder.create_runner(&cmd)?; runner.sync_run(|config| cmd.run(config.chain_spec, config.network))? From 9944b150dd1f6713ff36f0626c8c14ec18dce965 Mon Sep 17 00:00:00 2001 From: linning Date: Sat, 26 Jul 2025 04:31:08 +0800 Subject: [PATCH 3/3] Impl the fork chain command Signed-off-by: linning --- Cargo.lock | 7 + crates/subspace-node/Cargo.toml | 7 + crates/subspace-node/src/commands/fork.rs | 278 +++++++++++++++++++++- 3 files changed, 288 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 543390cdb51..d832f45fbfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13743,6 +13743,7 @@ dependencies = [ "bip39", "clap", "cross-domain-message-gossip", + "domain-block-builder", "domain-client-message-relayer", "domain-client-operator", "domain-eth-service", @@ -13754,16 +13755,19 @@ dependencies = [ "frame-benchmarking", "frame-benchmarking-cli", "frame-support", + "frame-system", "futures", "hex", "hex-literal", "mimalloc", + "pallet-balances", "parity-scale-codec", "prometheus-client", "sc-chain-spec", "sc-cli", "sc-client-api", "sc-client-db", + "sc-consensus", "sc-consensus-slots", "sc-consensus-subspace", "sc-domains", @@ -13782,13 +13786,16 @@ dependencies = [ "serde_json", "sp-api", "sp-blockchain", + "sp-consensus", "sp-consensus-subspace", "sp-core", "sp-domain-digests", "sp-domains", + "sp-keyring", "sp-keystore", "sp-messenger", "sp-runtime", + "sp-state-machine", "subspace-core-primitives", "subspace-logging", "subspace-metrics", diff --git a/crates/subspace-node/Cargo.toml b/crates/subspace-node/Cargo.toml index faf89f33911..8c00fad6a96 100644 --- a/crates/subspace-node/Cargo.toml +++ b/crates/subspace-node/Cargo.toml @@ -27,6 +27,7 @@ domain-client-message-relayer.workspace = true domain-client-operator.workspace = true domain-eth-service.workspace = true domain-service.workspace = true +domain-block-builder.workspace = true domain-runtime-primitives.workspace = true evm-domain-runtime.workspace = true fdlimit.workspace = true @@ -34,16 +35,19 @@ fp-evm.workspace = true frame-benchmarking = { workspace = true, optional = true } frame-benchmarking-cli = { workspace = true, optional = true } frame-support.workspace = true +frame-system.workspace = true futures.workspace = true hex.workspace = true hex-literal.workspace = true mimalloc.workspace = true +pallet-balances.workspace = true parity-scale-codec.workspace = true prometheus-client.workspace = true sc-chain-spec.workspace = true sc-cli.workspace = true sc-client-api.workspace = true sc-client-db.workspace = true +sc-consensus.workspace = true sc-consensus-slots.workspace = true sc-consensus-subspace.workspace = true sc-domains.workspace = true @@ -62,13 +66,16 @@ serde = { workspace = true, features = ["derive"] } serde_json.workspace = true sp-api.workspace = true sp-blockchain.workspace = true +sp-consensus.workspace = true sp-consensus-subspace.workspace = true sp-core.workspace = true sp-domains.workspace = true sp-domain-digests.workspace = true sp-keystore.workspace = true +sp-keyring.workspace = true sp-messenger.workspace = true sp-runtime.workspace = true +sp-state-machine.workspace = true subspace-core-primitives.workspace = true subspace-logging.workspace = true subspace-metrics.workspace = true diff --git a/crates/subspace-node/src/commands/fork.rs b/crates/subspace-node/src/commands/fork.rs index eafa4586ef7..238ee1417a1 100644 --- a/crates/subspace-node/src/commands/fork.rs +++ b/crates/subspace-node/src/commands/fork.rs @@ -52,7 +52,11 @@ use tracing::{debug, error, info, info_span}; /// Options for running a node #[derive(Debug, Parser)] -pub struct RunOptions { +pub struct ForkOptions { + /// ID of the fork chain + #[arg(long, default_value_t = 0)] + fork_chain_id: usize, + /// Consensus chain options #[clap(flatten)] consensus: ConsensusChainOptions, @@ -70,19 +74,24 @@ pub struct RunOptions { /// Default run command for node #[tokio::main] #[expect(clippy::result_large_err, reason = "Comes from Substrate")] -pub async fn run(run_options: RunOptions) -> Result<(), Error> { +pub async fn fork(run_options: ForkOptions) -> Result<(), Error> { init_logger(); raise_fd_limit(); let signals = Signals::capture()?; - let RunOptions { - consensus, + let ForkOptions { + fork_chain_id, + mut consensus, domain_args, } = run_options; let domain_options = (!domain_args.is_empty()) .then(|| DomainOptions::parse_from(env::args().take(1).chain(domain_args))); + // Init the fork chain storage + let need_init_fork_chain = init_fork_storage(&mut consensus, fork_chain_id) + .map_err(|error| Error::Other(format!("Failed to create fork dir: {error:?}")))?; + let ConsensusChainConfiguration { maybe_tmp_dir: _maybe_tmp_dir, mut subspace_configuration, @@ -92,6 +101,16 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> { mut prometheus_configuration, } = create_consensus_chain_configuration(consensus, domain_options.is_some())?; + // Remove peer so the node won't sync from other peers in the existing network + subspace_configuration.base.network.boot_nodes = vec![]; + match subspace_configuration.subspace_networking { + SubspaceNetworking::Create { ref mut config } => config.bootstrap_nodes = vec![], + SubspaceNetworking::Reuse { + ref mut bootstrap_nodes, + .. + } => *bootstrap_nodes = vec![], + } + let maybe_domain_configuration = domain_options .map(|domain_options| { create_domain_configuration(&subspace_configuration, dev, domain_options) @@ -423,6 +442,17 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> { ); }; + if need_init_fork_chain { + init_fork_chain::<_, _, _, _>( + consensus_chain_node.client.clone(), + consensus_chain_node.backend.clone(), + consensus_chain_node.executor.clone(), + fork_chain_id, + ) + .await + .map_err(|error| Error::Other(format!("Failed to import mock block: {error:?}")))?; + } + consensus_chain_node.task_manager }; @@ -431,3 +461,243 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> { .await .map_err(Into::into) } + +/// Initialize the fork chain storage by cloning the data directory of the local existing chain +/// +/// TODO: support using snap sync to download state from existing network to initialize the fork storage +pub fn init_fork_storage( + config: &mut ConsensusChainOptions, + fork_chain_id: usize, +) -> Result> { + let base_path = match &config.base_path { + Some(path) => path.clone(), + None => return Err("--base-path is required".to_string().into()), + }; + let fork_path = base_path.join(format!("fork-{fork_chain_id}")); + let tmp_fork_path = base_path.join(format!("tmp_fork-{fork_chain_id}")); + + // The fork directory exist means the storage is already initialized from the + // previous run + if fs::exists(&fork_path)? { + config.base_path = Some(fork_path); + info!( + ?fork_chain_id, + "Fork chain already initialized, reset `base_path` config to `fork-{fork_chain_id}` and skip initializing fork storage" + ); + return Ok(false); + } + + // The `fork` directory not exist but the `tmp_fork` directory does means the + // previous storage initialization is not completed, in this case, clean up all + // partial storage and retry again. + if fs::exists(&tmp_fork_path)? { + fs::remove_dir_all(&tmp_fork_path)?; + } + + // Initialize storage for the fork chain + fs::create_dir_all(&tmp_fork_path)?; + for dir in ["db", "domain"] { + let base_data_dir = base_path.join(dir); + let fork_data_dir = tmp_fork_path.join(dir); + if !fs::exists(&base_data_dir)? { + continue; + } + info!( + from = ?base_data_dir, + to = ?fork_data_dir, + "Cloning data for the fork..." + ); + copy_dir_all(&base_data_dir, &fork_data_dir)?; + } + + // Rename the `tmp_fork` dir to `fork` to mark the storage initialization completed. + info!( + from = ?tmp_fork_path, + to = ?fork_path, + "Rename the data base path of the fork" + ); + fs::rename(&tmp_fork_path, &fork_path)?; + + config.base_path = Some(fork_path); + + Ok(true) +} + +fn copy_dir_all(src: impl AsRef, dst: impl AsRef) -> io::Result<()> { + if !fs::exists(&dst)? { + fs::create_dir_all(&dst)?; + } + for entry in fs::read_dir(&src)? { + let entry = entry?; + let ty = entry.file_type()?; + if ty.is_dir() { + copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?; + } else { + fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?; + } + } + Ok(()) +} + +/// Initialize the fork chain +/// +/// This is done by modifying the best block of the existing chain with changes: +/// +/// - Reset sudo key to `Alice` and set initial balance for `Alice` (needed for tx fee), so +/// we can do all kind of test/experiment on the fork chain with the sudo key +/// +/// - Reset solution range and enable `AllowAuthoringByAnyone`, so local farmer can win slot +/// and produce block regardless of the pledged storage of the existing chain +/// +/// and then force import this block and make it the new best block, any new block produced by +/// the local node will extend this block. +async fn init_fork_chain( + client: Arc, + backend: Arc, + executor: Arc, + fork_chain_id: usize, +) -> Result<(), Box> +where + Block: BlockT, + Client: ProvideRuntimeApi + + ExecutorProvider + + BlockImport + + BlockBackend + + HeaderBackend + + CallApiAt, + Client::Api: BlockBuilderApi + SubspaceApi, + Backend: sc_client_api::backend::Backend, + Exec: CodeExecutor, +{ + let chain_info = client.info(); + let (best_hash, best_number) = (chain_info.best_hash, chain_info.best_number); + + let best_header = client + .header(best_hash)? + .ok_or_else(|| ApiError::UnknownBlock(format!("Block header {best_hash} not found")))?; + + let digest = best_header.digest().clone(); + let block_body = client + .block_body(best_hash)? + .ok_or_else(|| ApiError::UnknownBlock(format!("Block body {best_hash} not found")))?; + + let (parent_hash, parent_number) = (best_header.parent_hash(), best_number - One::one()); + + info!( + ?fork_chain_id, + "Fork chain from parent block {parent_number}@{parent_hash}, previous best block {best_number}@{best_hash}" + ); + + let mut api = TrieBackendApi::new( + *parent_hash, + parent_number, + client.clone(), + backend.clone(), + executor.clone(), + )?; + + api.execute_in_transaction( + |api: &TrieBackendApi, + backend: &TrieDeltaBackendFor, + overlayed_changes: &mut OverlayedChanges>| { + // TODO: maybe make it configurable + let sudo_account = Sr25519Keyring::Alice.to_account_id(); + + // Reset sudo key + let sudo_storage_key = + frame_support::storage::storage_prefix("Sudo".as_bytes(), "Key".as_bytes()) + .to_vec(); + overlayed_changes.set_storage(sudo_storage_key, Some(sudo_account.encode())); + + // Set initial balance for the sudo account (needed for tx fee) + let account_info: AccountInfo = { + let account_data = AccountData { + free: 1000 * AI3, + ..Default::default() + }; + AccountInfo { + providers: 1, + data: account_data, + ..Default::default() + } + }; + let account_storage_key = { + let mut storage_key = frame_support::storage::storage_prefix( + "System".as_bytes(), + "Account".as_bytes(), + ) + .to_vec(); + storage_key.extend_from_slice(&sudo_account.using_encoded(Blake2_128Concat::hash)); + storage_key + }; + overlayed_changes.set_storage(account_storage_key, Some(account_info.encode())); + + // Reset solution range + let solution_range_storage_key = frame_support::storage::storage_prefix( + "Subspace".as_bytes(), + "SolutionRanges".as_bytes(), + ) + .to_vec(); + overlayed_changes.set_storage(solution_range_storage_key, None); + + // Reset `AllowAuthoringByAnyone` + let allow_authoring_by_storage_key = frame_support::storage::storage_prefix( + "Subspace".as_bytes(), + "AllowAuthoringByAnyone".as_bytes(), + ) + .to_vec(); + overlayed_changes.set_storage(allow_authoring_by_storage_key, Some(true.encode())); + + let header = ::Header::new( + parent_number + One::one(), + Default::default(), + Default::default(), + *parent_hash, + digest, + ); + match api.initialize_block(header, backend, overlayed_changes) { + Ok(_) => TransactionOutcome::Commit(Ok(())), + Err(e) => TransactionOutcome::Rollback(Err(e)), + } + }, + )?; + + for tx in &block_body { + api.execute_in_transaction( + |api: &TrieBackendApi, + backend: &TrieDeltaBackendFor, + overlayed_changes: &mut OverlayedChanges>| { + match api.apply_extrinsic(tx.clone(), backend, overlayed_changes) { + Ok(Ok(_)) => TransactionOutcome::Commit(Ok(())), + Ok(Err(tx_validity)) => TransactionOutcome::Rollback(Err( + ApplyExtrinsicFailed::Validity(tx_validity).into(), + )), + Err(e) => TransactionOutcome::Rollback(Err(e)), + } + }, + )?; + } + let header = api.execute_in_transaction( + |api: &TrieBackendApi, + backend: &TrieDeltaBackendFor, + overlayed_changes: &mut OverlayedChanges>| { + match api.finalize_block(backend, overlayed_changes) { + Ok(header) => TransactionOutcome::Commit(Ok(header)), + Err(e) => TransactionOutcome::Rollback(Err(e)), + } + }, + )?; + let state_changes = api.collect_storage_changes().unwrap().storage_changes; + + let block_import_params = { + let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); + import_block.body = Some(block_body); + import_block.state_action = + StateAction::ApplyChanges(StorageChanges::Changes(state_changes)); + import_block.fork_choice = Some(ForkChoiceStrategy::Custom(true)); + import_block + }; + client.import_block(block_import_params).await?; + + Ok(()) +}