Skip to content

Commit 2ae7861

Browse files
committed
Add Subnet enum to include SyncCommittee subnet_ids
1 parent 40685d7 commit 2ae7861

File tree

10 files changed

+123
-83
lines changed

10 files changed

+123
-83
lines changed

beacon_node/eth2_libp2p/src/behaviour/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::behaviour::gossipsub_scoring_parameters::PeerScoreSettings;
21
use crate::peer_manager::{
32
score::{PeerAction, ReportSource},
43
ConnectionDirection, PeerManager, PeerManagerEvent,
@@ -10,6 +9,7 @@ use crate::types::{
109
SubnetDiscovery,
1110
};
1211
use crate::Eth2Enr;
12+
use crate::{behaviour::gossipsub_scoring_parameters::PeerScoreSettings, Subnet};
1313
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
1414
use futures::prelude::*;
1515
use handler::{BehaviourHandler, BehaviourHandlerIn, DelegateIn, DelegateOut};
@@ -327,7 +327,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
327327
}
328328

329329
/// Subscribes to a specific subnet id;
330-
pub fn subscribe_to_subnet(&mut self, subnet_id: SubnetId) -> bool {
330+
pub fn subscribe_to_subnet(&mut self, subnet_id: Subnet) -> bool {
331331
let topic = GossipTopic::new(
332332
subnet_id.into(),
333333
GossipEncoding::default(),
@@ -337,7 +337,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
337337
}
338338

339339
/// Un-Subscribes from a specific subnet id;
340-
pub fn unsubscribe_from_subnet(&mut self, subnet_id: SubnetId) -> bool {
340+
pub fn unsubscribe_from_subnet(&mut self, subnet_id: Subnet) -> bool {
341341
let topic = GossipTopic::new(
342342
subnet_id.into(),
343343
GossipEncoding::default(),

beacon_node/eth2_libp2p/src/discovery/mod.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,15 @@ pub use enr_ext::{peer_id_to_node_id, CombinedKeyExt, EnrExt};
1111
pub use libp2p::core::identity::{Keypair, PublicKey};
1212

1313
use crate::{config, metrics};
14-
use crate::{error, Enr, NetworkConfig, NetworkGlobals, SubnetDiscovery};
14+
use crate::{error, Enr, NetworkConfig, NetworkGlobals, Subnet, SubnetDiscovery};
1515
use discv5::{enr::NodeId, Discv5, Discv5Event};
1616
use enr::{ATTESTATION_BITFIELD_ENR_KEY, ETH2_ENR_KEY, SYNC_COMMITTEE_BITFIELD_ENR_KEY};
1717
use futures::prelude::*;
1818
use futures::stream::FuturesUnordered;
1919
use libp2p::core::PeerId;
2020
use lru::LruCache;
2121
use slog::{crit, debug, error, info, warn};
22-
use ssz::{Decode, Encode};
23-
use ssz_types::BitVector;
22+
use ssz::Encode;
2423
use std::{
2524
collections::{HashMap, VecDeque},
2625
net::{IpAddr, SocketAddr},
@@ -67,7 +66,7 @@ pub enum DiscoveryEvent {
6766

6867
#[derive(Debug, Clone, PartialEq)]
6968
struct SubnetQuery {
70-
subnet_id: SubnetId,
69+
subnet_id: Subnet,
7170
min_ttl: Option<Instant>,
7271
retries: usize,
7372
}
@@ -576,7 +575,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
576575

577576
/// Adds a subnet query if one doesn't exist. If a subnet query already exists, this
578577
/// updates the min_ttl field.
579-
fn add_subnet_query(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>, retries: usize) {
578+
fn add_subnet_query(&mut self, subnet_id: Subnet, min_ttl: Option<Instant>, retries: usize) {
580579
// remove the entry and complete the query if greater than the maximum search count
581580
if retries > MAX_DISCOVERY_RETRY {
582581
debug!(
@@ -611,7 +610,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
611610
retries,
612611
});
613612
// update the metrics and insert into the queue.
614-
debug!(self.log, "Queuing subnet query"; "subnet" => *subnet_id, "retries" => retries);
613+
debug!(self.log, "Queuing subnet query"; "subnet" => ?subnet_id, "retries" => retries);
615614
self.queued_queries.push_back(query);
616615
metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64);
617616
}
@@ -690,7 +689,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
690689

691690
/// Runs a discovery request for a given group of subnets.
692691
fn start_subnet_query(&mut self, subnet_queries: Vec<SubnetQuery>) {
693-
let mut filtered_subnet_ids: Vec<SubnetId> = Vec::new();
692+
let mut filtered_subnet_ids: Vec<Subnet> = Vec::new();
694693

695694
// find subnet queries that are still necessary
696695
let filtered_subnet_queries: Vec<SubnetQuery> = subnet_queries
@@ -715,7 +714,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
715714

716715
let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet;
717716
debug!(self.log, "Discovery query started for subnet";
718-
"subnet_id" => *subnet_query.subnet_id,
717+
"subnet_id" => ?subnet_query.subnet_id,
719718
"connected_peers_on_subnet" => peers_on_subnet,
720719
"target_subnet_peers" => TARGET_SUBNET_PEERS,
721720
"peers_to_find" => target_peers,
@@ -823,7 +822,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
823822
}
824823
}
825824
GroupedQueryType::Subnet(queries) => {
826-
let subnets_searched_for: Vec<SubnetId> =
825+
let subnets_searched_for: Vec<Subnet> =
827826
queries.iter().map(|query| query.subnet_id).collect();
828827
match query_result.1 {
829828
Ok(r) if r.is_empty() => {
@@ -1001,7 +1000,7 @@ mod tests {
10011000
use enr::EnrBuilder;
10021001
use slog::{o, Drain};
10031002
use std::net::UdpSocket;
1004-
use types::MinimalEthSpec;
1003+
use types::{BitVector, MinimalEthSpec};
10051004

10061005
type E = MinimalEthSpec;
10071006

@@ -1053,7 +1052,7 @@ mod tests {
10531052
let mut discovery = build_discovery().await;
10541053
let now = Instant::now();
10551054
let mut subnet_query = SubnetQuery {
1056-
subnet_id: SubnetId::new(1),
1055+
subnet_id: Subnet::Attestation(SubnetId::new(1)),
10571056
min_ttl: Some(now),
10581057
retries: 0,
10591058
};
@@ -1100,7 +1099,7 @@ mod tests {
11001099

11011100
let now = Instant::now();
11021101
let subnet_query = SubnetQuery {
1103-
subnet_id: SubnetId::new(1),
1102+
subnet_id: Subnet::Attestation(SubnetId::new(1)),
11041103
min_ttl: Some(now + Duration::from_secs(10)),
11051104
retries: 0,
11061105
};
@@ -1147,12 +1146,12 @@ mod tests {
11471146

11481147
let query = GroupedQueryType::Subnet(vec![
11491148
SubnetQuery {
1150-
subnet_id: SubnetId::new(1),
1149+
subnet_id: Subnet::Attestation(SubnetId::new(1)),
11511150
min_ttl: instant1,
11521151
retries: 0,
11531152
},
11541153
SubnetQuery {
1155-
subnet_id: SubnetId::new(2),
1154+
subnet_id: Subnet::Attestation(SubnetId::new(2)),
11561155
min_ttl: instant2,
11571156
retries: 0,
11581157
},
Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
///! The subnet predicate used for searching for a particular subnet.
22
use super::*;
3+
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
34
use slog::trace;
45
use std::ops::Deref;
56

67
/// Returns the predicate for a given subnet.
78
pub fn subnet_predicate<TSpec>(
8-
subnet_ids: Vec<SubnetId>,
9+
subnet_ids: Vec<Subnet>,
910
log: &slog::Logger,
1011
) -> impl Fn(&Enr) -> bool + Send
1112
where
@@ -14,39 +15,33 @@ where
1415
let log_clone = log.clone();
1516

1617
move |enr: &Enr| {
17-
if let Some(bitfield_bytes) = enr.get(ATTESTATION_BITFIELD_ENR_KEY) {
18-
let bitfield = match BitVector::<TSpec::SubnetBitfieldLength>::from_ssz_bytes(
19-
bitfield_bytes,
20-
) {
21-
Ok(v) => v,
22-
Err(e) => {
23-
warn!(log_clone, "Could not decode ENR bitfield for peer"; "peer_id" => format!("{}", enr.peer_id()), "error" => format!("{:?}", e));
24-
return false;
25-
}
18+
let attestation_bitfield: EnrAttestationBitfield<TSpec> =
19+
match enr.attestation_bitfield::<TSpec>() {
20+
Ok(b) => b,
21+
Err(_e) => return false,
2622
};
2723

28-
let matches: Vec<&SubnetId> = subnet_ids
29-
.iter()
30-
.filter(|id| bitfield.get(**id.deref() as usize).unwrap_or(false))
31-
.collect();
24+
// Pre-fork/fork-boundary enrs may not contain a syncnets field.
25+
// Don't return early here
26+
let sync_committee_bitfield: Result<EnrSyncCommitteeBitfield<TSpec>, _> =
27+
enr.sync_committee_bitfield::<TSpec>();
3228

33-
if matches.is_empty() {
34-
trace!(
35-
log_clone,
36-
"Peer found but not on any of the desired subnets";
37-
"peer_id" => %enr.peer_id()
38-
);
39-
return false;
40-
} else {
41-
trace!(
42-
log_clone,
43-
"Peer found on desired subnet(s)";
44-
"peer_id" => %enr.peer_id(),
45-
"subnets" => ?matches.as_slice()
46-
);
47-
return true;
48-
}
29+
let predicate = subnet_ids.iter().any(|subnet| match subnet {
30+
Subnet::Attestation(s) => attestation_bitfield
31+
.get(*s.deref() as usize)
32+
.unwrap_or(false),
33+
Subnet::SyncCommittee(s) => sync_committee_bitfield
34+
.as_ref()
35+
.map_or(false, |b| b.get(*s.deref() as usize).unwrap_or(false)),
36+
});
37+
38+
if !predicate {
39+
trace!(
40+
log_clone,
41+
"Peer found but not on any of the desired subnets";
42+
"peer_id" => %enr.peer_id()
43+
);
4944
}
50-
false
45+
return predicate;
5146
}
5247
}

beacon_node/eth2_libp2p/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ impl<'de> Deserialize<'de> for PeerIdSerialized {
6060
}
6161
}
6262

63-
pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage, SubnetDiscovery};
63+
pub use crate::types::{
64+
error, Enr, GossipTopic, NetworkGlobals, PubsubMessage, Subnet, SubnetDiscovery,
65+
};
6466
pub use behaviour::{BehaviourEvent, Gossipsub, PeerRequestId, Request, Response};
6567
pub use config::Config as NetworkConfig;
6668
pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr};

beacon_node/eth2_libp2p/src/peer_manager/mod.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
//! Implementation of a Lighthouse's peer management system.
22
33
pub use self::peerdb::*;
4-
use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS};
54
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
65
use crate::types::SyncState;
6+
use crate::{
7+
discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS},
8+
Subnet,
9+
};
710
use crate::{error, metrics, Gossipsub};
811
use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId, SubnetDiscovery};
912
use futures::prelude::*;
@@ -20,7 +23,7 @@ use std::{
2023
task::{Context, Poll},
2124
time::{Duration, Instant},
2225
};
23-
use types::{EthSpec, SubnetId};
26+
use types::EthSpec;
2427

2528
pub use libp2p::core::{identity::Keypair, Multiaddr};
2629

@@ -241,14 +244,14 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
241244
self.network_globals
242245
.peers
243246
.write()
244-
.extend_peers_on_subnet(s.subnet_id, min_ttl);
247+
.extend_peers_on_subnet(&s.subnet_id, min_ttl);
245248
}
246249
// Already have target number of peers, no need for subnet discovery
247250
let peers_on_subnet = self
248251
.network_globals
249252
.peers
250253
.read()
251-
.good_peers_on_subnet(s.subnet_id)
254+
.good_peers_on_subnet(s.subnet_id.clone())
252255
.count();
253256
if peers_on_subnet >= TARGET_SUBNET_PEERS {
254257
trace!(
@@ -282,14 +285,14 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
282285
}
283286

284287
/// Adds a gossipsub subscription to a peer in the peerdb.
285-
pub fn add_subscription(&self, peer_id: &PeerId, subnet_id: SubnetId) {
288+
pub fn add_subscription(&self, peer_id: &PeerId, subnet_id: Subnet) {
286289
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
287290
info.subnets.insert(subnet_id);
288291
}
289292
}
290293

291294
/// Removes a gossipsub subscription to a peer in the peerdb.
292-
pub fn remove_subscription(&self, peer_id: &PeerId, subnet_id: SubnetId) {
295+
pub fn remove_subscription(&self, peer_id: &PeerId, subnet_id: Subnet) {
293296
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
294297
info.subnets.remove(&subnet_id);
295298
}
@@ -658,7 +661,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
658661

659662
/// Dial cached enrs in discovery service that are in the given `subnet_id` and aren't
660663
/// in Connected, Dialing or Banned state.
661-
fn dial_cached_enrs_in_subnet(&mut self, subnet_id: SubnetId) {
664+
fn dial_cached_enrs_in_subnet(&mut self, subnet_id: Subnet) {
662665
let predicate = subnet_predicate::<TSpec>(vec![subnet_id], &self.log);
663666
let peers_to_dial: Vec<PeerId> = self
664667
.discovery()

beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use super::client::Client;
22
use super::score::{PeerAction, Score, ScoreState};
33
use super::PeerSyncStatus;
4-
use crate::rpc::MetaData;
54
use crate::Multiaddr;
5+
use crate::{rpc::MetaData, types::Subnet};
66
use discv5::Enr;
77
use serde::{
88
ser::{SerializeStruct, Serializer},
@@ -12,7 +12,7 @@ use std::collections::HashSet;
1212
use std::net::{IpAddr, SocketAddr};
1313
use std::time::Instant;
1414
use strum::AsRefStr;
15-
use types::{EthSpec, SubnetId};
15+
use types::EthSpec;
1616
use PeerConnectionStatus::*;
1717

1818
/// Information about a given connected peer.
@@ -40,7 +40,7 @@ pub struct PeerInfo<T: EthSpec> {
4040
/// connection.
4141
pub meta_data: Option<MetaData<T>>,
4242
/// Subnets the peer is connected to.
43-
pub subnets: HashSet<SubnetId>,
43+
pub subnets: HashSet<Subnet>,
4444
/// The time we would like to retain this peer. After this time, the peer is no longer
4545
/// necessary.
4646
#[serde(skip)]
@@ -85,15 +85,21 @@ impl<T: EthSpec> PeerInfo<T> {
8585
}
8686

8787
/// Returns if the peer is subscribed to a given `SubnetId` from the metadata attnets field.
88-
pub fn on_subnet_metadata(&self, subnet_id: SubnetId) -> bool {
88+
pub fn on_subnet_metadata(&self, subnet_id: &Subnet) -> bool {
8989
if let Some(meta_data) = &self.meta_data {
90-
return meta_data.attnets.get(*subnet_id as usize).unwrap_or(false);
90+
match subnet_id {
91+
Subnet::Attestation(id) => {
92+
return meta_data.attnets.get(**id as usize).unwrap_or(false)
93+
}
94+
// TODO(pawan): add syncnets to metadata
95+
Subnet::SyncCommittee(_id) => unimplemented!(),
96+
}
9197
}
9298
false
9399
}
94100

95101
/// Returns if the peer is subscribed to a given `SubnetId` from the gossipsub subscriptions.
96-
pub fn on_subnet_gossipsub(&self, subnet_id: SubnetId) -> bool {
102+
pub fn on_subnet_gossipsub(&self, subnet_id: &Subnet) -> bool {
97103
self.subnets.contains(&subnet_id)
98104
}
99105

0 commit comments

Comments
 (0)