Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 95df01d

Browse files
svyatonikgavofyork
authored andcommitted
limit retry count in OnDemand (#513)
1 parent 48519fd commit 95df01d

File tree

5 files changed

+128
-14
lines changed

5 files changed

+128
-14
lines changed

substrate/client/src/light/backend.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ impl<Block, S, F, H, C> StateBackend<H, C> for OnDemandState<Block, S, F>
196196
block: self.block,
197197
header: header.expect("if block above guarantees that header is_some(); qed"),
198198
key: key.to_vec(),
199+
retry_count: None,
199200
})
200201
.into_future().wait()
201202
}

substrate/client/src/light/blockchain.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Bloc
101101
.remote_header(RemoteHeaderRequest {
102102
cht_root: self.storage.cht_root(cht::SIZE, number)?,
103103
block: number,
104+
retry_count: None,
104105
})
105106
.into_future().wait()
106107
.map(Some)

substrate/client/src/light/call_executor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ impl<B, F, Block> CallExecutor<Block, KeccakHasher, RlpCodec> for RemoteCallExec
7373
header: block_header,
7474
method: method.into(),
7575
call_data: call_data.to_vec(),
76+
retry_count: None,
7677
}).into_future().wait()
7778
}
7879

@@ -168,6 +169,7 @@ mod tests {
168169
},
169170
method: "authorities".into(),
170171
call_data: vec![],
172+
retry_count: None,
171173
}, remote_execution_proof).unwrap();
172174
}
173175
}

substrate/client/src/light/fetcher.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ pub struct RemoteCallRequest<Header: HeaderT> {
4343
pub method: String,
4444
/// Call data.
4545
pub call_data: Vec<u8>,
46+
/// Number of times to retry request. None means that default RETRY_COUNT is used.
47+
pub retry_count: Option<usize>,
4648
}
4749

4850
/// Remote canonical header request.
@@ -52,6 +54,8 @@ pub struct RemoteHeaderRequest<Header: HeaderT> {
5254
pub cht_root: Header::Hash,
5355
/// Number of the header to query.
5456
pub block: Header::Number,
57+
/// Number of times to retry request. None means that default RETRY_COUNT is used.
58+
pub retry_count: Option<usize>,
5559
}
5660

5761
/// Remote storage read request.
@@ -63,6 +67,8 @@ pub struct RemoteReadRequest<Header: HeaderT> {
6367
pub header: Header,
6468
/// Storage key to read.
6569
pub key: Vec<u8>,
70+
/// Number of times to retry request. None means that default RETRY_COUNT is used.
71+
pub retry_count: Option<usize>,
6672
}
6773

6874
/// Light client data fetcher. Implementations of this trait must check if remote data
@@ -264,6 +270,7 @@ pub mod tests {
264270
block: remote_block_header.hash(),
265271
header: remote_block_header,
266272
key: b":auth:len".to_vec(),
273+
retry_count: None,
267274
}, remote_read_proof).unwrap().unwrap()[0], authorities_len as u8);
268275
}
269276

@@ -273,6 +280,7 @@ pub mod tests {
273280
assert_eq!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
274281
cht_root: local_cht_root,
275282
block: 1,
283+
retry_count: None,
276284
}, Some(remote_block_header.clone()), remote_header_proof).unwrap(), remote_block_header);
277285
}
278286

@@ -283,6 +291,7 @@ pub mod tests {
283291
assert!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
284292
cht_root: Default::default(),
285293
block: 1,
294+
retry_count: None,
286295
}, Some(remote_block_header.clone()), remote_header_proof).is_err());
287296
}
288297

@@ -293,6 +302,7 @@ pub mod tests {
293302
assert!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
294303
cht_root: local_cht_root,
295304
block: 1,
305+
retry_count: None,
296306
}, Some(remote_block_header.clone()), remote_header_proof).is_err());
297307
}
298308
}

substrate/network/src/on_demand.rs

Lines changed: 114 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
3535

3636
/// Remote request timeout.
3737
const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
38+
/// Default request retry count.
39+
const RETRY_COUNT: usize = 1;
3840

3941
/// On-demand service API.
4042
pub trait OnDemandService<Block: BlockT>: Send + Sync {
@@ -85,6 +87,7 @@ struct OnDemandCore<B: BlockT, E: service::ExecuteInContext<B>> {
8587
struct Request<Block: BlockT> {
8688
id: u64,
8789
timestamp: Instant,
90+
retry_count: usize,
8891
data: RequestData<Block>,
8992
}
9093

@@ -139,9 +142,9 @@ impl<B: BlockT, E> OnDemand<B, E> where
139142
}
140143

141144
/// Schedule && dispatch all scheduled requests.
142-
fn schedule_request<R>(&self, data: RequestData<B>, result: R) -> R {
145+
fn schedule_request<R>(&self, retry_count: Option<usize>, data: RequestData<B>, result: R) -> R {
143146
let mut core = self.core.lock();
144-
core.insert(data);
147+
core.insert(retry_count.unwrap_or(RETRY_COUNT), data);
145148
core.dispatch();
146149
result
147150
}
@@ -158,21 +161,31 @@ impl<B: BlockT, E> OnDemand<B, E> where
158161
},
159162
};
160163

161-
let retry_request_data = match try_accept(request) {
162-
Accept::Ok => None,
164+
let retry_count = request.retry_count;
165+
let (retry_count, retry_request_data) = match try_accept(request) {
166+
Accept::Ok => (retry_count, None),
163167
Accept::CheckFailed(error, retry_request_data) => {
164168
io.report_peer(peer, Severity::Bad(&format!("Failed to check remote {} response from peer: {}", rtype, error)));
165169
core.remove_peer(peer);
166-
Some(retry_request_data)
170+
171+
if retry_count > 0 {
172+
(retry_count - 1, Some(retry_request_data))
173+
} else {
174+
trace!(target: "sync", "Failed to get remote {} response for given number of retries", rtype);
175+
retry_request_data.fail(client::error::ErrorKind::RemoteFetchFailed.into());
176+
(0, None)
177+
}
167178
},
168179
Accept::Unexpected(retry_request_data) => {
169-
trace!(target: "sync", "Unexpected response to remote {} from peer {}", rtype, peer);
170-
Some(retry_request_data)
180+
io.report_peer(peer, Severity::Bad(&format!("Unexpected response to remote {} from peer", rtype)));
181+
core.remove_peer(peer);
182+
183+
(retry_count, Some(retry_request_data))
171184
},
172185
};
173186

174187
if let Some(request_data) = retry_request_data {
175-
core.insert(request_data);
188+
core.insert(retry_count, request_data);
176189
}
177190

178191
core.dispatch();
@@ -262,19 +275,19 @@ impl<B, E> Fetcher<B> for OnDemand<B, E> where
262275

263276
fn remote_header(&self, request: RemoteHeaderRequest<B::Header>) -> Self::RemoteHeaderResult {
264277
let (sender, receiver) = channel();
265-
self.schedule_request(RequestData::RemoteHeader(request, sender),
278+
self.schedule_request(request.retry_count.clone(), RequestData::RemoteHeader(request, sender),
266279
RemoteResponse { receiver })
267280
}
268281

269282
fn remote_read(&self, request: RemoteReadRequest<B::Header>) -> Self::RemoteReadResult {
270283
let (sender, receiver) = channel();
271-
self.schedule_request(RequestData::RemoteRead(request, sender),
284+
self.schedule_request(request.retry_count.clone(), RequestData::RemoteRead(request, sender),
272285
RemoteResponse { receiver })
273286
}
274287

275288
fn remote_call(&self, request: RemoteCallRequest<B::Header>) -> Self::RemoteCallResult {
276289
let (sender, receiver) = channel();
277-
self.schedule_request(RequestData::RemoteCall(request, sender),
290+
self.schedule_request(request.retry_count.clone(), RequestData::RemoteCall(request, sender),
278291
RemoteResponse { receiver })
279292
}
280293
}
@@ -314,13 +327,14 @@ impl<B, E> OnDemandCore<B, E> where
314327
}
315328
}
316329

317-
pub fn insert(&mut self, data: RequestData<B>) {
330+
pub fn insert(&mut self, retry_count: usize, data: RequestData<B>) {
318331
let request_id = self.next_request_id;
319332
self.next_request_id += 1;
320333

321334
self.pending_requests.push_back(Request {
322335
id: request_id,
323336
timestamp: Instant::now(),
337+
retry_count,
324338
data,
325339
});
326340
}
@@ -385,6 +399,17 @@ impl<Block: BlockT> Request<Block> {
385399
}
386400
}
387401

402+
impl<Block: BlockT> RequestData<Block> {
403+
pub fn fail(self, error: client::error::Error) {
404+
// don't care if anyone is listening
405+
match self {
406+
RequestData::RemoteHeader(_, sender) => { let _ = sender.send(Err(error)); },
407+
RequestData::RemoteCall(_, sender) => { let _ = sender.send(Err(error)); },
408+
RequestData::RemoteRead(_, sender) => { let _ = sender.send(Err(error)); },
409+
}
410+
}
411+
}
412+
388413
#[cfg(test)]
389414
pub mod tests {
390415
use std::collections::VecDeque;
@@ -503,6 +528,7 @@ pub mod tests {
503528
header: dummy_header(),
504529
method: "test".into(),
505530
call_data: vec![],
531+
retry_count: None,
506532
});
507533
assert_eq!(vec![1], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
508534
assert_eq!(vec![0], on_demand.core.lock().active_peers.keys().cloned().collect::<Vec<_>>());
@@ -526,6 +552,7 @@ pub mod tests {
526552
header: dummy_header(),
527553
method: "test".into(),
528554
call_data: vec![],
555+
retry_count: None,
529556
});
530557
receive_call_response(&*on_demand, &mut network, 0, 1);
531558
assert!(network.to_disconnect.contains(&0));
@@ -542,6 +569,7 @@ pub mod tests {
542569
header: dummy_header(),
543570
method: "test".into(),
544571
call_data: vec![],
572+
retry_count: Some(1),
545573
});
546574

547575
on_demand.on_connect(0, Roles::FULL);
@@ -561,6 +589,72 @@ pub mod tests {
561589
assert!(network.to_disconnect.contains(&0));
562590
}
563591

592+
#[test]
593+
fn disconnects_from_peer_on_wrong_response_type() {
594+
let (_x, on_demand) = dummy(false);
595+
let queue = RwLock::new(VecDeque::new());
596+
let mut network = TestIo::new(&queue, None);
597+
on_demand.on_connect(0, Roles::FULL);
598+
599+
on_demand.remote_call(RemoteCallRequest {
600+
block: Default::default(),
601+
header: dummy_header(),
602+
method: "test".into(),
603+
call_data: vec![],
604+
retry_count: Some(1),
605+
});
606+
607+
on_demand.on_remote_read_response(&mut network, 0, message::RemoteReadResponse {
608+
id: 0,
609+
proof: vec![vec![2]],
610+
});
611+
assert!(network.to_disconnect.contains(&0));
612+
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
613+
}
614+
615+
#[test]
616+
fn receives_remote_failure_after_retry_count_failures() {
617+
use parking_lot::{Condvar, Mutex};
618+
619+
let retry_count = 2;
620+
let (_x, on_demand) = dummy(false);
621+
let queue = RwLock::new(VecDeque::new());
622+
let mut network = TestIo::new(&queue, None);
623+
for i in 0..retry_count+1 {
624+
on_demand.on_connect(i, Roles::FULL);
625+
}
626+
627+
let sync = Arc::new((Mutex::new(0), Mutex::new(0), Condvar::new()));
628+
let thread_sync = sync.clone();
629+
630+
let response = on_demand.remote_call(RemoteCallRequest {
631+
block: Default::default(),
632+
header: dummy_header(),
633+
method: "test".into(),
634+
call_data: vec![],
635+
retry_count: Some(retry_count)
636+
});
637+
let thread = ::std::thread::spawn(move || {
638+
let &(ref current, ref finished_at, ref finished) = &*thread_sync;
639+
let _ = response.wait().unwrap_err();
640+
*finished_at.lock() = *current.lock();
641+
finished.notify_one();
642+
});
643+
644+
let &(ref current, ref finished_at, ref finished) = &*sync;
645+
for i in 0..retry_count+1 {
646+
let mut current = current.lock();
647+
*current = *current + 1;
648+
receive_call_response(&*on_demand, &mut network, i, i as u64);
649+
}
650+
651+
let mut finished_at = finished_at.lock();
652+
assert!(!finished.wait_for(&mut finished_at, ::std::time::Duration::from_millis(1000)).timed_out());
653+
assert_eq!(*finished_at, retry_count + 1);
654+
655+
thread.join().unwrap();
656+
}
657+
564658
#[test]
565659
fn receives_remote_call_response() {
566660
let (_x, on_demand) = dummy(true);
@@ -573,6 +667,7 @@ pub mod tests {
573667
header: dummy_header(),
574668
method: "test".into(),
575669
call_data: vec![],
670+
retry_count: None,
576671
});
577672
let thread = ::std::thread::spawn(move || {
578673
let result = response.wait().unwrap();
@@ -593,7 +688,8 @@ pub mod tests {
593688
let response = on_demand.remote_read(RemoteReadRequest {
594689
header: dummy_header(),
595690
block: Default::default(),
596-
key: b":key".to_vec()
691+
key: b":key".to_vec(),
692+
retry_count: None,
597693
});
598694
let thread = ::std::thread::spawn(move || {
599695
let result = response.wait().unwrap();
@@ -614,7 +710,11 @@ pub mod tests {
614710
let mut network = TestIo::new(&queue, None);
615711
on_demand.on_connect(0, Roles::FULL);
616712

617-
let response = on_demand.remote_header(RemoteHeaderRequest { cht_root: Default::default(), block: 1 });
713+
let response = on_demand.remote_header(RemoteHeaderRequest {
714+
cht_root: Default::default(),
715+
block: 1,
716+
retry_count: None,
717+
});
618718
let thread = ::std::thread::spawn(move || {
619719
let result = response.wait().unwrap();
620720
assert_eq!(result.hash(), "80729accb7bb10ff9c637a10e8bb59f21c52571aa7b46544c5885ca89ed190f4".into());

0 commit comments

Comments
 (0)