diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 59cdd7ab9ff..c748bd0e5f9 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -778,6 +778,11 @@ impl VerifiedUnaggregatedAttestation { &self.attestation } + /// Returns the wrapped `indexed_attestation`. + pub fn indexed_attestation(&self) -> &IndexedAttestation { + &self.indexed_attestation + } + /// Returns a mutable reference to the underlying attestation. /// /// Only use during testing since modifying the `IndexedAttestation` can cause the attestation diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 38aa66737a3..030991ed6cd 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -23,6 +23,9 @@ use crate::persisted_fork_choice::PersistedForkChoice; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::snapshot_cache::SnapshotCache; use crate::timeout_rw_lock::TimeoutRwLock; +use crate::validator_monitor::{ + ValidatorMonitor, HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS, +}; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::BeaconForkChoiceStore; use crate::BeaconSnapshot; @@ -242,6 +245,8 @@ pub struct BeaconChain { pub(crate) graffiti: Graffiti, /// Optional slasher. pub slasher: Option>>, + /// Provides monitoring of a set of explicitly defined validators. + pub validator_monitor: RwLock>, } type BeaconBlockAndState = (BeaconBlock, BeaconState); @@ -1624,6 +1629,12 @@ impl BeaconChain { .map_err(|e| BlockError::BeaconChainError(e.into()))?; } + // Allow the validator monitor to learn about a new valid state. + self.validator_monitor + .write() + .process_valid_state(current_slot.epoch(T::EthSpec::slots_per_epoch()), &state); + let validator_monitor = self.validator_monitor.read(); + // Register each attestation in the block with the fork choice service. for attestation in &block.body.attestations[..] { let _fork_choice_attestation_timer = @@ -1641,8 +1652,35 @@ impl BeaconChain { Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()), Err(e) => Err(BlockError::BeaconChainError(e.into())), }?; + + // Only register this with the validator monitor when the block is sufficiently close to + // the current slot. + if VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 * T::EthSpec::slots_per_epoch() + + block.slot.as_u64() + >= current_slot.as_u64() + { + validator_monitor.register_attestation_in_block( + &indexed_attestation, + &block, + &self.spec, + ); + } } + for exit in &block.body.voluntary_exits { + validator_monitor.register_block_voluntary_exit(&exit.message) + } + + for slashing in &block.body.attester_slashings { + validator_monitor.register_block_attester_slashing(slashing) + } + + for slashing in &block.body.proposer_slashings { + validator_monitor.register_block_proposer_slashing(slashing) + } + + drop(validator_monitor); + metrics::observe( &metrics::OPERATIONS_PER_BLOCK_ATTESTATION, block.body.attestations.len() as f64, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index a03192a52e4..8b188549045 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -6,6 +6,7 @@ use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::ShufflingCache; use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE}; use crate::timeout_rw_lock::TimeoutRwLock; +use crate::validator_monitor::ValidatorMonitor; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::ChainConfig; use crate::{ @@ -26,8 +27,8 @@ use std::sync::Arc; use std::time::Duration; use store::{HotColdDB, ItemStore}; use types::{ - BeaconBlock, BeaconState, ChainSpec, EthSpec, Graffiti, Hash256, Signature, SignedBeaconBlock, - Slot, + BeaconBlock, BeaconState, ChainSpec, EthSpec, Graffiti, Hash256, PublicKeyBytes, Signature, + SignedBeaconBlock, Slot, }; pub const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz"; @@ -88,6 +89,7 @@ pub struct BeaconChainBuilder { log: Option, graffiti: Graffiti, slasher: Option>>, + validator_monitor: Option>, } impl @@ -126,6 +128,7 @@ where log: None, graffiti: Graffiti::default(), slasher: None, + validator_monitor: None, } } @@ -170,8 +173,8 @@ where /// Sets the logger. /// /// Should generally be called early in the build chain. - pub fn logger(mut self, logger: Logger) -> Self { - self.log = Some(logger); + pub fn logger(mut self, log: Logger) -> Self { + self.log = Some(log); self } @@ -391,6 +394,23 @@ where self } + /// Register some validators for additional monitoring. + /// + /// `validators` is a comma-separated string of 0x-formatted BLS pubkeys. + pub fn monitor_validators( + mut self, + auto_register: bool, + validators: Vec, + log: Logger, + ) -> Self { + self.validator_monitor = Some(ValidatorMonitor::new( + validators, + auto_register, + log.clone(), + )); + self + } + /// Consumes `self`, returning a `BeaconChain` if all required parameters have been supplied. /// /// An error will be returned at runtime if all required parameters have not been configured. @@ -418,6 +438,9 @@ where let genesis_state_root = self .genesis_state_root .ok_or("Cannot build without a genesis state root")?; + let mut validator_monitor = self + .validator_monitor + .ok_or("Cannot build without a validator monitor")?; let current_slot = if slot_clock .is_prior_to_genesis() @@ -496,6 +519,13 @@ where log.clone(), ); + if let Some(slot) = slot_clock.now() { + validator_monitor.process_valid_state( + slot.epoch(TEthSpec::slots_per_epoch()), + &canonical_head.beacon_state, + ); + } + let beacon_chain = BeaconChain { spec: self.spec, config: self.chain_config, @@ -538,6 +568,7 @@ where log: log.clone(), graffiti: self.graffiti, slasher: self.slasher.clone(), + validator_monitor: RwLock::new(validator_monitor), }; let head = beacon_chain @@ -706,6 +737,7 @@ mod test { .testing_slot_clock(Duration::from_secs(1)) .expect("should configure testing slot clock") .shutdown_sender(shutdown_tx) + .monitor_validators(true, vec![], log.clone()) .build() .expect("should build"); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index b4a3ab0403e..bdeee8e9743 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -23,6 +23,7 @@ mod shuffling_cache; mod snapshot_cache; pub mod test_utils; mod timeout_rw_lock; +pub mod validator_monitor; mod validator_pubkey_cache; pub use self::beacon_chain::{ diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index ae085ed585b..029c27da20c 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -355,6 +355,223 @@ lazy_static! { ); } +// Third lazy-static block is used to account for macro recursion limit. +lazy_static! { + /* + * Validator Monitor Metrics (balances, etc) + */ + pub static ref VALIDATOR_MONITOR_BALANCE_GWEI: Result = + try_create_int_gauge_vec( + "validator_monitor_balance_gwei", + "The validator's balance in gwei.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_EFFECTIVE_BALANCE_GWEI: Result = + try_create_int_gauge_vec( + "validator_monitor_effective_balance_gwei", + "The validator's effective balance in gwei.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_SLASHED: Result = + try_create_int_gauge_vec( + "validator_monitor_slashed", + "Set to 1 if the validator is slashed.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_ACTIVE: Result = + try_create_int_gauge_vec( + "validator_monitor_active", + "Set to 1 if the validator is active.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_EXITED: Result = + try_create_int_gauge_vec( + "validator_monitor_exited", + "Set to 1 if the validator is exited.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_WITHDRAWABLE: Result = + try_create_int_gauge_vec( + "validator_monitor_withdrawable", + "Set to 1 if the validator is withdrawable.", + &["validator"] + ); + pub static ref VALIDATOR_ACTIVATION_ELIGIBILITY_EPOCH: Result = + try_create_int_gauge_vec( + "validator_activation_eligibility_epoch", + "Set to the epoch where the validator will be eligible for activation.", + &["validator"] + ); + pub static ref VALIDATOR_ACTIVATION_EPOCH: Result = + try_create_int_gauge_vec( + "validator_activation_epoch", + "Set to the epoch where the validator will activate.", + &["validator"] + ); + pub static ref VALIDATOR_EXIT_EPOCH: Result = + try_create_int_gauge_vec( + "validator_exit_epoch", + "Set to the epoch where the validator will exit.", + &["validator"] + ); + pub static ref VALIDATOR_WITHDRAWABLE_EPOCH: Result = + try_create_int_gauge_vec( + "validator_withdrawable_epoch", + "Set to the epoch where the validator will be withdrawable.", + &["validator"] + ); + + /* + * Validator Monitor Metrics (per-epoch summaries) + */ + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_TOTAL: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_attestations_total", + "The number of unagg. attestations seen in the previous epoch.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_MIN_DELAY_SECONDS: Result = + try_create_histogram_vec( + "validator_monitor_prev_epoch_attestations_min_delay_seconds", + "The min delay between when the validator should send the attestation and when it was received.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_AGGREGATE_INCLUSIONS: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_attestation_aggregate_inclusions", + "The count of times an attestation was seen inside an aggregate.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_INCLUSIONS: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_attestation_block_inclusions", + "The count of times an attestation was seen inside a block.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_MIN_INCLUSION_DISTANCE: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_attestation_block_min_inclusion_distance", + "The minimum inclusion distance observed for the inclusion of an attestation in a block.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_BEACON_BLOCKS_TOTAL: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_beacon_blocks_total", + "The number of beacon_blocks seen in the previous epoch.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_BEACON_BLOCKS_MIN_DELAY_SECONDS: Result = + try_create_histogram_vec( + "validator_monitor_prev_epoch_beacon_blocks_min_delay_seconds", + "The min delay between when the validator should send the block and when it was received.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_AGGREGATES_TOTAL: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_aggregates_total", + "The number of aggregates seen in the previous epoch.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_AGGREGATES_MIN_DELAY_SECONDS: Result = + try_create_histogram_vec( + "validator_monitor_prev_epoch_aggregates_min_delay_seconds", + "The min delay between when the validator should send the aggregate and when it was received.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_EXITS_TOTAL: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_exits_total", + "The number of exits seen in the previous epoch.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_PROPOSER_SLASHINGS_TOTAL: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_proposer_slashings_total", + "The number of proposer slashings seen in the previous epoch.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTER_SLASHINGS_TOTAL: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_attester_slashings_total", + "The number of attester slashings seen in the previous epoch.", + &["validator"] + ); + + /* + * Validator Monitor Metrics (real-time) + */ + pub static ref VALIDATOR_MONITOR_VALIDATORS_TOTAL: Result = try_create_int_gauge( + "validator_monitor_validators_total", + "Count of validators that are specifically monitored by this beacon node" + ); + pub static ref VALIDATOR_MONITOR_UNAGGREGATED_ATTESTATION_TOTAL: Result = try_create_int_counter_vec( + "validator_monitor_unaggregated_attestation_total", + "Number of unaggregated attestations seen", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_UNAGGREGATED_ATTESTATION_DELAY_SECONDS: Result = try_create_histogram_vec( + "validator_monitor_unaggregated_attestation_delay_seconds", + "The delay between when the validator should send the attestation and when it was received.", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_AGGREGATED_ATTESTATION_TOTAL: Result = try_create_int_counter_vec( + "validator_monitor_aggregated_attestation_total", + "Number of aggregated attestations seen", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_AGGREGATED_ATTESTATION_DELAY_SECONDS: Result = try_create_histogram_vec( + "validator_monitor_aggregated_attestation_delay_seconds", + "The delay between then the validator should send the aggregate and when it was received.", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_ATTESTATION_IN_AGGREGATE_TOTAL: Result = try_create_int_counter_vec( + "validator_monitor_attestation_in_aggregate_total", + "Number of times an attestation has been seen in an aggregate", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_ATTESTATION_IN_AGGREGATE_DELAY_SECONDS: Result = try_create_histogram_vec( + "validator_monitor_attestation_in_aggregate_delay_seconds", + "The delay between when the validator should send the aggregate and when it was received.", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_TOTAL: Result = try_create_int_counter_vec( + "validator_monitor_attestation_in_block_total", + "Number of times an attestation has been seen in a block", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_DELAY_SLOTS: Result = try_create_int_gauge_vec( + "validator_monitor_attestation_in_block_delay_slots", + "The excess slots (beyond the minimum delay) between the attestation slot and the block slot.", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_BEACON_BLOCK_TOTAL: Result = try_create_int_counter_vec( + "validator_monitor_beacon_block_total", + "Number of beacon blocks seen", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_BEACON_BLOCK_DELAY_SECONDS: Result = try_create_histogram_vec( + "validator_monitor_beacon_block_delay_seconds", + "The delay between when the validator should send the block and when it was received.", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_EXIT_TOTAL: Result = try_create_int_counter_vec( + "validator_monitor_exit_total", + "Number of beacon exits seen", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_PROPOSER_SLASHING_TOTAL: Result = try_create_int_counter_vec( + "validator_monitor_proposer_slashing_total", + "Number of proposer slashings seen", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_ATTESTER_SLASHING_TOTAL: Result = try_create_int_counter_vec( + "validator_monitor_attester_slashing_total", + "Number of attester slashings seen", + &["src", "validator"] + ); + +} + /// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot, /// head state info, etc) and update the Prometheus `DEFAULT_REGISTRY`. pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { @@ -382,6 +599,11 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { &OP_POOL_NUM_VOLUNTARY_EXITS, beacon_chain.op_pool.num_voluntary_exits(), ); + + beacon_chain + .validator_monitor + .read() + .scrape_metrics(&beacon_chain.slot_clock, &beacon_chain.spec); } /// Scrape the given `state` assuming it's the head state, updating the `DEFAULT_REGISTRY`. diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index a2794335b86..753422238fe 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -198,7 +198,11 @@ impl BeaconChainHarness> { .expect("should configure testing slot clock") .shutdown_sender(shutdown_tx) .chain_config(chain_config) - .event_handler(Some(ServerSentEventHandler::new_with_capacity(log, 1))) + .event_handler(Some(ServerSentEventHandler::new_with_capacity( + log.clone(), + 1, + ))) + .monitor_validators(true, vec![], log) .build() .expect("should build"); @@ -243,6 +247,7 @@ impl BeaconChainHarness> { .testing_slot_clock(HARNESS_SLOT_TIME) .expect("should configure testing slot clock") .shutdown_sender(shutdown_tx) + .monitor_validators(true, vec![], log) .build() .expect("should build"); @@ -284,6 +289,7 @@ impl BeaconChainHarness> { .testing_slot_clock(Duration::from_secs(1)) .expect("should configure testing slot clock") .shutdown_sender(shutdown_tx) + .monitor_validators(true, vec![], log) .build() .expect("should build"); diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs new file mode 100644 index 00000000000..5cb37fa881c --- /dev/null +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -0,0 +1,953 @@ +//! Provides detailed logging and metrics for a set of registered validators. +//! +//! This component should not affect consensus. + +use crate::metrics; +use parking_lot::RwLock; +use slog::{crit, info, Logger}; +use slot_clock::SlotClock; +use std::collections::{HashMap, HashSet}; +use std::convert::TryFrom; +use std::io; +use std::marker::PhantomData; +use std::str::Utf8Error; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use types::{ + AttestationData, AttesterSlashing, BeaconBlock, BeaconState, ChainSpec, Epoch, EthSpec, + Hash256, IndexedAttestation, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof, Slot, + VoluntaryExit, +}; + +/// The validator monitor collects per-epoch data about each monitored validator. Historical data +/// will be kept around for `HISTORIC_EPOCHS` before it is pruned. +pub const HISTORIC_EPOCHS: usize = 4; + +#[derive(Debug)] +pub enum Error { + InvalidPubkey(String), + FileError(io::Error), + InvalidUtf8(Utf8Error), +} + +/// Contains data pertaining to one validator for one epoch. +#[derive(Default)] +struct EpochSummary { + /* + * Attestations with a target in the current epoch. + */ + /// The number of attestations seen. + pub attestations: usize, + /// The delay between when the attestation should have been produced and when it was observed. + pub attestation_min_delay: Option, + /// The number of times a validators attestation was seen in an aggregate. + pub attestation_aggregate_incusions: usize, + /// The number of times a validators attestation was seen in a block. + pub attestation_block_inclusions: usize, + /// The minimum observed inclusion distance for an attestation for this epoch.. + pub attestation_min_block_inclusion_distance: Option, + /* + * Blocks with a slot in the current epoch. + */ + /// The number of blocks observed. + pub blocks: usize, + /// The delay between when the block should have been produced and when it was observed. + pub block_min_delay: Option, + /* + * Aggregates with a target in the current epoch + */ + /// The number of signed aggregate and proofs observed. + pub aggregates: usize, + /// The delay between when the aggregate should have been produced and when it was observed. + pub aggregate_min_delay: Option, + /* + * Others pertaining to this epoch. + */ + /// The number of voluntary exists observed. + pub exits: usize, + /// The number of proposer slashings observed. + pub proposer_slashings: usize, + /// The number of attester slashings observed. + pub attester_slashings: usize, +} + +impl EpochSummary { + /// Update `current` if: + /// + /// - It is `None`. + /// - `new` is greater than its current value. + fn update_if_lt(current: &mut Option, new: T) { + if let Some(ref mut current) = current { + if new < *current { + *current = new + } + } else { + *current = Some(new) + } + } + + pub fn register_unaggregated_attestation(&mut self, delay: Duration) { + self.attestations += 1; + Self::update_if_lt(&mut self.attestation_min_delay, delay); + } + + pub fn register_aggregated_attestation(&mut self, delay: Duration) { + self.aggregates += 1; + Self::update_if_lt(&mut self.aggregate_min_delay, delay); + } + + pub fn register_aggregate_attestation_inclusion(&mut self) { + self.attestation_aggregate_incusions += 1; + } + + pub fn register_attestation_block_inclusion(&mut self, delay: Slot) { + self.attestation_block_inclusions += 1; + Self::update_if_lt(&mut self.attestation_min_block_inclusion_distance, delay); + } + + pub fn register_exit(&mut self) { + self.exits += 1; + } + + pub fn register_proposer_slashing(&mut self) { + self.proposer_slashings += 1; + } + + pub fn register_attester_slashing(&mut self) { + self.attester_slashings += 1; + } +} + +type SummaryMap = HashMap; + +/// A validator that is being monitored by the `ValidatorMonitor`. +struct MonitoredValidator { + /// A human-readable identifier for the validator. + pub id: String, + /// The validator voting pubkey. + pub pubkey: PublicKeyBytes, + /// The validator index in the state. + pub index: Option, + /// A history of the validator over time. + pub summaries: RwLock, +} + +impl MonitoredValidator { + fn new(pubkey: PublicKeyBytes, index: Option) -> Self { + Self { + id: index + .map(|i| i.to_string()) + .unwrap_or_else(|| pubkey.to_string()), + pubkey, + index, + summaries: <_>::default(), + } + } + + fn set_index(&mut self, validator_index: u64) { + if self.index.is_none() { + self.index = Some(validator_index); + self.id = validator_index.to_string(); + } + } + + /// Maps `func` across the `self.summaries`. + /// + /// ## Warning + /// + /// It is possible to deadlock this function by trying to obtain a lock on + /// `self.summary` inside `func`. + /// + /// ## Notes + /// + /// - If `epoch` doesn't exist in `self.summaries`, it is created. + /// - `self.summaries` may be pruned after `func` is run. + fn with_epoch_summary(&self, epoch: Epoch, func: F) + where + F: Fn(&mut EpochSummary), + { + let mut summaries = self.summaries.write(); + + func(summaries.entry(epoch).or_default()); + + // Prune + while summaries.len() > HISTORIC_EPOCHS { + if let Some(key) = summaries.iter().map(|(epoch, _)| *epoch).min() { + summaries.remove(&key); + } + } + } +} + +/// Holds a collection of `MonitoredValidator` and is notified about a variety of events on the P2P +/// network, HTTP API and `BeaconChain`. +/// +/// If any of the events pertain to a `MonitoredValidator`, additional logging and metrics will be +/// performed. +/// +/// The intention of this struct is to provide users with more logging and Prometheus metrics around +/// validators that they are interested in. +pub struct ValidatorMonitor { + /// The validators that require additional monitoring. + validators: HashMap, + /// A map of validator index (state.validators) to a validator public key. + indices: HashMap, + /// If true, allow the automatic registration of validators. + auto_register: bool, + log: Logger, + _phantom: PhantomData, +} + +impl ValidatorMonitor { + pub fn new(pubkeys: Vec, auto_register: bool, log: Logger) -> Self { + let mut s = Self { + validators: <_>::default(), + indices: <_>::default(), + auto_register, + log, + _phantom: PhantomData, + }; + for pubkey in pubkeys { + s.add_validator_pubkey(pubkey) + } + s + } + + /// Add some validators to `self` for additional monitoring. + fn add_validator_pubkey(&mut self, pubkey: PublicKeyBytes) { + let index_opt = self + .indices + .iter() + .find(|(_, candidate_pk)| **candidate_pk == pubkey) + .map(|(index, _)| *index); + + let log = self.log.clone(); + self.validators.entry(pubkey).or_insert_with(|| { + info!( + log, + "Started monitoring validator"; + "pubkey" => %pubkey, + ); + MonitoredValidator::new(pubkey, index_opt) + }); + } + + /// Reads information from the given `state`. The `state` *must* be valid (i.e, able to be + /// imported). + pub fn process_valid_state(&mut self, current_epoch: Epoch, state: &BeaconState) { + // Add any new validator indices. + state + .validators + .iter() + .enumerate() + .skip(self.indices.len()) + .for_each(|(i, validator)| { + let i = i as u64; + if let Some(validator) = self.validators.get_mut(&validator.pubkey) { + validator.set_index(i) + } + self.indices.insert(i, validator.pubkey); + }); + + // Update metrics for individual validators. + for monitored_validator in self.validators.values() { + if let Some(i) = monitored_validator.index { + let i = i as usize; + let id = &monitored_validator.id; + + if let Some(balance) = state.balances.get(i) { + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_BALANCE_GWEI, + &[id], + *balance as i64, + ); + } + + if let Some(validator) = state.validators.get(i) { + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_EFFECTIVE_BALANCE_GWEI, + &[id], + u64_to_i64(validator.effective_balance), + ); + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_SLASHED, + &[id], + if validator.slashed { 1 } else { 0 }, + ); + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_ACTIVE, + &[id], + if validator.is_active_at(current_epoch) { + 1 + } else { + 0 + }, + ); + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_EXITED, + &[id], + if validator.is_exited_at(current_epoch) { + 1 + } else { + 0 + }, + ); + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_WITHDRAWABLE, + &[id], + if validator.is_withdrawable_at(current_epoch) { + 1 + } else { + 0 + }, + ); + metrics::set_int_gauge( + &metrics::VALIDATOR_ACTIVATION_ELIGIBILITY_EPOCH, + &[id], + u64_to_i64(validator.activation_eligibility_epoch), + ); + metrics::set_int_gauge( + &metrics::VALIDATOR_ACTIVATION_EPOCH, + &[id], + u64_to_i64(validator.activation_epoch), + ); + metrics::set_int_gauge( + &metrics::VALIDATOR_EXIT_EPOCH, + &[id], + u64_to_i64(validator.exit_epoch), + ); + metrics::set_int_gauge( + &metrics::VALIDATOR_WITHDRAWABLE_EPOCH, + &[id], + u64_to_i64(validator.withdrawable_epoch), + ); + } + } + } + } + + fn get_validator_id(&self, validator_index: u64) -> Option<&str> { + self.indices + .get(&validator_index) + .and_then(|pubkey| self.validators.get(pubkey)) + .map(|validator| validator.id.as_str()) + } + + fn get_validator(&self, validator_index: u64) -> Option<&MonitoredValidator> { + self.indices + .get(&validator_index) + .and_then(|pubkey| self.validators.get(pubkey)) + } + + /// Returns the number of validators monitored by `self`. + pub fn num_validators(&self) -> usize { + self.validators.len() + } + + /// If `self.auto_register == true`, add the `validator_index` to `self.monitored_validators`. + /// Otherwise, do nothing. + pub fn auto_register_local_validator(&mut self, validator_index: u64) { + if !self.auto_register { + return; + } + + if let Some(pubkey) = self.indices.get(&validator_index) { + if !self.validators.contains_key(pubkey) { + info!( + self.log, + "Started monitoring validator"; + "pubkey" => %pubkey, + "validator" => %validator_index, + ); + + self.validators.insert( + *pubkey, + MonitoredValidator::new(*pubkey, Some(validator_index)), + ); + } + } + } + + /// Returns the delay between the start of `block.slot` and `seen_timestamp`. + fn get_block_delay_ms( + seen_timestamp: Duration, + block: &BeaconBlock, + slot_clock: &S, + ) -> Duration { + slot_clock + .start_of(block.slot) + .and_then(|slot_start| seen_timestamp.checked_sub(slot_start)) + .unwrap_or_else(|| Duration::from_secs(0)) + } + + /// Process a block received on gossip. + pub fn register_gossip_block( + &self, + seen_timestamp: Duration, + block: &BeaconBlock, + block_root: Hash256, + slot_clock: &S, + ) { + self.register_beacon_block("gossip", seen_timestamp, block, block_root, slot_clock) + } + + /// Process a block received on the HTTP API from a local validator. + pub fn register_api_block( + &self, + seen_timestamp: Duration, + block: &BeaconBlock, + block_root: Hash256, + slot_clock: &S, + ) { + self.register_beacon_block("api", seen_timestamp, block, block_root, slot_clock) + } + + fn register_beacon_block( + &self, + src: &str, + seen_timestamp: Duration, + block: &BeaconBlock, + block_root: Hash256, + slot_clock: &S, + ) { + if let Some(id) = self.get_validator_id(block.proposer_index) { + let delay = Self::get_block_delay_ms(seen_timestamp, block, slot_clock); + + metrics::inc_counter_vec(&metrics::VALIDATOR_MONITOR_BEACON_BLOCK_TOTAL, &[src, id]); + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_BEACON_BLOCK_DELAY_SECONDS, + &[src, id], + delay, + ); + + info!( + self.log, + "Block from API"; + "root" => ?block_root, + "delay" => %delay.as_millis(), + "slot" => %block.slot, + "src" => src, + "validator" => %id, + ); + } + } + + /// Returns the duration between when the attestation `data` could be produced (1/3rd through + /// the slot) and `seen_timestamp`. + fn get_unaggregated_attestation_delay_ms( + seen_timestamp: Duration, + data: &AttestationData, + slot_clock: &S, + ) -> Duration { + slot_clock + .start_of(data.slot) + .and_then(|slot_start| seen_timestamp.checked_sub(slot_start)) + .and_then(|gross_delay| { + let production_delay = slot_clock.slot_duration() / 3; + gross_delay.checked_sub(production_delay) + }) + .unwrap_or_else(|| Duration::from_secs(0)) + } + + /// Register an attestation seen on the gossip network. + pub fn register_gossip_unaggregated_attestation( + &self, + seen_timestamp: Duration, + indexed_attestation: &IndexedAttestation, + slot_clock: &S, + ) { + self.register_unaggregated_attestation( + "gossip", + seen_timestamp, + indexed_attestation, + slot_clock, + ) + } + + /// Register an attestation seen on the HTTP API. + pub fn register_api_unaggregated_attestation( + &self, + seen_timestamp: Duration, + indexed_attestation: &IndexedAttestation, + slot_clock: &S, + ) { + self.register_unaggregated_attestation( + "api", + seen_timestamp, + indexed_attestation, + slot_clock, + ) + } + + fn register_unaggregated_attestation( + &self, + src: &str, + seen_timestamp: Duration, + indexed_attestation: &IndexedAttestation, + slot_clock: &S, + ) { + let data = &indexed_attestation.data; + let epoch = data.slot.epoch(T::slots_per_epoch()); + let delay = Self::get_unaggregated_attestation_delay_ms(seen_timestamp, data, slot_clock); + + indexed_attestation.attesting_indices.iter().for_each(|i| { + if let Some(validator) = self.get_validator(*i) { + let id = &validator.id; + + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_UNAGGREGATED_ATTESTATION_TOTAL, + &[src, id], + ); + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_UNAGGREGATED_ATTESTATION_DELAY_SECONDS, + &[src, id], + delay, + ); + + info!( + self.log, + "Unaggregated attestation"; + "head" => ?data.beacon_block_root, + "index" => %data.index, + "delay_ms" => %delay.as_millis(), + "epoch" => %epoch, + "slot" => %data.slot, + "src" => src, + "validator" => %id, + ); + + validator.with_epoch_summary(epoch, |summary| { + summary.register_unaggregated_attestation(delay) + }); + } + }) + } + + /// Returns the duration between when a `AggregateAndproof` with `data` could be produced (2/3rd + /// through the slot) and `seen_timestamp`. + fn get_aggregated_attestation_delay_ms( + seen_timestamp: Duration, + data: &AttestationData, + slot_clock: &S, + ) -> Duration { + slot_clock + .start_of(data.slot) + .and_then(|slot_start| seen_timestamp.checked_sub(slot_start)) + .and_then(|gross_delay| { + let production_delay = slot_clock.slot_duration() / 2; + gross_delay.checked_sub(production_delay) + }) + .unwrap_or_else(|| Duration::from_secs(0)) + } + + /// Register a `signed_aggregate_and_proof` seen on the gossip network. + pub fn register_gossip_aggregated_attestation( + &self, + seen_timestamp: Duration, + signed_aggregate_and_proof: &SignedAggregateAndProof, + indexed_attestation: &IndexedAttestation, + slot_clock: &S, + ) { + self.register_aggregated_attestation( + "gossip", + seen_timestamp, + signed_aggregate_and_proof, + indexed_attestation, + slot_clock, + ) + } + + /// Register a `signed_aggregate_and_proof` seen on the HTTP API. + pub fn register_api_aggregated_attestation( + &self, + seen_timestamp: Duration, + signed_aggregate_and_proof: &SignedAggregateAndProof, + indexed_attestation: &IndexedAttestation, + slot_clock: &S, + ) { + self.register_aggregated_attestation( + "api", + seen_timestamp, + signed_aggregate_and_proof, + indexed_attestation, + slot_clock, + ) + } + + fn register_aggregated_attestation( + &self, + src: &str, + seen_timestamp: Duration, + signed_aggregate_and_proof: &SignedAggregateAndProof, + indexed_attestation: &IndexedAttestation, + slot_clock: &S, + ) { + let data = &indexed_attestation.data; + let epoch = data.slot.epoch(T::slots_per_epoch()); + let delay = Self::get_aggregated_attestation_delay_ms(seen_timestamp, data, slot_clock); + + let aggregator_index = signed_aggregate_and_proof.message.aggregator_index; + if let Some(validator) = self.get_validator(aggregator_index) { + let id = &validator.id; + + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_AGGREGATED_ATTESTATION_TOTAL, + &[src, id], + ); + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_AGGREGATED_ATTESTATION_DELAY_SECONDS, + &[src, id], + delay, + ); + + info!( + self.log, + "Aggregated attestation"; + "head" => ?data.beacon_block_root, + "index" => %data.index, + "delay_ms" => %delay.as_millis(), + "epoch" => %epoch, + "slot" => %data.slot, + "src" => src, + "validator" => %id, + ); + + validator.with_epoch_summary(epoch, |summary| { + summary.register_aggregated_attestation(delay) + }); + } + + indexed_attestation.attesting_indices.iter().for_each(|i| { + if let Some(validator) = self.get_validator(*i) { + let id = &validator.id; + + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_ATTESTATION_IN_AGGREGATE_TOTAL, + &[src, id], + ); + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_ATTESTATION_IN_AGGREGATE_DELAY_SECONDS, + &[src, id], + delay, + ); + + info!( + self.log, + "Attestation included in aggregate"; + "head" => ?data.beacon_block_root, + "index" => %data.index, + "delay_ms" => %delay.as_millis(), + "epoch" => %epoch, + "slot" => %data.slot, + "src" => src, + "validator" => %id, + ); + + validator.with_epoch_summary(epoch, |summary| { + summary.register_aggregate_attestation_inclusion() + }); + } + }) + } + + /// Register that the `indexed_attestation` was included in a *valid* `BeaconBlock`. + pub fn register_attestation_in_block( + &self, + indexed_attestation: &IndexedAttestation, + block: &BeaconBlock, + spec: &ChainSpec, + ) { + let data = &indexed_attestation.data; + let delay = (block.slot - data.slot) - spec.min_attestation_inclusion_delay; + let epoch = data.slot.epoch(T::slots_per_epoch()); + + indexed_attestation.attesting_indices.iter().for_each(|i| { + if let Some(validator) = self.get_validator(*i) { + let id = &validator.id; + + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_TOTAL, + &["block", id], + ); + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_DELAY_SLOTS, + &["block", id], + delay.as_u64() as i64, + ); + + info!( + self.log, + "Attestation included in block"; + "head" => ?data.beacon_block_root, + "index" => %data.index, + "inclusion_lag" => format!("{} slot(s)", delay), + "epoch" => %epoch, + "slot" => %data.slot, + "validator" => %id, + ); + + validator.with_epoch_summary(epoch, |summary| { + summary.register_attestation_block_inclusion(delay) + }); + } + }) + } + + /// Register an exit from the gossip network. + pub fn register_gossip_voluntary_exit(&self, exit: &VoluntaryExit) { + self.register_voluntary_exit("gossip", exit) + } + + /// Register an exit from the HTTP API. + pub fn register_api_voluntary_exit(&self, exit: &VoluntaryExit) { + self.register_voluntary_exit("api", exit) + } + + /// Register an exit included in a *valid* beacon block. + pub fn register_block_voluntary_exit(&self, exit: &VoluntaryExit) { + self.register_voluntary_exit("block", exit) + } + + fn register_voluntary_exit(&self, src: &str, exit: &VoluntaryExit) { + if let Some(validator) = self.get_validator(exit.validator_index) { + let id = &validator.id; + let epoch = exit.epoch; + + metrics::inc_counter_vec(&metrics::VALIDATOR_MONITOR_EXIT_TOTAL, &[src, id]); + + info!( + self.log, + "Voluntary exit"; + "epoch" => %epoch, + "validator" => %id, + "src" => src, + ); + + validator.with_epoch_summary(epoch, |summary| summary.register_exit()); + } + } + + /// Register a proposer slashing from the gossip network. + pub fn register_gossip_proposer_slashing(&self, slashing: &ProposerSlashing) { + self.register_proposer_slashing("gossip", slashing) + } + + /// Register a proposer slashing from the HTTP API. + pub fn register_api_proposer_slashing(&self, slashing: &ProposerSlashing) { + self.register_proposer_slashing("api", slashing) + } + + /// Register a proposer slashing included in a *valid* `BeaconBlock`. + pub fn register_block_proposer_slashing(&self, slashing: &ProposerSlashing) { + self.register_proposer_slashing("block", slashing) + } + + fn register_proposer_slashing(&self, src: &str, slashing: &ProposerSlashing) { + let proposer = slashing.signed_header_1.message.proposer_index; + let slot = slashing.signed_header_1.message.slot; + let epoch = slot.epoch(T::slots_per_epoch()); + let root_1 = slashing.signed_header_1.message.canonical_root(); + let root_2 = slashing.signed_header_2.message.canonical_root(); + + if let Some(validator) = self.get_validator(proposer) { + let id = &validator.id; + + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PROPOSER_SLASHING_TOTAL, + &[src, id], + ); + + crit!( + self.log, + "Proposer slashing"; + "root_2" => %root_2, + "root_1" => %root_1, + "slot" => %slot, + "validator" => %id, + "src" => src, + ); + + validator.with_epoch_summary(epoch, |summary| summary.register_proposer_slashing()); + } + } + + /// Register an attester slashing from the gossip network. + pub fn register_gossip_attester_slashing(&self, slashing: &AttesterSlashing) { + self.register_attester_slashing("gossip", slashing) + } + + /// Register an attester slashing from the HTTP API. + pub fn register_api_attester_slashing(&self, slashing: &AttesterSlashing) { + self.register_attester_slashing("api", slashing) + } + + /// Register an attester slashing included in a *valid* `BeaconBlock`. + pub fn register_block_attester_slashing(&self, slashing: &AttesterSlashing) { + self.register_attester_slashing("block", slashing) + } + + fn register_attester_slashing(&self, src: &str, slashing: &AttesterSlashing) { + let data = &slashing.attestation_1.data; + let attestation_1_indices: HashSet = slashing + .attestation_1 + .attesting_indices + .iter() + .copied() + .collect(); + + slashing + .attestation_2 + .attesting_indices + .iter() + .filter(|index| attestation_1_indices.contains(index)) + .filter_map(|index| self.get_validator(*index)) + .for_each(|validator| { + let id = &validator.id; + let epoch = data.slot.epoch(T::slots_per_epoch()); + + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_ATTESTER_SLASHING_TOTAL, + &[src, id], + ); + + crit!( + self.log, + "Attester slashing"; + "epoch" => %epoch, + "slot" => %data.slot, + "validator" => %id, + "src" => src, + ); + + validator.with_epoch_summary(epoch, |summary| summary.register_attester_slashing()); + }) + } + + /// Scrape `self` for metrics. + /// + /// Should be called whenever Prometheus is scraping Lighthouse. + pub fn scrape_metrics(&self, slot_clock: &S, spec: &ChainSpec) { + metrics::set_gauge( + &metrics::VALIDATOR_MONITOR_VALIDATORS_TOTAL, + self.num_validators() as i64, + ); + + if let Some(slot) = slot_clock.now() { + let epoch = slot.epoch(T::slots_per_epoch()); + let slot_in_epoch = slot % T::slots_per_epoch(); + + // Only start to report on the current epoch once we've progressed past the point where + // all attestation should be included in a block. + // + // This allows us to set alarms on Grafana to detect when an attestation has been + // missed. If we didn't delay beyond the attestation inclusion period then we could + // expect some occasional false-positives on attestation misses. + // + // I have chosen 3 as an arbitrary number where we *probably* shouldn't see that many + // skip slots on mainnet. + let previous_epoch = if slot_in_epoch > spec.min_attestation_inclusion_delay + 3 { + epoch - 1 + } else { + epoch - 2 + }; + + for (_, validator) in self.validators.iter() { + let id = &validator.id; + let summaries = validator.summaries.read(); + + if let Some(summary) = summaries.get(&previous_epoch) { + /* + * Attestations + */ + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_TOTAL, + &[id], + summary.attestations as i64, + ); + if let Some(delay) = summary.attestation_min_delay { + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_MIN_DELAY_SECONDS, + &[id], + delay, + ); + } + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_AGGREGATE_INCLUSIONS, + &[id], + summary.attestation_aggregate_incusions as i64, + ); + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_INCLUSIONS, + &[id], + summary.attestation_block_inclusions as i64, + ); + if let Some(distance) = summary.attestation_min_block_inclusion_distance { + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_MIN_INCLUSION_DISTANCE, + &[id], + distance.as_u64() as i64, + ); + } + /* + * Blocks + */ + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_BEACON_BLOCKS_TOTAL, + &[id], + summary.blocks as i64, + ); + if let Some(delay) = summary.block_min_delay { + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_BEACON_BLOCKS_MIN_DELAY_SECONDS, + &[id], + delay, + ); + } + /* + * Aggregates + */ + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_AGGREGATES_TOTAL, + &[id], + summary.aggregates as i64, + ); + if let Some(delay) = summary.aggregate_min_delay { + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_AGGREGATES_MIN_DELAY_SECONDS, + &[id], + delay, + ); + } + /* + * Other + */ + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_EXITS_TOTAL, + &[id], + summary.exits as i64, + ); + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_PROPOSER_SLASHINGS_TOTAL, + &[id], + summary.proposer_slashings as i64, + ); + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTER_SLASHINGS_TOTAL, + &[id], + summary.attester_slashings as i64, + ); + } + } + } + } +} + +/// Returns the duration since the unix epoch. +pub fn timestamp_now() -> Duration { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)) +} + +fn u64_to_i64(n: impl Into) -> i64 { + i64::try_from(n.into()).unwrap_or(i64::max_value()) +} diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs index 48ec5d1b2bb..03977b24105 100644 --- a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs +++ b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs @@ -93,7 +93,7 @@ impl ValidatorPubkeyCache { .map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?, ); - self.indices.insert(v.pubkey.clone(), i); + self.indices.insert(v.pubkey, i); } Ok(()) diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 85320ba11db..c47e74cebc6 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -126,9 +126,9 @@ where let graffiti = config.graffiti; let store = store.ok_or("beacon_chain_start_method requires a store")?; - let context = runtime_context - .ok_or("beacon_chain_start_method requires a runtime context")? - .service_context("beacon".into()); + let runtime_context = + runtime_context.ok_or("beacon_chain_start_method requires a runtime context")?; + let context = runtime_context.service_context("beacon".into()); let spec = chain_spec.ok_or("beacon_chain_start_method requires a chain spec")?; let event_handler = if self.http_api_config.enabled { Some(ServerSentEventHandler::new(context.log().clone())) @@ -144,7 +144,15 @@ where .chain_config(chain_config) .disabled_forks(disabled_forks) .graffiti(graffiti) - .event_handler(event_handler); + .event_handler(event_handler) + .monitor_validators( + config.validator_monitor_auto, + config.validator_monitor_pubkeys.clone(), + runtime_context + .service_context("val_mon".to_string()) + .log() + .clone(), + ); let builder = if let Some(slasher) = self.slasher.clone() { builder.slasher(slasher) diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index c4516e642e2..865724f3ac7 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -3,7 +3,7 @@ use network::NetworkConfig; use serde_derive::{Deserialize, Serialize}; use std::fs; use std::path::PathBuf; -use types::Graffiti; +use types::{Graffiti, PublicKeyBytes}; /// Default directory name for the freezer database under the top-level data dir. const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db"; @@ -52,6 +52,10 @@ pub struct Config { pub disabled_forks: Vec, /// Graffiti to be inserted everytime we create a block. pub graffiti: Graffiti, + /// When true, automatically monitor validators using the HTTP API. + pub validator_monitor_auto: bool, + /// A list of validator pubkeys to monitor. + pub validator_monitor_pubkeys: Vec, #[serde(skip)] /// The `genesis` field is not serialized or deserialized by `serde` to ensure it is defined /// via the CLI at runtime, instead of from a configuration file saved to disk. @@ -84,6 +88,8 @@ impl Default for Config { http_api: <_>::default(), http_metrics: <_>::default(), slasher: None, + validator_monitor_auto: false, + validator_monitor_pubkeys: vec![], } } } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index a62276315be..70e83068733 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -12,8 +12,9 @@ mod state_id; mod validator_inclusion; use beacon_chain::{ - observed_operations::ObservationOutcome, AttestationError as AttnError, BeaconChain, - BeaconChainError, BeaconChainTypes, + attestation_verification::SignatureVerifiedAttestation, + observed_operations::ObservationOutcome, validator_monitor::timestamp_now, + AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, }; use beacon_proposer_cache::BeaconProposerCache; use block_id::BlockId; @@ -816,6 +817,8 @@ pub fn serve( network_tx: UnboundedSender>, log: Logger| { blocking_json_task(move || { + let seen_timestamp = timestamp_now(); + // Send the block, regardless of whether or not it is valid. The API // specification is very clear that this is the desired behaviour. publish_pubsub_message( @@ -831,6 +834,14 @@ pub fn serve( "root" => format!("{}", root) ); + // Notify the validator monitor. + chain.validator_monitor.read().register_api_block( + seen_timestamp, + &block.message, + root, + &chain.slot_clock, + ); + // Update the head since it's likely this block will become the new // head. chain @@ -921,6 +932,7 @@ pub fn serve( network_tx: UnboundedSender>, log: Logger| { blocking_json_task(move || { + let seen_timestamp = timestamp_now(); let mut failures = Vec::new(); for (index, attestation) in attestations.as_slice().iter().enumerate() { @@ -945,6 +957,16 @@ pub fn serve( } }; + // Notify the validator monitor. + chain + .validator_monitor + .read() + .register_api_unaggregated_attestation( + seen_timestamp, + attestation.indexed_attestation(), + &chain.slot_clock, + ); + publish_pubsub_message( &network_tx, PubsubMessage::Attestation(Box::new(( @@ -1049,6 +1071,12 @@ pub fn serve( )) })?; + // Notify the validator monitor. + chain + .validator_monitor + .read() + .register_api_attester_slashing(&slashing); + if let ObservationOutcome::New(slashing) = outcome { publish_pubsub_message( &network_tx, @@ -1100,6 +1128,12 @@ pub fn serve( )) })?; + // Notify the validator monitor. + chain + .validator_monitor + .read() + .register_api_proposer_slashing(&slashing); + if let ObservationOutcome::New(slashing) = outcome { publish_pubsub_message( &network_tx, @@ -1149,6 +1183,12 @@ pub fn serve( )) })?; + // Notify the validator monitor. + chain + .validator_monitor + .read() + .register_api_voluntary_exit(&exit.message); + if let ObservationOutcome::New(exit) = outcome { publish_pubsub_message( &network_tx, @@ -1970,6 +2010,7 @@ pub fn serve( aggregates: Vec>, network_tx: UnboundedSender>, log: Logger| { blocking_json_task(move || { + let seen_timestamp = timestamp_now(); let mut verified_aggregates = Vec::with_capacity(aggregates.len()); let mut messages = Vec::with_capacity(aggregates.len()); let mut failures = Vec::new(); @@ -1981,6 +2022,18 @@ pub fn serve( messages.push(PubsubMessage::AggregateAndProofAttestation(Box::new( verified_aggregate.aggregate().clone(), ))); + + // Notify the validator monitor. + chain + .validator_monitor + .read() + .register_api_aggregated_attestation( + seen_timestamp, + verified_aggregate.aggregate(), + verified_aggregate.indexed_attestation(), + &chain.slot_clock, + ); + verified_aggregates.push((index, verified_aggregate)); } // If we already know the attestation, don't broadcast it or attempt to @@ -2050,11 +2103,18 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(network_tx_filter) + .and(chain_filter.clone()) .and_then( |subscriptions: Vec, - network_tx: UnboundedSender>| { + network_tx: UnboundedSender>, + chain: Arc>| { blocking_json_task(move || { for subscription in &subscriptions { + chain + .validator_monitor + .write() + .auto_register_local_validator(subscription.validator_index); + let subscription = api_types::ValidatorSubscription { validator_index: subscription.validator_index, attestation_committee_index: subscription.committee_index, diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index e4173cbdb68..b9587a66015 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -65,6 +65,7 @@ mod tests { Duration::from_millis(SLOT_DURATION_MILLIS), )) .shutdown_sender(shutdown_tx) + .monitor_validators(true, vec![], log) .build() .expect("should build"), ); diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 90c7769b846..e42cab146e5 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -224,6 +224,7 @@ impl WorkEvent { attestation: Attestation, subnet_id: SubnetId, should_import: bool, + seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: true, @@ -233,6 +234,7 @@ impl WorkEvent { attestation: Box::new(attestation), subnet_id, should_import, + seen_timestamp, }, } } @@ -242,6 +244,7 @@ impl WorkEvent { message_id: MessageId, peer_id: PeerId, aggregate: SignedAggregateAndProof, + seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: true, @@ -249,6 +252,7 @@ impl WorkEvent { message_id, peer_id, aggregate: Box::new(aggregate), + seen_timestamp, }, } } @@ -258,6 +262,7 @@ impl WorkEvent { message_id: MessageId, peer_id: PeerId, block: Box>, + seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: false, @@ -265,6 +270,7 @@ impl WorkEvent { message_id, peer_id, block, + seen_timestamp, }, } } @@ -391,16 +397,19 @@ pub enum Work { attestation: Box>, subnet_id: SubnetId, should_import: bool, + seen_timestamp: Duration, }, GossipAggregate { message_id: MessageId, peer_id: PeerId, aggregate: Box>, + seen_timestamp: Duration, }, GossipBlock { message_id: MessageId, peer_id: PeerId, block: Box>, + seen_timestamp: Duration, }, GossipVoluntaryExit { message_id: MessageId, @@ -833,12 +842,14 @@ impl BeaconProcessor { attestation, subnet_id, should_import, + seen_timestamp, } => worker.process_gossip_attestation( message_id, peer_id, *attestation, subnet_id, should_import, + seen_timestamp, ), /* * Aggregated attestation verification. @@ -847,7 +858,13 @@ impl BeaconProcessor { message_id, peer_id, aggregate, - } => worker.process_gossip_aggregate(message_id, peer_id, *aggregate), + seen_timestamp, + } => worker.process_gossip_aggregate( + message_id, + peer_id, + *aggregate, + seen_timestamp, + ), /* * Verification for beacon blocks received on gossip. */ @@ -855,7 +872,8 @@ impl BeaconProcessor { message_id, peer_id, block, - } => worker.process_gossip_block(message_id, peer_id, *block), + seen_timestamp, + } => worker.process_gossip_block(message_id, peer_id, *block, seen_timestamp), /* * Voluntary exits received on gossip. */ diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index d9aec89d8e8..ee226a9f6a0 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1,12 +1,14 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::{ - attestation_verification::Error as AttnError, observed_operations::ObservationOutcome, + attestation_verification::{Error as AttnError, SignatureVerifiedAttestation}, + observed_operations::ObservationOutcome, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, }; use eth2_libp2p::{MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use slog::{debug, error, info, trace, warn}; use ssz::Encode; +use std::time::Duration; use types::{ Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, @@ -61,6 +63,7 @@ impl Worker { attestation: Attestation, subnet_id: SubnetId, should_import: bool, + seen_timestamp: Duration, ) { let beacon_block_root = attestation.data.beacon_block_root; @@ -81,6 +84,16 @@ impl Worker { } }; + // Register the attestation with any monitored validators. + self.chain + .validator_monitor + .read() + .register_gossip_unaggregated_attestation( + seen_timestamp, + attestation.indexed_attestation(), + &self.chain.slot_clock, + ); + // Indicate to the `Network` service that this message is valid and can be // propagated on the gossip network. self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); @@ -137,6 +150,7 @@ impl Worker { message_id: MessageId, peer_id: PeerId, aggregate: SignedAggregateAndProof, + seen_timestamp: Duration, ) { let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root; @@ -162,6 +176,17 @@ impl Worker { // propagated on the gossip network. self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + // Register the attestation with any monitored validators. + self.chain + .validator_monitor + .read() + .register_gossip_aggregated_attestation( + seen_timestamp, + aggregate.aggregate(), + aggregate.indexed_attestation(), + &self.chain.slot_clock, + ); + metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL); if let Err(e) = self.chain.apply_attestation_to_fork_choice(&aggregate) { @@ -210,6 +235,7 @@ impl Worker { message_id: MessageId, peer_id: PeerId, block: SignedBeaconBlock, + seen_duration: Duration, ) { let verified_block = match self.chain.verify_block_for_gossip(block) { Ok(verified_block) => { @@ -262,7 +288,19 @@ impl Worker { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL); + // Register the block with any monitored validators. + // + // Run this event *prior* to importing the block, where the block is only partially + // verified. + self.chain.validator_monitor.read().register_gossip_block( + seen_duration, + &verified_block.block.message, + verified_block.block_root, + &self.chain.slot_clock, + ); + let block = Box::new(verified_block.block.clone()); + match self.chain.process_block(verified_block) { Ok(_block_root) => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); @@ -359,6 +397,12 @@ impl Worker { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + // Register the exit with any monitored validators. + self.chain + .validator_monitor + .read() + .register_gossip_voluntary_exit(&exit.as_inner().message); + self.chain.import_voluntary_exit(exit); debug!(self.log, "Successfully imported voluntary exit"); @@ -412,6 +456,12 @@ impl Worker { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + // Register the slashing with any monitored validators. + self.chain + .validator_monitor + .read() + .register_gossip_proposer_slashing(slashing.as_inner()); + self.chain.import_proposer_slashing(slashing); debug!(self.log, "Successfully imported proposer slashing"); @@ -457,6 +507,12 @@ impl Worker { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + // Register the slashing with any monitored validators. + self.chain + .validator_monitor + .read() + .register_gossip_attester_slashing(slashing.as_inner()); + if let Err(e) = self.chain.import_attester_slashing(slashing) { debug!(self.log, "Error importing attester slashing"; "error" => ?e); metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_ERROR_TOTAL); diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 677e8d03618..d7026306017 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -9,6 +9,7 @@ use eth2_libp2p::{MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Res use slog::{debug, error, o, trace, warn}; use std::cmp; use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, ChainSpec, EthSpec, ProposerSlashing, SignedAggregateAndProof, @@ -230,7 +231,10 @@ impl Processor { block: Box>, ) { self.send_beacon_processor_work(BeaconWorkEvent::gossip_beacon_block( - message_id, peer_id, block, + message_id, + peer_id, + block, + timestamp_now(), )) } @@ -248,6 +252,7 @@ impl Processor { unaggregated_attestation, subnet_id, should_process, + timestamp_now(), )) } @@ -258,7 +263,10 @@ impl Processor { aggregate: SignedAggregateAndProof, ) { self.send_beacon_processor_work(BeaconWorkEvent::aggregated_attestation( - message_id, peer_id, aggregate, + message_id, + peer_id, + aggregate, + timestamp_now(), )) } @@ -390,3 +398,9 @@ impl HandlerNetworkContext { }) } } + +fn timestamp_now() -> Duration { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)) +} diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 629d8e831e0..9016f69035d 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -451,4 +451,29 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .value_name("WSS_CHECKPOINT") .takes_value(true) ) + .arg( + Arg::with_name("validator-monitor-auto") + .long("validator-monitor-auto") + .help("Enables the automatic detection and monitoring of validators connected to the \ + HTTP API and using the subnet subscription endpoint. This generally has the \ + effect of providing additional logging and metrics for locally controlled \ + validators.") + ) + .arg( + Arg::with_name("validator-monitor-pubkeys") + .long("validator-monitor-pubkeys") + .help("A comma-separated list of 0x-prefixed validator public keys. \ + These validators will receive special monitoring and additional \ + logging.") + .value_name("PUBKEYS") + .takes_value(true) + ) + .arg( + Arg::with_name("validator-monitor-file") + .long("validator-monitor-file") + .help("As per --validator-monitor-pubkeys, but the comma-separated list is \ + contained within a file at the given path.") + .value_name("PATH") + .takes_value(true) + ) } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 38df71c471e..24e02f40120 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -12,7 +12,8 @@ use std::fs; use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs}; use std::net::{TcpListener, UdpSocket}; use std::path::PathBuf; -use types::{ChainSpec, Checkpoint, Epoch, EthSpec, Hash256, GRAFFITI_BYTES_LEN}; +use std::str::FromStr; +use types::{ChainSpec, Checkpoint, Epoch, EthSpec, Hash256, PublicKeyBytes, GRAFFITI_BYTES_LEN}; /// Gets the fully-initialized global client. /// @@ -386,6 +387,39 @@ pub fn get_config( client_config.slasher = Some(slasher_config); } + if cli_args.is_present("validator-monitor-auto") { + client_config.validator_monitor_auto = true; + } + + if let Some(pubkeys) = cli_args.value_of("validator-monitor-pubkeys") { + let pubkeys = pubkeys + .split(',') + .map(PublicKeyBytes::from_str) + .collect::, _>>() + .map_err(|e| format!("Invalid --validator-monitor-pubkeys value: {:?}", e))?; + client_config + .validator_monitor_pubkeys + .extend_from_slice(&pubkeys); + } + + if let Some(path) = cli_args.value_of("validator-monitor-file") { + let string = fs::read(path) + .map_err(|e| format!("Unable to read --validator-monitor-file: {}", e)) + .and_then(|bytes| { + String::from_utf8(bytes) + .map_err(|e| format!("--validator-monitor-file is not utf8: {}", e)) + })?; + let pubkeys = string + .trim_end() // Remove trailing white space + .split(',') + .map(PublicKeyBytes::from_str) + .collect::, _>>() + .map_err(|e| format!("Invalid --validator-monitor-file contents: {:?}", e))?; + client_config + .validator_monitor_pubkeys + .extend_from_slice(&pubkeys); + } + Ok(client_config) } diff --git a/book/src/SUMMARY.md b/book/src/SUMMARY.md index 57501236000..5c1b89c49d1 100644 --- a/book/src/SUMMARY.md +++ b/book/src/SUMMARY.md @@ -18,6 +18,7 @@ * [Importing from the Eth2 Launchpad](./validator-import-launchpad.md) * [Slashing Protection](./slashing-protection.md) * [Voluntary Exits](./voluntary-exit.md) + * [Validator Monitoring](./validator-monitoring.md) * [APIs](./api.md) * [Beacon Node API](./api-bn.md) * [/lighthouse](./api-lighthouse.md) diff --git a/book/src/faq.md b/book/src/faq.md index eb4c41db744..d020ac218e5 100644 --- a/book/src/faq.md +++ b/book/src/faq.md @@ -7,7 +7,8 @@ - [How do I update lighthouse?](#how-do-i-update-lighthouse) - [I can't compile lighthouse](#i-cant-compile-lighthouse) - [What is "Syncing eth1 block cache"](#what-is-syncing-eth1-block-cache) - +- [Can I use redundancy in my staking setup?](#can-i-use-redundancy-in-my-staking-setup) +- [How can I monitor my validators](#how-can-i-monitor-my-validators) ### Why does it take so long for a validator to be activated? @@ -182,3 +183,9 @@ duplicate your JSON keystores and don't run `lighthouse vc` twice). This will le However, there are some components which can be configured with redundancy. See the [Redundancy](./redundancy.md) guide for more information. + +### How can I monitor my validators? + +Apart from using block explorers, you may use the "Validator Monitor" built into Lighthouse which +provides logging and Prometheus/Grafana metrics for individual validators. See [Validator +Monitoring](./validator-monitoring.md) for more information. diff --git a/book/src/validator-monitoring.md b/book/src/validator-monitoring.md new file mode 100644 index 00000000000..dfe13d754f4 --- /dev/null +++ b/book/src/validator-monitoring.md @@ -0,0 +1,95 @@ +# Validator Monitoring + +Lighthouse allows for fine-grained monitoring of specific validators using the "validator monitor". +Generally users will want to use this function to track their own validators, however, it can be +used for any validator, regardless of who controls it. + +## Monitoring is in the Beacon Node + +Lighthouse performs validator monitoring in the Beacon Node (BN) instead of the Validator Client +(VC). This is contrary to what some users may expect, but it has several benefits: + +1. It keeps the VC simple. The VC handles cryptographic signing and the developers believe it should + be doing as little additional work as possible. +1. The BN has a better knowledge of the chain and network. Communicating all this information to + the VC is impractical, we can provide more information when monitoring with the BN. +1. It is more flexible: + - Users can use a local BN to observe some validators running in a remote location. + - Users can monitor validators that are not their own. + + +## How to Enable Monitoring + +The validator monitor is always enabled in Lighthouse, but it might not have any enrolled +validators. There are two methods for a validator to be enrolled for additional monitoring; +automatic and manual. + +### Automatic + +When the `--validator-monitor-auto` flag is supplied, any validator which uses the +[`beacon_committee_subscriptions`](https://ethereum.github.io/eth2.0-APIs/#/Validator/prepareBeaconCommitteeSubnet) +API endpoint will be enrolled for additional monitoring. All active validators will use this +endpoint each epoch, so you can expect it to detect all local and active validators within several +minutes after start up. + +#### Example + +``` +lighthouse bn --staking --validator-monitor-auto +``` + +### Manual + +The `--validator-monitor-pubkeys` flag can be used to specify validator public keys for monitoring. +This is useful when monitoring validators that are not directly attached to this BN. + +> Note: when monitoring validators that aren't connected to this BN, supply the +> `--subscribe-all-subnets --import-all-attestations` flags to ensure the BN has a full view of the +> network. This is not strictly necessary, though. + +#### Example + +Monitor the mainnet validators at indices `0` and `1`: + +``` +lighthouse bn --validator-monitor-pubkeys 0x933ad9491b62059dd065b560d256d8957a8c402cc6e8d8ee7290ae11e8f7329267a8811c397529dac52ae1342ba58c95,0xa1d1ad0714035353258038e964ae9675dc0252ee22cea896825c01458e1807bfad2f9969338798548d9858a571f7425c +``` + +## Observing Monitoring + +Enrolling a validator for additional monitoring results in: + +- Additional logs to be printed during BN operation. +- Additional [Prometheus metrics](./advanced_metrics.md) from the BN. + +### Logging + +Lighthouse will create logs for the following events for each monitored validator: + +- A block from the validator is observed. +- An unaggregated attestation from the validator is observed. +- An unaggregated attestation from the validator is included in an aggregate. +- An unaggregated attestation from the validator is included in a block. +- An aggregated attestation from the validator is observed. +- An exit for the validator is observed. +- A slashing (proposer or attester) is observed which implicates that validator. + +#### Example + +``` +Jan 18 11:50:03.896 INFO Unaggregated attestation validator: 0, src: gossip, slot: 342248, epoch: 10695, delay_ms: 891, index: 12, head: 0x5f9d603c04b5489bf2de3708569226fd9428eb40a89c75945e344d06c7f4f86a, service: beacon +``` + +``` +Jan 18 11:32:55.196 INFO Attestation included in aggregate validator: 0, src: gossip, slot: 342162, epoch: 10692, delay_ms: 2193, index: 10, head: 0x9be04ecd04bf82952dad5d12c62e532fd13a8d42afb2e6ee98edaf05fc7f9f30, service: beacon +``` + +``` +Jan 18 11:21:09.808 INFO Attestation included in block validator: 1, slot: 342102, epoch: 10690, inclusion_lag: 0 slot(s), index: 7, head: 0x422bcd14839e389f797fd38b01e31995f91bcaea3d5d56457fc6aac76909ebac, service: beacon +``` + +### Metrics + +The +[`ValidatorMonitor`](https://github.com/sigp/lighthouse-metrics/blob/master/dashboards/ValidatorMonitor.json) +dashboard contains all/most of the metrics exposed via the validator monitor. diff --git a/common/lighthouse_metrics/src/lib.rs b/common/lighthouse_metrics/src/lib.rs index f848fb29ccf..7f5bcf5e9c8 100644 --- a/common/lighthouse_metrics/src/lib.rs +++ b/common/lighthouse_metrics/src/lib.rs @@ -282,6 +282,12 @@ pub fn inc_counter_by(counter: &Result, value: u64) { } } +pub fn set_gauge_vec(int_gauge_vec: &Result, name: &[&str], value: i64) { + if let Some(gauge) = get_int_gauge(int_gauge_vec, name) { + gauge.set(value); + } +} + pub fn set_gauge(gauge: &Result, value: i64) { if let Ok(gauge) = gauge { gauge.set(value); diff --git a/common/slot_clock/src/lib.rs b/common/slot_clock/src/lib.rs index 0fe1bedfeda..36b4fcfcd25 100644 --- a/common/slot_clock/src/lib.rs +++ b/common/slot_clock/src/lib.rs @@ -59,6 +59,9 @@ pub trait SlotClock: Send + Sync + Sized { /// Returns the duration until the first slot of the next epoch. fn duration_to_next_epoch(&self, slots_per_epoch: u64) -> Option; + /// Returns the start time of the slot, as a duration since `UNIX_EPOCH`. + fn start_of(&self, slot: Slot) -> Option; + /// Returns the first slot to be returned at the genesis time. fn genesis_slot(&self) -> Slot; diff --git a/common/slot_clock/src/manual_slot_clock.rs b/common/slot_clock/src/manual_slot_clock.rs index 04235b6ca52..ef45e07c19f 100644 --- a/common/slot_clock/src/manual_slot_clock.rs +++ b/common/slot_clock/src/manual_slot_clock.rs @@ -45,18 +45,6 @@ impl ManualSlotClock { &self.genesis_duration } - /// Returns the duration between UNIX epoch and the start of `slot`. - pub fn start_of(&self, slot: Slot) -> Option { - let slot = slot - .as_u64() - .checked_sub(self.genesis_slot.as_u64())? - .try_into() - .ok()?; - let unadjusted_slot_duration = self.slot_duration.checked_mul(slot)?; - - self.genesis_duration.checked_add(unadjusted_slot_duration) - } - /// Returns the duration from `now` until the start of `slot`. /// /// Will return `None` if `now` is later than the start of `slot`. @@ -147,6 +135,18 @@ impl SlotClock for ManualSlotClock { self.duration_to_slot(slot, *self.current_time.read()) } + /// Returns the duration between UNIX epoch and the start of `slot`. + fn start_of(&self, slot: Slot) -> Option { + let slot = slot + .as_u64() + .checked_sub(self.genesis_slot.as_u64())? + .try_into() + .ok()?; + let unadjusted_slot_duration = self.slot_duration.checked_mul(slot)?; + + self.genesis_duration.checked_add(unadjusted_slot_duration) + } + fn genesis_slot(&self) -> Slot { self.genesis_slot } diff --git a/common/slot_clock/src/system_time_slot_clock.rs b/common/slot_clock/src/system_time_slot_clock.rs index 2ed917a91a0..c5d6dedc9bd 100644 --- a/common/slot_clock/src/system_time_slot_clock.rs +++ b/common/slot_clock/src/system_time_slot_clock.rs @@ -54,6 +54,10 @@ impl SlotClock for SystemTimeSlotClock { self.clock.duration_to_slot(slot, now) } + fn start_of(&self, slot: Slot) -> Option { + self.clock.start_of(slot) + } + fn genesis_slot(&self) -> Slot { self.clock.genesis_slot() } diff --git a/consensus/state_processing/src/per_block_processing.rs b/consensus/state_processing/src/per_block_processing.rs index e6c57beb24e..d1db65d26f0 100644 --- a/consensus/state_processing/src/per_block_processing.rs +++ b/consensus/state_processing/src/per_block_processing.rs @@ -465,7 +465,7 @@ pub fn process_deposit( // Create a new validator. let validator = Validator { - pubkey: deposit.data.pubkey.clone(), + pubkey: deposit.data.pubkey, withdrawal_credentials: deposit.data.withdrawal_credentials, activation_eligibility_epoch: spec.far_future_epoch, activation_epoch: spec.far_future_epoch, diff --git a/consensus/state_processing/src/verify_operation.rs b/consensus/state_processing/src/verify_operation.rs index 6cc66aa814b..25c2839edd3 100644 --- a/consensus/state_processing/src/verify_operation.rs +++ b/consensus/state_processing/src/verify_operation.rs @@ -20,6 +20,10 @@ impl SigVerifiedOp { pub fn into_inner(self) -> T { self.0 } + + pub fn as_inner(&self) -> &T { + &self.0 + } } /// Trait for operations that can be verified and transformed into a `SigVerifiedOp`. diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index f2642111efb..2eec1f0b8e9 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1013,7 +1013,7 @@ impl BeaconState { .enumerate() .skip(self.pubkey_cache.len()) { - let success = self.pubkey_cache.insert(validator.pubkey.clone(), i); + let success = self.pubkey_cache.insert(validator.pubkey, i); if !success { return Err(Error::PubkeyCacheInconsistent); } diff --git a/consensus/types/src/deposit_data.rs b/consensus/types/src/deposit_data.rs index 8e2050a0b83..ac2bdd13d7f 100644 --- a/consensus/types/src/deposit_data.rs +++ b/consensus/types/src/deposit_data.rs @@ -26,7 +26,7 @@ impl DepositData { /// Spec v0.12.1 pub fn as_deposit_message(&self) -> DepositMessage { DepositMessage { - pubkey: self.pubkey.clone(), + pubkey: self.pubkey, withdrawal_credentials: self.withdrawal_credentials, amount: self.amount, } diff --git a/crypto/bls/src/generic_public_key_bytes.rs b/crypto/bls/src/generic_public_key_bytes.rs index 3fc16dbfb63..ea7ed30d8d4 100644 --- a/crypto/bls/src/generic_public_key_bytes.rs +++ b/crypto/bls/src/generic_public_key_bytes.rs @@ -18,12 +18,22 @@ use tree_hash::TreeHash; /// /// - Lazily verifying a serialized public key. /// - Storing some bytes that are actually invalid (required in the case of a `Deposit` message). -#[derive(Clone)] pub struct GenericPublicKeyBytes { bytes: [u8; PUBLIC_KEY_BYTES_LEN], _phantom: PhantomData, } +impl Copy for GenericPublicKeyBytes {} + +impl Clone for GenericPublicKeyBytes { + fn clone(&self) -> Self { + Self { + bytes: self.bytes, + _phantom: PhantomData, + } + } +} + impl GenericPublicKeyBytes where Pub: TPublicKey,