@@ -20,27 +20,53 @@ import (
2020 "github.com/ava-labs/avalanchego/utils/wrappers"
2121)
2222
23+ var (
24+ _ Gossiper = (* ValidatorGossiper )(nil )
25+ _ Gossiper = (* PullGossiper [testTx , * testTx ])(nil )
26+ )
27+
28+ // Gossiper gossips Gossipables to other nodes
29+ type Gossiper interface {
30+ // Gossip runs a cycle of gossip. Returns an error if we failed to gossip.
31+ Gossip (ctx context.Context ) error
32+ }
33+
2334// GossipableAny exists to help create non-nil pointers to a concrete Gossipable
2435// ref: https://stackoverflow.com/questions/69573113/how-can-i-instantiate-a-non-nil-pointer-of-type-argument-with-generic-go
2536type GossipableAny [T any ] interface {
2637 * T
2738 Gossipable
2839}
2940
41+ // ValidatorGossiper only calls [Gossip] if the given node is a validator
42+ type ValidatorGossiper struct {
43+ Gossiper
44+
45+ NodeID ids.NodeID
46+ Validators p2p.ValidatorSet
47+ }
48+
49+ func (v ValidatorGossiper ) Gossip (ctx context.Context ) error {
50+ if ! v .Validators .Has (ctx , v .NodeID ) {
51+ return nil
52+ }
53+
54+ return v .Gossiper .Gossip (ctx )
55+ }
56+
3057type Config struct {
3158 Namespace string
32- Frequency time.Duration
3359 PollSize int
3460}
3561
36- func NewGossiper [T any , U GossipableAny [T ]](
62+ func NewPullGossiper [T any , U GossipableAny [T ]](
3763 config Config ,
3864 log logging.Logger ,
3965 set Set [U ],
4066 client * p2p.Client ,
4167 metrics prometheus.Registerer ,
42- ) (* Gossiper [T , U ], error ) {
43- g := & Gossiper [T , U ]{
68+ ) (* PullGossiper [T , U ], error ) {
69+ p := & PullGossiper [T , U ]{
4470 config : config ,
4571 log : log ,
4672 set : set ,
@@ -59,14 +85,14 @@ func NewGossiper[T any, U GossipableAny[T]](
5985
6086 errs := wrappers.Errs {}
6187 errs .Add (
62- metrics .Register (g .receivedN ),
63- metrics .Register (g .receivedBytes ),
88+ metrics .Register (p .receivedN ),
89+ metrics .Register (p .receivedBytes ),
6490 )
6591
66- return g , errs .Err
92+ return p , errs .Err
6793}
6894
69- type Gossiper [T any , U GossipableAny [T ]] struct {
95+ type PullGossiper [T any , U GossipableAny [T ]] struct {
7096 config Config
7197 log logging.Logger
7298 set Set [U ]
@@ -75,25 +101,8 @@ type Gossiper[T any, U GossipableAny[T]] struct {
75101 receivedBytes prometheus.Counter
76102}
77103
78- func (g * Gossiper [_ , _ ]) Gossip (ctx context.Context ) {
79- gossipTicker := time .NewTicker (g .config .Frequency )
80- defer gossipTicker .Stop ()
81-
82- for {
83- select {
84- case <- gossipTicker .C :
85- if err := g .gossip (ctx ); err != nil {
86- g .log .Warn ("failed to gossip" , zap .Error (err ))
87- }
88- case <- ctx .Done ():
89- g .log .Debug ("shutting down gossip" )
90- return
91- }
92- }
93- }
94-
95- func (g * Gossiper [_ , _ ]) gossip (ctx context.Context ) error {
96- bloom , salt , err := g .set .GetFilter ()
104+ func (p * PullGossiper [_ , _ ]) Gossip (ctx context.Context ) error {
105+ bloom , salt , err := p .set .GetFilter ()
97106 if err != nil {
98107 return err
99108 }
@@ -107,23 +116,23 @@ func (g *Gossiper[_, _]) gossip(ctx context.Context) error {
107116 return err
108117 }
109118
110- for i := 0 ; i < g .config .PollSize ; i ++ {
111- if err := g .client .AppRequestAny (ctx , msgBytes , g .handleResponse ); err != nil {
119+ for i := 0 ; i < p .config .PollSize ; i ++ {
120+ if err := p .client .AppRequestAny (ctx , msgBytes , p .handleResponse ); err != nil {
112121 return err
113122 }
114123 }
115124
116125 return nil
117126}
118127
119- func (g * Gossiper [T , U ]) handleResponse (
128+ func (p * PullGossiper [T , U ]) handleResponse (
120129 _ context.Context ,
121130 nodeID ids.NodeID ,
122131 responseBytes []byte ,
123132 err error ,
124133) {
125134 if err != nil {
126- g .log .Debug (
135+ p .log .Debug (
127136 "failed gossip request" ,
128137 zap .Stringer ("nodeID" , nodeID ),
129138 zap .Error (err ),
@@ -133,7 +142,7 @@ func (g *Gossiper[T, U]) handleResponse(
133142
134143 response := & sdk.PullGossipResponse {}
135144 if err := proto .Unmarshal (responseBytes , response ); err != nil {
136- g .log .Debug ("failed to unmarshal gossip response" , zap .Error (err ))
145+ p .log .Debug ("failed to unmarshal gossip response" , zap .Error (err ))
137146 return
138147 }
139148
@@ -143,7 +152,7 @@ func (g *Gossiper[T, U]) handleResponse(
143152
144153 gossipable := U (new (T ))
145154 if err := gossipable .Unmarshal (bytes ); err != nil {
146- g .log .Debug (
155+ p .log .Debug (
147156 "failed to unmarshal gossip" ,
148157 zap .Stringer ("nodeID" , nodeID ),
149158 zap .Error (err ),
@@ -152,13 +161,13 @@ func (g *Gossiper[T, U]) handleResponse(
152161 }
153162
154163 hash := gossipable .GetID ()
155- g .log .Debug (
164+ p .log .Debug (
156165 "received gossip" ,
157166 zap .Stringer ("nodeID" , nodeID ),
158167 zap .Stringer ("id" , hash ),
159168 )
160- if err := g .set .Add (gossipable ); err != nil {
161- g .log .Debug (
169+ if err := p .set .Add (gossipable ); err != nil {
170+ p .log .Debug (
162171 "failed to add gossip to the known set" ,
163172 zap .Stringer ("nodeID" , nodeID ),
164173 zap .Stringer ("id" , hash ),
@@ -168,6 +177,24 @@ func (g *Gossiper[T, U]) handleResponse(
168177 }
169178 }
170179
171- g .receivedN .Add (float64 (len (response .Gossip )))
172- g .receivedBytes .Add (float64 (receivedBytes ))
180+ p .receivedN .Add (float64 (len (response .Gossip )))
181+ p .receivedBytes .Add (float64 (receivedBytes ))
182+ }
183+
184+ // Every calls [Gossip] every [frequency] amount of time.
185+ func Every (ctx context.Context , log logging.Logger , gossiper Gossiper , frequency time.Duration ) {
186+ ticker := time .NewTicker (frequency )
187+ defer ticker .Stop ()
188+
189+ for {
190+ select {
191+ case <- ticker .C :
192+ if err := gossiper .Gossip (ctx ); err != nil {
193+ log .Warn ("failed to gossip" , zap .Error (err ))
194+ }
195+ case <- ctx .Done ():
196+ log .Debug ("shutting down gossip" )
197+ return
198+ }
199+ }
173200}
0 commit comments