Skip to content

Commit 88e0227

Browse files
committed
Network altair rpc (#2301)
* Add v2 messages to rpc decoder * Ugly hack * Pass chainspec and genesis_root to Rpc * Add context bytes while encoding * Add a ForkContext struct * Pass ForkContext to rpc * crate compiles * Extract ForkContext into separate file; add a current_fork field * Fix encoding/decoding * Fix tests * Remove fork_schedule from rebase * Fix ForkContext * Fix tests * Remove fork_schedule again * Add altair empty and full block limits * Fix panic in snappy decoding * Fix limits * Move wrapping of RPCRequests to handler * RpcRequestContainer only used in OutboundUpgrade * Add altair blocks in rpc end to end tests * same rpc limits for V1 and V2 * V2 response decoding happens only for valid protocols * Add snappy response decoding tests * Add more snappy tests * Minor fixes * Appease clippy * to_context_bytes returns an Option * Add padding snappy message test for v2 * Minor fixes; remove accidentally added file * lint
1 parent f2b0e39 commit 88e0227

File tree

17 files changed

+1070
-297
lines changed

17 files changed

+1070
-297
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3069,14 +3069,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
30693069
// therefore use the genesis slot.
30703070
let slot = self.slot().unwrap_or(self.spec.genesis_slot);
30713071

3072-
self.spec.enr_fork_id(slot, self.genesis_validators_root)
3072+
self.spec
3073+
.enr_fork_id::<T::EthSpec>(slot, self.genesis_validators_root)
30733074
}
30743075

3075-
/// Calculates the `Duration` to the next fork, if one exists.
3076-
pub fn duration_to_next_fork(&self) -> Option<Duration> {
3077-
let epoch = self.spec.next_fork_epoch()?;
3076+
/// Calculates the `Duration` to the next fork if it exists and returns it
3077+
/// with it's corresponding `ForkName`.
3078+
pub fn duration_to_next_fork(&self) -> Option<(ForkName, Duration)> {
3079+
// If we are unable to read the slot clock we assume that it is prior to genesis and
3080+
// therefore use the genesis slot.
3081+
let slot = self.slot().unwrap_or(self.spec.genesis_slot);
3082+
3083+
let (fork_name, epoch) = self.spec.next_fork_epoch::<T::EthSpec>(slot)?;
30783084
self.slot_clock
30793085
.duration_to_slot(epoch.start_slot(T::EthSpec::slots_per_epoch()))
3086+
.map(|duration| (fork_name, duration))
30803087
}
30813088

30823089
pub fn dump_as_dot<W: Write>(&self, output: &mut W) {

beacon_node/eth2_libp2p/src/behaviour/mod.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use std::{
4343
sync::Arc,
4444
task::{Context, Poll},
4545
};
46-
use types::{ChainSpec, EnrForkId, EthSpec, Hash256, SignedBeaconBlock, Slot, SubnetId};
46+
use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, SignedBeaconBlock, Slot, SubnetId};
4747

4848
mod gossipsub_scoring_parameters;
4949
mod handler;
@@ -136,11 +136,6 @@ pub struct Behaviour<TSpec: EthSpec> {
136136

137137
score_settings: PeerScoreSettings<TSpec>,
138138

139-
spec: ChainSpec,
140-
141-
/// The genesis root for the eth2 network
142-
genesis_validators_root: Hash256,
143-
144139
/// The interval for updating gossipsub scores
145140
update_gossipsub_scores: tokio::time::Interval,
146141
}
@@ -152,7 +147,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
152147
net_conf: &NetworkConfig,
153148
network_globals: Arc<NetworkGlobals<TSpec>>,
154149
log: &slog::Logger,
155-
genesis_validators_root: Hash256,
150+
fork_context: Arc<ForkContext>,
156151
chain_spec: &ChainSpec,
157152
) -> error::Result<Self> {
158153
let behaviour_log = log.new(o!());
@@ -225,7 +220,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
225220
.expect("Valid score params and thresholds");
226221

227222
Ok(Behaviour {
228-
eth2_rpc: RPC::new(log.clone()),
223+
eth2_rpc: RPC::new(fork_context, log.clone()),
229224
gossipsub,
230225
identify,
231226
peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)
@@ -238,9 +233,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
238233
network_dir: net_conf.network_dir.clone(),
239234
log: behaviour_log,
240235
score_settings,
241-
spec: chain_spec.clone(),
242236
update_gossipsub_scores,
243-
genesis_validators_root,
244237
})
245238
}
246239

beacon_node/eth2_libp2p/src/rpc/codec/base.rs

Lines changed: 29 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -181,16 +181,18 @@ where
181181
mod tests {
182182
use super::super::ssz_snappy::*;
183183
use super::*;
184-
use crate::rpc::methods::StatusMessage;
185184
use crate::rpc::protocol::*;
186-
use snap::write::FrameEncoder;
187-
use ssz::Encode;
188-
use std::io::Write;
189-
use types::{Epoch, Hash256, Slot};
185+
186+
use std::sync::Arc;
187+
use types::{ForkContext, Hash256};
190188
use unsigned_varint::codec::Uvi;
191189

192190
type Spec = types::MainnetEthSpec;
193191

192+
fn fork_context() -> ForkContext {
193+
ForkContext::new(Hash256::zero(), &Spec::default_spec())
194+
}
195+
194196
#[test]
195197
fn test_decode_status_message() {
196198
let message = hex::decode("0054ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap();
@@ -200,8 +202,9 @@ mod tests {
200202
let snappy_protocol_id =
201203
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);
202204

205+
let fork_context = Arc::new(fork_context());
203206
let mut snappy_outbound_codec =
204-
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576);
207+
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576, fork_context);
205208

206209
// remove response code
207210
let mut snappy_buf = buf.clone();
@@ -233,8 +236,10 @@ mod tests {
233236

234237
let snappy_protocol_id =
235238
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);
239+
240+
let fork_context = Arc::new(fork_context());
236241
let mut snappy_outbound_codec =
237-
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576);
242+
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576, fork_context);
238243

239244
let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err();
240245

@@ -260,80 +265,34 @@ mod tests {
260265
// Response limits
261266
let limit = protocol_id.rpc_response_limits::<Spec>();
262267
let mut max = encode_len(limit.max + 1);
263-
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(protocol_id.clone(), 1_048_576);
268+
let fork_context = Arc::new(fork_context());
269+
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
270+
protocol_id.clone(),
271+
1_048_576,
272+
fork_context.clone(),
273+
);
264274
assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData);
265275

266276
let mut min = encode_len(limit.min - 1);
267-
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(protocol_id.clone(), 1_048_576);
277+
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
278+
protocol_id.clone(),
279+
1_048_576,
280+
fork_context.clone(),
281+
);
268282
assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData);
269283

270284
// Request limits
271285
let limit = protocol_id.rpc_request_limits();
272286
let mut max = encode_len(limit.max + 1);
273-
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(protocol_id.clone(), 1_048_576);
287+
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
288+
protocol_id.clone(),
289+
1_048_576,
290+
fork_context.clone(),
291+
);
274292
assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData);
275293

276294
let mut min = encode_len(limit.min - 1);
277-
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(protocol_id, 1_048_576);
295+
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(protocol_id, 1_048_576, fork_context);
278296
assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData);
279297
}
280-
281-
#[test]
282-
fn test_decode_malicious_status_message() {
283-
// 10 byte snappy stream identifier
284-
let stream_identifier: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY";
285-
286-
assert_eq!(stream_identifier.len(), 10);
287-
288-
// byte 0(0xFE) is padding chunk type identifier for snappy messages
289-
// byte 1,2,3 are chunk length (little endian)
290-
let malicious_padding: &'static [u8] = b"\xFE\x00\x00\x00";
291-
292-
// Status message is 84 bytes uncompressed. `max_compressed_len` is 32 + 84 + 84/6 = 130.
293-
let status_message_bytes = StatusMessage {
294-
fork_digest: [0; 4],
295-
finalized_root: Hash256::from_low_u64_be(0),
296-
finalized_epoch: Epoch::new(1),
297-
head_root: Hash256::from_low_u64_be(0),
298-
head_slot: Slot::new(1),
299-
}
300-
.as_ssz_bytes();
301-
302-
assert_eq!(status_message_bytes.len(), 84);
303-
assert_eq!(snap::raw::max_compress_len(status_message_bytes.len()), 130);
304-
305-
let mut uvi_codec: Uvi<usize> = Uvi::default();
306-
let mut dst = BytesMut::with_capacity(1024);
307-
308-
// Insert length-prefix
309-
uvi_codec
310-
.encode(status_message_bytes.len(), &mut dst)
311-
.unwrap();
312-
313-
// Insert snappy stream identifier
314-
dst.extend_from_slice(stream_identifier);
315-
316-
// Insert malicious padding of 80 bytes.
317-
for _ in 0..20 {
318-
dst.extend_from_slice(malicious_padding);
319-
}
320-
321-
// Insert payload (42 bytes compressed)
322-
let mut writer = FrameEncoder::new(Vec::new());
323-
writer.write_all(&status_message_bytes).unwrap();
324-
writer.flush().unwrap();
325-
assert_eq!(writer.get_ref().len(), 42);
326-
dst.extend_from_slice(writer.get_ref());
327-
328-
// 10 (for stream identifier) + 80 + 42 = 132 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`.
329-
330-
let snappy_protocol_id =
331-
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);
332-
333-
let mut snappy_outbound_codec =
334-
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576);
335-
336-
let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err();
337-
assert_eq!(snappy_decoded_message, RPCError::InvalidData);
338-
}
339298
}

0 commit comments

Comments
 (0)