Skip to content

Commit 5b6fe3d

Browse files
committed
Fix metadata encoding; fmt
1 parent 60167f0 commit 5b6fe3d

File tree

2 files changed

+116
-37
lines changed

2 files changed

+116
-37
lines changed

beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs

Lines changed: 90 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use crate::rpc::methods::*;
21
use crate::rpc::{
32
codec::base::OutboundCodec,
43
protocol::{Encoding, Protocol, ProtocolId, RPCError, Version, ERROR_TYPE_MAX, ERROR_TYPE_MIN},
54
};
65
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
6+
use crate::{rpc::methods::*, EnrSyncCommitteeBitfield};
77
use libp2p::bytes::BytesMut;
88
use snap::read::FrameDecoder;
99
use snap::write::FrameEncoder;
@@ -78,7 +78,21 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<
7878
attnets: res.attnets().clone(),
7979
})
8080
.as_ssz_bytes(),
81-
Version::V2 => res.as_ssz_bytes(),
81+
Version::V2 => {
82+
// `res` is of type MetaDataV2, return the ssz bytes
83+
if res.syncnets().is_ok() {
84+
res.as_ssz_bytes()
85+
} else {
86+
// `res` is of type MetaDataV1, create a MetaDataV2 by adding a default syncnets field
87+
// Note: This code path is redundant as `res` would be always of type MetaDataV2
88+
MetaData::<TSpec>::V2(MetaDataV2 {
89+
seq_number: *res.seq_number(),
90+
attnets: res.attnets().clone(),
91+
syncnets: EnrSyncCommitteeBitfield::<TSpec>::default(),
92+
})
93+
.as_ssz_bytes()
94+
}
95+
}
8296
}
8397
}
8498
},
@@ -175,6 +189,8 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
175189
let mut reader = FrameDecoder::new(limit_reader);
176190
let mut decoded_buffer = vec![0; length];
177191

192+
dbg!(&self.protocol);
193+
178194
match reader.read_exact(&mut decoded_buffer) {
179195
Ok(()) => {
180196
// `n` is how many bytes the reader read in the compressed stream
@@ -203,6 +219,9 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
203219
Protocol::Ping => Ok(Some(RPCRequest::Ping(Ping {
204220
data: u64::from_ssz_bytes(&decoded_buffer)?,
205221
}))),
222+
223+
// MetaData requests return early from InboundUpgrade and do not reach the decoder.
224+
// Handle this case just for completeness.
206225
Protocol::MetaData => {
207226
if !decoded_buffer.is_empty() {
208227
Err(RPCError::InvalidData)
@@ -212,26 +231,30 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
212231
}
213232
},
214233
// Receiving a Rpc request for protocol version 2 for range and root requests
215-
Version::V2 => {
216-
match self.protocol.message_name {
217-
// Request type doesn't change, only response type
218-
Protocol::BlocksByRange => Ok(Some(RPCRequest::BlocksByRange(
219-
BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?,
220-
))),
221-
Protocol::BlocksByRoot => {
222-
Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
223-
block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?,
224-
})))
234+
Version::V2 => match self.protocol.message_name {
235+
// Request type doesn't change, only response type
236+
Protocol::BlocksByRange => Ok(Some(RPCRequest::BlocksByRange(
237+
BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?,
238+
))),
239+
Protocol::BlocksByRoot => {
240+
Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
241+
block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?,
242+
})))
243+
}
244+
// MetaData requests return early from InboundUpgrade and do not reach the decoder.
245+
// Handle this case just for completeness.
246+
Protocol::MetaData => {
247+
if !decoded_buffer.is_empty() {
248+
Err(RPCError::InvalidData)
249+
} else {
250+
Ok(Some(RPCRequest::MetaData(PhantomData)))
225251
}
226-
_ => Err(RPCError::ErrorResponse(
227-
RPCResponseErrorCode::InvalidRequest,
228-
format!(
229-
"{} does not support version 2",
230-
self.protocol.message_name
231-
),
232-
)),
233252
}
234-
}
253+
_ => Err(RPCError::ErrorResponse(
254+
RPCResponseErrorCode::InvalidRequest,
255+
format!("{} does not support version 2", self.protocol.message_name),
256+
)),
257+
},
235258
}
236259
}
237260
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
@@ -547,7 +570,7 @@ mod tests {
547570
use crate::rpc::{protocol::*, MetaData};
548571
use crate::{
549572
rpc::{methods::StatusMessage, Ping, RPCResponseErrorCode},
550-
types::EnrAttestationBitfield,
573+
types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield},
551574
};
552575
use std::sync::Arc;
553576
use types::{
@@ -597,6 +620,14 @@ mod tests {
597620
})
598621
}
599622

623+
fn metadata_v2() -> MetaData<Spec> {
624+
MetaData::V2(MetaDataV2 {
625+
seq_number: 1,
626+
attnets: EnrAttestationBitfield::<Spec>::default(),
627+
syncnets: EnrSyncCommitteeBitfield::<Spec>::default(),
628+
})
629+
}
630+
600631
/// Encodes the given protocol response as bytes.
601632
fn encode(
602633
protocol: Protocol,
@@ -714,9 +745,27 @@ mod tests {
714745
Ok(Some(RPCResponse::MetaData(metadata()))),
715746
);
716747

717-
// TODO: add metadataV2 response failure case
748+
assert_eq!(
749+
encode_then_decode(
750+
Protocol::MetaData,
751+
Version::V1,
752+
RPCCodedResponse::Success(RPCResponse::MetaData(metadata())),
753+
),
754+
Ok(Some(RPCResponse::MetaData(metadata()))),
755+
);
756+
757+
// A MetaDataV2 still encodes as a MetaDataV1 since version is Version::V1
758+
assert_eq!(
759+
encode_then_decode(
760+
Protocol::MetaData,
761+
Version::V1,
762+
RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())),
763+
),
764+
Ok(Some(RPCResponse::MetaData(metadata()))),
765+
);
718766
}
719767

768+
// Test RPCResponse encoding/decoding for V1 messages
720769
#[test]
721770
fn test_encode_then_decode_v2() {
722771
assert!(
@@ -780,6 +829,25 @@ mod tests {
780829
),
781830
Ok(Some(RPCResponse::BlocksByRoot(Box::new(altair_block()))))
782831
);
832+
833+
// A MetaDataV1 still encodes as a MetaDataV2 since version is Version::V2
834+
assert_eq!(
835+
encode_then_decode(
836+
Protocol::MetaData,
837+
Version::V2,
838+
RPCCodedResponse::Success(RPCResponse::MetaData(metadata()))
839+
),
840+
Ok(Some(RPCResponse::MetaData(metadata_v2())))
841+
);
842+
843+
assert_eq!(
844+
encode_then_decode(
845+
Protocol::MetaData,
846+
Version::V2,
847+
RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2()))
848+
),
849+
Ok(Some(RPCResponse::MetaData(metadata_v2())))
850+
);
783851
}
784852

785853
#[test]

beacon_node/network/src/service.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,24 @@ use crate::{
66
};
77
use crate::{error, metrics};
88
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
9-
use eth2_libp2p::{Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response, Subnet, rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId}};
10-
use eth2_libp2p::{types::{GossipEncoding, GossipTopic}, BehaviourEvent, MessageId, NetworkGlobals, PeerId};
9+
use eth2_libp2p::{
10+
rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId},
11+
Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response, Subnet,
12+
};
13+
use eth2_libp2p::{
14+
types::{GossipEncoding, GossipTopic},
15+
BehaviourEvent, MessageId, NetworkGlobals, PeerId,
16+
};
1117
use eth2_libp2p::{MessageAcceptance, Service as LibP2PService};
1218
use futures::prelude::*;
13-
use slog::{debug, error, info, o, trace, warn, crit};
19+
use slog::{crit, debug, error, info, o, trace, warn};
1420
use std::{net::SocketAddr, sync::Arc, time::Duration};
1521
use store::HotColdDB;
1622
use tokio::sync::mpsc;
1723
use tokio::time::Sleep;
18-
use types::{EthSpec, ForkContext, RelativeEpoch, SubnetId, Unsigned, ValidatorSubscription, ChainSpec};
24+
use types::{
25+
ChainSpec, EthSpec, ForkContext, RelativeEpoch, SubnetId, Unsigned, ValidatorSubscription,
26+
};
1927

2028
mod tests;
2129

@@ -243,9 +251,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
243251
/// digests since we should be subscribed to post fork topics before the fork.
244252
/// TODO(pawan): use ForkContext to get required gossip fork digests instead of using slots
245253
pub fn required_gossip_fork_digests(&self) -> Vec<[u8; 4]> {
246-
let current_slot = match self
247-
.beacon_chain
248-
.slot() {
254+
let current_slot = match self.beacon_chain.slot() {
249255
Ok(current_slot) => current_slot,
250256
Err(e) => {
251257
crit!(self.log, "Failed to get current slot, returning all fork_digests"; "error" => ?e);
@@ -258,13 +264,18 @@ impl<T: BeaconChainTypes> NetworkService<T> {
258264
if current_slot < altair_fork_slot {
259265
// Return both pre-altair and post-altair fork digests if altair hasn't happened yet
260266
return self.fork_context.all_fork_digests();
261-
}
262-
else {
267+
} else {
263268
// Return only altair if altair_fork_slot is passed
264-
return vec![ChainSpec::compute_fork_digest(spec.altair_fork_version, genesis_validators_root)]
269+
return vec![ChainSpec::compute_fork_digest(
270+
spec.altair_fork_version,
271+
genesis_validators_root,
272+
)];
265273
}
266274
}
267-
vec![ChainSpec::compute_fork_digest(spec.genesis_fork_version, genesis_validators_root)]
275+
vec![ChainSpec::compute_fork_digest(
276+
spec.genesis_fork_version,
277+
genesis_validators_root,
278+
)]
268279
}
269280
}
270281

@@ -423,7 +434,6 @@ fn spawn_service<T: BeaconChainTypes>(
423434
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic_kind);
424435
}
425436
}
426-
427437
}
428438

429439
// if we are to subscribe to all subnets we do it here
@@ -628,15 +638,16 @@ fn next_fork_delay<T: BeaconChainTypes>(
628638

629639
/// Returns a `Sleep` that triggers `UNSUBSCRIBE_DELAY` epochs after change in the beacon chain fork version.
630640
/// If there is no scheduled fork, `None` is returned.
631-
fn topic_unsubscribe_delay<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) -> Option<tokio::time::Sleep> {
641+
fn topic_unsubscribe_delay<T: BeaconChainTypes>(
642+
beacon_chain: &BeaconChain<T>,
643+
) -> Option<tokio::time::Sleep> {
632644
beacon_chain.duration_to_next_fork().map(|until_fork| {
633645
let epoch_duration = beacon_chain.spec.seconds_per_slot * T::EthSpec::slots_per_epoch();
634-
let delay = Duration::from_secs( UNSUBSCRIBE_DELAY * epoch_duration) ;
646+
let delay = Duration::from_secs(UNSUBSCRIBE_DELAY * epoch_duration);
635647
tokio::time::sleep_until(tokio::time::Instant::now() + until_fork + delay)
636648
})
637649
}
638650

639-
640651
impl<T: BeaconChainTypes> Drop for NetworkService<T> {
641652
fn drop(&mut self) {
642653
// network thread is terminating

0 commit comments

Comments
 (0)