Skip to content

Commit 5a2017c

Browse files
committed
repair: coarse peer selection
1 parent 719a33d commit 5a2017c

File tree

6 files changed

+102
-33
lines changed

6 files changed

+102
-33
lines changed

src/app/firedancer-dev/commands/repair.c

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ static fd_location_info_t * location_table;
442442
static fd_pubkey_t peers_copy[ FD_ACTIVE_KEY_MAX ];
443443

444444
static ulong
445-
sort_peers_by_latency( fd_policy_peer_t * active_table, fd_peer_dlist_t * peers_dlist, fd_peer_t * peers_arr ) {
445+
sort_peers_by_latency( fd_policy_peer_t * active_table, fd_peer_dlist_t * peers_dlist, fd_peer_dlist_t * peers_wlist, fd_peer_t * peers_arr ) {
446446
ulong i = 0;
447447
fd_peer_dlist_iter_t iter = fd_peer_dlist_iter_fwd_init( peers_dlist, peers_arr );
448448
while( !fd_peer_dlist_iter_done( iter, peers_dlist, peers_arr ) ) {
@@ -452,6 +452,16 @@ sort_peers_by_latency( fd_policy_peer_t * active_table, fd_peer_dlist_t * peers_
452452
if( FD_UNLIKELY( i >= FD_ACTIVE_KEY_MAX ) ) break;
453453
iter = fd_peer_dlist_iter_fwd_next( iter, peers_dlist, peers_arr );
454454
}
455+
FD_LOG_NOTICE(( "best peers cnt: %lu", i ));
456+
iter = fd_peer_dlist_iter_fwd_init( peers_wlist, peers_arr );
457+
while( !fd_peer_dlist_iter_done( iter, peers_wlist, peers_arr ) ) {
458+
fd_peer_t * peer = fd_peer_dlist_iter_ele( iter, peers_wlist, peers_arr );
459+
if( FD_UNLIKELY( !peer ) ) break;
460+
peers_copy[ i++ ] = peer->identity;
461+
if( FD_UNLIKELY( i >= FD_ACTIVE_KEY_MAX ) ) break;
462+
iter = fd_peer_dlist_iter_fwd_next( iter, peers_wlist, peers_arr );
463+
}
464+
FD_LOG_NOTICE(( "worst peers cnt: %lu", i ));
455465

456466
ulong peer_cnt = i;
457467
for( uint i = 0; i < peer_cnt - 1; i++ ) {
@@ -485,12 +495,14 @@ print_peer_location_latency( fd_wksp_t * repair_tile_wksp, ctx_t * tile_ctx ) {
485495
fd_policy_t * policy = fd_wksp_laddr ( repair_tile_wksp, policy_gaddr );
486496
ulong peermap_gaddr = fd_wksp_gaddr_fast( tile_ctx->wksp, policy->peers.map );
487497
ulong peerarr_gaddr = fd_wksp_gaddr_fast( tile_ctx->wksp, policy->peers.pool );
488-
ulong peerlst_gaddr = fd_wksp_gaddr_fast( tile_ctx->wksp, policy->peers.dlist );
498+
ulong peerlst_gaddr = fd_wksp_gaddr_fast( tile_ctx->wksp, policy->peers.best );
499+
ulong peerwst_gaddr = fd_wksp_gaddr_fast( tile_ctx->wksp, policy->peers.worst );
489500
fd_policy_peer_t * peers_map = (fd_policy_peer_t *)fd_wksp_laddr( repair_tile_wksp, peermap_gaddr );
490501
fd_peer_dlist_t * peers_dlist = (fd_peer_dlist_t *)fd_wksp_laddr( repair_tile_wksp, peerlst_gaddr );
502+
fd_peer_dlist_t * peers_wlist = (fd_peer_dlist_t *)fd_wksp_laddr( repair_tile_wksp, peerwst_gaddr );
491503
fd_peer_t * peers_arr = (fd_peer_t *)fd_wksp_laddr( repair_tile_wksp, peerarr_gaddr );
492504

493-
ulong peer_cnt = sort_peers_by_latency( peers_map, peers_dlist, peers_arr );
505+
ulong peer_cnt = sort_peers_by_latency( peers_map, peers_dlist, peers_wlist, peers_arr );
494506
printf("\nPeer Location/Latency Information\n");
495507
printf( "| %-46s | %-7s | %-8s | %-8s | %-7s | %12s | %s\n", "Pubkey", "Req Cnt", "Req B/s", "Rx B/s", "Rx Rate", "Avg Latency", "Location Info" );
496508
for( uint i = 0; i < peer_cnt; i++ ) {

src/discof/forest/fd_forest.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ struct __attribute__((aligned(128UL))) fd_forest_blk {
7979

8080
fd_forest_blk_idxs_t code[fd_forest_blk_idxs_word_cnt]; /* code shred idxs */
8181
long first_shred_ts; /* timestamp of first shred rcved in slot != complete_idx */
82-
long first_req_ts; /* timestamp of first request received in slot != complete_idx */
82+
long first_req_ts; /* tick of first request received in slot != complete_idx */
8383
uint turbine_cnt; /* number of shreds received from turbine */
8484
uint repair_cnt; /* number of data shreds received from repair */
8585
uint recovered_cnt; /* number of shreds recovered from reedsol recovery */

src/discof/repair/fd_policy.c

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,17 @@ fd_policy_new( void * shmem, ulong dedup_max, ulong peer_max, ulong seed ) {
3030
void * dedup_lru = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_lru_align(), fd_policy_dedup_lru_footprint ( ) );
3131
void * peers = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_map_align(), fd_policy_peer_map_footprint ( lg_peer_max ) );
3232
void * peers_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_pool_align(), fd_peer_pool_footprint ( peer_max ) );
33-
void * peers_dlist = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(), fd_peer_dlist_footprint ( ) );
33+
void * peers_best = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(), fd_peer_dlist_footprint ( ) );
34+
void * peers_worst = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(), fd_peer_dlist_footprint ( ) );
3435
FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_policy_align() ) == (ulong)shmem + footprint );
3536

3637
policy->dedup.map = fd_policy_dedup_map_new ( dedup_map, dedup_max, seed );
3738
policy->dedup.pool = fd_policy_dedup_pool_new( dedup_pool, dedup_max );
3839
policy->dedup.lru = fd_policy_dedup_lru_new ( dedup_lru );
3940
policy->peers.map = fd_policy_peer_map_new ( peers, lg_peer_max );
4041
policy->peers.pool = fd_peer_pool_new ( peers_pool, peer_max );
41-
policy->peers.dlist = fd_peer_dlist_new ( peers_dlist );
42+
policy->peers.best = fd_peer_dlist_new ( peers_best );
43+
policy->peers.worst = fd_peer_dlist_new ( peers_worst );
4244
policy->iterf.ele_idx = ULONG_MAX;
4345
policy->turbine_slot0 = ULONG_MAX;
4446
policy->tsreset = 0;
@@ -72,8 +74,11 @@ fd_policy_join( void * shpolicy ) {
7274
policy->dedup.lru = fd_policy_dedup_lru_join ( policy->dedup.lru );
7375
policy->peers.map = fd_policy_peer_map_join ( policy->peers.map );
7476
policy->peers.pool = fd_peer_pool_join ( policy->peers.pool );
75-
policy->peers.dlist = fd_peer_dlist_join ( policy->peers.dlist );
76-
policy->peers.iter = fd_peer_dlist_iter_fwd_init( policy->peers.dlist, policy->peers.pool );
77+
policy->peers.best = fd_peer_dlist_join ( policy->peers.best );
78+
policy->peers.worst = fd_peer_dlist_join ( policy->peers.worst );
79+
80+
policy->peers.select.iter = fd_peer_dlist_iter_fwd_init( policy->peers.worst, policy->peers.pool );
81+
policy->peers.select.stage = 0;
7782

7883
return policy;
7984
}
@@ -127,7 +132,7 @@ dedup_next( fd_policy_t * policy, ulong key, long now ) {
127132
fd_policy_dedup_map_ele_insert ( dedup->map, ele, dedup->pool );
128133
fd_policy_dedup_lru_ele_push_tail( dedup->lru, ele, dedup->pool );
129134
}
130-
if( FD_LIKELY( now < ele->req_ts + (long)80e6 ) ) {
135+
if( FD_LIKELY( now < ele->req_ts + (long)FD_POLICY_DEDUP_TIMEOUT ) ) {
131136
return 1;
132137
}
133138
ele->req_ts = now;
@@ -156,13 +161,19 @@ passes_throttle_threshold( fd_policy_t * policy, fd_forest_blk_t * ele ) {
156161

157162
fd_pubkey_t const *
158163
fd_policy_peer_select( fd_policy_t * policy ) {
159-
fd_peer_dlist_t * dlist = policy->peers.dlist;
160-
fd_peer_t * pool = policy->peers.pool;
161-
if( FD_UNLIKELY( fd_peer_dlist_iter_done( policy->peers.iter, dlist, pool ) ) ) {
162-
policy->peers.iter = fd_peer_dlist_iter_fwd_init( dlist, pool );
164+
fd_peer_dlist_t * best_dlist = policy->peers.best;
165+
fd_peer_dlist_t * worst_dlist = policy->peers.worst;
166+
fd_peer_t * pool = policy->peers.pool;
167+
168+
fd_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_BEST ? best_dlist : worst_dlist;
169+
170+
while( FD_UNLIKELY( fd_peer_dlist_iter_done( policy->peers.select.iter, dlist, pool ) ) ) {
171+
policy->peers.select.stage = (policy->peers.select.stage + 1) % (sizeof(bucket_stages) / sizeof(uint));
172+
dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_BEST ? best_dlist : worst_dlist;
173+
policy->peers.select.iter = fd_peer_dlist_iter_fwd_init( dlist, pool );
163174
}
164-
fd_peer_t * select = fd_peer_dlist_iter_ele( policy->peers.iter, dlist, pool );
165-
policy->peers.iter = fd_peer_dlist_iter_fwd_next( policy->peers.iter, dlist, pool );
175+
fd_peer_t * select = fd_peer_dlist_iter_ele( policy->peers.select.iter, dlist, pool );
176+
policy->peers.select.iter = fd_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, pool );
166177
return &select->identity;
167178
}
168179

@@ -294,7 +305,7 @@ fd_policy_peer_insert( fd_policy_t * policy, fd_pubkey_t const * key, fd_ip4_por
294305
fd_peer_t * peer_ele = fd_peer_pool_ele_acquire( policy->peers.pool );
295306
peer->pool_idx = fd_peer_pool_idx( policy->peers.pool, peer_ele );
296307
peer_ele->identity = *key;
297-
fd_peer_dlist_ele_push_tail( policy->peers.dlist, peer_ele, policy->peers.pool );
308+
fd_peer_dlist_ele_push_tail( policy->peers.worst, peer_ele, policy->peers.pool );
298309
return peer;
299310
}
300311
return NULL;
@@ -312,11 +323,14 @@ fd_policy_peer_remove( fd_policy_t * policy, fd_pubkey_t const * key ) {
312323
fd_peer_t * peer_ele = fd_peer_pool_ele( policy->peers.pool, peer->pool_idx );
313324
fd_policy_peer_map_remove( policy->peers.map, peer );
314325

315-
if( FD_UNLIKELY( policy->peers.iter == fd_peer_pool_idx( policy->peers.pool, peer_ele ) ) ) {
326+
if( FD_UNLIKELY( policy->peers.select.iter == fd_peer_pool_idx( policy->peers.pool, peer_ele ) ) ) {
316327
/* In general removal during iteration is safe, except when the iterator is on the peer to be removed. */
317-
policy->peers.iter = fd_peer_dlist_iter_fwd_next( policy->peers.iter, policy->peers.dlist, policy->peers.pool );
328+
fd_peer_dlist_t * dlist = policy->peers.select.stage == FD_POLICY_LATENCY_BEST ? policy->peers.best : policy->peers.worst;
329+
policy->peers.select.iter = fd_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, policy->peers.pool );
318330
}
319-
fd_peer_dlist_ele_remove( policy->peers.dlist, peer_ele, policy->peers.pool );
331+
332+
fd_peer_dlist_t * bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
333+
fd_peer_dlist_ele_remove( bucket, peer_ele, policy->peers.pool );
320334
fd_peer_pool_ele_release( policy->peers.pool, peer_ele );
321335
return 1;
322336
}
@@ -332,14 +346,22 @@ fd_policy_peer_request_update( fd_policy_t * policy, fd_pubkey_t const * to ) {
332346
}
333347

334348
void
335-
fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, long rtt ) {
349+
fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, long rtt /* ns */ ) {
336350
fd_policy_peer_t * peer = fd_policy_peer_query( policy, to );
337351
if( FD_LIKELY( peer ) ) {
338352
long now = fd_tickcount();
353+
fd_peer_dlist_t * prev_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
339354
peer->res_cnt++;
340355
if( FD_UNLIKELY( peer->first_resp_ts == 0 ) ) peer->first_resp_ts = now;
341356
peer->last_resp_ts = now;
342357
peer->total_lat += rtt;
358+
fd_peer_dlist_t * new_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
359+
360+
if( prev_bucket != new_bucket ) {
361+
fd_peer_t * peer_ele = fd_peer_pool_ele( policy->peers.pool, peer->pool_idx );
362+
fd_peer_dlist_ele_remove ( prev_bucket, peer_ele, policy->peers.pool );
363+
fd_peer_dlist_ele_push_tail( new_bucket, peer_ele, policy->peers.pool );
364+
}
343365
}
344366
}
345367

src/discof/repair/fd_policy.h

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,25 +136,50 @@ typedef struct fd_peer fd_peer_t;
136136
#define DLIST_PREV prev
137137
#include "../../util/tmpl/fd_dlist.c"
138138

139+
139140
/* fd_policy_peers implements the data structures and bookkeeping for
140141
selecting repair peers via round-robin. */
141142

142143
struct fd_policy_peers {
143-
fd_peer_dlist_t * dlist; /* doubly-linked list of repair peer pubkeys in insertion order */
144-
fd_peer_t * pool; /* memory pool of repair peer pubkeys */
145-
fd_policy_peer_t * map; /* map of pubkey->peer */
146-
fd_peer_dlist_iter_t iter; /* round-robin index of next peer */
144+
fd_peer_t * pool; /* memory pool of repair peer pubkeys, contains both dlist entries */
145+
fd_peer_dlist_t * best; /* 0->60ms latency FD_POLICY_LATENCY_BEST */
146+
fd_peer_dlist_t * worst; /* 60->infinity latency FD_POLICY_LATENCY_WORST */
147+
fd_policy_peer_t * map; /* map dynamic of pubkey->peer */
148+
struct {
149+
uint stage; /* < sizeof(bucket_stages) */
150+
fd_peer_dlist_iter_t iter; /* round-robin index of next peer */
151+
} select;
147152
};
148153
typedef struct fd_policy_peers fd_policy_peers_t;
149154

155+
#define FD_POLICY_LATENCY_BEST 1
156+
#define FD_POLICY_LATENCY_WORST 3
157+
158+
#define FD_POLICY_LATENCY_THRESH 30e6L /* less than this is a BEST peer, otherwise a WORST peer */
159+
#define FD_POLICY_DEDUP_TIMEOUT 50e6L /* how long wait to request the same shred */
160+
161+
/* Round robins through ALL the worst peers once, then round robins
162+
through ALL the best peers once, then round robins through ALL the
163+
best peers again, etc. All peers are initially added to the worst
164+
bucket, and moved once round trip times have been recorded. */
165+
166+
static const uint bucket_stages[7] = {
167+
FD_POLICY_LATENCY_WORST, /* do a cycle through worst peers 1/7 times to see if any improvements are made */
168+
FD_POLICY_LATENCY_BEST,
169+
FD_POLICY_LATENCY_BEST,
170+
FD_POLICY_LATENCY_BEST,
171+
FD_POLICY_LATENCY_BEST,
172+
FD_POLICY_LATENCY_BEST,
173+
FD_POLICY_LATENCY_BEST,
174+
};
150175
struct fd_policy {
151-
fd_policy_dedup_t dedup; /* dedup cache of already sent requests */
152-
fd_policy_peers_t peers; /* round-robin strategy for selecting repair peers */
153-
long tsmax; /* maximum time for an iteration before resetting the DFS to root */
154-
long tsref; /* reference timestamp for resetting DFS */
176+
fd_policy_dedup_t dedup; /* dedup cache of already sent requests */
177+
fd_policy_peers_t peers; /* round-robin strategy for selecting repair peers */
178+
long tsmax; /* maximum time for an iteration before resetting the DFS to root */
179+
long tsref; /* reference timestamp for resetting DFS */
155180

156-
fd_forest_iter_t iterf; /* forest iterator */
157-
ulong tsreset; /* ms timestamp of last reset of iterf */
181+
fd_forest_iter_t iterf; /* forest iterator */
182+
ulong tsreset; /* ms timestamp of last reset of iterf */
158183

159184
ulong turbine_slot0;
160185
uint nonce;
@@ -183,6 +208,7 @@ fd_policy_footprint( ulong dedup_max, ulong peer_max ) {
183208
FD_LAYOUT_APPEND(
184209
FD_LAYOUT_APPEND(
185210
FD_LAYOUT_APPEND(
211+
FD_LAYOUT_APPEND(
186212
FD_LAYOUT_INIT,
187213
alignof(fd_policy_t), sizeof(fd_policy_t) ),
188214
fd_policy_dedup_map_align(), fd_policy_dedup_map_footprint ( dedup_max ) ),
@@ -191,6 +217,7 @@ fd_policy_footprint( ulong dedup_max, ulong peer_max ) {
191217
fd_policy_peer_map_align(), fd_policy_peer_map_footprint ( lg_peer_max ) ),
192218
fd_peer_pool_align(), fd_peer_pool_footprint ( peer_max ) ),
193219
fd_peer_dlist_align(), fd_peer_dlist_footprint ( ) ),
220+
fd_peer_dlist_align(), fd_peer_dlist_footprint ( ) ),
194221
fd_policy_align() );
195222
}
196223

@@ -246,6 +273,12 @@ fd_policy_peer_select( fd_policy_t * policy );
246273
void
247274
fd_policy_peer_request_update( fd_policy_t * policy, fd_pubkey_t const * to );
248275

276+
static inline fd_peer_dlist_t *
277+
fd_policy_peer_latency_bucket( fd_policy_t * policy, long total_rtt /* ns */, ulong res_cnt ) {
278+
if( res_cnt == 0 || (long)(total_rtt / (long)res_cnt) > FD_POLICY_LATENCY_THRESH ) return policy->peers.worst;
279+
return policy->peers.best;
280+
}
281+
249282
void
250283
fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, long rtt );
251284

src/discof/repair/fd_repair_metrics.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ print_catchup_stats( fd_repair_metrics_t * repair_metrics ) {
7878

7979
/* incremental slot completion time */
8080
if( cur_slot <= repair_metrics->turbine_slot0 && slot_cmpl_ts - prev_slot_cmpl_ts > 0 ) {
81+
/* drop any samples above 100ms */
82+
//FD_LOG_NOTICE(( "incremental slot completion time: %.2fms", (double)fd_metrics_convert_ticks_to_nanoseconds((ulong)(slot_cmpl_ts - prev_slot_cmpl_ts)) / 1e6 ));
8183
incr_slot_cmpl_total += (slot_cmpl_ts - prev_slot_cmpl_ts);
8284
}
8385
prev_slot_cmpl_ts = slot_cmpl_ts;
@@ -86,7 +88,7 @@ print_catchup_stats( fd_repair_metrics_t * repair_metrics ) {
8688

8789
if( FD_LIKELY( turbine_ts > 0 ) ) { /* still have turbine slot0 in the catchup metrics */
8890
double pipelined_time = (double)(turbine_ts - min_ts);
89-
FD_LOG_NOTICE(( "took %.3fs to reach first turbine.", pipelined_time / 1e9 ));
91+
FD_LOG_NOTICE(( "took %.3fs to reach first turbine.", fd_metrics_convert_ticks_to_seconds((ulong)pipelined_time) ));
9092

9193
/* Compute pipeline factor */
9294
double non_pipelined_time = (double)slot_cmpl_time_total;

src/discof/repair/fd_repair_metrics.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@
1111
struct fd_slot_metrics {
1212
ulong slot;
1313
long first_ts;
14-
long slot_complete_ts;
14+
long slot_complete_ts; /* tick */
1515

1616
uint repair_cnt;
1717
uint turbine_cnt;
1818
};
1919
typedef struct fd_slot_metrics fd_slot_metrics_t;
2020

21-
#define FD_CATCHUP_METRICS_MAX 256
21+
#define FD_CATCHUP_METRICS_MAX 16384
2222

2323
struct fd_repair_metrics_t {
2424
fd_slot_metrics_t slots[ FD_CATCHUP_METRICS_MAX ];

0 commit comments

Comments
 (0)