Skip to content

Commit a3a7f39

Browse files
[Altair] Sync committee pools (#2321)
Add pools supporting sync committees: - naive sync aggregation pool - observed sync contributions pool - observed sync contributors pool - observed sync aggregators pool Add SSZ types and tests related to sync committee signatures. Co-authored-by: Michael Sproul <[email protected]> Co-authored-by: realbigsean <[email protected]>
1 parent 8fa6e46 commit a3a7f39

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+5262
-918
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/beacon_chain/src/attestation_verification.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,9 @@
2727
//! ```
2828
2929
use crate::{
30-
beacon_chain::{
31-
HEAD_LOCK_TIMEOUT, MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
32-
},
30+
beacon_chain::{MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT},
3331
metrics,
34-
observed_attestations::ObserveOutcome,
32+
observed_aggregates::ObserveOutcome,
3533
observed_attesters::Error as ObservedAttestersError,
3634
BeaconChain, BeaconChainError, BeaconChainTypes,
3735
};
@@ -430,7 +428,7 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
430428
match chain
431429
.observed_aggregators
432430
.read()
433-
.validator_has_been_observed(attestation, aggregator_index as usize)
431+
.validator_has_been_observed(attestation.data.target.epoch, aggregator_index as usize)
434432
{
435433
Ok(true) => Err(Error::AggregatorAlreadyKnown(aggregator_index)),
436434
Ok(false) => Ok(()),
@@ -485,7 +483,7 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
485483
if let ObserveOutcome::AlreadyKnown = chain
486484
.observed_attestations
487485
.write()
488-
.observe_attestation(attestation, Some(attestation_root))
486+
.observe_item(attestation, Some(attestation_root))
489487
.map_err(|e| Error::BeaconChainError(e.into()))?
490488
{
491489
return Err(Error::AttestationAlreadyKnown(attestation_root));
@@ -498,7 +496,7 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
498496
if chain
499497
.observed_aggregators
500498
.write()
501-
.observe_validator(&attestation, aggregator_index as usize)
499+
.observe_validator(attestation.data.target.epoch, aggregator_index as usize)
502500
.map_err(BeaconChainError::from)?
503501
{
504502
return Err(Error::PriorAttestationKnown {
@@ -689,7 +687,7 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
689687
if chain
690688
.observed_attesters
691689
.read()
692-
.validator_has_been_observed(&attestation, validator_index as usize)
690+
.validator_has_been_observed(attestation.data.target.epoch, validator_index as usize)
693691
.map_err(BeaconChainError::from)?
694692
{
695693
return Err(Error::PriorAttestationKnown {
@@ -716,7 +714,7 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
716714
if chain
717715
.observed_attesters
718716
.write()
719-
.observe_validator(&attestation, validator_index as usize)
717+
.observe_validator(attestation.data.target.epoch, validator_index as usize)
720718
.map_err(BeaconChainError::from)?
721719
{
722720
return Err(Error::PriorAttestationKnown {
@@ -923,10 +921,8 @@ pub fn verify_attestation_signature<T: BeaconChainTypes>(
923921
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
924922

925923
let fork = chain
926-
.canonical_head
927-
.try_read_for(HEAD_LOCK_TIMEOUT)
928-
.ok_or(BeaconChainError::CanonicalHeadLockTimeout)
929-
.map(|head| head.beacon_state.fork())?;
924+
.spec
925+
.fork_at_epoch(indexed_attestation.data.target.epoch);
930926

931927
let signature_set = indexed_attestation_signature_set_from_pubkeys(
932928
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
@@ -1029,10 +1025,8 @@ pub fn verify_signed_aggregate_signatures<T: BeaconChainTypes>(
10291025
}
10301026

10311027
let fork = chain
1032-
.canonical_head
1033-
.try_read_for(HEAD_LOCK_TIMEOUT)
1034-
.ok_or(BeaconChainError::CanonicalHeadLockTimeout)
1035-
.map(|head| head.beacon_state.fork())?;
1028+
.spec
1029+
.fork_at_epoch(indexed_attestation.data.target.epoch);
10361030

10371031
let signature_sets = vec![
10381032
signed_aggregate_selection_proof_signature_set(

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 219 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,25 @@ use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
1414
use crate::events::ServerSentEventHandler;
1515
use crate::head_tracker::HeadTracker;
1616
use crate::migrate::BackgroundMigrator;
17-
use crate::naive_aggregation_pool::{Error as NaiveAggregationError, NaiveAggregationPool};
18-
use crate::observed_attestations::{Error as AttestationObservationError, ObservedAttestations};
19-
use crate::observed_attesters::{ObservedAggregators, ObservedAttesters};
17+
use crate::naive_aggregation_pool::{
18+
AggregatedAttestationMap, Error as NaiveAggregationError, NaiveAggregationPool,
19+
SyncContributionAggregateMap,
20+
};
21+
use crate::observed_aggregates::{
22+
Error as AttestationObservationError, ObservedAggregateAttestations, ObservedSyncContributions,
23+
};
24+
use crate::observed_attesters::{
25+
ObservedAggregators, ObservedAttesters, ObservedSyncAggregators, ObservedSyncContributors,
26+
};
2027
use crate::observed_block_producers::ObservedBlockProducers;
2128
use crate::observed_operations::{ObservationOutcome, ObservedOperations};
2229
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
2330
use crate::persisted_fork_choice::PersistedForkChoice;
2431
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
2532
use crate::snapshot_cache::SnapshotCache;
33+
use crate::sync_committee_verification::{
34+
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
35+
};
2636
use crate::timeout_rw_lock::TimeoutRwLock;
2737
use crate::validator_monitor::{
2838
get_block_delay_ms, get_slot_delay_ms, timestamp_now, ValidatorMonitor,
@@ -39,6 +49,7 @@ use itertools::process_results;
3949
use itertools::Itertools;
4050
use operation_pool::{OperationPool, PersistedOperationPool};
4151
use parking_lot::{Mutex, RwLock};
52+
use safe_arith::SafeArith;
4253
use slasher::Slasher;
4354
use slog::{crit, debug, error, info, trace, warn, Logger};
4455
use slot_clock::SlotClock;
@@ -221,14 +232,28 @@ pub struct BeaconChain<T: BeaconChainTypes> {
221232
///
222233
/// This pool accepts `Attestation` objects that only have one aggregation bit set and provides
223234
/// a method to get an aggregated `Attestation` for some `AttestationData`.
224-
pub naive_aggregation_pool: RwLock<NaiveAggregationPool<T::EthSpec>>,
235+
pub naive_aggregation_pool: RwLock<NaiveAggregationPool<AggregatedAttestationMap<T::EthSpec>>>,
236+
/// A pool of `SyncCommitteeContribution` dedicated to the "naive aggregation strategy" defined in the eth2
237+
/// specs.
238+
///
239+
/// This pool accepts `SyncCommitteeContribution` objects that only have one aggregation bit set and provides
240+
/// a method to get an aggregated `SyncCommitteeContribution` for some `SyncCommitteeContributionData`.
241+
pub naive_sync_aggregation_pool:
242+
RwLock<NaiveAggregationPool<SyncContributionAggregateMap<T::EthSpec>>>,
225243
/// Contains a store of attestations which have been observed by the beacon chain.
226-
pub(crate) observed_attestations: RwLock<ObservedAttestations<T::EthSpec>>,
244+
pub(crate) observed_attestations: RwLock<ObservedAggregateAttestations<T::EthSpec>>,
245+
/// Contains a store of sync contributions which have been observed by the beacon chain.
246+
pub(crate) observed_sync_contributions: RwLock<ObservedSyncContributions<T::EthSpec>>,
227247
/// Maintains a record of which validators have been seen to attest in recent epochs.
228248
pub(crate) observed_attesters: RwLock<ObservedAttesters<T::EthSpec>>,
249+
/// Maintains a record of which validators have been seen sending sync messages in recent epochs.
250+
pub(crate) observed_sync_contributors: RwLock<ObservedSyncContributors<T::EthSpec>>,
229251
/// Maintains a record of which validators have been seen to create `SignedAggregateAndProofs`
230252
/// in recent epochs.
231253
pub(crate) observed_aggregators: RwLock<ObservedAggregators<T::EthSpec>>,
254+
/// Maintains a record of which validators have been seen to create `SignedContributionAndProofs`
255+
/// in recent epochs.
256+
pub(crate) observed_sync_aggregators: RwLock<ObservedSyncAggregators<T::EthSpec>>,
232257
/// Maintains a record of which validators have proposed blocks for each slot.
233258
pub(crate) observed_block_producers: RwLock<ObservedBlockProducers<T::EthSpec>>,
234259
/// Maintains a record of which validators have submitted voluntary exits.
@@ -823,6 +848,80 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
823848
})
824849
}
825850

851+
/// Return the sync committee at `slot + 1` from the canonical chain.
852+
///
853+
/// This is useful when dealing with sync committee messages, because messages are signed
854+
/// and broadcast one slot prior to the slot of the sync committee (which is relevant at
855+
/// sync committee period boundaries).
856+
pub fn sync_committee_at_next_slot(
857+
&self,
858+
slot: Slot,
859+
) -> Result<Arc<SyncCommittee<T::EthSpec>>, Error> {
860+
let epoch = slot.safe_add(1)?.epoch(T::EthSpec::slots_per_epoch());
861+
self.sync_committee_at_epoch(epoch)
862+
}
863+
864+
/// Return the sync committee at `epoch` from the canonical chain.
865+
pub fn sync_committee_at_epoch(
866+
&self,
867+
epoch: Epoch,
868+
) -> Result<Arc<SyncCommittee<T::EthSpec>>, Error> {
869+
// Try to read a committee from the head. This will work most of the time, but will fail
870+
// for faraway committees, or if there are skipped slots at the transition to Altair.
871+
let spec = &self.spec;
872+
let committee_from_head =
873+
self.with_head(
874+
|head| match head.beacon_state.get_built_sync_committee(epoch, spec) {
875+
Ok(committee) => Ok(Some(committee.clone())),
876+
Err(BeaconStateError::SyncCommitteeNotKnown { .. })
877+
| Err(BeaconStateError::IncorrectStateVariant) => Ok(None),
878+
Err(e) => Err(Error::from(e)),
879+
},
880+
)?;
881+
882+
if let Some(committee) = committee_from_head {
883+
Ok(committee)
884+
} else {
885+
// Slow path: load a state (or advance the head).
886+
let sync_committee_period = epoch.sync_committee_period(spec)?;
887+
let committee = self
888+
.state_for_sync_committee_period(sync_committee_period)?
889+
.get_built_sync_committee(epoch, spec)?
890+
.clone();
891+
Ok(committee)
892+
}
893+
}
894+
895+
/// Load a state suitable for determining the sync committee for the given period.
896+
///
897+
/// Specifically, the state at the start of the *previous* sync committee period.
898+
///
899+
/// This is sufficient for historical duties, and efficient in the case where the head
900+
/// is lagging the current period and we need duties for the next period (because we only
901+
/// have to transition the head to start of the current period).
902+
///
903+
/// We also need to ensure that the load slot is after the Altair fork.
904+
///
905+
/// **WARNING**: the state returned will have dummy state roots. It should only be used
906+
/// for its sync committees (determining duties, etc).
907+
pub fn state_for_sync_committee_period(
908+
&self,
909+
sync_committee_period: u64,
910+
) -> Result<BeaconState<T::EthSpec>, Error> {
911+
let altair_fork_epoch = self
912+
.spec
913+
.altair_fork_epoch
914+
.ok_or(Error::AltairForkDisabled)?;
915+
916+
let load_slot = std::cmp::max(
917+
self.spec.epochs_per_sync_committee_period * sync_committee_period.saturating_sub(1),
918+
altair_fork_epoch,
919+
)
920+
.start_slot(T::EthSpec::slots_per_epoch());
921+
922+
self.state_at_slot(load_slot, StateSkipConfig::WithoutStateRoots)
923+
}
924+
826925
/// Returns info representing the head block and state.
827926
///
828927
/// A summarized version of `Self::head` that involves less cloning.
@@ -1270,6 +1369,36 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
12701369
})
12711370
}
12721371

1372+
/// Accepts some `SyncCommitteeMessage` from the network and attempts to verify it, returning `Ok(_)` if
1373+
/// it is valid to be (re)broadcast on the gossip network.
1374+
pub fn verify_sync_committee_message_for_gossip(
1375+
&self,
1376+
sync_message: SyncCommitteeMessage,
1377+
subnet_id: SyncSubnetId,
1378+
) -> Result<VerifiedSyncCommitteeMessage, SyncCommitteeError> {
1379+
metrics::inc_counter(&metrics::SYNC_MESSAGE_PROCESSING_REQUESTS);
1380+
let _timer = metrics::start_timer(&metrics::SYNC_MESSAGE_GOSSIP_VERIFICATION_TIMES);
1381+
1382+
VerifiedSyncCommitteeMessage::verify(sync_message, subnet_id, self).map(|v| {
1383+
metrics::inc_counter(&metrics::SYNC_MESSAGE_PROCESSING_SUCCESSES);
1384+
v
1385+
})
1386+
}
1387+
1388+
/// Accepts some `SignedContributionAndProof` from the network and attempts to verify it,
1389+
/// returning `Ok(_)` if it is valid to be (re)broadcast on the gossip network.
1390+
pub fn verify_sync_contribution_for_gossip(
1391+
&self,
1392+
sync_contribution: SignedContributionAndProof<T::EthSpec>,
1393+
) -> Result<VerifiedSyncContribution<T>, SyncCommitteeError> {
1394+
metrics::inc_counter(&metrics::SYNC_CONTRIBUTION_PROCESSING_REQUESTS);
1395+
let _timer = metrics::start_timer(&metrics::SYNC_CONTRIBUTION_GOSSIP_VERIFICATION_TIMES);
1396+
VerifiedSyncContribution::verify(sync_contribution, self).map(|v| {
1397+
metrics::inc_counter(&metrics::SYNC_CONTRIBUTION_PROCESSING_SUCCESSES);
1398+
v
1399+
})
1400+
}
1401+
12731402
/// Accepts some attestation-type object and attempts to verify it in the context of fork
12741403
/// choice. If it is valid it is applied to `self.fork_choice`.
12751404
///
@@ -1339,6 +1468,70 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
13391468
Ok(unaggregated_attestation)
13401469
}
13411470

1471+
/// Accepts a `VerifiedSyncCommitteeMessage` and attempts to apply it to the "naive
1472+
/// aggregation pool".
1473+
///
1474+
/// The naive aggregation pool is used by local validators to produce
1475+
/// `SignedContributionAndProof`.
1476+
///
1477+
/// If the sync message is too old (low slot) to be included in the pool it is simply dropped
1478+
/// and no error is returned.
1479+
pub fn add_to_naive_sync_aggregation_pool(
1480+
&self,
1481+
verified_sync_committee_message: VerifiedSyncCommitteeMessage,
1482+
) -> Result<VerifiedSyncCommitteeMessage, SyncCommitteeError> {
1483+
let sync_message = verified_sync_committee_message.sync_message();
1484+
let positions_by_subnet_id: &HashMap<SyncSubnetId, Vec<usize>> =
1485+
verified_sync_committee_message.subnet_positions();
1486+
for (subnet_id, positions) in positions_by_subnet_id.iter() {
1487+
for position in positions {
1488+
let _timer =
1489+
metrics::start_timer(&metrics::SYNC_CONTRIBUTION_PROCESSING_APPLY_TO_AGG_POOL);
1490+
let contribution = SyncCommitteeContribution::from_message(
1491+
sync_message,
1492+
subnet_id.into(),
1493+
*position,
1494+
)?;
1495+
1496+
match self
1497+
.naive_sync_aggregation_pool
1498+
.write()
1499+
.insert(&contribution)
1500+
{
1501+
Ok(outcome) => trace!(
1502+
self.log,
1503+
"Stored unaggregated sync committee message";
1504+
"outcome" => ?outcome,
1505+
"index" => sync_message.validator_index,
1506+
"slot" => sync_message.slot.as_u64(),
1507+
),
1508+
Err(NaiveAggregationError::SlotTooLow {
1509+
slot,
1510+
lowest_permissible_slot,
1511+
}) => {
1512+
trace!(
1513+
self.log,
1514+
"Refused to store unaggregated sync committee message";
1515+
"lowest_permissible_slot" => lowest_permissible_slot.as_u64(),
1516+
"slot" => slot.as_u64(),
1517+
);
1518+
}
1519+
Err(e) => {
1520+
error!(
1521+
self.log,
1522+
"Failed to store unaggregated sync committee message";
1523+
"error" => ?e,
1524+
"index" => sync_message.validator_index,
1525+
"slot" => sync_message.slot.as_u64(),
1526+
);
1527+
return Err(Error::from(e).into());
1528+
}
1529+
};
1530+
}
1531+
}
1532+
Ok(verified_sync_committee_message)
1533+
}
1534+
13421535
/// Accepts a `VerifiedAggregatedAttestation` and attempts to apply it to `self.op_pool`.
13431536
///
13441537
/// The op pool is used by local block producers to pack blocks with operations.
@@ -1368,6 +1561,26 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
13681561
Ok(signed_aggregate)
13691562
}
13701563

1564+
/// Accepts a `VerifiedSyncContribution` and attempts to apply it to `self.op_pool`.
1565+
///
1566+
/// The op pool is used by local block producers to pack blocks with operations.
1567+
pub fn add_contribution_to_block_inclusion_pool(
1568+
&self,
1569+
contribution: VerifiedSyncContribution<T>,
1570+
) -> Result<(), SyncCommitteeError> {
1571+
let _timer = metrics::start_timer(&metrics::SYNC_CONTRIBUTION_PROCESSING_APPLY_TO_OP_POOL);
1572+
1573+
// If there's no eth1 chain then it's impossible to produce blocks and therefore
1574+
// useless to put things in the op pool.
1575+
if self.eth1_chain.is_some() {
1576+
self.op_pool
1577+
.insert_sync_contribution(contribution.contribution())
1578+
.map_err(Error::from)?;
1579+
}
1580+
1581+
Ok(())
1582+
}
1583+
13711584
/// Filter an attestation from the op pool for shuffling compatibility.
13721585
///
13731586
/// Use the provided `filter_cache` map to memoize results.
@@ -1818,11 +2031,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
18182031
// Iterate through the attestations in the block and register them as an "observed
18192032
// attestation". This will stop us from propagating them on the gossip network.
18202033
for a in signed_block.message().body().attestations() {
1821-
match self
1822-
.observed_attestations
1823-
.write()
1824-
.observe_attestation(a, None)
1825-
{
2034+
match self.observed_attestations.write().observe_item(a, None) {
18262035
// If the observation was successful or if the slot for the attestation was too
18272036
// low, continue.
18282037
//

0 commit comments

Comments
 (0)