diff --git a/console_backend/src/bin/fileio.rs b/console_backend/src/bin/fileio.rs index b9324d37b..e79d0fa7c 100644 --- a/console_backend/src/bin/fileio.rs +++ b/console_backend/src/bin/fileio.rs @@ -79,13 +79,13 @@ fn main() -> Result<()> { let file = fs::File::open(source)?; let size = file.metadata()?.len() as usize; let mut bytes_written = 0; - eprint!("\rWriting 0.0%..."); + print!("\rWriting 0.0%..."); fileio.overwrite_with_progress(dest, file, |n| { bytes_written += n; let progress = (bytes_written as f64) / (size as f64) * 100.0; - eprint!("\rWriting {:.2}%...", progress); + print!("\rWriting {:.2}%...", progress); })?; - eprintln!("\nFile written successfully."); + println!("\nFile written successfully ({} bytes).", bytes_written); done_tx.send(true).unwrap(); Result::Ok(()) }) @@ -101,7 +101,7 @@ fn main() -> Result<()> { scope(|s| { s.spawn(|_| run(rdr)); let mut fileio = Fileio::new(link, sender); - let dest: Box = match dest { + let dest: Box = match dest { Some(path) => Box::new(fs::File::create(path)?), None => Box::new(io::stdout()), }; diff --git a/console_backend/src/errors.rs b/console_backend/src/errors.rs index 9775ff591..682e948c4 100644 --- a/console_backend/src/errors.rs +++ b/console_backend/src/errors.rs @@ -18,3 +18,4 @@ pub(crate) const FILEIO_CHANNEL_SEND_FAILURE: &str = pub(crate) const SOLUTION_POSITION_UNIT_SELECTION_NOT_AVAILABLE: &str = "solution position unit selection not available"; pub(crate) const PROCESS_MESSAGES_FAILURE: &str = "process_messages thread panicked"; +pub(crate) const THREAD_START_FAILURE: &str = "failed to start a new thread"; diff --git a/console_backend/src/fileio.rs b/console_backend/src/fileio.rs index 3efbb940f..e7542e91c 100644 --- a/console_backend/src/fileio.rs +++ b/console_backend/src/fileio.rs @@ -1,44 +1,53 @@ use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, io::{BufRead, BufReader, Read, Write}, time::{Duration, Instant}, }; use anyhow::{anyhow, bail}; -use crossbeam::{atomic::AtomicCell, channel, scope, select, utils::Backoff}; +use crossbeam::{ + channel::{self, Receiver, Sender}, + scope, select, + sync::Parker, +}; +use log::{debug, trace}; +use parking_lot::Mutex; use rand::Rng; -use sbp::link::Link; -use sbp::messages::file_io::{ - MsgFileioConfigReq, MsgFileioConfigResp, MsgFileioReadDirReq, MsgFileioReadDirResp, - MsgFileioReadReq, MsgFileioReadResp, MsgFileioRemove, MsgFileioWriteReq, MsgFileioWriteResp, +use sbp::{ + link::Link, + messages::file_io::{ + MsgFileioConfigReq, MsgFileioConfigResp, MsgFileioReadDirReq, MsgFileioReadDirResp, + MsgFileioReadReq, MsgFileioReadResp, MsgFileioRemove, MsgFileioWriteReq, + MsgFileioWriteResp, + }, }; -use crate::errors::FILEIO_CHANNEL_SEND_FAILURE; -use crate::types::{MsgSender, Result}; +use crate::{ + errors::{CROSSBEAM_SCOPE_UNWRAP_FAILURE, FILEIO_CHANNEL_SEND_FAILURE, THREAD_START_FAILURE}, + types::{MsgSender, Result}, +}; const MAX_RETRIES: usize = 20; const READDIR_TIMEOUT: Duration = Duration::from_secs(5); const CONFIG_REQ_RETRY: Duration = Duration::from_millis(100); const CONFIG_REQ_TIMEOUT: Duration = Duration::from_secs(10); -const CHECK_INTERVAL: Duration = Duration::from_millis(100); const FILE_IO_TIMEOUT: Duration = Duration::from_secs(3); -const MAX_PAYLOAD_SIZE: usize = 255; -const READ_CHUNK_SIZE: usize = MAX_PAYLOAD_SIZE - 4; +const READ_CHUNK_SIZE: usize = sbp::MAX_PAYLOAD_LEN - 4; const SEQUENCE_LEN: usize = 4; const OFFSET_LEN: usize = 4; const NULL_SEP_LEN: usize = 1; const WRITE_REQ_OVERHEAD_LEN: usize = SEQUENCE_LEN + OFFSET_LEN + NULL_SEP_LEN; -pub struct Fileio<'a> { - link: Link<'a, ()>, +pub struct Fileio { + link: Link<'static, ()>, sender: MsgSender, config: Option, } -impl<'a> Fileio<'a> { - pub fn new(link: Link<'a, ()>, sender: MsgSender) -> Self { +impl Fileio { + pub fn new(link: Link<'static, ()>, sender: MsgSender) -> Self { Self { link, sender, @@ -46,12 +55,38 @@ impl<'a> Fileio<'a> { } } - pub fn read(&mut self, path: String, mut dest: impl Write) -> Result<()> { - let config = self.config_to_default(); - - let sender = self.sender.clone(); - let send_msg = move |sequence, offset| { - sender.send( + pub fn read(&mut self, path: String, mut dest: impl Write + Send) -> Result<()> { + let mut sequence = new_sequence(); + let mut offset = 0; + let (tx, rx) = channel::unbounded(); + let key = self.link.register(move |msg: MsgFileioReadResp| { + let _ = tx.send(msg); + }); + if let Err(err) = self.sender.send( + MsgFileioReadReq { + sender_id: None, + filename: path.clone().into(), + chunk_size: READ_CHUNK_SIZE as u8, + sequence, + offset, + } + .into(), + ) { + self.link.unregister(key); + return Err(err); + }; + for msg in rx.iter() { + if let Err(err) = dest.write_all(&msg.contents) { + self.link.unregister(key); + return Err(err.into()); + } + let bytes_read = msg.contents.len(); + if bytes_read != READ_CHUNK_SIZE { + break; + } + sequence += 1; + offset += READ_CHUNK_SIZE as u32; + if let Err(err) = self.sender.send( MsgFileioReadReq { sender_id: None, filename: path.clone().into(), @@ -60,238 +95,116 @@ impl<'a> Fileio<'a> { offset, } .into(), - ) - }; - - let (stop_req_tx, stop_req_rx) = channel::bounded(0); - let (req_tx, req_rx) = channel::unbounded(); - - let (res_tx, res_rx) = channel::unbounded(); - - let open_requests = AtomicCell::new(0u32); - - let mut sequence = new_sequence(); - // sequence number of the request we need to write to `dest` next - let mut current_sequence = sequence; - // holds data while we wait for out of order requests - let mut data: HashMap> = HashMap::new(); - let mut pending: HashMap = HashMap::new(); - let mut last_sent = false; - - scope(|s| { - s.spawn(|_| { - let mut offset = 0; - let backoff = Backoff::new(); - - while stop_req_rx.try_recv().is_err() { - while open_requests.load() >= config.window_size { - backoff.snooze(); - } - send_msg(sequence, offset)?; - req_tx - .send((sequence, ReadReq::new(offset))) - .expect(FILEIO_CHANNEL_SEND_FAILURE); - offset += READ_CHUNK_SIZE as u32; - sequence += 1; - open_requests.fetch_add(1); - } - - Result::Ok(()) - }); - - let key = self.link.register(move |msg: MsgFileioReadResp| { - res_tx.send(msg).expect(FILEIO_CHANNEL_SEND_FAILURE); - }); - - loop { - select! { - recv(req_rx) -> msg => { - let (sequence, request) = msg?; - pending.insert(sequence, request); - }, - recv(res_rx) -> msg => { - let msg = msg?; - let req = match pending.remove(&msg.sequence) { - Some(req) => req, - None => continue, - }; - let bytes_read = msg.contents.len(); - if msg.sequence == current_sequence { - dest.write_all(&msg.contents)?; - current_sequence += 1; - while let Some(d) = data.remove(¤t_sequence) { - dest.write_all(&d)?; - current_sequence += 1; - } - } else { - data.insert(req.offset, msg.contents); - } - open_requests.fetch_sub(1); - if !last_sent && bytes_read != READ_CHUNK_SIZE as usize { - last_sent = true; - stop_req_tx.send(true).expect(FILEIO_CHANNEL_SEND_FAILURE); - } - if last_sent && open_requests.load() == 0 { - break - } - }, - recv(channel::tick(CHECK_INTERVAL)) -> _ => { - for (seq, req) in pending.iter_mut() { - if req.expired() { - req.track_retry()?; - send_msg(*seq, req.offset)?; - } - } - } - } - } - - self.link.unregister(key); - - Ok(()) - }) - .unwrap() + ) { + self.link.unregister(key); + return Err(err); + }; + } + self.link.unregister(key); + Ok(()) } /// Deletes `filename` on the remote device (if it exists) and writes the contents of `data` to the file. /// This operation is NOT atomic. If the write fails and `filename` existed, it is gone forever. /// For more context see: https://github.com/swift-nav/swift-toolbox/pull/72#discussion_r654751414 - pub fn overwrite(&mut self, filename: String, data: impl Read) -> Result<()> { + pub fn overwrite(&mut self, filename: String, data: impl Read + Send) -> Result<()> { self.overwrite_with_progress(filename, data, |_| ()) } /// Deletes `filename` on the remote device (if it exists) and writes the contents of `data` to the file. /// This operation is NOT atomic. If the write fails and `filename` existed, it is gone forever. /// For more context see: https://github.com/swift-nav/swift-toolbox/pull/72#discussion_r654751414 - pub fn overwrite_with_progress<'b, F>( + pub fn overwrite_with_progress( &mut self, filename: String, - data: impl Read, + data: impl Read + Send, mut on_progress: F, ) -> Result<()> where - F: FnMut(usize) + 'b, + F: FnMut(usize) + Send, { self.remove(filename.clone())?; - - let mut data = BufReader::new(data); - let mut state = WriteState::new(filename); - - loop { - let buf = data.fill_buf()?; - let bytes_read = buf.len(); - if bytes_read == 0 { - break; - } - state = self.write_slice(state, buf, &mut on_progress)?; - data.consume(bytes_read); - } - - Ok(()) - } - - fn write_slice<'b, F>( - &mut self, - mut state: WriteState, - data: &[u8], - on_progress: &mut F, - ) -> Result - where - F: FnMut(usize) + 'b, - { let config = self.fetch_config(); - - let (req_tx, req_rx) = channel::unbounded(); - let (res_tx, res_rx) = channel::unbounded(); - - let open_requests = AtomicCell::new(0u32); - - let sender = self.sender.clone(); - let send_msg = |state: &WriteState, req: &WriteReq| { - sender.send( - MsgFileioWriteReq { - sender_id: None, - sequence: state.sequence, - offset: state.offset as u32, - filename: state.filename().into(), - data: data[req.offset..req.end_offset].to_vec(), - } - .into(), + let (tx, rx) = channel::bounded(config.batch_size); + let chunk_sizes = Mutex::new(HashMap::with_capacity(config.window_size)); + let mut data = BufReader::new(data); + let mut offset = 0; + let (chunk_size, filename) = if filename.ends_with('\x00') { + ( + sbp::MAX_PAYLOAD_LEN - WRITE_REQ_OVERHEAD_LEN - filename.len() - 1, + filename, + ) + } else { + ( + sbp::MAX_PAYLOAD_LEN - WRITE_REQ_OVERHEAD_LEN - filename.len(), + filename + "\x00", ) }; - - let data_len = data.len(); - - scope(|s| { - let key = self.link.register(move |msg: MsgFileioWriteResp| { - res_tx.send(msg).expect(FILEIO_CHANNEL_SEND_FAILURE); - }); - - s.spawn(|_| { - let backoff = Backoff::new(); + scope(move |scope| { + let chunk_sizes = &chunk_sizes; + scope.spawn(move |_| loop { + let buf = match data.fill_buf() { + Ok(buf) => buf, + Err(e) => { + let _ = tx.send(Err(e)); + return; + } + }; + let data_len = buf.len(); + if data_len == 0 { + return; + } let mut slice_offset = 0; - while slice_offset < data_len { - while open_requests.load() >= config.window_size { - backoff.snooze(); - } - let end_offset = std::cmp::min(slice_offset + state.chunk_size, data_len); - let chunk_len = std::cmp::min(state.chunk_size, data_len - slice_offset); - let req = WriteReq::new(slice_offset, end_offset, chunk_len < state.chunk_size); - send_msg(&state, &req)?; - req_tx - .send((state.clone(), req)) - .expect(FILEIO_CHANNEL_SEND_FAILURE); - state.update(chunk_len); + let end_offset = std::cmp::min(slice_offset + chunk_size, data_len); + let chunk_len = std::cmp::min(chunk_size, data_len - slice_offset); + let req = MsgFileioWriteReq { + sender_id: None, + sequence: 0, + offset, + filename: filename.clone().into(), + data: buf[slice_offset..end_offset].to_vec(), + }; + offset += chunk_len as u32; slice_offset += chunk_len; - open_requests.fetch_add(1); + if tx.send(Ok(req)).is_err() { + return; + }; } - - Result::Ok(()) + data.consume(data_len); }); - - let mut pending: HashMap = HashMap::new(); - let mut last_sent = false; - - loop { - select! { - recv(req_rx) -> msg => { - let (req_state, req) = msg?; - if req.is_last { - last_sent = true; - } - pending.insert(req_state.sequence, (req_state, req)); - }, - recv(res_rx) -> msg => { - let msg = msg?; - let req = match pending.remove(&msg.sequence) { - Some((_, req)) => req, - _ => continue, - }; - on_progress(req.end_offset - req.offset); - open_requests.fetch_sub(1); - if last_sent && open_requests.load() == 0 { - break; - } - }, - recv(channel::tick(CHECK_INTERVAL)) -> _ => { - for (req_state, req) in pending.values_mut() { - if req.expired() { - req.track_retry()?; - send_msg(req_state, req)?; - } + with_repeater( + &self.sender, + &self.link, + config, + move |sequence| { + let req = match rx.recv() { + Ok(req) => req, + Err(_) => return Ok(None), + }; + match req { + Ok(mut msg) => { + msg.sequence = sequence; + chunk_sizes.lock().insert(msg.sequence, msg.data.len()); + Ok(Some(msg)) } + Err(e) => Err(e.into()), } - } - } - - self.link.unregister(key); - - Result::Ok(()) + }, + move |msg: MsgFileioWriteResp| { + let chunk_size = match chunk_sizes.lock().remove(&msg.sequence) { + Some(chunk_size) => chunk_size, + None => { + debug!("unexpected message {:?}", msg); + return; + } + }; + on_progress(chunk_size); + }, + ) }) - .unwrap()?; - - Ok(state) + .expect(CROSSBEAM_SCOPE_UNWRAP_FAILURE)?; + Ok(()) } pub fn readdir(&mut self, path: String) -> Result> { @@ -363,22 +276,17 @@ impl<'a> Fileio<'a> { Ok(()) } - fn config_to_default(&mut self) -> FileioConfig { - self.config = Some(Default::default()); - self.config.as_ref().unwrap().clone() - } - fn fetch_config(&mut self) -> FileioConfig { - if let Some(ref config) = self.config { - return config.clone(); + if let Some(config) = self.config { + return config; } let sequence = new_sequence(); - let (stop_tx, stop_rx) = channel::bounded(1); - let (tx, rx) = channel::bounded(1); + let (stop_tx, stop_rx) = channel::unbounded(); + let (tx, rx) = channel::unbounded(); let stop_tx_clone = stop_tx.clone(); let key = self.link.register(move |msg: MsgFileioConfigResp| { - tx.send(FileioConfig::new(msg)) + tx.send(FileioConfig::from(msg)) .expect(FILEIO_CHANNEL_SEND_FAILURE); stop_tx_clone.send(true).expect(FILEIO_CHANNEL_SEND_FAILURE); }); @@ -405,164 +313,284 @@ impl<'a> Fileio<'a> { stop_tx.send(true).expect(FILEIO_CHANNEL_SEND_FAILURE); res }) - .unwrap(); + .expect(CROSSBEAM_SCOPE_UNWRAP_FAILURE); self.link.unregister(key); self.config = Some(config); - self.config.clone().unwrap() + self.config.unwrap() } } -/// State that spans an entire call to `write` (i.e. potentially multiple `write_slice` calls) -#[derive(Debug, Clone)] -struct WriteState { - sequence: u32, - /// Offset into the file (not the current slice of data) - offset: usize, - filename: String, - chunk_size: usize, -} - -impl WriteState { - fn new(filename: String) -> Self { - let (chunk_size, filename) = if filename.ends_with('\x00') { - ( - MAX_PAYLOAD_SIZE - WRITE_REQ_OVERHEAD_LEN - filename.len() - 1, - filename, - ) - } else { - ( - MAX_PAYLOAD_SIZE - WRITE_REQ_OVERHEAD_LEN - filename.len(), - filename + "\x00", - ) - }; - Self { - sequence: new_sequence(), - offset: 0, - filename, - chunk_size, - } - } - - fn filename(&self) -> String { - self.filename.to_string() - } - - fn update(&mut self, chunk_len: usize) { - self.offset += chunk_len; - self.sequence += 1; - } +pub fn new_sequence() -> u32 { + rand::thread_rng().gen_range(0..0xfffffff) } -#[derive(Debug)] -struct FileioRequest { - sent_at: Instant, - retries: usize, +#[derive(Debug, Clone, Copy)] +struct FileioConfig { + window_size: usize, + batch_size: usize, } -impl FileioRequest { - fn new() -> Self { +impl From for FileioConfig { + fn from(msg: MsgFileioConfigResp) -> Self { Self { - sent_at: Instant::now(), - retries: 0, - } - } - - fn expired(&self) -> bool { - self.sent_at.elapsed() >= FILE_IO_TIMEOUT - } - - fn track_retry(&mut self) -> Result<()> { - self.retries += 1; - self.sent_at = Instant::now(); - - if self.retries >= MAX_RETRIES { - Err(anyhow!("fileio send message timeout")) - } else { - Ok(()) + window_size: msg.window_size as usize, + batch_size: msg.batch_size as usize, } } } -impl Default for FileioRequest { +impl Default for FileioConfig { fn default() -> Self { - Self::new() + FileioConfig { + window_size: 100, + batch_size: 1, + } } } -struct ReadReq { - offset: u32, - req: FileioRequest, +fn with_repeater( + sender: &MsgSender, + link: &Link<'static, ()>, + config: FileioConfig, + req_gen: F, + mut cb: G, +) -> Result<()> +where + F: FnMut(u32) -> Result> + Send, + G: FnMut(MsgFileioWriteResp) + Send, +{ + // maps sequences -> pending requests + let pending_map = Mutex::new(BTreeMap::new()); + // each sequence gets sent to this channel. the timeout thread pulls from + // it and checks the timeouts in order. On a retry the sequence gets pushed + // back onto the queue + let (pending_queue_tx, pending_queue_rx) = channel::unbounded(); + // workers send errors via this channel to propagate failure + let (err_tx, err_rx) = channel::unbounded(); + // forwards messages from the link to the main select loop + let (res_tx, res_rx) = channel::unbounded(); + // the request generating thread sends requests to the request making thread over this channel + let (req_tx, req_rx) = channel::bounded(config.batch_size); + let key = link.register(move |msg: MsgFileioWriteResp| { + let _ = res_tx.send(msg); + }); + // when the window is full the request thread goes to sleep. when we get a response + // we can wake up the thread with this + let send_parker = Parker::new(); + let send_unparker = send_parker.unparker().clone(); + let result = scope(|scope| { + let err_tx = &err_tx; + let pending_map = &pending_map; + let pending_queue_rx = &pending_queue_rx; + let pending_queue_tx = &pending_queue_tx; + let res_rx = &res_rx; + scope + .builder() + .name("request-generator".into()) + .spawn(move |_| { + req_generator_thd(req_gen, req_tx); + }) + .expect(THREAD_START_FAILURE); + scope + .builder() + .name("request-maker".into()) + .spawn(move |_| { + req_maker_thd( + config, + sender, + pending_map, + pending_queue_tx, + err_tx, + req_rx, + send_parker, + ); + }) + .expect(THREAD_START_FAILURE); + scope + .builder() + .name("timeout".into()) + .spawn(move |_| { + timeout_thd( + pending_queue_rx, + pending_map, + err_tx, + sender, + pending_queue_tx, + ); + }) + .expect(THREAD_START_FAILURE); + loop { + select! { + recv(res_rx) -> msg => { + let res = msg.expect("response channel closed"); + if pending_map.lock().remove(&res.sequence).is_none() { + trace!("duplicate response {}", res.sequence); + continue; + } + trace!("got response {}", res.sequence); + send_unparker.unpark(); + cb(res); + if pending_map.lock().is_empty() { + let _ = pending_queue_tx.send(None); + return Ok(()); + } + } + recv(err_rx) -> msg => { + let err = msg.expect("error channel closed"); + return Err(err); + } + } + } + }) + .expect(CROSSBEAM_SCOPE_UNWRAP_FAILURE); + link.unregister(key); + result } -impl ReadReq { - fn new(offset: u32) -> Self { - Self { - offset, - req: FileioRequest::new(), +fn req_generator_thd Result> + Send>( + mut req_gen: F, + req_tx: Sender>>, +) { + let sequence = new_sequence(); + for seq in sequence.. { + let msg = req_gen(seq); + let exit = msg.as_ref().map_or(true, Option::is_none); + if req_tx.send(msg).is_err() || exit { + break; } } - - fn expired(&self) -> bool { - self.req.expired() + debug!("request-generator thread finished"); +} +fn req_maker_thd( + config: FileioConfig, + sender: &MsgSender, + pending_map: &Mutex>, + pending_queue_tx: &Sender>, + err_tx: &Sender, + req_rx: Receiver>>, + send_parker: Parker, +) { + let mut batch = Vec::with_capacity(config.batch_size); + let send_batch = move |batch: &mut Vec| -> bool { + for req in batch.drain(..config.batch_size.min(batch.len())) { + match sender.send(req.clone().into()) { + Ok(_) => { + let req = PendingReq::new(req); + let seq = req.message.sequence; + pending_map.lock().insert(seq, req); + let _ = pending_queue_tx.send(Some(seq)); + } + Err(err) => { + let _ = err_tx.send(err); + return false; + } + } + } + true + }; + for req in req_rx { + let req = match req { + Ok(Some(req)) => req, + Err(err) => { + let _ = err_tx.send(err); + break; + } + _ => break, + }; + batch.push(req); + if batch.len() < config.batch_size { + continue; + } + while !window_available(pending_map, config) { + send_parker.park(); + } + if !send_batch(&mut batch) { + break; + } + trace!("batch sent"); } - - fn track_retry(&mut self) -> Result<()> { - self.req.track_retry() + debug!("flushing remaining messages"); + while !batch.is_empty() { + while !window_available(pending_map, config) { + send_parker.park(); + } + if !send_batch(&mut batch) { + break; + } } + debug!("request-maker thread finished"); } -#[derive(Debug)] -struct WriteReq { - /// Offset start into current slice of data - offset: usize, - /// Offset end into current slice of data - end_offset: usize, - /// Is this the last request for this chunk of data - is_last: bool, - req: FileioRequest, -} - -impl WriteReq { - fn new(offset: usize, end_offset: usize, is_last: bool) -> Self { - Self { - offset, - end_offset, - is_last, - req: FileioRequest::new(), +fn timeout_thd( + pending_queue_rx: &Receiver>, + pending_map: &Mutex>, + err_tx: &Sender, + sender: &MsgSender, + pending_queue_tx: &Sender>, +) { + for seq in pending_queue_rx.iter() { + let seq = match seq { + Some(seq) => seq, + None => break, + }; + let elapsed = match pending_map.lock().get(&seq) { + Some(req) => req.sent_at.elapsed(), + None => continue, + }; + if elapsed < FILE_IO_TIMEOUT { + std::thread::sleep(FILE_IO_TIMEOUT - elapsed); + } + let mut guard = pending_map.lock(); + if let Some(req) = guard.get_mut(&seq) { + req.retries += 1; + trace!("retry {} times {}", req.message.sequence, req.retries); + if req.retries >= MAX_RETRIES { + let _ = err_tx.send(anyhow!("fileio timeout")); + } + req.sent_at = Instant::now(); + let msg = req.message.clone(); + let seq = msg.sequence; + drop(guard); + if let Err(err) = sender.send(msg.into()) { + let _ = err_tx.send(err); + break; + }; + let _ = pending_queue_tx.send(Some(seq)); } } + debug!("timeout thread finished"); +} - fn expired(&self) -> bool { - self.req.expired() - } - - fn track_retry(&mut self) -> Result<()> { - self.req.track_retry() +fn window_available(pending_map: &Mutex>, config: FileioConfig) -> bool { + let range = { + let guard = pending_map.lock(); + let mut seqs = guard.keys(); + seqs.next().copied().zip(seqs.next_back().copied()) + }; + match range { + Some((oldest, newest)) => { + // will a new batch fit in the window size + (newest - oldest) + config.batch_size as u32 <= config.window_size as u32 + } + None => true, } } #[derive(Debug, Clone)] -struct FileioConfig { - window_size: u32, - // batch_size -> TODO Investigate slow upgrades: https://swift-nav.atlassian.net/browse/CPP-472 +struct PendingReq { + message: MsgFileioWriteReq, + retries: usize, + sent_at: Instant, } -impl FileioConfig { - fn new(msg: MsgFileioConfigResp) -> Self { +impl PendingReq { + fn new(message: MsgFileioWriteReq) -> Self { Self { - window_size: msg.window_size, + message, + retries: 0, + sent_at: Instant::now(), } } } - -impl Default for FileioConfig { - fn default() -> Self { - FileioConfig { window_size: 100 } - } -} - -pub fn new_sequence() -> u32 { - rand::thread_rng().gen_range(0..0xfffffff) -} diff --git a/console_backend/src/update_tab.rs b/console_backend/src/update_tab.rs index a6fba90e8..c25ef791d 100644 --- a/console_backend/src/update_tab.rs +++ b/console_backend/src/update_tab.rs @@ -153,7 +153,7 @@ pub fn update_tab_thread( update_tab_context: UpdateTabContext, shared_state: SharedState, client_sender: BoxedClientSender, - link: Link<'_, ()>, + link: Link<'static, ()>, msg_sender: MsgSender, ) { let is_running = ArcBool::new_with(true);