@@ -165,62 +165,44 @@ where
165165 }
166166
167167 pub fn write ( & mut self , filename : String , data : impl Read ) -> Result < ( ) > {
168+ self . remove ( filename. clone ( ) ) ?;
169+
168170 let mut data = BufReader :: new ( data) ;
169- let mut sequence: Option < u32 > = None ;
170- let mut offset: Option < usize > = None ;
171+ let mut state = WriteState :: new ( filename) ;
171172
172173 loop {
173174 let buf = data. fill_buf ( ) ?;
174175 let bytes_read = buf. len ( ) ;
175176 if bytes_read == 0 {
176177 break ;
177178 }
178- let res = self . write_slice ( filename. clone ( ) , buf, sequence, offset) ?;
179- sequence = Some ( res. 0 ) ;
180- offset = Some ( res. 1 ) ;
179+ state = self . write_slice ( state, buf) ?;
181180 data. consume ( bytes_read) ;
182181 }
183182
184183 Ok ( ( ) )
185184 }
186185
187- pub fn write_slice (
188- & mut self ,
189- mut filename : String ,
190- data : & [ u8 ] ,
191- sequence : Option < u32 > ,
192- offset : Option < usize > ,
193- ) -> Result < ( u32 , usize ) > {
186+ fn write_slice ( & mut self , mut state : WriteState , data : & [ u8 ] ) -> Result < WriteState > {
194187 let config = self . fetch_config ( ) ;
195188
196- self . remove ( filename. clone ( ) ) ?;
197-
198- let filename_len = filename. len ( ) ;
199- let data_len = data. len ( ) ;
200- filename. push ( b'\x00' as char ) ;
201-
202- let chunk_size = MAX_PAYLOAD_SIZE - WRITE_REQ_OVERHEAD_LEN - filename_len;
203-
204189 let ( req_tx, req_rx) = channel:: unbounded ( ) ;
205190 let ( res_tx, res_rx) = channel:: unbounded ( ) ;
206191
207192 let open_requests = AtomicCell :: new ( 0u32 ) ;
208193
209194 let sender = self . sender . clone ( ) ;
210- let send_msg = |sequence, offset, end_offset| {
211- let data = data[ offset..end_offset] . to_vec ( ) ;
212- let msg = SBP :: from ( MsgFileioWriteReq {
195+ let send_msg = |state : & WriteState , req : & WriteReq | {
196+ sender. send ( SBP :: from ( MsgFileioWriteReq {
213197 sender_id : Some ( 42 ) ,
214- sequence,
215- offset : offset as u32 ,
216- filename : filename. clone ( ) . into ( ) ,
217- data,
218- } ) ;
219- sender. send ( msg)
198+ sequence : state. sequence ,
199+ offset : state. offset as u32 ,
200+ filename : state. filename ( ) ,
201+ data : data[ req. offset ..req. end_offset ] . to_vec ( ) ,
202+ } ) )
220203 } ;
221204
222- let mut sequence = sequence. unwrap_or_else ( || new_sequence ( ) ) ;
223- let mut file_offset = offset. unwrap_or ( 0 ) ;
205+ let data_len = data. len ( ) ;
224206
225207 scope ( |s| {
226208 s. spawn ( |_| {
@@ -231,16 +213,15 @@ where
231213 while open_requests. load ( ) >= config. window_size {
232214 backoff. snooze ( ) ;
233215 }
234- let end_offset = std:: cmp:: min ( slice_offset + chunk_size, data_len) ;
235- let chunk_len = std:: cmp:: min ( chunk_size, data_len - slice_offset) ;
236- let is_last = chunk_len < chunk_size;
237- send_msg ( sequence , slice_offset, end_offset) ? ;
238- req_tx
239- . send ( ( sequence , WriteReq :: new ( slice_offset , end_offset ) , is_last) )
240- . unwrap ( ) ;
241- file_offset += chunk_len;
216+ let end_offset = std:: cmp:: min ( slice_offset + state . chunk_size , data_len) ;
217+ let chunk_len = std:: cmp:: min ( state . chunk_size , data_len - slice_offset) ;
218+ let is_last = chunk_len < state . chunk_size ;
219+ let req = WriteReq :: new ( slice_offset, end_offset) ;
220+ send_msg ( & state , & req ) ? ;
221+ req_tx . send ( ( state . clone ( ) , req , is_last) ) . unwrap ( ) ;
222+ state . sequence += 1 ;
223+ state . offset += chunk_len;
242224 slice_offset += chunk_len;
243- sequence += 1 ;
244225 open_requests. fetch_add ( 1 ) ;
245226 }
246227
@@ -254,17 +235,17 @@ where
254235 }
255236 } ) ;
256237
257- let mut pending: HashMap < u32 , WriteReq > = HashMap :: new ( ) ;
238+ let mut pending: HashMap < u32 , ( WriteState , WriteReq ) > = HashMap :: new ( ) ;
258239 let mut last_sent = false ;
259240
260241 loop {
261242 select ! {
262243 recv( req_rx) -> msg => {
263- let ( sequence , req, is_last) = msg?;
244+ let ( req_state , req, is_last) = msg?;
264245 if !last_sent && is_last {
265246 last_sent = true ;
266247 }
267- pending. insert( sequence, req) ;
248+ pending. insert( req_state . sequence, ( req_state , req) ) ;
268249 } ,
269250 recv( res_rx) -> msg => {
270251 let msg = msg?;
@@ -277,10 +258,10 @@ where
277258 }
278259 } ,
279260 recv( channel:: tick( CHECK_INTERVAL ) ) -> _ => {
280- for ( seq , req) in pending. iter_mut ( ) {
261+ for ( req_state , req) in pending. values_mut ( ) {
281262 if req. expired( ) {
282263 req. track_retry( ) ?;
283- send_msg( * seq , req. offset , req . end_offset ) ?;
264+ send_msg( req_state , req) ?;
284265 }
285266 }
286267 }
@@ -293,7 +274,7 @@ where
293274 } )
294275 . unwrap ( ) ?;
295276
296- Ok ( ( sequence , file_offset ) )
277+ Ok ( state )
297278 }
298279
299280 pub fn readdir ( & mut self , path : String ) -> Result < Vec < String > > {
@@ -378,6 +359,40 @@ where
378359 }
379360}
380361
362+ #[ derive( Debug , Clone ) ]
363+ struct WriteState {
364+ sequence : u32 ,
365+ offset : usize ,
366+ filename : String ,
367+ chunk_size : usize ,
368+ }
369+
370+ impl WriteState {
371+ fn new ( filename : String ) -> Self {
372+ let ( chunk_size, filename) = if filename. ends_with ( "\x00 " ) {
373+ (
374+ MAX_PAYLOAD_SIZE - WRITE_REQ_OVERHEAD_LEN - filename. len ( ) - 1 ,
375+ filename,
376+ )
377+ } else {
378+ (
379+ MAX_PAYLOAD_SIZE - WRITE_REQ_OVERHEAD_LEN - filename. len ( ) ,
380+ filename + "\x00 " ,
381+ )
382+ } ;
383+ Self {
384+ sequence : new_sequence ( ) ,
385+ offset : 0 ,
386+ filename,
387+ chunk_size,
388+ }
389+ }
390+
391+ fn filename ( & self ) -> sbp:: SbpString {
392+ self . filename . clone ( ) . into ( )
393+ }
394+ }
395+
381396struct ReadReq {
382397 offset : u32 ,
383398 sent_at : Instant ,
0 commit comments