@@ -262,57 +262,79 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
262262// direct request replies. The differentiation is important so the fetcher can
263263// re-schedule missing transactions as soon as possible.
264264func (f * TxFetcher ) Enqueue (peer string , txs []* types.Transaction , direct bool ) error {
265- // Keep track of all the propagated transactions
266- if direct {
267- txReplyInMeter .Mark (int64 (len (txs )))
268- } else {
269- txBroadcastInMeter .Mark (int64 (len (txs )))
265+ var (
266+ inMeter = txReplyInMeter
267+ knownMeter = txReplyKnownMeter
268+ underpricedMeter = txReplyUnderpricedMeter
269+ otherRejectMeter = txReplyOtherRejectMeter
270+ )
271+ if ! direct {
272+ inMeter = txBroadcastInMeter
273+ knownMeter = txBroadcastKnownMeter
274+ underpricedMeter = txBroadcastUnderpricedMeter
275+ otherRejectMeter = txBroadcastOtherRejectMeter
270276 }
277+ // Keep track of all the propagated transactions
278+ inMeter .Mark (int64 (len (txs )))
279+
271280 // Push all the transactions into the pool, tracking underpriced ones to avoid
272281 // re-requesting them and dropping the peer in case of malicious transfers.
273282 var (
274- added = make ([]common.Hash , 0 , len (txs ))
275- duplicate int64
276- underpriced int64
277- otherreject int64
283+ added = make ([]common.Hash , 0 , len (txs ))
284+ delay time.Duration
278285 )
279- errs := f .addTxs (txs )
280- for i , err := range errs {
281- // Track the transaction hash if the price is too low for us.
282- // Avoid re-request this transaction when we receive another
283- // announcement.
284- if errors .Is (err , core .ErrUnderpriced ) || errors .Is (err , core .ErrReplaceUnderpriced ) {
285- for f .underpriced .Cardinality () >= maxTxUnderpricedSetSize {
286- f .underpriced .Pop ()
287- }
288- f .underpriced .Add (txs [i ].Hash ())
286+ // proceed in batches
287+ for i := 0 ; i < len (txs ); i += 128 {
288+ end := i + 128
289+ if end > len (txs ) {
290+ end = len (txs )
289291 }
290- // Track a few interesting failure types
291- switch {
292- case err == nil : // Noop, but need to handle to not count these
292+ var (
293+ duplicate int64
294+ underpriced int64
295+ otherreject int64
296+ )
297+ batch := txs [i :end ]
298+ for j , err := range f .addTxs (batch ) {
299+ // Track the transaction hash if the price is too low for us.
300+ // Avoid re-request this transaction when we receive another
301+ // announcement.
302+ if errors .Is (err , core .ErrUnderpriced ) || errors .Is (err , core .ErrReplaceUnderpriced ) {
303+ for f .underpriced .Cardinality () >= maxTxUnderpricedSetSize {
304+ f .underpriced .Pop ()
305+ }
306+ f .underpriced .Add (batch [j ].Hash ())
307+ }
308+ // Track a few interesting failure types
309+ switch {
310+ case err == nil : // Noop, but need to handle to not count these
293311
294- case errors .Is (err , core .ErrAlreadyKnown ):
295- duplicate ++
312+ case errors .Is (err , core .ErrAlreadyKnown ):
313+ duplicate ++
296314
297- case errors .Is (err , core .ErrUnderpriced ) || errors .Is (err , core .ErrReplaceUnderpriced ):
298- underpriced ++
315+ case errors .Is (err , core .ErrUnderpriced ) || errors .Is (err , core .ErrReplaceUnderpriced ):
316+ underpriced ++
299317
300- default :
301- otherreject ++
318+ default :
319+ otherreject ++
320+ }
321+ added = append (added , batch [j ].Hash ())
322+ }
323+ knownMeter .Mark (duplicate )
324+ underpricedMeter .Mark (underpriced )
325+ otherRejectMeter .Mark (otherreject )
326+
327+ // If 'other reject' is >25% of the deliveries in any batch, abort. Either we are
328+ // out of sync with the chain or the peer is griefing us.
329+ if otherreject > 128 / 4 {
330+ delay = 200 * time .Millisecond
331+ log .Warn ("Peer delivering useless transactions" , "peer" , peer , "ignored" , len (txs )- end )
332+ break
302333 }
303- added = append (added , txs [i ].Hash ())
304- }
305- if direct {
306- txReplyKnownMeter .Mark (duplicate )
307- txReplyUnderpricedMeter .Mark (underpriced )
308- txReplyOtherRejectMeter .Mark (otherreject )
309- } else {
310- txBroadcastKnownMeter .Mark (duplicate )
311- txBroadcastUnderpricedMeter .Mark (underpriced )
312- txBroadcastOtherRejectMeter .Mark (otherreject )
313334 }
314335 select {
315336 case f .cleanup <- & txDelivery {origin : peer , hashes : added , direct : direct }:
337+ time .Sleep (delay )
316338 return nil
317339 case <- f .quit :
318340 return errTerminated
0 commit comments