Skip to content

Commit 72c3bed

Browse files
committed
Add latency percentiles calculation
1 parent 3816d53 commit 72c3bed

File tree

3 files changed

+109
-5
lines changed

3 files changed

+109
-5
lines changed

ci/tests/run_perf_test.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,12 @@ async function main() {
9090

9191
if (concurrentRun) {
9292
console.log(`Running ${modeLabel} Producer/Consumer test (concurrently)...`);
93-
const INITIAL_DELAY_MS = 2000;
93+
const INITIAL_DELAY_MS = 10000;
9494
const TERMINATE_TIMEOUT_MS = process.env.TERMINATE_TIMEOUT_MS ? +process.env.TERMINATE_TIMEOUT_MS : 600000;
9595
// Wait INITIAL_DELAY_MS more to see if all lag is caught up, start earlier than the producer to check
9696
// E2E latencies more accurately.
97-
const TERMINATE_TIMEOUT_MS_CONSUMERS = TERMINATE_TIMEOUT_MS + INITIAL_DELAY_MS * 2;
97+
const TERMINATE_TIMEOUT_MS_CONSUMERS = TERMINATE_TIMEOUT_MS + INITIAL_DELAY_MS + 2000;
98+
const TERMINATE_TIMEOUT_MS_LAG_MONITORING = TERMINATE_TIMEOUT_MS + 1000;
9899

99100
await runCommand(`MODE=${mode} node performance-consolidated.js --create-topics`);
100101
const allPromises = [];
@@ -106,10 +107,10 @@ async function main() {
106107
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} INITIAL_DELAY_MS=0 TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_BATCH=${groupIdEachBatch} node performance-consolidated.js --consumer-each-batch ${produceToSecondTopicParam}`));
107108
}
108109
if (consumerModeAll || consumerModeEachMessage) {
109-
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=0 TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachMessage} node performance-consolidated.js --monitor-lag`));
110+
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=${INITIAL_DELAY_MS} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_LAG_MONITORING} GROUPID_MONITOR=${groupIdEachMessage} node performance-consolidated.js --monitor-lag`));
110111
}
111112
if (consumerModeAll || consumerModeEachBatch) {
112-
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=0 TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachBatch} node performance-consolidated.js --monitor-lag`));
113+
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=${INITIAL_DELAY_MS} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_LAG_MONITORING} GROUPID_MONITOR=${groupIdEachBatch} node performance-consolidated.js --monitor-lag`));
113114
}
114115
const results = await Promise.allSettled(allPromises);
115116
return results.map(r => r.status === 'fulfilled' ? r.value : '').join('\n');

examples/performance/performance-consolidated.js

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ function logParameters(parameters) {
5353
}
5454
}
5555

56+
function printPercentiles(percentiles, type) {
57+
for (const { percentile, value, count, total } of percentiles) {
58+
const percentileStr = `P${percentile}`.padStart(6, ' ');
59+
console.log(`=== Consumer ${percentileStr} E2E latency ${type}: ${value.toFixed(2)} ms (${count}/${total})`);
60+
}
61+
}
62+
5663
(async function () {
5764
const producer = process.argv.includes('--producer');
5865
const consumer = process.argv.includes('--consumer');
@@ -169,10 +176,11 @@ function logParameters(parameters) {
169176
endTrackingMemory('consumer-each-message', `consumer-memory-message-${mode}.json`);
170177
console.log("=== Consumer Rate MB/s (eachMessage): ", consumerRate);
171178
console.log("=== Consumer Rate msg/s (eachMessage): ", stats.messageRate);
172-
console.log("=== Consumer average E2E latency T0-T1 (eachMessage): ", stats.avgLatencyT0T1);
179+
printPercentiles(stats.percentilesTOT1, 'T0-T1 (eachMessage)');
173180
console.log("=== Consumer max E2E latency T0-T1 (eachMessage): ", stats.maxLatencyT0T1);
174181
if (produceToSecondTopic) {
175182
console.log("=== Consumer average E2E latency T0-T2 (eachMessage): ", stats.avgLatencyT0T2);
183+
printPercentiles(stats.percentilesTOT2, 'T0-T2 (eachMessage)');
176184
console.log("=== Consumer max E2E latency T0-T2 (eachMessage): ", stats.maxLatencyT0T2);
177185
}
178186
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
@@ -197,9 +205,11 @@ function logParameters(parameters) {
197205
console.log("=== Max eachBatch lag: ", stats.maxOffsetLag);
198206
console.log("=== Average eachBatch size: ", stats.averageBatchSize);
199207
console.log("=== Consumer average E2E latency T0-T1 (eachBatch): ", stats.avgLatencyT0T1);
208+
printPercentiles(stats.percentilesTOT1, 'T0-T1 (eachBatch)');
200209
console.log("=== Consumer max E2E latency T0-T1 (eachBatch): ", stats.maxLatencyT0T1);
201210
if (produceToSecondTopic) {
202211
console.log("=== Consumer average E2E latency T0-T2 (eachBatch): ", stats.avgLatencyT0T2);
212+
printPercentiles(stats.percentilesTOT2, 'T0-T2 (eachBatch)');
203213
console.log("=== Consumer max E2E latency T0-T2 (eachBatch): ", stats.maxLatencyT0T2);
204214
}
205215
console.log("=== Consumption time (eachBatch): ", stats.durationSeconds);

examples/performance/performance-primitives-common.js

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const { hrtime } = require('process');
22
const { randomBytes } = require('crypto');
3+
const PERCENTILES = [50, 75, 90, 95, 99, 99.9, 99.99, 100];
34

45
const TERMINATE_TIMEOUT_MS = process.env.TERMINATE_TIMEOUT_MS ? +process.env.TERMINATE_TIMEOUT_MS : 600000;
56
const AUTO_COMMIT = process.env.AUTO_COMMIT || 'false';
@@ -58,8 +59,86 @@ function genericProduceToTopic(producer, topic, messages) {
5859
});
5960
}
6061

62+
63+
// We use a simple count-sketch for latency percentiles to avoid storing all latencies in memory.
64+
// because we're also measuring the memory usage of the consumer as part of the performance tests.
65+
class LatencyCountSketch {
66+
#numBuckets;
67+
#minValue;
68+
#maxValue;
69+
#buckets;
70+
#counts;
71+
#changeBaseLogarithm;
72+
#totalCount = 0;
73+
#base;
74+
75+
constructor({
76+
error = 0.01, // 1% error
77+
minValue = 0.01, // min 10μs latency
78+
maxValue = 60000, // max 60s latency
79+
}) {
80+
// Each bucket represents [x, x * (1 + error))
81+
this.#base = 1 + error;
82+
// Change base from natural log to log base this.#base
83+
this.#changeBaseLogarithm = Math.log(this.#base);
84+
this.#numBuckets = Math.ceil(Math.log(maxValue / minValue) / Math.log(this.#base));
85+
this.#maxValue = maxValue;
86+
87+
this.#buckets = new Array(this.#numBuckets + 2).fill(0);
88+
this.#buckets[this.#numBuckets + 1] = Number.POSITIVE_INFINITY;
89+
this.#buckets[this.#numBuckets] = this.#maxValue;
90+
this.#buckets[0] = 0;
91+
let i = this.#numBuckets - 1;
92+
let currentValue = maxValue;
93+
while (i >= 1) {
94+
let nextMinimum = currentValue / this.#base;
95+
this.#buckets[i] = nextMinimum;
96+
currentValue = nextMinimum;
97+
i--;
98+
}
99+
this.#minValue = this.#buckets[1];
100+
this.#counts = new Array(this.#numBuckets + 2).fill(0);
101+
}
102+
103+
add(latency) {
104+
let idx = 0;
105+
if (latency > 0)
106+
idx = Math.ceil(Math.log(latency / this.#minValue) / this.#changeBaseLogarithm);
107+
idx = (idx < 0) ? 0 :
108+
(idx > this.#buckets.length - 2) ? (this.#buckets.length - 2) :
109+
idx;
110+
111+
this.#counts[idx]++;
112+
this.#totalCount++;
113+
}
114+
115+
percentiles(percentilesArray) {
116+
const percentileCounts = percentilesArray.map(p => Math.ceil(this.#totalCount * p / 100));
117+
const percentileResults = new Array(percentilesArray.length);
118+
var totalCountSoFar = 0;
119+
let j = 0;
120+
let sum = 0;
121+
for (let i = 0; i < this.#counts.length; i++) {
122+
sum += this.#counts[i];
123+
}
124+
for (let i = 0; i < percentileCounts.length; i++) {
125+
while ((totalCountSoFar < percentileCounts[i]) && (j < this.#counts.length - 1)) {
126+
totalCountSoFar += this.#counts[j];
127+
j++;
128+
}
129+
const bucketIndex = (j < this.#counts.length - 1) ? j : this.#counts.length - 2;
130+
percentileResults[i] = [this.#buckets[bucketIndex], totalCountSoFar, this.#totalCount];
131+
}
132+
return percentileResults;
133+
}
134+
}
135+
61136
async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats, actionOnMessages) {
62137
const handlers = installHandlers(totalMessageCnt === -1);
138+
if (stats) {
139+
stats.percentilesTOT1 = new LatencyCountSketch({});
140+
stats.percentilesTOT2 = new LatencyCountSketch({});
141+
}
63142
while (true) {
64143
try {
65144
await consumer.connect();
@@ -106,6 +185,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
106185
stats.maxLatencyT0T1 = Math.max(stats.maxLatencyT0T1, latency);
107186
stats.avgLatencyT0T1 = ((stats.avgLatencyT0T1 * (numMessages - 1)) + latency) / numMessages;
108187
}
188+
stats.percentilesTOT1.add(latency);
109189
} else {
110190
if (!stats.maxLatencyT0T2) {
111191
stats.maxLatencyT0T2 = latency;
@@ -114,6 +194,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
114194
stats.maxLatencyT0T2 = Math.max(stats.maxLatencyT0T2, latency);
115195
stats.avgLatencyT0T2 = ((stats.avgLatencyT0T2 * (numMessages - 1)) + latency) / numMessages;
116196
}
197+
stats.percentilesTOT2.add(latency);
117198
}
118199
};
119200

@@ -257,6 +338,18 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
257338
stats.messageRate = durationSeconds > 0 ?
258339
(messagesMeasured / durationSeconds) : Infinity;
259340
stats.durationSeconds = durationSeconds;
341+
stats.percentilesTOT1 = stats.percentilesTOT1.percentiles(PERCENTILES).map((value, index) => ({
342+
percentile: PERCENTILES[index],
343+
value: value[0],
344+
count: value[1],
345+
total: value[2],
346+
}));
347+
stats.percentilesTOT2 = stats.percentilesTOT2.percentiles(PERCENTILES).map((value, index) => ({
348+
percentile: PERCENTILES[index],
349+
value: value[0],
350+
count: value[1],
351+
total: value[2],
352+
}));
260353
}
261354
removeHandlers(handlers);
262355
return rate;

0 commit comments

Comments
 (0)