Skip to content

Commit 5945563

Browse files
feywinddermasmid
andauthored
fix: set MaxBytes for AckQueue (#1963)
* fix: #1864 * feat: update maxBytes changes to use a fixed max size * chore: remove "discount polyfill" for allSettled * tests: add unit tests for maxBytes handling * fix: do the maxMessages and maxBytes checks _before_ adding * fix: roll back nise package to avoid semver breakage * chore: move the nise version fix to devDependencies (oops) * chore: pin rimraf in samples package to avoid typescript breakage in lru-cache * chore: also pin lru-cache, remove unneeded rimraf types --------- Co-authored-by: Cheskel Twersky <[email protected]>
1 parent c2b75bb commit 5945563

File tree

4 files changed

+53
-43
lines changed

4 files changed

+53
-43
lines changed

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@
8989
"mocha": "^9.2.2",
9090
"mv": "^2.1.1",
9191
"ncp": "^2.0.0",
92+
"nise": "6.0.0",
9293
"null-loader": "^4.0.0",
94+
"path-to-regexp": "6.2.2",
9395
"protobufjs": "^7.0.0",
9496
"proxyquire": "^2.0.0",
9597
"sinon": "^18.0.0",

samples/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@
3636
"devDependencies": {
3737
"@google-cloud/bigquery": "^7.0.0",
3838
"@types/chai": "^4.2.16",
39-
"@types/rimraf": "^4.0.0",
4039
"chai": "^4.2.0",
4140
"gts": "^5.0.0",
41+
"lru-cache": "9.1.2",
4242
"mocha": "^9.2.2",
43-
"rimraf": "^5.0.0",
43+
"rimraf": "5.0.9",
4444
"typescript": "^5.1.6",
4545
"uuid": "^9.0.0"
4646
}

src/message-queues.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ export interface BatchOptions {
6161
maxMilliseconds?: number;
6262
}
6363

64+
// This is the maximum number of bytes we will send for a batch of
65+
// ack/modack messages. The server itself has a maximum of 512KiB, so
66+
// we just pull back a little from that in case of unknown fenceposts.
67+
export const MAX_BATCH_BYTES = 510 * 1024 * 1024;
68+
6469
/**
6570
* Error class used to signal a batch failure.
6671
*
@@ -113,6 +118,7 @@ export abstract class MessageQueue {
113118
numPendingRequests: number;
114119
numInFlightRequests: number;
115120
numInRetryRequests: number;
121+
bytes: number;
116122
protected _onFlush?: defer.DeferredPromise<void>;
117123
protected _onDrain?: defer.DeferredPromise<void>;
118124
protected _options!: BatchOptions;
@@ -127,6 +133,7 @@ export abstract class MessageQueue {
127133
this.numPendingRequests = 0;
128134
this.numInFlightRequests = 0;
129135
this.numInRetryRequests = 0;
136+
this.bytes = 0;
130137
this._requests = [];
131138
this._subscriber = sub;
132139
this._retrier = new ExponentialRetry<QueuedMessage>(
@@ -195,7 +202,18 @@ export abstract class MessageQueue {
195202
}
196203

197204
const {maxMessages, maxMilliseconds} = this._options;
205+
const size = Buffer.byteLength(message.ackId, 'utf8');
198206

207+
// If we will go over maxMessages or MAX_BATCH_BYTES by adding this
208+
// message, flush first. (maxMilliseconds is handled by timers.)
209+
if (
210+
this._requests.length + 1 >= maxMessages! ||
211+
this.bytes + size >= MAX_BATCH_BYTES
212+
) {
213+
this.flush();
214+
}
215+
216+
// Add the message to the current batch.
199217
const responsePromise = defer<void>();
200218
this._requests.push({
201219
message: {
@@ -208,10 +226,10 @@ export abstract class MessageQueue {
208226
});
209227
this.numPendingRequests++;
210228
this.numInFlightRequests++;
229+
this.bytes += size;
211230

212-
if (this._requests.length >= maxMessages!) {
213-
this.flush();
214-
} else if (!this._timer) {
231+
// Ensure that we are counting toward maxMilliseconds by timer.
232+
if (!this._timer) {
215233
this._timer = setTimeout(() => this.flush(), maxMilliseconds!);
216234
}
217235

@@ -273,6 +291,7 @@ export abstract class MessageQueue {
273291
const deferred = this._onFlush;
274292

275293
this._requests = [];
294+
this.bytes = 0;
276295
this.numPendingRequests -= batchSize;
277296
delete this._onFlush;
278297

@@ -339,7 +358,10 @@ export abstract class MessageQueue {
339358
* @private
340359
*/
341360
setOptions(options: BatchOptions): void {
342-
const defaults: BatchOptions = {maxMessages: 3000, maxMilliseconds: 100};
361+
const defaults: BatchOptions = {
362+
maxMessages: 3000,
363+
maxMilliseconds: 100,
364+
};
343365

344366
this._options = Object.assign(defaults, options);
345367
}

test/message-queues.ts

Lines changed: 23 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import defer = require('p-defer');
2424

2525
import * as messageTypes from '../src/message-queues';
2626
import {BatchError} from '../src/message-queues';
27-
import {AckError, Message, Subscriber} from '../src/subscriber';
27+
import {Message, Subscriber} from '../src/subscriber';
2828
import {DebugMessage} from '../src/debug';
2929

3030
class FakeClient {
@@ -99,36 +99,6 @@ class ModAckQueue extends messageTypes.ModAckQueue {
9999
}
100100
}
101101

102-
// This discount polyfill for Promise.allSettled can be removed after we drop Node 12.
103-
type AllSettledResult<T, U> = {
104-
status: 'fulfilled' | 'rejected';
105-
value?: T;
106-
reason?: U;
107-
};
108-
function allSettled<T, U>(
109-
proms: Promise<T>[]
110-
): Promise<AllSettledResult<T, U>[]> {
111-
const checkedProms = proms.map((r: Promise<T>) =>
112-
r
113-
.then(
114-
(value: T) =>
115-
({
116-
status: 'fulfilled',
117-
value,
118-
}) as AllSettledResult<T, U>
119-
)
120-
.catch(
121-
(error: U) =>
122-
({
123-
status: 'rejected',
124-
reason: error,
125-
}) as AllSettledResult<T, U>
126-
)
127-
);
128-
129-
return Promise.all(checkedProms);
130-
}
131-
132102
describe('MessageQueues', () => {
133103
const sandbox = sinon.createSandbox();
134104

@@ -190,6 +160,15 @@ describe('MessageQueues', () => {
190160
assert.strictEqual(stub.callCount, 1);
191161
});
192162

163+
it('should flush the queue if at byte capacity', () => {
164+
const stub = sandbox.stub(messageQueue, 'flush');
165+
166+
messageQueue.bytes = messageTypes.MAX_BATCH_BYTES - 10;
167+
messageQueue.add(new FakeMessage() as Message);
168+
169+
assert.strictEqual(stub.callCount, 1);
170+
});
171+
193172
it('should schedule a flush if needed', () => {
194173
const clock = sandbox.useFakeTimers();
195174
const stub = sandbox.stub(messageQueue, 'flush');
@@ -244,6 +223,13 @@ describe('MessageQueues', () => {
244223
assert.strictEqual(messageQueue.numPendingRequests, 0);
245224
});
246225

226+
it('should remove the bytes of messages from the queue', () => {
227+
messageQueue.add(new FakeMessage() as Message);
228+
messageQueue.flush();
229+
230+
assert.strictEqual(messageQueue.bytes, 0);
231+
});
232+
247233
it('should send the batch', () => {
248234
const message = new FakeMessage();
249235
const deadline = 10;
@@ -498,7 +484,7 @@ describe('MessageQueues', () => {
498484
(r: messageTypes.QueuedMessage) => r.responsePromise!.promise
499485
);
500486
await ackQueue.flush();
501-
const results = await allSettled(proms);
487+
const results = await Promise.allSettled(proms);
502488
const oneSuccess = {status: 'fulfilled', value: undefined};
503489
assert.deepStrictEqual(results, [oneSuccess, oneSuccess, oneSuccess]);
504490
});
@@ -522,7 +508,7 @@ describe('MessageQueues', () => {
522508
proms.shift();
523509
await ackQueue.flush();
524510

525-
const results = await allSettled<void, AckError>(proms);
511+
const results = await Promise.allSettled<void>(proms);
526512
assert.strictEqual(results[0].status, 'rejected');
527513
assert.strictEqual(results[0].reason?.errorCode, 'OTHER');
528514
assert.strictEqual(results[1].status, 'rejected');
@@ -552,7 +538,7 @@ describe('MessageQueues', () => {
552538
];
553539
await ackQueue.flush();
554540

555-
const results = await allSettled<void, AckError>(proms);
541+
const results = await Promise.allSettled<void>(proms);
556542
assert.strictEqual(results[0].status, 'rejected');
557543
assert.strictEqual(results[0].reason?.errorCode, 'INVALID');
558544

@@ -789,7 +775,7 @@ describe('MessageQueues', () => {
789775
(r: messageTypes.QueuedMessage) => r.responsePromise!.promise
790776
);
791777
await modAckQueue.flush();
792-
const results = await allSettled(proms);
778+
const results = await Promise.allSettled(proms);
793779
const oneSuccess = {status: 'fulfilled', value: undefined};
794780
assert.deepStrictEqual(results, [oneSuccess, oneSuccess, oneSuccess]);
795781
});
@@ -815,7 +801,7 @@ describe('MessageQueues', () => {
815801
proms.shift();
816802
await modAckQueue.flush();
817803

818-
const results = await allSettled<void, AckError>(proms);
804+
const results = await Promise.allSettled<void>(proms);
819805
assert.strictEqual(results[0].status, 'rejected');
820806
assert.strictEqual(results[0].reason?.errorCode, 'OTHER');
821807
assert.strictEqual(results[1].status, 'rejected');
@@ -847,7 +833,7 @@ describe('MessageQueues', () => {
847833
];
848834
await modAckQueue.flush();
849835

850-
const results = await allSettled<void, AckError>(proms);
836+
const results = await Promise.allSettled<void>(proms);
851837
assert.strictEqual(results[0].status, 'rejected');
852838
assert.strictEqual(results[0].reason?.errorCode, 'INVALID');
853839

0 commit comments

Comments
 (0)