diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 6e8be5c2b67..c50439c6d30 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -43,7 +43,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use types::{ChainSpec, EnrForkId, EthSpec, Hash256, SignedBeaconBlock, Slot, SubnetId}; +use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, SignedBeaconBlock, Slot, SubnetId}; mod gossipsub_scoring_parameters; mod handler; @@ -136,11 +136,6 @@ pub struct Behaviour { score_settings: PeerScoreSettings, - spec: ChainSpec, - - /// The genesis root for the eth2 network - genesis_validators_root: Hash256, - /// The interval for updating gossipsub scores update_gossipsub_scores: tokio::time::Interval, } @@ -152,7 +147,7 @@ impl Behaviour { net_conf: &NetworkConfig, network_globals: Arc>, log: &slog::Logger, - genesis_validators_root: Hash256, + fork_context: Arc, chain_spec: &ChainSpec, ) -> error::Result { let behaviour_log = log.new(o!()); @@ -225,7 +220,7 @@ impl Behaviour { .expect("Valid score params and thresholds"); Ok(Behaviour { - eth2_rpc: RPC::new(log.clone()), + eth2_rpc: RPC::new(fork_context, log.clone()), gossipsub, identify, peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log) @@ -238,9 +233,7 @@ impl Behaviour { network_dir: net_conf.network_dir.clone(), log: behaviour_log, score_settings, - spec: chain_spec.clone(), update_gossipsub_scores, - genesis_validators_root, }) } diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/base.rs b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs index 0e97157fd34..edfeea726ae 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs @@ -177,16 +177,18 @@ where mod tests { use super::super::ssz_snappy::*; use super::*; - use crate::rpc::methods::StatusMessage; use crate::rpc::protocol::*; - use snap::write::FrameEncoder; - use ssz::Encode; - use std::io::Write; - use types::{Epoch, Hash256, Slot}; + + use std::sync::Arc; + use types::{ForkContext, Hash256}; use unsigned_varint::codec::Uvi; type Spec = types::MainnetEthSpec; + fn fork_context() -> ForkContext { + ForkContext::new(Hash256::zero(), &Spec::default_spec()) + } + #[test] fn test_decode_status_message() { let message = hex::decode("0054ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap(); @@ -196,8 +198,9 @@ mod tests { let snappy_protocol_id = ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); + let fork_context = Arc::new(fork_context()); let mut snappy_outbound_codec = - SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576); + SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576, fork_context); // remove response code let mut snappy_buf = buf.clone(); @@ -229,8 +232,10 @@ mod tests { let snappy_protocol_id = ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); + + let fork_context = Arc::new(fork_context()); let mut snappy_outbound_codec = - SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576); + SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576, fork_context); let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err(); @@ -256,80 +261,34 @@ mod tests { // Response limits let limit = protocol_id.rpc_response_limits::(); let mut max = encode_len(limit.max + 1); - let mut codec = SSZSnappyOutboundCodec::::new(protocol_id.clone(), 1_048_576); + let fork_context = Arc::new(fork_context()); + let mut codec = SSZSnappyOutboundCodec::::new( + protocol_id.clone(), + 1_048_576, + fork_context.clone(), + ); assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData); let mut min = encode_len(limit.min - 1); - let mut codec = SSZSnappyOutboundCodec::::new(protocol_id.clone(), 1_048_576); + let mut codec = SSZSnappyOutboundCodec::::new( + protocol_id.clone(), + 1_048_576, + fork_context.clone(), + ); assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData); // Request limits let limit = protocol_id.rpc_request_limits(); let mut max = encode_len(limit.max + 1); - let mut codec = SSZSnappyOutboundCodec::::new(protocol_id.clone(), 1_048_576); + let mut codec = SSZSnappyOutboundCodec::::new( + protocol_id.clone(), + 1_048_576, + fork_context.clone(), + ); assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData); let mut min = encode_len(limit.min - 1); - let mut codec = SSZSnappyOutboundCodec::::new(protocol_id, 1_048_576); + let mut codec = SSZSnappyOutboundCodec::::new(protocol_id, 1_048_576, fork_context); assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData); } - - #[test] - fn test_decode_malicious_status_message() { - // 10 byte snappy stream identifier - let stream_identifier: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY"; - - assert_eq!(stream_identifier.len(), 10); - - // byte 0(0xFE) is padding chunk type identifier for snappy messages - // byte 1,2,3 are chunk length (little endian) - let malicious_padding: &'static [u8] = b"\xFE\x00\x00\x00"; - - // Status message is 84 bytes uncompressed. `max_compressed_len` is 32 + 84 + 84/6 = 130. - let status_message_bytes = StatusMessage { - fork_digest: [0; 4], - finalized_root: Hash256::from_low_u64_be(0), - finalized_epoch: Epoch::new(1), - head_root: Hash256::from_low_u64_be(0), - head_slot: Slot::new(1), - } - .as_ssz_bytes(); - - assert_eq!(status_message_bytes.len(), 84); - assert_eq!(snap::raw::max_compress_len(status_message_bytes.len()), 130); - - let mut uvi_codec: Uvi = Uvi::default(); - let mut dst = BytesMut::with_capacity(1024); - - // Insert length-prefix - uvi_codec - .encode(status_message_bytes.len(), &mut dst) - .unwrap(); - - // Insert snappy stream identifier - dst.extend_from_slice(stream_identifier); - - // Insert malicious padding of 80 bytes. - for _ in 0..20 { - dst.extend_from_slice(malicious_padding); - } - - // Insert payload (42 bytes compressed) - let mut writer = FrameEncoder::new(Vec::new()); - writer.write_all(&status_message_bytes).unwrap(); - writer.flush().unwrap(); - assert_eq!(writer.get_ref().len(), 42); - dst.extend_from_slice(writer.get_ref()); - - // 10 (for stream identifier) + 80 + 42 = 132 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. - - let snappy_protocol_id = - ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); - - let mut snappy_outbound_codec = - SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576); - - let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err(); - assert_eq!(snappy_decoded_message, RPCError::InvalidData); - } } diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs index 5e0c39250da..621cd62012b 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs @@ -13,8 +13,12 @@ use std::io::Cursor; use std::io::ErrorKind; use std::io::{Read, Write}; use std::marker::PhantomData; +use std::sync::Arc; use tokio_util::codec::{Decoder, Encoder}; -use types::{EthSpec, SignedBeaconBlock, SignedBeaconBlockBase}; +use types::{ + EthSpec, ForkContext, ForkName, SignedBeaconBlock, SignedBeaconBlockAltair, + SignedBeaconBlockBase, +}; use unsigned_varint::codec::Uvi; /* Inbound Codec */ @@ -25,11 +29,16 @@ pub struct SSZSnappyInboundCodec { len: Option, /// Maximum bytes that can be sent in one req/resp chunked responses. max_packet_size: usize, + fork_context: Arc, phantom: PhantomData, } impl SSZSnappyInboundCodec { - pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { + pub fn new( + protocol: ProtocolId, + max_packet_size: usize, + fork_context: Arc, + ) -> Self { let uvi_codec = Uvi::default(); // this encoding only applies to ssz_snappy. debug_assert_eq!(protocol.encoding, Encoding::SSZSnappy); @@ -39,6 +48,7 @@ impl SSZSnappyInboundCodec { protocol, len: None, phantom: PhantomData, + fork_context, max_packet_size, } } @@ -53,8 +63,8 @@ impl Encoder> for SSZSnappyInboundCodec< item: RPCCodedResponse, dst: &mut BytesMut, ) -> Result<(), Self::Error> { - let bytes = match item { - RPCCodedResponse::Success(resp) => match resp { + let bytes = match &item { + RPCCodedResponse::Success(resp) => match &resp { RPCResponse::Status(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), @@ -72,6 +82,38 @@ impl Encoder> for SSZSnappyInboundCodec< "attempting to encode data > max_packet_size", )); } + + // Add the context bytes if required + if self.protocol.has_context_bytes() { + if let RPCCodedResponse::Success(RPCResponse::BlocksByRange(ref res)) = item { + if let SignedBeaconBlock::Altair { .. } = **res { + // Altair context being `None` implies that "altair never happened". + // This code should be unreachable if altair is disabled since only Version::V1 would be valid in that case. + if let Some(ref altair_context) = + self.fork_context.to_context_bytes(ForkName::Altair) + { + dst.extend_from_slice(altair_context); + } + } else if let SignedBeaconBlock::Base { .. } = **res { + dst.extend_from_slice(&self.fork_context.genesis_context_bytes()); + } + } + + if let RPCCodedResponse::Success(RPCResponse::BlocksByRoot(res)) = item { + if let SignedBeaconBlock::Altair { .. } = *res { + // Altair context being `None` implies that "altair never happened". + // This code should be unreachable if altair is disabled since only Version::V1 would be valid in that case. + if let Some(ref altair_context) = + self.fork_context.to_context_bytes(ForkName::Altair) + { + dst.extend_from_slice(altair_context); + } + } else if let SignedBeaconBlock::Base { .. } = *res { + dst.extend_from_slice(&self.fork_context.genesis_context_bytes()); + } + } + } + // Inserts the length prefix of the uncompressed bytes into dst // encoded as a unsigned varint self.inner @@ -131,35 +173,26 @@ impl Decoder for SSZSnappyInboundCodec { // We need not check that decoded_buffer.len() is within bounds here // since we have already checked `length` above. - match self.protocol.message_name { - Protocol::Status => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( - &decoded_buffer, - )?))), - }, - Protocol::Goodbye => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::Goodbye( + match self.protocol.version { + Version::V1 => match self.protocol.message_name { + Protocol::Status => Ok(Some(RPCRequest::Status( + StatusMessage::from_ssz_bytes(&decoded_buffer)?, + ))), + Protocol::Goodbye => Ok(Some(RPCRequest::Goodbye( GoodbyeReason::from_ssz_bytes(&decoded_buffer)?, ))), - }, - Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::BlocksByRange( + Protocol::BlocksByRange => Ok(Some(RPCRequest::BlocksByRange( BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?, ))), - }, - Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { - block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?, - }))), - }, - Protocol::Ping => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::Ping(Ping { + Protocol::BlocksByRoot => { + Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { + block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?, + }))) + } + Protocol::Ping => Ok(Some(RPCRequest::Ping(Ping { data: u64::from_ssz_bytes(&decoded_buffer)?, }))), - }, - // This case should be unreachable as `MetaData` requests are handled separately in the `InboundUpgrade` - Protocol::MetaData => match self.protocol.version { - Version::V1 => { + Protocol::MetaData => { if !decoded_buffer.is_empty() { Err(RPCError::InvalidData) } else { @@ -167,6 +200,27 @@ impl Decoder for SSZSnappyInboundCodec { } } }, + // Receiving a Rpc request for protocol version 2 for range and root requests + Version::V2 => { + match self.protocol.message_name { + // Request type doesn't change, only response type + Protocol::BlocksByRange => Ok(Some(RPCRequest::BlocksByRange( + BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?, + ))), + Protocol::BlocksByRoot => { + Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { + block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?, + }))) + } + _ => Err(RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + format!( + "{} does not support version 2", + self.protocol.message_name + ), + )), + } + } } } Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), @@ -181,11 +235,18 @@ pub struct SSZSnappyOutboundCodec { protocol: ProtocolId, /// Maximum bytes that can be sent in one req/resp chunked responses. max_packet_size: usize, + /// The fork name corresponding to the received context bytes. + fork_name: Option, + fork_context: Arc, phantom: PhantomData, } impl SSZSnappyOutboundCodec { - pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { + pub fn new( + protocol: ProtocolId, + max_packet_size: usize, + fork_context: Arc, + ) -> Self { let uvi_codec = Uvi::default(); // this encoding only applies to ssz_snappy. debug_assert_eq!(protocol.encoding, Encoding::SSZSnappy); @@ -195,6 +256,8 @@ impl SSZSnappyOutboundCodec { protocol, max_packet_size, len: None, + fork_name: None, + fork_context, phantom: PhantomData, } } @@ -246,6 +309,20 @@ impl Decoder for SSZSnappyOutboundCodec { type Error = RPCError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + // Read the context bytes if required + if self.protocol.has_context_bytes() && self.fork_name.is_none() { + if src.len() >= 4 { + let context_bytes = src.split_to(4); + let mut result = [0; 4]; + result.copy_from_slice(&context_bytes.as_ref()); + self.fork_name = Some(context_bytes_to_fork_name( + result, + self.fork_context.clone(), + )?); + } else { + return Ok(None); + } + } let length = if let Some(length) = self.len { length } else { @@ -283,39 +360,78 @@ impl Decoder for SSZSnappyOutboundCodec { // We need not check that decoded_buffer.len() is within bounds here // since we have already checked `length` above. - match self.protocol.message_name { - Protocol::Status => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::Status( + match self.protocol.version { + Version::V1 => match self.protocol.message_name { + Protocol::Status => Ok(Some(RPCResponse::Status( StatusMessage::from_ssz_bytes(&decoded_buffer)?, ))), - }, - // This case should be unreachable as `Goodbye` has no response. - Protocol::Goodbye => Err(RPCError::InvalidData), - Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new( - // FIXME(altair): support Altair blocks + // This case should be unreachable as `Goodbye` has no response. + Protocol::Goodbye => Err(RPCError::InvalidData), + Protocol::BlocksByRange => Ok(Some(RPCResponse::BlocksByRange(Box::new( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes( &decoded_buffer, )?), )))), - }, - Protocol::BlocksByRoot => match self.protocol.version { - // FIXME(altair): support Altair blocks - Version::V1 => Ok(Some(RPCResponse::BlocksByRoot(Box::new( + Protocol::BlocksByRoot => Ok(Some(RPCResponse::BlocksByRoot(Box::new( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes( &decoded_buffer, )?), )))), - }, - Protocol::Ping => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::Pong(Ping { + Protocol::Ping => Ok(Some(RPCResponse::Pong(Ping { data: u64::from_ssz_bytes(&decoded_buffer)?, }))), + Protocol::MetaData => Ok(Some(RPCResponse::MetaData( + MetaData::from_ssz_bytes(&decoded_buffer)?, + ))), }, - Protocol::MetaData => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes( - &decoded_buffer, - )?))), + Version::V2 => match self.protocol.message_name { + Protocol::BlocksByRange => { + match self.fork_name.take().ok_or_else(|| { + RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + format!( + "No context bytes provided for {} response", + self.protocol.message_name + ), + ) + })? { + ForkName::Altair => Ok(Some(RPCResponse::BlocksByRange(Box::new( + SignedBeaconBlock::Altair( + SignedBeaconBlockAltair::from_ssz_bytes(&decoded_buffer)?, + ), + )))), + + ForkName::Base => Ok(Some(RPCResponse::BlocksByRange(Box::new( + SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes( + &decoded_buffer, + )?), + )))), + } + } + Protocol::BlocksByRoot => match self.fork_name.take().ok_or_else(|| { + RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + format!( + "No context bytes provided for {} response", + self.protocol.message_name + ), + ) + })? { + ForkName::Altair => Ok(Some(RPCResponse::BlocksByRoot(Box::new( + SignedBeaconBlock::Altair(SignedBeaconBlockAltair::from_ssz_bytes( + &decoded_buffer, + )?), + )))), + ForkName::Base => Ok(Some(RPCResponse::BlocksByRoot(Box::new( + SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes( + &decoded_buffer, + )?), + )))), + }, + _ => Err(RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + "Invalid v2 request".to_string(), + )), }, } } @@ -393,3 +509,485 @@ fn handle_error( _ => Err(err).map_err(RPCError::from), } } + +/// Takes the context bytes and a fork_context and returns the corresponding fork_name. +fn context_bytes_to_fork_name( + context_bytes: [u8; 4], + fork_context: Arc, +) -> Result { + fork_context + .from_context_bytes(context_bytes) + .cloned() + .ok_or_else(|| { + RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + "Context bytes does not correspond to a valid fork".to_string(), + ) + }) +} +#[cfg(test)] +mod tests { + + use super::*; + use crate::rpc::{protocol::*, MetaData}; + use crate::{ + rpc::{methods::StatusMessage, Ping, RPCResponseErrorCode}, + types::EnrBitfield, + }; + use std::sync::Arc; + use types::{ + BeaconBlock, BeaconBlockAltair, BeaconBlockBase, Epoch, ForkContext, Hash256, Signature, + SignedBeaconBlock, Slot, + }; + + use snap::write::FrameEncoder; + use ssz::Encode; + use std::io::Write; + + type Spec = types::MainnetEthSpec; + + fn fork_context() -> ForkContext { + ForkContext::new(Hash256::zero(), &Spec::default_spec()) + } + + fn base_block() -> SignedBeaconBlock { + let full_block = BeaconBlock::Base(BeaconBlockBase::::full(&Spec::default_spec())); + SignedBeaconBlock::from_block(full_block, Signature::empty()) + } + + fn altair_block() -> SignedBeaconBlock { + let full_block = + BeaconBlock::Altair(BeaconBlockAltair::::full(&Spec::default_spec())); + SignedBeaconBlock::from_block(full_block, Signature::empty()) + } + + fn status_message() -> StatusMessage { + StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::from_low_u64_be(0), + finalized_epoch: Epoch::new(1), + head_root: Hash256::from_low_u64_be(0), + head_slot: Slot::new(1), + } + } + + fn ping_message() -> Ping { + Ping { data: 1 } + } + + fn metadata() -> MetaData { + MetaData { + seq_number: 1, + attnets: EnrBitfield::::default(), + } + } + + /// Encodes the given protocol response as bytes. + fn encode( + protocol: Protocol, + version: Version, + message: RPCCodedResponse, + ) -> Result { + let max_packet_size = 1_048_576; + let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy); + let fork_context = Arc::new(fork_context()); + + let mut buf = BytesMut::new(); + let mut snappy_inbound_codec = + SSZSnappyInboundCodec::::new(snappy_protocol_id, max_packet_size, fork_context); + + snappy_inbound_codec.encode(message, &mut buf)?; + Ok(buf) + } + + /// Attempts to decode the given protocol bytes as an rpc response + fn decode( + protocol: Protocol, + version: Version, + message: &mut BytesMut, + ) -> Result>, RPCError> { + let max_packet_size = 1_048_576; + let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy); + let fork_context = Arc::new(fork_context()); + let mut snappy_outbound_codec = + SSZSnappyOutboundCodec::::new(snappy_protocol_id, max_packet_size, fork_context); + // decode message just as snappy message + snappy_outbound_codec.decode(message) + } + + /// Encodes the provided protocol message as bytes and tries to decode the encoding bytes. + fn encode_then_decode( + protocol: Protocol, + version: Version, + message: RPCCodedResponse, + ) -> Result>, RPCError> { + let mut encoded = encode(protocol, version.clone(), message)?; + decode(protocol, version, &mut encoded) + } + + // Test RPCResponse encoding/decoding for V1 messages + #[test] + fn test_encode_then_decode_v1() { + assert_eq!( + encode_then_decode( + Protocol::Status, + Version::V1, + RPCCodedResponse::Success(RPCResponse::Status(status_message())) + ), + Ok(Some(RPCResponse::Status(status_message()))) + ); + + assert_eq!( + encode_then_decode( + Protocol::Ping, + Version::V1, + RPCCodedResponse::Success(RPCResponse::Pong(ping_message())) + ), + Ok(Some(RPCResponse::Pong(ping_message()))) + ); + + assert_eq!( + encode_then_decode( + Protocol::BlocksByRange, + Version::V1, + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))) + ), + Ok(Some(RPCResponse::BlocksByRange(Box::new(base_block())))) + ); + + assert!( + matches!( + encode_then_decode( + Protocol::BlocksByRange, + Version::V1, + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))), + ) + .unwrap_err(), + RPCError::SSZDecodeError(_) + ), + "altair block cannot be decoded with blocks by range V1 version" + ); + + assert_eq!( + encode_then_decode( + Protocol::BlocksByRoot, + Version::V1, + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))) + ), + Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block())))) + ); + + assert!( + matches!( + encode_then_decode( + Protocol::BlocksByRoot, + Version::V1, + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), + ) + .unwrap_err(), + RPCError::SSZDecodeError(_) + ), + "altair block cannot be decoded with blocks by range V1 version" + ); + + assert_eq!( + encode_then_decode( + Protocol::MetaData, + Version::V1, + RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ), + Ok(Some(RPCResponse::MetaData(metadata()))), + ); + + // TODO: add metadataV2 response failure case + } + + #[test] + fn test_encode_then_decode_v2() { + assert!( + matches!( + encode_then_decode( + Protocol::Status, + Version::V2, + RPCCodedResponse::Success(RPCResponse::Status(status_message())), + ) + .unwrap_err(), + RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), + ), + "status does not have V2 message" + ); + + assert!( + matches!( + encode_then_decode( + Protocol::Ping, + Version::V2, + RPCCodedResponse::Success(RPCResponse::Pong(ping_message())), + ) + .unwrap_err(), + RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), + ), + "ping does not have V2 message" + ); + + assert_eq!( + encode_then_decode( + Protocol::BlocksByRange, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))) + ), + Ok(Some(RPCResponse::BlocksByRange(Box::new(base_block())))) + ); + + assert_eq!( + encode_then_decode( + Protocol::BlocksByRange, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))) + ), + Ok(Some(RPCResponse::BlocksByRange(Box::new(altair_block())))) + ); + + assert_eq!( + encode_then_decode( + Protocol::BlocksByRoot, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))) + ), + Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block())))) + ); + + assert_eq!( + encode_then_decode( + Protocol::BlocksByRoot, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))) + ), + Ok(Some(RPCResponse::BlocksByRoot(Box::new(altair_block())))) + ); + } + + #[test] + fn test_context_bytes_v2() { + let fork_context = fork_context(); + + // Removing context bytes for v2 messages should error + let mut encoded_bytes = encode( + Protocol::BlocksByRange, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ) + .unwrap(); + + let _ = encoded_bytes.split_to(4); + + assert!(matches!( + decode(Protocol::BlocksByRange, Version::V2, &mut encoded_bytes).unwrap_err(), + RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), + )); + + let mut encoded_bytes = encode( + Protocol::BlocksByRoot, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ) + .unwrap(); + + let _ = encoded_bytes.split_to(4); + + assert!(matches!( + decode(Protocol::BlocksByRange, Version::V2, &mut encoded_bytes).unwrap_err(), + RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), + )); + + // Trying to decode a base block with altair context bytes should give ssz decoding error + let mut encoded_bytes = encode( + Protocol::BlocksByRange, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ) + .unwrap(); + + let mut wrong_fork_bytes = BytesMut::new(); + wrong_fork_bytes + .extend_from_slice(&fork_context.to_context_bytes(ForkName::Altair).unwrap()); + wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); + + assert!(matches!( + decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(), + RPCError::SSZDecodeError(_), + )); + + // Trying to decode an altair block with base context bytes should give ssz decoding error + let mut encoded_bytes = encode( + Protocol::BlocksByRoot, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), + ) + .unwrap(); + + let mut wrong_fork_bytes = BytesMut::new(); + wrong_fork_bytes.extend_from_slice(&fork_context.to_context_bytes(ForkName::Base).unwrap()); + wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); + + assert!(matches!( + decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(), + RPCError::SSZDecodeError(_), + )); + + // Adding context bytes to Protocols that don't require it should return an error + let mut encoded_bytes = BytesMut::new(); + encoded_bytes.extend_from_slice(&fork_context.to_context_bytes(ForkName::Altair).unwrap()); + encoded_bytes.extend_from_slice( + &encode( + Protocol::MetaData, + Version::V2, + RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ) + .unwrap(), + ); + + assert!(decode(Protocol::MetaData, Version::V2, &mut encoded_bytes).is_err()); + + // Sending context bytes which do not correspond to any fork should return an error + let mut encoded_bytes = encode( + Protocol::BlocksByRoot, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ) + .unwrap(); + + let mut wrong_fork_bytes = BytesMut::new(); + wrong_fork_bytes.extend_from_slice(&[42, 42, 42, 42]); + wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); + + assert!(matches!( + decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(), + RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), + )); + + // Sending bytes less than context bytes length should wait for more bytes by returning `Ok(None)` + let mut encoded_bytes = encode( + Protocol::BlocksByRoot, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ) + .unwrap(); + + let mut part = encoded_bytes.split_to(3); + + assert_eq!( + decode(Protocol::BlocksByRange, Version::V2, &mut part), + Ok(None) + ) + } + + #[test] + fn test_decode_malicious_status_message() { + // 10 byte snappy stream identifier + let stream_identifier: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY"; + + assert_eq!(stream_identifier.len(), 10); + + // byte 0(0xFE) is padding chunk type identifier for snappy messages + // byte 1,2,3 are chunk length (little endian) + let malicious_padding: &'static [u8] = b"\xFE\x00\x00\x00"; + + // Status message is 84 bytes uncompressed. `max_compressed_len` is 32 + 84 + 84/6 = 130. + let status_message_bytes = StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::from_low_u64_be(0), + finalized_epoch: Epoch::new(1), + head_root: Hash256::from_low_u64_be(0), + head_slot: Slot::new(1), + } + .as_ssz_bytes(); + + assert_eq!(status_message_bytes.len(), 84); + assert_eq!(snap::raw::max_compress_len(status_message_bytes.len()), 130); + + let mut uvi_codec: Uvi = Uvi::default(); + let mut dst = BytesMut::with_capacity(1024); + + // Insert length-prefix + uvi_codec + .encode(status_message_bytes.len(), &mut dst) + .unwrap(); + + // Insert snappy stream identifier + dst.extend_from_slice(stream_identifier); + + // Insert malicious padding of 80 bytes. + for _ in 0..20 { + dst.extend_from_slice(malicious_padding); + } + + // Insert payload (42 bytes compressed) + let mut writer = FrameEncoder::new(Vec::new()); + writer.write_all(&status_message_bytes).unwrap(); + writer.flush().unwrap(); + assert_eq!(writer.get_ref().len(), 42); + dst.extend_from_slice(writer.get_ref()); + + // 10 (for stream identifier) + 80 + 42 = 132 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. + assert_eq!( + decode(Protocol::Status, Version::V1, &mut dst).unwrap_err(), + RPCError::InvalidData + ); + } + + #[test] + fn test_decode_malicious_v2_message() { + let fork_context = Arc::new(fork_context()); + + // 10 byte snappy stream identifier + let stream_identifier: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY"; + + assert_eq!(stream_identifier.len(), 10); + + // byte 0(0xFE) is padding chunk type identifier for snappy messages + // byte 1,2,3 are chunk length (little endian) + let malicious_padding: &'static [u8] = b"\xFE\x00\x00\x00"; + + // Full altair block is 157980 bytes uncompressed. `max_compressed_len` is 32 + 157980 + 157980/6 = 184342. + let block_message_bytes = altair_block().as_ssz_bytes(); + + assert_eq!(block_message_bytes.len(), 157980); + assert_eq!( + snap::raw::max_compress_len(block_message_bytes.len()), + 184342 + ); + + let mut uvi_codec: Uvi = Uvi::default(); + let mut dst = BytesMut::with_capacity(1024); + + // Insert context bytes + dst.extend_from_slice(&fork_context.to_context_bytes(ForkName::Altair).unwrap()); + + // Insert length-prefix + uvi_codec + .encode(block_message_bytes.len(), &mut dst) + .unwrap(); + + // Insert snappy stream identifier + dst.extend_from_slice(stream_identifier); + + // Insert malicious padding of 176240 bytes. + for _ in 0..44060 { + dst.extend_from_slice(malicious_padding); + } + + // Insert payload (8106 bytes compressed) + let mut writer = FrameEncoder::new(Vec::new()); + writer.write_all(&block_message_bytes).unwrap(); + writer.flush().unwrap(); + assert_eq!(writer.get_ref().len(), 8106); + dst.extend_from_slice(writer.get_ref()); + + // 10 (for stream identifier) + 176240 + 8106 = 184356 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. + assert_eq!( + decode(Protocol::BlocksByRange, Version::V2, &mut dst).unwrap_err(), + RPCError::InvalidData + ); + } +} diff --git a/beacon_node/eth2_libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs index 6f7aff78b70..d3ab98d1108 100644 --- a/beacon_node/eth2_libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2_libp2p/src/rpc/handler.rs @@ -1,8 +1,11 @@ #![allow(clippy::type_complexity)] #![allow(clippy::cognitive_complexity)] -use super::methods::{RPCCodedResponse, RPCResponseErrorCode, RequestId, ResponseTermination}; -use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest}; +use super::protocol::{Protocol, RPCError, RPCProtocol, RpcRequestContainer}; +use super::{ + methods::{RPCCodedResponse, RPCResponseErrorCode, RequestId, ResponseTermination}, + RPCRequest, +}; use super::{RPCReceived, RPCSend}; use crate::rpc::protocol::{InboundFramed, OutboundFramed}; use fnv::FnvHashMap; @@ -20,12 +23,13 @@ use smallvec::SmallVec; use std::{ collections::hash_map::Entry, pin::Pin, + sync::Arc, task::{Context, Poll}, time::Duration, }; use tokio::time::{sleep_until, Instant as TInstant, Sleep}; use tokio_util::time::{delay_queue, DelayQueue}; -use types::EthSpec; +use types::{EthSpec, ForkContext}; /// The time (in seconds) before a substream that is awaiting a response from the user times out. pub const RESPONSE_TIMEOUT: u64 = 10; @@ -123,6 +127,9 @@ where /// This keeps track of the number of attempts. outbound_io_error_retries: u8, + /// Fork specific info. + fork_context: Arc, + /// Logger for handling RPC streams log: slog::Logger, } @@ -200,6 +207,7 @@ where { pub fn new( listen_protocol: SubstreamProtocol, ()>, + fork_context: Arc, log: &slog::Logger, ) -> Self { RPCHandler { @@ -216,6 +224,7 @@ where state: HandlerState::Active, max_dial_negotiated: 8, outbound_io_error_retries: 0, + fork_context, log: log.clone(), } } @@ -303,7 +312,7 @@ where type OutEvent = HandlerEvent; type Error = RPCError; type InboundProtocol = RPCProtocol; - type OutboundProtocol = RPCRequest; + type OutboundProtocol = RpcRequestContainer; type OutboundOpenInfo = (RequestId, RPCRequest); // Keep track of the id and the request type InboundOpenInfo = (); @@ -861,7 +870,14 @@ where let (id, req) = self.dial_queue.remove(0); self.dial_queue.shrink_to_fit(); return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(req.clone(), ()).map_info(|()| (id, req)), + protocol: SubstreamProtocol::new( + RpcRequestContainer { + req: req.clone(), + fork_context: self.fork_context.clone(), + }, + (), + ) + .map_info(|()| (id, req)), }); } Poll::Pending diff --git a/beacon_node/eth2_libp2p/src/rpc/mod.rs b/beacon_node/eth2_libp2p/src/rpc/mod.rs index a1f4fac0336..8b70a970f77 100644 --- a/beacon_node/eth2_libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2_libp2p/src/rpc/mod.rs @@ -15,9 +15,10 @@ use libp2p::{Multiaddr, PeerId}; use rate_limiter::{RPCRateLimiter as RateLimiter, RPCRateLimiterBuilder, RateLimitedErr}; use slog::{crit, debug, o}; use std::marker::PhantomData; +use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; -use types::EthSpec; +use types::{EthSpec, ForkContext}; pub(crate) use handler::HandlerErr; pub(crate) use methods::{MetaData, Ping, RPCCodedResponse, RPCResponse}; @@ -96,12 +97,13 @@ pub struct RPC { limiter: RateLimiter, /// Queue of events to be processed. events: Vec, RPCMessage>>, + fork_context: Arc, /// Slog logger for RPC behaviour. log: slog::Logger, } impl RPC { - pub fn new(log: slog::Logger) -> Self { + pub fn new(fork_context: Arc, log: slog::Logger) -> Self { let log = log.new(o!("service" => "libp2p_rpc")); let limiter = RPCRateLimiterBuilder::new() .n_every(Protocol::MetaData, 2, Duration::from_secs(5)) @@ -123,6 +125,7 @@ impl RPC { RPC { limiter, events: Vec::new(), + fork_context, log, } } @@ -171,10 +174,12 @@ where RPCHandler::new( SubstreamProtocol::new( RPCProtocol { + fork_context: self.fork_context.clone(), phantom: PhantomData, }, (), ), + self.fork_context.clone(), &self.log, ) } diff --git a/beacon_node/eth2_libp2p/src/rpc/protocol.rs b/beacon_node/eth2_libp2p/src/rpc/protocol.rs index 7dc41247ad2..21e7c751e88 100644 --- a/beacon_node/eth2_libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2_libp2p/src/rpc/protocol.rs @@ -16,6 +16,7 @@ use ssz::Encode; use ssz_types::VariableList; use std::io; use std::marker::PhantomData; +use std::sync::Arc; use std::time::Duration; use strum::{AsStaticRef, AsStaticStr}; use tokio_io_timeout::TimeoutStream; @@ -23,19 +24,35 @@ use tokio_util::{ codec::Framed, compat::{Compat, FuturesAsyncReadCompatExt}, }; -use types::{BeaconBlock, EthSpec, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock}; +use types::{ + BeaconBlock, BeaconBlockAltair, BeaconBlockBase, EthSpec, ForkContext, Hash256, MainnetEthSpec, + Signature, SignedBeaconBlock, +}; lazy_static! { // Note: Hardcoding the `EthSpec` type for `SignedBeaconBlock` as min/max values is // same across different `EthSpec` implementations. - pub static ref SIGNED_BEACON_BLOCK_MIN: usize = SignedBeaconBlock::::from_block( - BeaconBlock::::empty(&MainnetEthSpec::default_spec()), + pub static ref SIGNED_BEACON_BLOCK_BASE_MIN: usize = SignedBeaconBlock::::from_block( + BeaconBlock::Base(BeaconBlockBase::::empty(&MainnetEthSpec::default_spec())), + Signature::empty(), + ) + .as_ssz_bytes() + .len(); + pub static ref SIGNED_BEACON_BLOCK_BASE_MAX: usize = SignedBeaconBlock::::from_block( + BeaconBlock::Base(BeaconBlockBase::full(&MainnetEthSpec::default_spec())), Signature::empty(), ) .as_ssz_bytes() .len(); - pub static ref SIGNED_BEACON_BLOCK_MAX: usize = SignedBeaconBlock::::from_block( - BeaconBlock::full(&MainnetEthSpec::default_spec()), + + pub static ref SIGNED_BEACON_BLOCK_ALTAIR_MIN: usize = SignedBeaconBlock::::from_block( + BeaconBlock::Altair(BeaconBlockAltair::::empty(&MainnetEthSpec::default_spec())), + Signature::empty(), + ) + .as_ssz_bytes() + .len(); + pub static ref SIGNED_BEACON_BLOCK_ALTAIR_MAX: usize = SignedBeaconBlock::::from_block( + BeaconBlock::Altair(BeaconBlockAltair::full(&MainnetEthSpec::default_spec())), Signature::empty(), ) .as_ssz_bytes() @@ -99,6 +116,8 @@ pub enum Protocol { pub enum Version { /// Version 1 of RPC V1, + /// Version 2 of RPC + V2, } /// RPC Encondings supported. @@ -134,6 +153,7 @@ impl std::fmt::Display for Version { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let repr = match self { Version::V1 => "1", + Version::V2 => "2", }; f.write_str(repr) } @@ -141,6 +161,7 @@ impl std::fmt::Display for Version { #[derive(Debug, Clone)] pub struct RPCProtocol { + pub fork_context: Arc, pub phantom: PhantomData, } @@ -153,7 +174,10 @@ impl UpgradeInfo for RPCProtocol { vec![ ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy), + // V2 variants have higher preference then V1 + ProtocolId::new(Protocol::BlocksByRange, Version::V2, Encoding::SSZSnappy), ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy), + ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy), ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZSnappy), @@ -230,12 +254,27 @@ impl ProtocolId { ::ssz_fixed_len(), ), Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response - Protocol::BlocksByRange => { - RpcLimits::new(*SIGNED_BEACON_BLOCK_MIN, *SIGNED_BEACON_BLOCK_MAX) - } - Protocol::BlocksByRoot => { - RpcLimits::new(*SIGNED_BEACON_BLOCK_MIN, *SIGNED_BEACON_BLOCK_MAX) - } + Protocol::BlocksByRange => RpcLimits::new( + std::cmp::min( + *SIGNED_BEACON_BLOCK_ALTAIR_MIN, + *SIGNED_BEACON_BLOCK_BASE_MIN, + ), + std::cmp::max( + *SIGNED_BEACON_BLOCK_ALTAIR_MAX, + *SIGNED_BEACON_BLOCK_BASE_MAX, + ), + ), + Protocol::BlocksByRoot => RpcLimits::new( + std::cmp::min( + *SIGNED_BEACON_BLOCK_ALTAIR_MIN, + *SIGNED_BEACON_BLOCK_BASE_MIN, + ), + std::cmp::max( + *SIGNED_BEACON_BLOCK_ALTAIR_MAX, + *SIGNED_BEACON_BLOCK_BASE_MAX, + ), + ), + Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -246,6 +285,18 @@ impl ProtocolId { ), } } + + /// Returns `true` if the given `ProtocolId` should expect `context_bytes` in the + /// beginning of the stream, else returns `false`. + pub fn has_context_bytes(&self) -> bool { + if self.version == Version::V2 { + match self.message_name { + Protocol::BlocksByRange | Protocol::BlocksByRoot => return true, + _ => return false, + } + } + false + } } /// An RPC protocol ID. @@ -296,8 +347,11 @@ where let socket = socket.compat(); let codec = match protocol.encoding { Encoding::SSZSnappy => { - let ssz_snappy_codec = - BaseInboundCodec::new(SSZSnappyInboundCodec::new(protocol, MAX_RPC_SIZE)); + let ssz_snappy_codec = BaseInboundCodec::new(SSZSnappyInboundCodec::new( + protocol, + MAX_RPC_SIZE, + self.fork_context.clone(), + )); InboundCodec::SSZSnappy(ssz_snappy_codec) } }; @@ -333,6 +387,12 @@ where // Combines all the RPC requests into a single enum to implement `UpgradeInfo` and // `OutboundUpgrade` +#[derive(Debug, Clone)] +pub struct RpcRequestContainer { + pub req: RPCRequest, + pub fork_context: Arc, +} + #[derive(Debug, Clone, PartialEq)] pub enum RPCRequest { Status(StatusMessage), @@ -343,13 +403,13 @@ pub enum RPCRequest { MetaData(PhantomData), } -impl UpgradeInfo for RPCRequest { +impl UpgradeInfo for RpcRequestContainer { type Info = ProtocolId; type InfoIter = Vec; // add further protocols as we support more encodings/versions fn protocol_info(&self) -> Self::InfoIter { - self.supported_protocols() + self.req.supported_protocols() } } @@ -368,16 +428,16 @@ impl RPCRequest { Version::V1, Encoding::SSZSnappy, )], - RPCRequest::BlocksByRange(_) => vec![ProtocolId::new( - Protocol::BlocksByRange, - Version::V1, - Encoding::SSZSnappy, - )], - RPCRequest::BlocksByRoot(_) => vec![ProtocolId::new( - Protocol::BlocksByRoot, - Version::V1, - Encoding::SSZSnappy, - )], + RPCRequest::BlocksByRange(_) => vec![ + // V2 has higher preference when negotiating a stream + ProtocolId::new(Protocol::BlocksByRange, Version::V2, Encoding::SSZSnappy), + ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy), + ], + RPCRequest::BlocksByRoot(_) => vec![ + // V2 has higher preference when negotiating a stream + ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy), + ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), + ], RPCRequest::Ping(_) => vec![ProtocolId::new( Protocol::Ping, Version::V1, @@ -439,7 +499,7 @@ impl RPCRequest { pub type OutboundFramed = Framed, OutboundCodec>; -impl OutboundUpgrade for RPCRequest +impl OutboundUpgrade for RpcRequestContainer where TSpec: EthSpec + Send + 'static, TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static, @@ -453,8 +513,11 @@ where let socket = socket.compat(); let codec = match protocol.encoding { Encoding::SSZSnappy => { - let ssz_snappy_codec = - BaseOutboundCodec::new(SSZSnappyOutboundCodec::new(protocol, MAX_RPC_SIZE)); + let ssz_snappy_codec = BaseOutboundCodec::new(SSZSnappyOutboundCodec::new( + protocol, + MAX_RPC_SIZE, + self.fork_context.clone(), + )); OutboundCodec::SSZSnappy(ssz_snappy_codec) } }; @@ -462,7 +525,7 @@ where let mut socket = Framed::new(socket, codec); async { - socket.send(self).await?; + socket.send(self.req).await?; socket.close().await?; Ok(socket) } diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 1c58878ab9f..3f3e1ab6ab1 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -25,7 +25,7 @@ use std::io::prelude::*; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use types::{ChainSpec, EnrForkId, EthSpec, Hash256}; +use types::{ChainSpec, EnrForkId, EthSpec, ForkContext}; pub const NETWORK_KEY_FILENAME: &str = "key"; /// The maximum simultaneous libp2p connections per peer. @@ -64,7 +64,7 @@ impl Service { config: &NetworkConfig, enr_fork_id: EnrForkId, log: &Logger, - genesis_validators_root: Hash256, + fork_context: Arc, chain_spec: &ChainSpec, ) -> error::Result<(Arc>, Self)> { let log = log.new(o!("service"=> "libp2p")); @@ -114,7 +114,7 @@ impl Service { config, network_globals.clone(), &log, - genesis_validators_root, + fork_context, chain_spec, ) .await?; diff --git a/beacon_node/eth2_libp2p/tests/common/mod.rs b/beacon_node/eth2_libp2p/tests/common/mod.rs index e0aba71f655..5a8246526a8 100644 --- a/beacon_node/eth2_libp2p/tests/common/mod.rs +++ b/beacon_node/eth2_libp2p/tests/common/mod.rs @@ -7,14 +7,20 @@ use eth2_libp2p::{Libp2pEvent, NetworkConfig}; use libp2p::gossipsub::GossipsubConfigBuilder; use slog::{debug, error, o, Drain}; use std::net::{TcpListener, UdpSocket}; +use std::sync::Arc; use std::sync::Weak; use std::time::Duration; use tokio::runtime::Runtime; -use types::{ChainSpec, EnrForkId, MinimalEthSpec}; +use types::{ChainSpec, EnrForkId, ForkContext, Hash256, MinimalEthSpec}; type E = MinimalEthSpec; use tempfile::Builder as TempBuilder; +/// Returns a dummy fork context +fn fork_context() -> ForkContext { + ForkContext::new(Hash256::zero(), &ChainSpec::minimal()) +} + pub struct Libp2pInstance(LibP2PService, exit_future::Signal); impl std::ops::Deref for Libp2pInstance { @@ -109,12 +115,14 @@ pub async fn build_libp2p_instance( let (signal, exit) = exit_future::signal(); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let executor = task_executor::TaskExecutor::new(rt, exit, log.clone(), shutdown_tx); + let fork_context = Arc::new(fork_context()); Libp2pInstance( LibP2PService::new( executor, &config, EnrForkId::default(), &log, + fork_context, &ChainSpec::minimal(), ) .await diff --git a/beacon_node/eth2_libp2p/tests/rpc_tests.rs b/beacon_node/eth2_libp2p/tests/rpc_tests.rs index 1b565a4655e..0a1f7aed04f 100644 --- a/beacon_node/eth2_libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2_libp2p/tests/rpc_tests.rs @@ -8,7 +8,8 @@ use std::time::Duration; use tokio::runtime::Runtime; use tokio::time::sleep; use types::{ - BeaconBlock, Epoch, EthSpec, Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, Slot, + BeaconBlock, BeaconBlockAltair, BeaconBlockBase, Epoch, EthSpec, Hash256, MinimalEthSpec, + Signature, SignedBeaconBlock, Slot, }; mod common; @@ -500,9 +501,13 @@ fn test_blocks_by_root_chunked_rpc() { }); // BlocksByRoot Response - let full_block = BeaconBlock::full(&spec); + let full_block = BeaconBlock::Base(BeaconBlockBase::::full(&spec)); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); - let rpc_response = Response::BlocksByRoot(Some(Box::new(signed_full_block))); + let rpc_response_base = Response::BlocksByRoot(Some(Box::new(signed_full_block))); + + let full_block = BeaconBlock::Altair(BeaconBlockAltair::::full(&spec)); + let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); + let rpc_response_altair = Response::BlocksByRoot(Some(Box::new(signed_full_block))); // keep count of the number of messages received let mut messages_received = 0; @@ -525,7 +530,11 @@ fn test_blocks_by_root_chunked_rpc() { response, }) => match response { Response::BlocksByRoot(Some(_)) => { - assert_eq!(response, rpc_response.clone()); + if messages_received < 5 { + assert_eq!(response, rpc_response_base.clone()); + } else { + assert_eq!(response, rpc_response_altair.clone()); + } messages_received += 1; debug!(log, "Chunk received"); } @@ -555,12 +564,17 @@ fn test_blocks_by_root_chunked_rpc() { // send the response debug!(log, "Receiver got request"); - for _ in 1..=messages_to_send { - receiver.swarm.send_successful_response( - peer_id, - id, - rpc_response.clone(), - ); + for i in 0..messages_to_send { + // Send first half of responses as base blocks and + // second half as altair blocks. + let rpc_response = if i < 5 { + rpc_response_base.clone() + } else { + rpc_response_altair.clone() + }; + receiver + .swarm + .send_successful_response(peer_id, id, rpc_response); debug!(log, "Sending message"); } // send the stream termination @@ -621,7 +635,7 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() { }); // BlocksByRoot Response - let full_block = BeaconBlock::full(&spec); + let full_block = BeaconBlock::Base(BeaconBlockBase::::full(&spec)); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response = Response::BlocksByRoot(Some(Box::new(signed_full_block))); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 0a33b3856da..9ae6dde568e 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -18,7 +18,7 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; use store::HotColdDB; use tokio::sync::mpsc; use tokio::time::Sleep; -use types::{EthSpec, RelativeEpoch, SubnetId, Unsigned, ValidatorSubscription}; +use types::{EthSpec, ForkContext, RelativeEpoch, SubnetId, Unsigned, ValidatorSubscription}; mod tests; @@ -159,13 +159,19 @@ impl NetworkService { // keep track of when our fork_id needs to be updated let next_fork_update = next_fork_delay(&beacon_chain); + // Create a fork context for the given config and genesis validators root + let fork_context = Arc::new(ForkContext::new( + beacon_chain.genesis_validators_root, + &beacon_chain.spec, + )); + // launch libp2p service let (network_globals, mut libp2p) = LibP2PService::new( executor.clone(), config, enr_fork_id, &network_log, - beacon_chain.genesis_validators_root, + fork_context, &beacon_chain.spec, ) .await?; diff --git a/consensus/types/src/beacon_block.rs b/consensus/types/src/beacon_block.rs index c7305696ad2..c65ffcae8a7 100644 --- a/consensus/types/src/beacon_block.rs +++ b/consensus/types/src/beacon_block.rs @@ -65,101 +65,6 @@ impl BeaconBlock { } } - /// Return a block where the block has maximum size. - pub fn full(spec: &ChainSpec) -> BeaconBlock { - let header = BeaconBlockHeader { - slot: Slot::new(1), - proposer_index: 0, - parent_root: Hash256::zero(), - state_root: Hash256::zero(), - body_root: Hash256::zero(), - }; - - let signed_header = SignedBeaconBlockHeader { - message: header, - signature: Signature::empty(), - }; - let indexed_attestation: IndexedAttestation = IndexedAttestation { - attesting_indices: VariableList::new(vec![ - 0_u64; - T::MaxValidatorsPerCommittee::to_usize() - ]) - .unwrap(), - data: AttestationData::default(), - signature: AggregateSignature::empty(), - }; - - let deposit_data = DepositData { - pubkey: PublicKeyBytes::empty(), - withdrawal_credentials: Hash256::zero(), - amount: 0, - signature: SignatureBytes::empty(), - }; - let proposer_slashing = ProposerSlashing { - signed_header_1: signed_header.clone(), - signed_header_2: signed_header, - }; - - let attester_slashing = AttesterSlashing { - attestation_1: indexed_attestation.clone(), - attestation_2: indexed_attestation, - }; - - let attestation: Attestation = Attestation { - aggregation_bits: BitList::with_capacity(T::MaxValidatorsPerCommittee::to_usize()) - .unwrap(), - data: AttestationData::default(), - signature: AggregateSignature::empty(), - }; - - let deposit = Deposit { - proof: FixedVector::from_elem(Hash256::zero()), - data: deposit_data, - }; - - let voluntary_exit = VoluntaryExit { - epoch: Epoch::new(1), - validator_index: 1, - }; - - let signed_voluntary_exit = SignedVoluntaryExit { - message: voluntary_exit, - signature: Signature::empty(), - }; - - // FIXME(altair): use an Altair block (they're bigger) - let mut block = BeaconBlockBase::::empty(spec); - for _ in 0..T::MaxProposerSlashings::to_usize() { - block - .body - .proposer_slashings - .push(proposer_slashing.clone()) - .unwrap(); - } - for _ in 0..T::MaxDeposits::to_usize() { - block.body.deposits.push(deposit.clone()).unwrap(); - } - for _ in 0..T::MaxVoluntaryExits::to_usize() { - block - .body - .voluntary_exits - .push(signed_voluntary_exit.clone()) - .unwrap(); - } - for _ in 0..T::MaxAttesterSlashings::to_usize() { - block - .body - .attester_slashings - .push(attester_slashing.clone()) - .unwrap(); - } - - for _ in 0..T::MaxAttestations::to_usize() { - block.body.attestations.push(attestation.clone()).unwrap(); - } - BeaconBlock::Base(block) - } - /// Custom SSZ decoder that takes a `ChainSpec` as context. pub fn from_ssz_bytes(bytes: &[u8], spec: &ChainSpec) -> Result { let slot_len = ::ssz_fixed_len(); @@ -313,10 +218,104 @@ impl BeaconBlockBase { }, } } + + /// Return a block where the block has maximum size. + pub fn full(spec: &ChainSpec) -> Self { + let header = BeaconBlockHeader { + slot: Slot::new(1), + proposer_index: 0, + parent_root: Hash256::zero(), + state_root: Hash256::zero(), + body_root: Hash256::zero(), + }; + + let signed_header = SignedBeaconBlockHeader { + message: header, + signature: Signature::empty(), + }; + let indexed_attestation: IndexedAttestation = IndexedAttestation { + attesting_indices: VariableList::new(vec![ + 0_u64; + T::MaxValidatorsPerCommittee::to_usize() + ]) + .unwrap(), + data: AttestationData::default(), + signature: AggregateSignature::empty(), + }; + + let deposit_data = DepositData { + pubkey: PublicKeyBytes::empty(), + withdrawal_credentials: Hash256::zero(), + amount: 0, + signature: SignatureBytes::empty(), + }; + let proposer_slashing = ProposerSlashing { + signed_header_1: signed_header.clone(), + signed_header_2: signed_header, + }; + + let attester_slashing = AttesterSlashing { + attestation_1: indexed_attestation.clone(), + attestation_2: indexed_attestation, + }; + + let attestation: Attestation = Attestation { + aggregation_bits: BitList::with_capacity(T::MaxValidatorsPerCommittee::to_usize()) + .unwrap(), + data: AttestationData::default(), + signature: AggregateSignature::empty(), + }; + + let deposit = Deposit { + proof: FixedVector::from_elem(Hash256::zero()), + data: deposit_data, + }; + + let voluntary_exit = VoluntaryExit { + epoch: Epoch::new(1), + validator_index: 1, + }; + + let signed_voluntary_exit = SignedVoluntaryExit { + message: voluntary_exit, + signature: Signature::empty(), + }; + + let mut block = BeaconBlockBase::::empty(spec); + for _ in 0..T::MaxProposerSlashings::to_usize() { + block + .body + .proposer_slashings + .push(proposer_slashing.clone()) + .unwrap(); + } + for _ in 0..T::MaxDeposits::to_usize() { + block.body.deposits.push(deposit.clone()).unwrap(); + } + for _ in 0..T::MaxVoluntaryExits::to_usize() { + block + .body + .voluntary_exits + .push(signed_voluntary_exit.clone()) + .unwrap(); + } + for _ in 0..T::MaxAttesterSlashings::to_usize() { + block + .body + .attester_slashings + .push(attester_slashing.clone()) + .unwrap(); + } + + for _ in 0..T::MaxAttestations::to_usize() { + block.body.attestations.push(attestation.clone()).unwrap(); + } + block + } } impl BeaconBlockAltair { - /// Returns an empty block to be used during genesis. + /// Returns an empty Altair block to be used during genesis. pub fn empty(spec: &ChainSpec) -> Self { BeaconBlockAltair { slot: spec.genesis_slot, @@ -340,6 +339,36 @@ impl BeaconBlockAltair { }, } } + + /// Return an Altair block where the block has maximum size. + pub fn full(spec: &ChainSpec) -> Self { + let base_block = BeaconBlockBase::full(spec); + let sync_aggregate = SyncAggregate { + sync_committee_signature: AggregateSignature::empty(), + sync_committee_bits: BitVector::default(), + }; + BeaconBlockAltair { + slot: spec.genesis_slot, + proposer_index: 0, + parent_root: Hash256::zero(), + state_root: Hash256::zero(), + body: BeaconBlockBodyAltair { + proposer_slashings: base_block.body.proposer_slashings, + attester_slashings: base_block.body.attester_slashings, + attestations: base_block.body.attestations, + deposits: base_block.body.deposits, + voluntary_exits: base_block.body.voluntary_exits, + sync_aggregate, + randao_reveal: Signature::empty(), + eth1_data: Eth1Data { + deposit_root: Hash256::zero(), + block_hash: Hash256::zero(), + deposit_count: 0, + }, + graffiti: Graffiti::default(), + }, + } + } } #[cfg(test)] diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index cc1e67b1207..2a43ff86008 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -167,23 +167,20 @@ impl ChainSpec { /// Returns the `ForkDigest` for the given slot. /// - /// Add additional if else branches with additional forks. + /// If `self.altair_fork_slot == None`, then this function returns the genesis fork digest + /// otherwise, returns the fork digest based on the slot. pub fn fork_digest(&self, slot: Slot, genesis_validators_root: Hash256) -> [u8; 4] { - if slot >= self.altair_fork_slot { - Self::compute_fork_digest(self.altair_fork_version, genesis_validators_root) + if let Some(altair_fork_slot) = self.altair_fork_slot { + if slot >= altair_fork_slot { + Self::compute_fork_digest(self.altair_fork_version, genesis_validators_root) + } else { + Self::compute_fork_digest(self.genesis_fork_version, genesis_validators_root) + } } else { Self::compute_fork_digest(self.genesis_fork_version, genesis_validators_root) } } - pub fn genesis_fork_digest(&self, genesis_validators_root: Hash256) -> [u8; 4] { - Self::compute_fork_digest(self.genesis_fork_version, genesis_validators_root) - } - - pub fn altair_fork_digest(&self, genesis_validators_root: Hash256) -> [u8; 4] { - Self::compute_fork_digest(self.altair_fork_version, genesis_validators_root) - } - /// Returns the epoch of the next scheduled change in the `fork.current_version`. /// /// There are no future forks scheduled so this function always returns `None`. This may not diff --git a/consensus/types/src/fork_context.rs b/consensus/types/src/fork_context.rs new file mode 100644 index 00000000000..6242cd032d4 --- /dev/null +++ b/consensus/types/src/fork_context.rs @@ -0,0 +1,63 @@ +use crate::{ChainSpec, ForkName, Hash256}; +use std::collections::HashMap; + +/// Provides fork specific info like the fork digest corresponding to a fork. +#[derive(Debug, Clone)] +pub struct ForkContext { + fork_to_digest: HashMap, + digest_to_fork: HashMap<[u8; 4], ForkName>, +} + +impl ForkContext { + /// Creates a new `ForkContext` object by enumerating all enabled forks and computing their + /// fork digest. + /// + /// A fork is disabled in the `ChainSpec` if the activation slot corresponding to that fork is `None`. + pub fn new(genesis_validators_root: Hash256, spec: &ChainSpec) -> Self { + let mut fork_to_digest = vec![( + ForkName::Base, + ChainSpec::compute_fork_digest(spec.genesis_fork_version, genesis_validators_root), + )]; + + // Only add Altair to list of forks if it's enabled (i.e. spec.altair_fork_slot != None) + if spec.altair_fork_slot.is_some() { + fork_to_digest.push(( + ForkName::Altair, + ChainSpec::compute_fork_digest(spec.altair_fork_version, genesis_validators_root), + )) + } + + let fork_to_digest: HashMap = fork_to_digest.into_iter().collect(); + + let digest_to_fork = fork_to_digest + .clone() + .into_iter() + .map(|(k, v)| (v, k)) + .collect(); + + Self { + fork_to_digest, + digest_to_fork, + } + } + + /// Returns the context bytes/fork_digest corresponding to the genesis fork version. + pub fn genesis_context_bytes(&self) -> [u8; 4] { + *self + .fork_to_digest + .get(&ForkName::Base) + .expect("ForkContext must contain genesis context bytes") + } + + /// Returns the fork type given the context bytes/fork_digest. + /// Returns `None` if context bytes doesn't correspond to any valid `ForkName`. + pub fn from_context_bytes(&self, context: [u8; 4]) -> Option<&ForkName> { + self.digest_to_fork.get(&context) + } + + /// Returns the context bytes/fork_digest corresponding to a fork name. + /// Returns `None` if the `ForkName` has not been initialized. + pub fn to_context_bytes(&self, fork_name: ForkName) -> Option<[u8; 4]> { + self.fork_to_digest.get(&fork_name).cloned() + } +} diff --git a/consensus/types/src/fork_name.rs b/consensus/types/src/fork_name.rs index 8d34a8254ec..bd772a037c1 100644 --- a/consensus/types/src/fork_name.rs +++ b/consensus/types/src/fork_name.rs @@ -1,4 +1,4 @@ -use crate::ChainSpec; +use crate::{ChainSpec, Slot}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum ForkName { @@ -25,4 +25,17 @@ impl ForkName { } } } + + /// Returns the `ForkName` given the slot and depending if Altair is enabled in the `ChainSpec`. + pub fn from_slot(slot: Slot, spec: &ChainSpec) -> Self { + if let Some(altair_fork_slot) = spec.altair_fork_slot { + if slot >= altair_fork_slot { + ForkName::Altair + } else { + ForkName::Base + } + } else { + ForkName::Base + } + } } diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 4f2fd7ed8fd..32c8ab78fdd 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -58,6 +58,7 @@ pub mod validator_subscription; pub mod voluntary_exit; #[macro_use] pub mod slot_epoch_macros; +pub mod fork_context; pub mod participation_flags; pub mod slot_epoch; pub mod subnet_id; @@ -94,6 +95,7 @@ pub use crate::enr_fork_id::EnrForkId; pub use crate::eth1_data::Eth1Data; pub use crate::eth_spec::EthSpecId; pub use crate::fork::Fork; +pub use crate::fork_context::ForkContext; pub use crate::fork_data::ForkData; pub use crate::fork_name::ForkName; pub use crate::free_attestation::FreeAttestation;