@@ -286,7 +286,7 @@ impl ManagerService for Arc<Manager> {
286286
287287 info_with_replica ! ( self . replica_id, "Finished quorum for rank {}" , rank) ;
288288
289- let reply = compute_quorum_results ( & self . replica_id , rank, & quorum) ?;
289+ let reply = compute_quorum_results ( & self . replica_id , rank, & quorum, req . init_sync ) ?;
290290
291291 Ok ( Response :: new ( reply) )
292292 }
@@ -382,6 +382,7 @@ fn compute_quorum_results(
382382 replica_id : & str ,
383383 rank : i64 ,
384384 quorum : & Quorum ,
385+ init_sync : bool ,
385386) -> Result < ManagerQuorumResponse , Status > {
386387 let mut participants = quorum. participants . clone ( ) ;
387388 participants. sort_by ( |a, b| a. replica_id . cmp ( & b. replica_id ) ) ;
@@ -424,20 +425,23 @@ fn compute_quorum_results(
424425
425426 // Compute recovery assignments
426427
427- // Nodes are recovering if:
428- // 1. not at the max step
428+ let force_recover = init_sync && max_step == 0 ;
429+
430+ // Nodes are recovering if
431+ // 1. not at the max step (init_sync)
429432 // 2. max_step == 0 and not the primary replica
430433 let all_recover_dst_ranks: Vec < usize > = participants
431434 . iter ( )
432435 . enumerate ( )
433436 . filter_map ( |( i, p) | {
434- if p. step != max_step || max_step == 0 && primary. replica_id != p. replica_id {
437+ if p. step != max_step || force_recover && primary. replica_id != p. replica_id {
435438 Some ( i)
436439 } else {
437440 None
438441 }
439442 } )
440443 . collect ( ) ;
444+
441445 let all_recover_dst_ranks_set = all_recover_dst_ranks. iter ( ) . collect :: < HashSet < _ > > ( ) ;
442446 let up_to_date_ranks: Vec < usize > = participants
443447 . iter ( )
@@ -605,6 +609,7 @@ mod tests {
605609 step : 123 ,
606610 checkpoint_metadata : "addr" . to_string ( ) ,
607611 shrink_only : false ,
612+ init_sync : true ,
608613 } ) ;
609614 request. set_timeout ( Duration :: from_secs ( 10 ) ) ;
610615 let resp = client. quorum ( request) . await ?. into_inner ( ) ;
@@ -664,6 +669,7 @@ mod tests {
664669 step : 0 ,
665670 checkpoint_metadata : "addr" . to_string ( ) ,
666671 shrink_only : false ,
672+ init_sync : true ,
667673 } ) ;
668674 request. set_timeout ( Duration :: from_secs ( 10 ) ) ;
669675
@@ -771,21 +777,21 @@ mod tests {
771777
772778 // rank 0
773779
774- let results = compute_quorum_results ( "replica_0" , 0 , & quorum) ?;
780+ let results = compute_quorum_results ( "replica_0" , 0 , & quorum, true ) ?;
775781 assert ! ( !results. heal) ;
776782 assert_eq ! ( results. replica_rank, 0 ) ;
777783 assert_eq ! ( results. recover_src_rank, None ) ;
778784 assert_eq ! ( results. recover_dst_ranks, vec![ 1 ] ) ;
779785
780- let results = compute_quorum_results ( "replica_1" , 0 , & quorum) ?;
786+ let results = compute_quorum_results ( "replica_1" , 0 , & quorum, true ) ?;
781787 assert ! ( results. heal) ;
782788 assert_eq ! ( results. replica_rank, 1 ) ;
783789 assert_eq ! ( results. recover_src_rank, Some ( 0 ) ) ;
784790 assert_eq ! ( results. recover_dst_ranks, Vec :: <i64 >:: new( ) ) ;
785791
786792 // rank 1 assignments should be offset from rank 0 above and the primary
787793
788- let results = compute_quorum_results ( "replica_1" , 1 , & quorum) ?;
794+ let results = compute_quorum_results ( "replica_1" , 1 , & quorum, true ) ?;
789795 assert ! ( !results. heal) ;
790796 assert_eq ! ( results. replica_rank, 1 ) ;
791797 assert_eq ! ( results. recover_src_rank, None ) ;
@@ -850,34 +856,80 @@ mod tests {
850856
851857 // rank 0
852858
853- let results = compute_quorum_results ( "replica_0" , 0 , & quorum) ?;
859+ let results = compute_quorum_results ( "replica_0" , 0 , & quorum, true ) ?;
854860 assert ! ( results. heal) ;
855861 assert_eq ! ( results. recover_src_manager_address, "addr_1" . to_string( ) ) ;
856862 assert_eq ! ( results. replica_rank, 0 ) ;
857863 assert_eq ! ( results. recover_src_rank, Some ( 1 ) ) ;
858864 assert ! ( results. recover_dst_ranks. is_empty( ) ) ;
859865
860- let results = compute_quorum_results ( "replica_1" , 0 , & quorum) ?;
866+ let results = compute_quorum_results ( "replica_1" , 0 , & quorum, true ) ?;
861867 assert ! ( !results. heal) ;
862868 assert_eq ! ( results. recover_src_manager_address, "" . to_string( ) ) ;
863869 assert_eq ! ( results. replica_rank, 1 ) ;
864870 assert_eq ! ( results. recover_src_rank, None ) ;
865871 assert_eq ! ( results. recover_dst_ranks, vec![ 0 , 4 ] ) ;
866872
867- let results = compute_quorum_results ( "replica_3" , 0 , & quorum) ?;
873+ let results = compute_quorum_results ( "replica_3" , 0 , & quorum, true ) ?;
868874 assert ! ( !results. heal) ;
869875 assert_eq ! ( results. replica_rank, 3 ) ;
870876 assert_eq ! ( results. recover_src_rank, None ) ;
871877 assert_eq ! ( results. recover_dst_ranks, vec![ 2 ] ) ;
872878
873879 // rank 1 assignments should be offset from rank 0 above
874880
875- let results = compute_quorum_results ( "replica_1" , 1 , & quorum) ?;
881+ let results = compute_quorum_results ( "replica_1" , 1 , & quorum, true ) ?;
876882 assert ! ( !results. heal) ;
877883 assert_eq ! ( results. replica_rank, 1 ) ;
878884 assert_eq ! ( results. recover_src_rank, None ) ;
879885 assert_eq ! ( results. recover_dst_ranks, vec![ 2 ] ) ;
880886
881887 Ok ( ( ) )
882888 }
889+
890+ #[ tokio:: test]
891+ async fn test_compute_quorum_results_skip_init_sync ( ) -> Result < ( ) > {
892+ let mut quorum = Quorum {
893+ quorum_id : 1 ,
894+ participants : vec ! [
895+ QuorumMember {
896+ replica_id: "replica_0" . to_string( ) ,
897+ address: "addr_0" . to_string( ) ,
898+ store_address: "store_addr_0" . to_string( ) ,
899+ step: 0 ,
900+ world_size: 1 ,
901+ shrink_only: false ,
902+ data: String :: new( ) ,
903+ } ,
904+ QuorumMember {
905+ replica_id: "replica_1" . to_string( ) ,
906+ address: "addr_1" . to_string( ) ,
907+ store_address: "store_addr_1" . to_string( ) ,
908+ step: 0 ,
909+ world_size: 1 ,
910+ shrink_only: false ,
911+ data: String :: new( ) ,
912+ } ,
913+ ] ,
914+ created : None ,
915+ } ;
916+
917+ // baseline w/ init_sync=true
918+ let results = compute_quorum_results ( "replica_0" , 0 , & quorum, true ) ?;
919+ assert ! ( !results. heal) ;
920+
921+ let results = compute_quorum_results ( "replica_1" , 0 , & quorum, true ) ?;
922+ assert ! ( results. heal) ;
923+
924+ // init_sync=false
925+ let results = compute_quorum_results ( "replica_1" , 0 , & quorum, false ) ?;
926+ assert ! ( !results. heal) ;
927+
928+ // init_sync=false, step=1
929+ quorum. participants [ 0 ] . step = 1 ;
930+ let results = compute_quorum_results ( "replica_1" , 0 , & quorum, false ) ?;
931+ assert ! ( results. heal) ;
932+
933+ Ok ( ( ) )
934+ }
883935}
0 commit comments