@@ -433,34 +433,36 @@ async function runProducer(producer, topic, batchSize, warmupMessages, totalMess
433433 while ( totalMessageCnt == - 1 || messagesDispatched < totalMessageCnt ) {
434434 const modifiedMessages = [ ] ;
435435 const batchStart = messagesDispatched % messageCnt ;
436- for ( const msg of messages . slice ( batchStart , batchStart + batchSize ) ) {
436+ for ( let i = 0 ; i < batchSize ; i ++ ) {
437+ const msg = messages [ ( batchStart + i ) % messages . length ] ;
437438 modifiedMessages . push ( {
438439 key : msg . key ,
439440 value : msg . value ,
440441 headers : {
441442 'timestamp' : Date . now ( ) . toString ( ) ,
442443 }
443444 } ) ;
445+ if ( messagesNotAwaited + i + 1 >= maxToAwait )
446+ break ;
444447 }
448+ const toProduce = modifiedMessages . length ;
445449 promises . push ( producer . send ( {
446450 topic,
447451 messages : modifiedMessages ,
448452 } , compression ) . then ( ( ) => {
449- totalMessagesSent += batchSize ;
450- totalBytesSent += batchSize * msgSize ;
453+ totalMessagesSent += toProduce ;
454+ totalBytesSent += toProduce * msgSize ;
451455 } ) . catch ( ( err ) => {
452456 if ( producer . isQueueFullError ( err ) ) {
453457 /* do nothing, just send them again */
454- messagesDispatched -= batchSize ;
455- totalMessagesSent -= batchSize ;
456- totalBytesSent -= batchSize * msgSize ;
458+ messagesDispatched -= toProduce ;
457459 } else {
458460 console . error ( err ) ;
459461 throw err ;
460462 }
461463 } ) ) ;
462- messagesDispatched += batchSize ;
463- messagesNotAwaited += batchSize ;
464+ messagesDispatched += toProduce ;
465+ messagesNotAwaited += toProduce ;
464466 if ( handlers . terminationRequested || messagesNotAwaited >= maxToAwait )
465467 break ;
466468 }
0 commit comments