77 runLagMonitoring : runLagMonitoringCommon ,
88 genericProduceToTopic,
99 getAutoCommit,
10+ PerformanceLogger,
1011 } = require ( './performance-primitives-common' ) ;
1112
1213module . exports = {
@@ -22,6 +23,9 @@ module.exports = {
2223
2324const CONSUMER_MAX_BATCH_SIZE = process . env . CONSUMER_MAX_BATCH_SIZE ? + process . env . CONSUMER_MAX_BATCH_SIZE : null ;
2425const IS_HIGHER_LATENCY_CLUSTER = process . env . IS_HIGHER_LATENCY_CLUSTER === 'true' ;
26+ const DEBUG = process . env . DEBUG ;
27+ const STATISTICS_INTERVAL_MS = process . env . STATISTICS_INTERVAL_MS ? + process . env . STATISTICS_INTERVAL_MS : null ;
28+ const ENABLE_LOGGING = DEBUG !== undefined || STATISTICS_INTERVAL_MS !== null ;
2529
2630function baseConfiguration ( parameters ) {
2731 let ret = {
@@ -39,11 +43,30 @@ function baseConfiguration(parameters) {
3943 'sasl.password' : parameters . saslPassword ,
4044 } ;
4145 }
46+ if ( DEBUG ) {
47+ ret [ 'debug' ] = DEBUG ;
48+ }
49+ if ( parameters . logToFile ) {
50+ ret . kafkaJS = {
51+ 'logger' : new PerformanceLogger ( parameters . logToFile ) ,
52+ } ;
53+ }
54+ if ( STATISTICS_INTERVAL_MS !== null ) {
55+ ret [ 'statistics.interval.ms' ] = STATISTICS_INTERVAL_MS ;
56+ ret [ 'stats_cb' ] = function ( event ) {
57+ this . logger ( ) . info ( event . message ) ;
58+ } ;
59+ }
4260 return ret ;
4361}
4462
4563async function runCreateTopics ( parameters , topic , topic2 , numPartitions ) {
46- const kafka = new Kafka ( baseConfiguration ( parameters ) ) ;
64+ const adminParameters = {
65+ ...parameters ,
66+ } ;
67+ if ( ENABLE_LOGGING )
68+ adminParameters . logToFile = './confluent-admin.log' ;
69+ const kafka = new Kafka ( baseConfiguration ( adminParameters ) ) ;
4770
4871 const admin = kafka . admin ( ) ;
4972 await admin . connect ( ) ;
@@ -116,6 +139,9 @@ function newCompatibleProducer(parameters, compression) {
116139}
117140
118141async function runProducer ( parameters , topic , batchSize , warmupMessages , totalMessageCnt , msgSize , compression , randomness , limitRPS ) {
142+ if ( ENABLE_LOGGING && ! parameters . logToFile ) {
143+ parameters . logToFile = './confluent-producer.log' ;
144+ }
119145 return runProducerCommon ( newCompatibleProducer ( parameters , compression ) , topic , batchSize , warmupMessages , totalMessageCnt , msgSize , compression , randomness , limitRPS ) ;
120146}
121147
@@ -151,7 +177,8 @@ class CompatibleConsumer {
151177 }
152178}
153179
154- function newCompatibleConsumer ( parameters , eachBatch ) {
180+ function newCompatibleConsumer ( parameters , eachBatch , messageSize , limitRPS ) {
181+ const minFetchBytes = messageSize * limitRPS ;
155182 const kafka = new Kafka ( baseConfiguration ( parameters ) ) ;
156183 const autoCommit = getAutoCommit ( ) ;
157184 const autoCommitOpts = autoCommit > 0 ?
@@ -173,6 +200,7 @@ function newCompatibleConsumer(parameters, eachBatch) {
173200 'group.id' : groupId ,
174201 'auto.offset.reset' : 'earliest' ,
175202 'fetch.queue.backoff.ms' : '100' ,
203+ 'fetch.min.bytes' : minFetchBytes . toString ( ) ,
176204 ...autoCommitOpts ,
177205 ...jsOpts ,
178206 ...higherLatencyClusterOpts ,
@@ -181,16 +209,30 @@ function newCompatibleConsumer(parameters, eachBatch) {
181209}
182210
183211
184- async function runConsumer ( parameters , topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , produceToTopic , produceCompression , useCKJSProducerEverywhere ) {
212+ async function runConsumer ( parameters , topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , produceToTopic , produceCompression , useCKJSProducerEverywhere ,
213+ messageSize , limitRPS ) {
185214 let actionOnMessages = null ;
186215 let producer ;
187216 if ( produceToTopic ) {
188- producer = newCompatibleProducer ( parameters , produceCompression ) ;
217+ const producerParameters = {
218+ ...parameters ,
219+ } ;
220+ if ( ENABLE_LOGGING )
221+ producerParameters . logToFile = './confluent-consumer-producer.log' ;
222+ producer = newCompatibleProducer ( producerParameters , produceCompression ) ;
189223 await producer . connect ( ) ;
190224 actionOnMessages = ( messages ) =>
191225 genericProduceToTopic ( producer , produceToTopic , messages ) ;
192226 }
193- const ret = await runConsumerCommon ( newCompatibleConsumer ( parameters , eachBatch ) , topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , actionOnMessages ) ;
227+ const consumerParameters = {
228+ ...parameters ,
229+ } ;
230+ if ( ENABLE_LOGGING )
231+ consumerParameters . logToFile = eachBatch ? './confluent-consumer-batch.log' :
232+ './confluent-consumer-message.log' ;
233+ const ret = await runConsumerCommon (
234+ newCompatibleConsumer ( consumerParameters , eachBatch , messageSize , limitRPS ) ,
235+ topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , actionOnMessages ) ;
194236 if ( producer ) {
195237 await producer . disconnect ( ) ;
196238 }
0 commit comments