1- // Copyright 2020 The go-ethereum Authors
1+ // Copyright 2019 The go-ethereum Authors
22// This file is part of the go-ethereum library.
33//
44// The go-ethereum library is free software: you can redistribute it and/or modify
@@ -20,6 +20,7 @@ import (
2020 "bytes"
2121 "errors"
2222 "fmt"
23+ "math"
2324 mrand "math/rand"
2425 "sort"
2526 "time"
@@ -103,6 +104,14 @@ var (
103104type txAnnounce struct {
104105 origin string // Identifier of the peer originating the notification
105106 hashes []common.Hash // Batch of transaction hashes being announced
107+ metas []* txMetadata // Batch of metadatas associated with the hashes (nil before eth/68)
108+ }
109+
110+ // txMetadata is a set of extra data transmitted along the announcement for better
111+ // fetch scheduling.
112+ type txMetadata struct {
113+ kind byte // Transaction consensus type
114+ size uint32 // Transaction size in bytes
106115}
107116
108117// txRequest represents an in-flight transaction retrieval request destined to
@@ -118,10 +127,11 @@ type txRequest struct {
118127type txDelivery struct {
119128 origin string // Identifier of the peer originating the notification
120129 hashes []common.Hash // Batch of transaction hashes having been delivered
130+ metas []txMetadata // Batch of metadatas associated with the delivered hashes
121131 direct bool // Whether this is a direct reply or a broadcast
122132}
123133
124- // txDrop is the notiication that a peer has disconnected.
134+ // txDrop is the notification that a peer has disconnected.
125135type txDrop struct {
126136 peer string
127137}
@@ -153,14 +163,14 @@ type TxFetcher struct {
153163
154164 // Stage 1: Waiting lists for newly discovered transactions that might be
155165 // broadcast without needing explicit request/reply round trips.
156- waitlist map [common.Hash ]map [string ]struct {} // Transactions waiting for an potential broadcast
157- waittime map [common.Hash ]mclock.AbsTime // Timestamps when transactions were added to the waitlist
158- waitslots map [string ]map [common.Hash ]struct {} // Waiting announcement sgroupped by peer (DoS protection)
166+ waitlist map [common.Hash ]map [string ]struct {} // Transactions waiting for an potential broadcast
167+ waittime map [common.Hash ]mclock.AbsTime // Timestamps when transactions were added to the waitlist
168+ waitslots map [string ]map [common.Hash ]* txMetadata // Waiting announcement sgroupped by peer (DoS protection)
159169
160170 // Stage 2: Queue of transactions that waiting to be allocated to some peer
161171 // to be retrieved directly.
162- announces map [string ]map [common.Hash ]struct {} // Set of announced transactions, grouped by origin peer
163- announced map [common.Hash ]map [string ]struct {} // Set of download locations, grouped by transaction hash
172+ announces map [string ]map [common.Hash ]* txMetadata // Set of announced transactions, grouped by origin peer
173+ announced map [common.Hash ]map [string ]struct {} // Set of download locations, grouped by transaction hash
164174
165175 // Stage 3: Set of transactions currently being retrieved, some which may be
166176 // fulfilled and some rescheduled. Note, this step shares 'announces' from the
@@ -173,6 +183,7 @@ type TxFetcher struct {
173183 hasTx func (common.Hash ) bool // Retrieves a tx from the local txpool
174184 addTxs func ([]* types.Transaction ) []error // Insert a batch of transactions into local txpool
175185 fetchTxs func (string , []common.Hash ) error // Retrieves a set of txs from a remote peer
186+ dropPeer func (string ) // Drops a peer in case of announcement violation
176187
177188 step chan struct {} // Notification channel when the fetcher loop iterates
178189 clock mclock.Clock // Time wrapper to simulate in tests
@@ -181,14 +192,14 @@ type TxFetcher struct {
181192
182193// NewTxFetcher creates a transaction fetcher to retrieve transaction
183194// based on hash announcements.
184- func NewTxFetcher (hasTx func (common.Hash ) bool , addTxs func ([]* types.Transaction ) []error , fetchTxs func (string , []common.Hash ) error ) * TxFetcher {
185- return NewTxFetcherForTests (hasTx , addTxs , fetchTxs , mclock.System {}, nil )
195+ func NewTxFetcher (hasTx func (common.Hash ) bool , addTxs func ([]* types.Transaction ) []error , fetchTxs func (string , []common.Hash ) error , dropPeer func ( string ) ) * TxFetcher {
196+ return NewTxFetcherForTests (hasTx , addTxs , fetchTxs , dropPeer , mclock.System {}, nil )
186197}
187198
188199// NewTxFetcherForTests is a testing method to mock out the realtime clock with
189200// a simulated version and the internal randomness with a deterministic one.
190201func NewTxFetcherForTests (
191- hasTx func (common.Hash ) bool , addTxs func ([]* types.Transaction ) []error , fetchTxs func (string , []common.Hash ) error ,
202+ hasTx func (common.Hash ) bool , addTxs func ([]* types.Transaction ) []error , fetchTxs func (string , []common.Hash ) error , dropPeer func ( string ),
192203 clock mclock.Clock , rand * mrand.Rand ) * TxFetcher {
193204 return & TxFetcher {
194205 notify : make (chan * txAnnounce ),
@@ -197,8 +208,8 @@ func NewTxFetcherForTests(
197208 quit : make (chan struct {}),
198209 waitlist : make (map [common.Hash ]map [string ]struct {}),
199210 waittime : make (map [common.Hash ]mclock.AbsTime ),
200- waitslots : make (map [string ]map [common.Hash ]struct {} ),
201- announces : make (map [string ]map [common.Hash ]struct {} ),
211+ waitslots : make (map [string ]map [common.Hash ]* txMetadata ),
212+ announces : make (map [string ]map [common.Hash ]* txMetadata ),
202213 announced : make (map [common.Hash ]map [string ]struct {}),
203214 fetching : make (map [common.Hash ]string ),
204215 requests : make (map [string ]* txRequest ),
@@ -207,27 +218,31 @@ func NewTxFetcherForTests(
207218 hasTx : hasTx ,
208219 addTxs : addTxs ,
209220 fetchTxs : fetchTxs ,
221+ dropPeer : dropPeer ,
210222 clock : clock ,
211223 rand : rand ,
212224 }
213225}
214226
215227// Notify announces the fetcher of the potential availability of a new batch of
216228// transactions in the network.
217- func (f * TxFetcher ) Notify (peer string , hashes []common.Hash ) error {
229+ func (f * TxFetcher ) Notify (peer string , types [] byte , sizes [] uint32 , hashes []common.Hash ) error {
218230 // Keep track of all the announced transactions
219231 txAnnounceInMeter .Mark (int64 (len (hashes )))
220232
221233 // Skip any transaction announcements that we already know of, or that we've
222- // previously marked as cheap and discarded. This check is of course racey ,
234+ // previously marked as cheap and discarded. This check is of course racy ,
223235 // because multiple concurrent notifies will still manage to pass it, but it's
224236 // still valuable to check here because it runs concurrent to the internal
225237 // loop, so anything caught here is time saved internally.
226238 var (
227- unknowns = make ([]common.Hash , 0 , len (hashes ))
228- duplicate , underpriced int64
239+ unknownHashes = make ([]common.Hash , 0 , len (hashes ))
240+ unknownMetas = make ([]* txMetadata , 0 , len (hashes ))
241+
242+ duplicate int64
243+ underpriced int64
229244 )
230- for _ , hash := range hashes {
245+ for i , hash := range hashes {
231246 switch {
232247 case f .hasTx (hash ):
233248 duplicate ++
@@ -236,20 +251,22 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
236251 underpriced ++
237252
238253 default :
239- unknowns = append (unknowns , hash )
254+ unknownHashes = append (unknownHashes , hash )
255+ if types == nil {
256+ unknownMetas = append (unknownMetas , nil )
257+ } else {
258+ unknownMetas = append (unknownMetas , & txMetadata {kind : types [i ], size : sizes [i ]})
259+ }
240260 }
241261 }
242262 txAnnounceKnownMeter .Mark (duplicate )
243263 txAnnounceUnderpricedMeter .Mark (underpriced )
244264
245265 // If anything's left to announce, push it into the internal loop
246- if len (unknowns ) == 0 {
266+ if len (unknownHashes ) == 0 {
247267 return nil
248268 }
249- announce := & txAnnounce {
250- origin : peer ,
251- hashes : unknowns ,
252- }
269+ announce := & txAnnounce {origin : peer , hashes : unknownHashes , metas : unknownMetas }
253270 select {
254271 case f .notify <- announce :
255272 return nil
@@ -261,7 +278,7 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
261278// Enqueue imports a batch of received transaction into the transaction pool
262279// and the fetcher. This method may be called by both transaction broadcasts and
263280// direct request replies. The differentiation is important so the fetcher can
264- // re-shedule missing transactions as soon as possible.
281+ // re-schedule missing transactions as soon as possible.
265282func (f * TxFetcher ) Enqueue (peer string , txs []* types.Transaction , direct bool ) error {
266283 // Keep track of all the propagated transactions
267284 if direct {
@@ -273,6 +290,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
273290 // re-requesting them and dropping the peer in case of malicious transfers.
274291 var (
275292 added = make ([]common.Hash , 0 , len (txs ))
293+ metas = make ([]txMetadata , 0 , len (txs ))
276294 duplicate int64
277295 underpriced int64
278296 otherreject int64
@@ -302,6 +320,10 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
302320 otherreject ++
303321 }
304322 added = append (added , txs [i ].Hash ())
323+ metas = append (metas , txMetadata {
324+ kind : txs [i ].Type (),
325+ size : uint32 (txs [i ].Size ()),
326+ })
305327 }
306328 if direct {
307329 txReplyKnownMeter .Mark (duplicate )
@@ -313,7 +335,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
313335 txBroadcastOtherRejectMeter .Mark (otherreject )
314336 }
315337 select {
316- case f .cleanup <- & txDelivery {origin : peer , hashes : added , direct : direct }:
338+ case f .cleanup <- & txDelivery {origin : peer , hashes : added , metas : metas , direct : direct }:
317339 return nil
318340 case <- f .quit :
319341 return errTerminated
@@ -370,13 +392,15 @@ func (f *TxFetcher) loop() {
370392 want := used + len (ann .hashes )
371393 if want > maxTxAnnounces {
372394 txAnnounceDOSMeter .Mark (int64 (want - maxTxAnnounces ))
395+
373396 ann .hashes = ann .hashes [:want - maxTxAnnounces ]
397+ ann .metas = ann .metas [:want - maxTxAnnounces ]
374398 }
375399 // All is well, schedule the remainder of the transactions
376400 idleWait := len (f .waittime ) == 0
377401 _ , oldPeer := f .announces [ann .origin ]
378402
379- for _ , hash := range ann .hashes {
403+ for i , hash := range ann .hashes {
380404 // If the transaction is already downloading, add it to the list
381405 // of possible alternates (in case the current retrieval fails) and
382406 // also account it for the peer.
@@ -385,9 +409,9 @@ func (f *TxFetcher) loop() {
385409
386410 // Stage 2 and 3 share the set of origins per tx
387411 if announces := f .announces [ann .origin ]; announces != nil {
388- announces [hash ] = struct {}{}
412+ announces [hash ] = ann . metas [ i ]
389413 } else {
390- f .announces [ann .origin ] = map [common.Hash ]struct {}{ hash : {} }
414+ f .announces [ann .origin ] = map [common.Hash ]* txMetadata { hash : ann . metas [ i ] }
391415 }
392416 continue
393417 }
@@ -398,22 +422,28 @@ func (f *TxFetcher) loop() {
398422
399423 // Stage 2 and 3 share the set of origins per tx
400424 if announces := f .announces [ann .origin ]; announces != nil {
401- announces [hash ] = struct {}{}
425+ announces [hash ] = ann . metas [ i ]
402426 } else {
403- f .announces [ann .origin ] = map [common.Hash ]struct {}{ hash : {} }
427+ f .announces [ann .origin ] = map [common.Hash ]* txMetadata { hash : ann . metas [ i ] }
404428 }
405429 continue
406430 }
407431 // If the transaction is already known to the fetcher, but not
408432 // yet downloading, add the peer as an alternate origin in the
409433 // waiting list.
410434 if f .waitlist [hash ] != nil {
435+ // Ignore double announcements from the same peer. This is
436+ // especially important if metadata is also passed along to
437+ // prevent malicious peers flip-flopping good/bad values.
438+ if _ , ok := f.waitlist [hash ][ann.origin ]; ok {
439+ continue
440+ }
411441 f.waitlist [hash ][ann.origin ] = struct {}{}
412442
413443 if waitslots := f .waitslots [ann .origin ]; waitslots != nil {
414- waitslots [hash ] = struct {}{}
444+ waitslots [hash ] = ann . metas [ i ]
415445 } else {
416- f .waitslots [ann .origin ] = map [common.Hash ]struct {}{ hash : {} }
446+ f .waitslots [ann .origin ] = map [common.Hash ]* txMetadata { hash : ann . metas [ i ] }
417447 }
418448 continue
419449 }
@@ -422,9 +452,9 @@ func (f *TxFetcher) loop() {
422452 f .waittime [hash ] = f .clock .Now ()
423453
424454 if waitslots := f .waitslots [ann .origin ]; waitslots != nil {
425- waitslots [hash ] = struct {}{}
455+ waitslots [hash ] = ann . metas [ i ]
426456 } else {
427- f .waitslots [ann .origin ] = map [common.Hash ]struct {}{ hash : {} }
457+ f .waitslots [ann .origin ] = map [common.Hash ]* txMetadata { hash : ann . metas [ i ] }
428458 }
429459 }
430460 // If a new item was added to the waitlist, schedule it into the fetcher
@@ -450,9 +480,9 @@ func (f *TxFetcher) loop() {
450480 f .announced [hash ] = f .waitlist [hash ]
451481 for peer := range f .waitlist [hash ] {
452482 if announces := f .announces [peer ]; announces != nil {
453- announces [hash ] = struct {}{}
483+ announces [hash ] = f. waitslots [ peer ][ hash ]
454484 } else {
455- f .announces [peer ] = map [common.Hash ]struct {}{ hash : {} }
485+ f .announces [peer ] = map [common.Hash ]* txMetadata { hash : f. waitslots [ peer ][ hash ] }
456486 }
457487 delete (f .waitslots [peer ], hash )
458488 if len (f .waitslots [peer ]) == 0 {
@@ -521,10 +551,27 @@ func (f *TxFetcher) loop() {
521551
522552 case delivery := <- f .cleanup :
523553 // Independent if the delivery was direct or broadcast, remove all
524- // traces of the hash from internal trackers
525- for _ , hash := range delivery .hashes {
554+ // traces of the hash from internal trackers. That said, compare any
555+ // advertised metadata with the real ones and drop bad peers.
556+ for i , hash := range delivery .hashes {
526557 if _ , ok := f .waitlist [hash ]; ok {
527558 for peer , txset := range f .waitslots {
559+ if meta := txset [hash ]; meta != nil {
560+ if delivery .metas [i ].kind != meta .kind {
561+ log .Warn ("Announced transaction type mismatch" , "peer" , peer , "tx" , hash , "type" , delivery .metas [i ].kind , "ann" , meta .kind )
562+ f .dropPeer (peer )
563+ } else if delivery .metas [i ].size != meta .size {
564+ log .Warn ("Announced transaction size mismatch" , "peer" , peer , "tx" , hash , "size" , delivery .metas [i ].size , "ann" , meta .size )
565+ if math .Abs (float64 (delivery .metas [i ].size )- float64 (meta .size )) > 8 {
566+ // Normally we should drop a peer considering this is a protocol violation.
567+ // However, due to the RLP vs consensus format messyness, allow a few bytes
568+ // wiggle-room where we only warn, but don't drop.
569+ //
570+ // TODO(karalabe): Get rid of this relaxation when clients are proven stable.
571+ f .dropPeer (peer )
572+ }
573+ }
574+ }
528575 delete (txset , hash )
529576 if len (txset ) == 0 {
530577 delete (f .waitslots , peer )
@@ -534,6 +581,22 @@ func (f *TxFetcher) loop() {
534581 delete (f .waittime , hash )
535582 } else {
536583 for peer , txset := range f .announces {
584+ if meta := txset [hash ]; meta != nil {
585+ if delivery .metas [i ].kind != meta .kind {
586+ log .Warn ("Announced transaction type mismatch" , "peer" , peer , "tx" , hash , "type" , delivery .metas [i ].kind , "ann" , meta .kind )
587+ f .dropPeer (peer )
588+ } else if delivery .metas [i ].size != meta .size {
589+ log .Warn ("Announced transaction size mismatch" , "peer" , peer , "tx" , hash , "size" , delivery .metas [i ].size , "ann" , meta .size )
590+ if math .Abs (float64 (delivery .metas [i ].size )- float64 (meta .size )) > 8 {
591+ // Normally we should drop a peer considering this is a protocol violation.
592+ // However, due to the RLP vs consensus format messyness, allow a few bytes
593+ // wiggle-room where we only warn, but don't drop.
594+ //
595+ // TODO(karalabe): Get rid of this relaxation when clients are proven stable.
596+ f .dropPeer (peer )
597+ }
598+ }
599+ }
537600 delete (txset , hash )
538601 if len (txset ) == 0 {
539602 delete (f .announces , peer )
@@ -559,7 +622,7 @@ func (f *TxFetcher) loop() {
559622 // In case of a direct delivery, also reschedule anything missing
560623 // from the original query
561624 if delivery .direct {
562- // Mark the reqesting successful (independent of individual status)
625+ // Mark the requesting successful (independent of individual status)
563626 txRequestDoneMeter .Mark (int64 (len (delivery .hashes )))
564627
565628 // Make sure something was pending, nuke it
@@ -608,7 +671,7 @@ func (f *TxFetcher) loop() {
608671 delete (f .alternates , hash )
609672 delete (f .fetching , hash )
610673 }
611- // Something was delivered, try to rechedule requests
674+ // Something was delivered, try to reschedule requests
612675 f .scheduleFetches (timeoutTimer , timeoutTrigger , nil ) // Partial delivery may enable others to deliver too
613676 }
614677
@@ -720,7 +783,7 @@ func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) {
720783// should be rescheduled if some request is pending. In practice, a timeout will
721784// cause the timer to be rescheduled every 5 secs (until the peer comes through or
722785// disconnects). This is a limitation of the fetcher code because we don't trac
723- // pending requests and timed out requests separatey . Without double tracking, if
786+ // pending requests and timed out requests separately . Without double tracking, if
724787// we simply didn't reschedule the timer on all-timeout then the timer would never
725788// be set again since len(request) > 0 => something's running.
726789func (f * TxFetcher ) rescheduleTimeout (timer * mclock.Timer , trigger chan struct {}) {
@@ -835,7 +898,7 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
835898
836899// forEachHash does a range loop over a map of hashes in production, but during
837900// testing it does a deterministic sorted random to allow reproducing issues.
838- func (f * TxFetcher ) forEachHash (hashes map [common.Hash ]struct {} , do func (hash common.Hash ) bool ) {
901+ func (f * TxFetcher ) forEachHash (hashes map [common.Hash ]* txMetadata , do func (hash common.Hash ) bool ) {
839902 // If we're running production, use whatever Go's map gives us
840903 if f .rand == nil {
841904 for hash := range hashes {
0 commit comments