@@ -262,54 +262,72 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
262262// direct request replies. The differentiation is important so the fetcher can
263263// re-schedule missing transactions as soon as possible.
264264func (f * TxFetcher ) Enqueue (peer string , txs []* types.Transaction , direct bool ) error {
265- // Keep track of all the propagated transactions
266- if direct {
267- txReplyInMeter .Mark (int64 (len (txs )))
268- } else {
269- txBroadcastInMeter .Mark (int64 (len (txs )))
265+ var (
266+ inMeter = txReplyInMeter
267+ knownMeter = txReplyKnownMeter
268+ underpricedMeter = txReplyUnderpricedMeter
269+ otherRejectMeter = txReplyOtherRejectMeter
270+ )
271+ if ! direct {
272+ inMeter = txBroadcastInMeter
273+ knownMeter = txBroadcastKnownMeter
274+ underpricedMeter = txBroadcastUnderpricedMeter
275+ otherRejectMeter = txBroadcastOtherRejectMeter
270276 }
277+ // Keep track of all the propagated transactions
278+ inMeter .Mark (int64 (len (txs )))
279+
271280 // Push all the transactions into the pool, tracking underpriced ones to avoid
272281 // re-requesting them and dropping the peer in case of malicious transfers.
273282 var (
274- added = make ([]common.Hash , 0 , len (txs ))
275- duplicate int64
276- underpriced int64
277- otherreject int64
283+ added = make ([]common.Hash , 0 , len (txs ))
278284 )
279- errs := f .addTxs (txs )
280- for i , err := range errs {
281- // Track the transaction hash if the price is too low for us.
282- // Avoid re-request this transaction when we receive another
283- // announcement.
284- if errors .Is (err , core .ErrUnderpriced ) || errors .Is (err , core .ErrReplaceUnderpriced ) {
285- for f .underpriced .Cardinality () >= maxTxUnderpricedSetSize {
286- f .underpriced .Pop ()
287- }
288- f .underpriced .Add (txs [i ].Hash ())
285+ // proceed in batches
286+ for i := 0 ; i < len (txs ); i += 128 {
287+ end := i + 128
288+ if end > len (txs ) {
289+ end = len (txs )
289290 }
290- // Track a few interesting failure types
291- switch {
292- case err == nil : // Noop, but need to handle to not count these
291+ var (
292+ duplicate int64
293+ underpriced int64
294+ otherreject int64
295+ )
296+ batch := txs [i :end ]
297+ for j , err := range f .addTxs (batch ) {
298+ // Track the transaction hash if the price is too low for us.
299+ // Avoid re-request this transaction when we receive another
300+ // announcement.
301+ if errors .Is (err , core .ErrUnderpriced ) || errors .Is (err , core .ErrReplaceUnderpriced ) {
302+ for f .underpriced .Cardinality () >= maxTxUnderpricedSetSize {
303+ f .underpriced .Pop ()
304+ }
305+ f .underpriced .Add (batch [j ].Hash ())
306+ }
307+ // Track a few interesting failure types
308+ switch {
309+ case err == nil : // Noop, but need to handle to not count these
293310
294- case errors .Is (err , core .ErrAlreadyKnown ):
295- duplicate ++
311+ case errors .Is (err , core .ErrAlreadyKnown ):
312+ duplicate ++
296313
297- case errors .Is (err , core .ErrUnderpriced ) || errors .Is (err , core .ErrReplaceUnderpriced ):
298- underpriced ++
314+ case errors .Is (err , core .ErrUnderpriced ) || errors .Is (err , core .ErrReplaceUnderpriced ):
315+ underpriced ++
299316
300- default :
301- otherreject ++
317+ default :
318+ otherreject ++
319+ }
320+ added = append (added , batch [j ].Hash ())
321+ }
322+ knownMeter .Mark (duplicate )
323+ underpricedMeter .Mark (underpriced )
324+ otherRejectMeter .Mark (otherreject )
325+
326+ // If 'other reject' is >25% of the deliveries in any batch, sleep a bit.
327+ if otherreject > 128 / 4 {
328+ time .Sleep (200 * time .Millisecond )
329+ log .Warn ("Peer delivering stale transactions" , "peer" , peer , "rejected" , otherreject )
302330 }
303- added = append (added , txs [i ].Hash ())
304- }
305- if direct {
306- txReplyKnownMeter .Mark (duplicate )
307- txReplyUnderpricedMeter .Mark (underpriced )
308- txReplyOtherRejectMeter .Mark (otherreject )
309- } else {
310- txBroadcastKnownMeter .Mark (duplicate )
311- txBroadcastUnderpricedMeter .Mark (underpriced )
312- txBroadcastOtherRejectMeter .Mark (otherreject )
313331 }
314332 select {
315333 case f .cleanup <- & txDelivery {origin : peer , hashes : added , direct : direct }:
0 commit comments