diff --git a/src/app/firedancer-dev/commands/repair.c b/src/app/firedancer-dev/commands/repair.c index 4a0b983a473..80c7e50788b 100644 --- a/src/app/firedancer-dev/commands/repair.c +++ b/src/app/firedancer-dev/commands/repair.c @@ -442,7 +442,7 @@ static fd_location_info_t * location_table; static fd_pubkey_t peers_copy[ FD_ACTIVE_KEY_MAX ]; static ulong -sort_peers_by_latency( fd_policy_peer_t * active_table, fd_peer_dlist_t * peers_dlist, fd_peer_t * peers_arr ) { +sort_peers_by_latency( fd_policy_peer_t * active_table, fd_peer_dlist_t * peers_dlist, fd_peer_dlist_t * peers_wlist, fd_peer_t * peers_arr ) { ulong i = 0; fd_peer_dlist_iter_t iter = fd_peer_dlist_iter_fwd_init( peers_dlist, peers_arr ); while( !fd_peer_dlist_iter_done( iter, peers_dlist, peers_arr ) ) { @@ -452,6 +452,15 @@ sort_peers_by_latency( fd_policy_peer_t * active_table, fd_peer_dlist_t * peers_ if( FD_UNLIKELY( i >= FD_ACTIVE_KEY_MAX ) ) break; iter = fd_peer_dlist_iter_fwd_next( iter, peers_dlist, peers_arr ); } + FD_LOG_NOTICE(( "Fast peers cnt: %lu. Remainder is slow.", i )); + iter = fd_peer_dlist_iter_fwd_init( peers_wlist, peers_arr ); + while( !fd_peer_dlist_iter_done( iter, peers_wlist, peers_arr ) ) { + fd_peer_t * peer = fd_peer_dlist_iter_ele( iter, peers_wlist, peers_arr ); + if( FD_UNLIKELY( !peer ) ) break; + peers_copy[ i++ ] = peer->identity; + if( FD_UNLIKELY( i >= FD_ACTIVE_KEY_MAX ) ) break; + iter = fd_peer_dlist_iter_fwd_next( iter, peers_wlist, peers_arr ); + } ulong peer_cnt = i; for( uint i = 0; i < peer_cnt - 1; i++ ) { @@ -485,12 +494,14 @@ print_peer_location_latency( fd_wksp_t * repair_tile_wksp, ctx_t * tile_ctx ) { fd_policy_t * policy = fd_wksp_laddr ( repair_tile_wksp, policy_gaddr ); ulong peermap_gaddr = fd_wksp_gaddr_fast( tile_ctx->wksp, policy->peers.map ); ulong peerarr_gaddr = fd_wksp_gaddr_fast( tile_ctx->wksp, policy->peers.pool ); - ulong peerlst_gaddr = fd_wksp_gaddr_fast( tile_ctx->wksp, policy->peers.dlist ); + ulong peerlst_gaddr = fd_wksp_gaddr_fast( tile_ctx->wksp, policy->peers.fast ); + ulong peerwst_gaddr = fd_wksp_gaddr_fast( tile_ctx->wksp, policy->peers.slow ); fd_policy_peer_t * peers_map = (fd_policy_peer_t *)fd_wksp_laddr( repair_tile_wksp, peermap_gaddr ); fd_peer_dlist_t * peers_dlist = (fd_peer_dlist_t *)fd_wksp_laddr( repair_tile_wksp, peerlst_gaddr ); + fd_peer_dlist_t * peers_wlist = (fd_peer_dlist_t *)fd_wksp_laddr( repair_tile_wksp, peerwst_gaddr ); fd_peer_t * peers_arr = (fd_peer_t *)fd_wksp_laddr( repair_tile_wksp, peerarr_gaddr ); - ulong peer_cnt = sort_peers_by_latency( peers_map, peers_dlist, peers_arr ); + ulong peer_cnt = sort_peers_by_latency( peers_map, peers_dlist, peers_wlist, peers_arr ); printf("\nPeer Location/Latency Information\n"); printf( "| %-46s | %-7s | %-8s | %-8s | %-7s | %12s | %s\n", "Pubkey", "Req Cnt", "Req B/s", "Rx B/s", "Rx Rate", "Avg Latency", "Location Info" ); for( uint i = 0; i < peer_cnt; i++ ) { diff --git a/src/discof/forest/fd_forest.h b/src/discof/forest/fd_forest.h index 0838dfd7a5c..74cb78b8e88 100644 --- a/src/discof/forest/fd_forest.h +++ b/src/discof/forest/fd_forest.h @@ -79,7 +79,7 @@ struct __attribute__((aligned(128UL))) fd_forest_blk { fd_forest_blk_idxs_t code[fd_forest_blk_idxs_word_cnt]; /* code shred idxs */ long first_shred_ts; /* timestamp of first shred rcved in slot != complete_idx */ - long first_req_ts; /* timestamp of first request received in slot != complete_idx */ + long first_req_ts; /* tick of first request received in slot != complete_idx */ uint turbine_cnt; /* number of shreds received from turbine */ uint repair_cnt; /* number of data shreds received from repair */ uint recovered_cnt; /* number of shreds recovered from reedsol recovery */ diff --git a/src/discof/repair/fd_policy.c b/src/discof/repair/fd_policy.c index d1ef3f3a41d..04b2a4c8ddf 100644 --- a/src/discof/repair/fd_policy.c +++ b/src/discof/repair/fd_policy.c @@ -24,13 +24,14 @@ fd_policy_new( void * shmem, ulong dedup_max, ulong peer_max, ulong seed ) { int lg_peer_max = fd_ulong_find_msb( fd_ulong_pow2_up( peer_max ) ); FD_SCRATCH_ALLOC_INIT( l, shmem ); - fd_policy_t * policy = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_align(), sizeof(fd_policy_t) ); - void * dedup_map = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_map_align(), fd_policy_dedup_map_footprint ( dedup_max ) ); - void * dedup_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_pool_align(), fd_policy_dedup_pool_footprint( dedup_max ) ); - void * dedup_lru = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_lru_align(), fd_policy_dedup_lru_footprint ( ) ); - void * peers = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_map_align(), fd_policy_peer_map_footprint ( lg_peer_max ) ); - void * peers_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_pool_align(), fd_peer_pool_footprint ( peer_max ) ); - void * peers_dlist = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(), fd_peer_dlist_footprint ( ) ); + fd_policy_t * policy = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_align(), sizeof(fd_policy_t) ); + void * dedup_map = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_map_align(), fd_policy_dedup_map_footprint ( dedup_max ) ); + void * dedup_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_pool_align(), fd_policy_dedup_pool_footprint( dedup_max ) ); + void * dedup_lru = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_lru_align(), fd_policy_dedup_lru_footprint() ); + void * peers = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_map_align(), fd_policy_peer_map_footprint ( lg_peer_max ) ); + void * peers_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_pool_align(), fd_peer_pool_footprint ( peer_max ) ); + void * peers_fast = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(), fd_peer_dlist_footprint() ); + void * peers_slow = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(), fd_peer_dlist_footprint() ); FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_policy_align() ) == (ulong)shmem + footprint ); policy->dedup.map = fd_policy_dedup_map_new ( dedup_map, dedup_max, seed ); @@ -38,7 +39,8 @@ fd_policy_new( void * shmem, ulong dedup_max, ulong peer_max, ulong seed ) { policy->dedup.lru = fd_policy_dedup_lru_new ( dedup_lru ); policy->peers.map = fd_policy_peer_map_new ( peers, lg_peer_max ); policy->peers.pool = fd_peer_pool_new ( peers_pool, peer_max ); - policy->peers.dlist = fd_peer_dlist_new ( peers_dlist ); + policy->peers.fast = fd_peer_dlist_new ( peers_fast ); + policy->peers.slow = fd_peer_dlist_new ( peers_slow ); policy->iterf.ele_idx = ULONG_MAX; policy->turbine_slot0 = ULONG_MAX; policy->tsreset = 0; @@ -67,13 +69,16 @@ fd_policy_join( void * shpolicy ) { return NULL; } - policy->dedup.map = fd_policy_dedup_map_join ( policy->dedup.map ); - policy->dedup.pool = fd_policy_dedup_pool_join( policy->dedup.pool ); - policy->dedup.lru = fd_policy_dedup_lru_join ( policy->dedup.lru ); - policy->peers.map = fd_policy_peer_map_join ( policy->peers.map ); - policy->peers.pool = fd_peer_pool_join ( policy->peers.pool ); - policy->peers.dlist = fd_peer_dlist_join ( policy->peers.dlist ); - policy->peers.iter = fd_peer_dlist_iter_fwd_init( policy->peers.dlist, policy->peers.pool ); + policy->dedup.map = fd_policy_dedup_map_join ( policy->dedup.map ); + policy->dedup.pool = fd_policy_dedup_pool_join( policy->dedup.pool ); + policy->dedup.lru = fd_policy_dedup_lru_join ( policy->dedup.lru ); + policy->peers.map = fd_policy_peer_map_join ( policy->peers.map ); + policy->peers.pool = fd_peer_pool_join ( policy->peers.pool ); + policy->peers.fast = fd_peer_dlist_join ( policy->peers.fast ); + policy->peers.slow = fd_peer_dlist_join ( policy->peers.slow ); + + policy->peers.select.iter = fd_peer_dlist_iter_fwd_init( policy->peers.slow, policy->peers.pool ); + policy->peers.select.stage = 0; return policy; } @@ -127,7 +132,7 @@ dedup_next( fd_policy_t * policy, ulong key, long now ) { fd_policy_dedup_map_ele_insert ( dedup->map, ele, dedup->pool ); fd_policy_dedup_lru_ele_push_tail( dedup->lru, ele, dedup->pool ); } - if( FD_LIKELY( now < ele->req_ts + (long)80e6 ) ) { + if( FD_LIKELY( now < ele->req_ts + (long)FD_POLICY_DEDUP_TIMEOUT ) ) { return 1; } ele->req_ts = now; @@ -156,13 +161,19 @@ passes_throttle_threshold( fd_policy_t * policy, fd_forest_blk_t * ele ) { fd_pubkey_t const * fd_policy_peer_select( fd_policy_t * policy ) { - fd_peer_dlist_t * dlist = policy->peers.dlist; - fd_peer_t * pool = policy->peers.pool; - if( FD_UNLIKELY( fd_peer_dlist_iter_done( policy->peers.iter, dlist, pool ) ) ) { - policy->peers.iter = fd_peer_dlist_iter_fwd_init( dlist, pool ); + fd_peer_dlist_t * best_dlist = policy->peers.fast; + fd_peer_dlist_t * worst_dlist = policy->peers.slow; + fd_peer_t * pool = policy->peers.pool; + + fd_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist; + + while( FD_UNLIKELY( fd_peer_dlist_iter_done( policy->peers.select.iter, dlist, pool ) ) ) { + policy->peers.select.stage = (policy->peers.select.stage + 1) % (sizeof(bucket_stages) / sizeof(uint)); + dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist; + policy->peers.select.iter = fd_peer_dlist_iter_fwd_init( dlist, pool ); } - fd_peer_t * select = fd_peer_dlist_iter_ele( policy->peers.iter, dlist, pool ); - policy->peers.iter = fd_peer_dlist_iter_fwd_next( policy->peers.iter, dlist, pool ); + fd_peer_t * select = fd_peer_dlist_iter_ele( policy->peers.select.iter, dlist, pool ); + policy->peers.select.iter = fd_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, pool ); return &select->identity; } @@ -294,7 +305,7 @@ fd_policy_peer_insert( fd_policy_t * policy, fd_pubkey_t const * key, fd_ip4_por fd_peer_t * peer_ele = fd_peer_pool_ele_acquire( policy->peers.pool ); peer->pool_idx = fd_peer_pool_idx( policy->peers.pool, peer_ele ); peer_ele->identity = *key; - fd_peer_dlist_ele_push_tail( policy->peers.dlist, peer_ele, policy->peers.pool ); + fd_peer_dlist_ele_push_tail( policy->peers.slow, peer_ele, policy->peers.pool ); return peer; } return NULL; @@ -312,11 +323,14 @@ fd_policy_peer_remove( fd_policy_t * policy, fd_pubkey_t const * key ) { fd_peer_t * peer_ele = fd_peer_pool_ele( policy->peers.pool, peer->pool_idx ); fd_policy_peer_map_remove( policy->peers.map, peer ); - if( FD_UNLIKELY( policy->peers.iter == fd_peer_pool_idx( policy->peers.pool, peer_ele ) ) ) { + if( FD_UNLIKELY( policy->peers.select.iter == fd_peer_pool_idx( policy->peers.pool, peer_ele ) ) ) { /* In general removal during iteration is safe, except when the iterator is on the peer to be removed. */ - policy->peers.iter = fd_peer_dlist_iter_fwd_next( policy->peers.iter, policy->peers.dlist, policy->peers.pool ); + fd_peer_dlist_t * dlist = policy->peers.select.stage == FD_POLICY_LATENCY_FAST ? policy->peers.fast : policy->peers.slow; + policy->peers.select.iter = fd_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, policy->peers.pool ); } - fd_peer_dlist_ele_remove( policy->peers.dlist, peer_ele, policy->peers.pool ); + + fd_peer_dlist_t * bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt ); + fd_peer_dlist_ele_remove( bucket, peer_ele, policy->peers.pool ); fd_peer_pool_ele_release( policy->peers.pool, peer_ele ); return 1; } @@ -332,14 +346,22 @@ fd_policy_peer_request_update( fd_policy_t * policy, fd_pubkey_t const * to ) { } void -fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, long rtt ) { +fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, long rtt /* ns */ ) { fd_policy_peer_t * peer = fd_policy_peer_query( policy, to ); if( FD_LIKELY( peer ) ) { long now = fd_tickcount(); + fd_peer_dlist_t * prev_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt ); peer->res_cnt++; if( FD_UNLIKELY( peer->first_resp_ts == 0 ) ) peer->first_resp_ts = now; peer->last_resp_ts = now; peer->total_lat += rtt; + fd_peer_dlist_t * new_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt ); + + if( prev_bucket != new_bucket ) { + fd_peer_t * peer_ele = fd_peer_pool_ele( policy->peers.pool, peer->pool_idx ); + fd_peer_dlist_ele_remove ( prev_bucket, peer_ele, policy->peers.pool ); + fd_peer_dlist_ele_push_tail( new_bucket, peer_ele, policy->peers.pool ); + } } } diff --git a/src/discof/repair/fd_policy.h b/src/discof/repair/fd_policy.h index 3ac9d302a1c..a443b8d8962 100644 --- a/src/discof/repair/fd_policy.h +++ b/src/discof/repair/fd_policy.h @@ -140,16 +140,43 @@ typedef struct fd_peer fd_peer_t; selecting repair peers via round-robin. */ struct fd_policy_peers { - fd_peer_dlist_t * dlist; /* doubly-linked list of repair peer pubkeys in insertion order */ - fd_peer_t * pool; /* memory pool of repair peer pubkeys */ - fd_policy_peer_t * map; /* map of pubkey->peer */ - fd_peer_dlist_iter_t iter; /* round-robin index of next peer */ + fd_peer_t * pool; /* memory pool of repair peer pubkeys, contains entries of both dlist */ + fd_peer_dlist_t * fast; /* [0, FD_POLICY_LATENCY_THRESH] ms latency group FD_POLICY_LATENCY_FAST */ + fd_peer_dlist_t * slow; /* (FD_POLICY_LATENCY_THRESH, inf) ms latency group FD_POLICY_LATENCY_SLOW */ + fd_policy_peer_t * map; /* map dynamic of pubkey->peer data */ + struct { + uint stage; /* < sizeof(bucket_stages) */ + fd_peer_dlist_iter_t iter; /* round-robin index of next peer */ + } select; }; typedef struct fd_policy_peers fd_policy_peers_t; +#define FD_POLICY_LATENCY_FAST 1 +#define FD_POLICY_LATENCY_SLOW 3 + +/* Policy parameters start */ +#define FD_POLICY_LATENCY_THRESH 30e6L /* less than this is a BEST peer, otherwise a WORST peer */ +#define FD_POLICY_DEDUP_TIMEOUT 50e6L /* how long wait to request the same shred */ + +/* Round robins through ALL the worst peers once, then round robins + through ALL the best peers once, then round robins through ALL the + best peers again, etc. All peers are initially added to the worst + bucket, and moved once round trip times have been recorded. */ + +static const uint bucket_stages[7] = { + FD_POLICY_LATENCY_SLOW, /* do a cycle through worst peers 1/7 times to see if any improvements are made */ + FD_POLICY_LATENCY_FAST, + FD_POLICY_LATENCY_FAST, + FD_POLICY_LATENCY_FAST, + FD_POLICY_LATENCY_FAST, + FD_POLICY_LATENCY_FAST, + FD_POLICY_LATENCY_FAST, +}; +/* Policy parameters end */ + struct fd_policy { fd_policy_dedup_t dedup; /* dedup cache of already sent requests */ - fd_policy_peers_t peers; /* round-robin strategy for selecting repair peers */ + fd_policy_peers_t peers; /* repair peers (strategy & data) */ long tsmax; /* maximum time for an iteration before resetting the DFS to root */ long tsref; /* reference timestamp for resetting DFS */ @@ -183,14 +210,16 @@ fd_policy_footprint( ulong dedup_max, ulong peer_max ) { FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( + FD_LAYOUT_APPEND( FD_LAYOUT_INIT, alignof(fd_policy_t), sizeof(fd_policy_t) ), fd_policy_dedup_map_align(), fd_policy_dedup_map_footprint ( dedup_max ) ), fd_policy_dedup_pool_align(), fd_policy_dedup_pool_footprint( dedup_max ) ), - fd_policy_dedup_lru_align(), fd_policy_dedup_lru_footprint ( ) ), + fd_policy_dedup_lru_align(), fd_policy_dedup_lru_footprint() ), fd_policy_peer_map_align(), fd_policy_peer_map_footprint ( lg_peer_max ) ), fd_peer_pool_align(), fd_peer_pool_footprint ( peer_max ) ), - fd_peer_dlist_align(), fd_peer_dlist_footprint ( ) ), + fd_peer_dlist_align(), fd_peer_dlist_footprint() ), + fd_peer_dlist_align(), fd_peer_dlist_footprint() ), fd_policy_align() ); } @@ -246,6 +275,12 @@ fd_policy_peer_select( fd_policy_t * policy ); void fd_policy_peer_request_update( fd_policy_t * policy, fd_pubkey_t const * to ); +static inline fd_peer_dlist_t * +fd_policy_peer_latency_bucket( fd_policy_t * policy, long total_rtt /* ns */, ulong res_cnt ) { + if( res_cnt == 0 || (long)(total_rtt / (long)res_cnt) > FD_POLICY_LATENCY_THRESH ) return policy->peers.slow; + return policy->peers.fast; +} + void fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, long rtt ); diff --git a/src/discof/repair/fd_repair_metrics.c b/src/discof/repair/fd_repair_metrics.c index ad477479176..0413c0ba1a6 100644 --- a/src/discof/repair/fd_repair_metrics.c +++ b/src/discof/repair/fd_repair_metrics.c @@ -86,7 +86,7 @@ print_catchup_stats( fd_repair_metrics_t * repair_metrics ) { if( FD_LIKELY( turbine_ts > 0 ) ) { /* still have turbine slot0 in the catchup metrics */ double pipelined_time = (double)(turbine_ts - min_ts); - FD_LOG_NOTICE(( "took %.3fs to reach first turbine.", pipelined_time / 1e9 )); + FD_LOG_NOTICE(( "took %.3fs to reach first turbine.", fd_metrics_convert_ticks_to_seconds((ulong)pipelined_time) )); /* Compute pipeline factor */ double non_pipelined_time = (double)slot_cmpl_time_total; diff --git a/src/discof/repair/fd_repair_metrics.h b/src/discof/repair/fd_repair_metrics.h index 67b08413624..5fd75772a23 100644 --- a/src/discof/repair/fd_repair_metrics.h +++ b/src/discof/repair/fd_repair_metrics.h @@ -11,7 +11,7 @@ struct fd_slot_metrics { ulong slot; long first_ts; - long slot_complete_ts; + long slot_complete_ts; /* tick */ uint repair_cnt; uint turbine_cnt;