Skip to content

Commit 8dee024

Browse files
feywindgoogle-labs-jules[bot]gcf-owl-bot[bot]
authored
feat: add timeout option and graceful shutdown to Subscription.close() (#2068)
* feat: Add timeout option to Subscription.close() Implements a new `timeout` option (using `Duration`) for the `Subscription.close()` method. This provides more control over the shutdown process: - If `timeout` is zero, the subscription closes as quickly as possible without nacking buffered messages. - If `timeout` is positive, the subscription attempts to nack any buffered messages (in the lease manager) and waits up to the specified duration for pending acknowledgements and nacks to be sent to the server. - If no timeout is provided, the behavior remains as before (waits indefinitely for pending acks/modacks, no nacking). The core logic is implemented in `Subscriber.close()`. `PubSub.close()` documentation is updated to clarify its scope and recommend using `Subscription.close()` directly for this feature. Includes: - Unit tests for the new timeout behavior in `Subscriber.close()`. - A TypeScript sample (`samples/closeSubscriptionWithTimeout.ts`) demonstrating usage. - Updated JSDoc documentation for relevant methods. * docs: revert README change so release-please can do it * feat: jules' vibin' is too lo-fi, fix some bad assumptions * samples: typeless a JS sample for close with timeout * feat: add awaitWithTimeout and test * tests: also test error results without timeout * feat: update for the current spec, test updates coming * tests: misc fixes before further additions * feat: update Temporal shims to match latest standards * chore: linter fix * feat: update to latest spec doc info, finish unit tests * feat: also move the options from close() parameters to subscriber options * chore: fix linter errors * samples: update to latest API changes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: no need to check isEmpty on remove * chore: remove unneeded promise skip code * fix: substantially clarify the awaitWithTimeout interface * chore: hoist timeout logic into its own method * fix: tests were leaking EventEmitter handlers * chore: change constant to CONSTANT_CASE --------- Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent dce83c7 commit 8dee024

21 files changed

+1043
-83
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
123123

124124
| Sample | Source Code | Try it |
125125
| --------------------------- | --------------------------------- | ------ |
126+
| Close Subscription with Timeout | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/closeSubscriptionWithTimeout.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/closeSubscriptionWithTimeout.js,samples/README.md) |
126127
| Commit an Avro-Based Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/commitAvroSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/commitAvroSchema.js,samples/README.md) |
127128
| Commit an Proto-Based Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/commitProtoSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/commitProtoSchema.js,samples/README.md) |
128129
| Create an Avro based Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createAvroSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createAvroSchema.js,samples/README.md) |

samples/README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ guides.
2020

2121
* [Before you begin](#before-you-begin)
2222
* [Samples](#samples)
23+
* [Close Subscription with Timeout](#close-subscription-with-timeout)
2324
* [Commit an Avro-Based Schema](#commit-an-avro-based-schema)
2425
* [Commit an Proto-Based Schema](#commit-an-proto-based-schema)
2526
* [Create an Avro based Schema](#create-an-avro-based-schema)
@@ -108,6 +109,25 @@ Before running the samples, make sure you've followed the steps outlined in
108109

109110

110111

112+
### Close Subscription with Timeout
113+
114+
Demonstrates closing a subscription with a specified timeout for graceful shutdown.
115+
116+
View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/closeSubscriptionWithTimeout.js).
117+
118+
[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/closeSubscriptionWithTimeout.js,samples/README.md)
119+
120+
__Usage:__
121+
122+
123+
`node closeSubscriptionWithTimeout.js <topic-name> <subscription-name>`
124+
125+
126+
-----
127+
128+
129+
130+
111131
### Commit an Avro-Based Schema
112132

113133
Commits a new schema definition revision on a project, using Avro
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// This is a generated sample, using the typeless sample bot. Please
16+
// look for the source TypeScript sample (.ts) for modifications.
17+
'use strict';
18+
19+
/**
20+
* This sample demonstrates how to use the `timeout` option when closing a Pub/Sub
21+
* subscription using the Node.js client library. The timeout allows for graceful
22+
* shutdown, attempting to nack any buffered messages before closing.
23+
*
24+
* For more information, see the README.md under /pubsub and the documentation
25+
* at https://cloud.google.com/pubsub/docs.
26+
*/
27+
28+
// sample-metadata:
29+
// title: Close Subscription with Timeout
30+
// description: Demonstrates closing a subscription with a specified timeout for graceful shutdown.
31+
// usage: node closeSubscriptionWithTimeout.js <topic-name> <subscription-name>
32+
33+
// This sample is currently speculative.
34+
// -START pubsub_close_subscription_with_timeout]
35+
36+
// Imports the Google Cloud client library
37+
const {
38+
PubSub,
39+
Duration,
40+
SubscriptionCloseBehaviors,
41+
} = require('@google-cloud/pubsub');
42+
43+
// Creates a client; cache this for further use
44+
const pubsub = new PubSub();
45+
46+
async function closeSubscriptionWithTimeout(
47+
topicNameOrId,
48+
subscriptionNameOrId,
49+
) {
50+
const topic = pubsub.topic(topicNameOrId);
51+
52+
// Closes the subscription immediately, not waiting for anything.
53+
let subscription = topic.subscription(subscriptionNameOrId, {
54+
closeOptions: {
55+
timeout: Duration.from({seconds: 0}),
56+
},
57+
});
58+
await subscription.close();
59+
60+
// Shuts down the gRPC connection, and waits for just before the timeout
61+
// to send nacks for buffered messages. If `timeout` were missing, this
62+
// would wait for the maximum leasing timeout.
63+
subscription = topic.subscription(subscriptionNameOrId, {
64+
closeOptions: {
65+
behavior: SubscriptionCloseBehaviors.WaitForProcessing,
66+
timeout: Duration.from({seconds: 10}),
67+
},
68+
});
69+
await subscription.close();
70+
71+
// Shuts down the gRPC connection, sends nacks for buffered messages, and waits
72+
// through the timeout for nacks to send.
73+
subscription = topic.subscription(subscriptionNameOrId, {
74+
closeOptions: {
75+
behavior: SubscriptionCloseBehaviors.NackImmediately,
76+
timeout: Duration.from({seconds: 10}),
77+
},
78+
});
79+
await subscription.close();
80+
}
81+
// -END pubsub_close_subscription_with_timeout]
82+
83+
// Presumes topic and subscription have been created prior to running the sample.
84+
// If you uncomment the cleanup code above, the sample will delete them afterwards.
85+
function main(
86+
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
87+
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
88+
) {
89+
closeSubscriptionWithTimeout(topicNameOrId, subscriptionNameOrId).catch(
90+
err => {
91+
console.error(err.message);
92+
process.exitCode = 1;
93+
},
94+
);
95+
}
96+
97+
main(...process.argv.slice(2));
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* This sample demonstrates how to use the `timeout` option when closing a Pub/Sub
17+
* subscription using the Node.js client library. The timeout allows for graceful
18+
* shutdown, attempting to nack any buffered messages before closing.
19+
*
20+
* For more information, see the README.md under /pubsub and the documentation
21+
* at https://cloud.google.com/pubsub/docs.
22+
*/
23+
24+
// sample-metadata:
25+
// title: Close Subscription with Timeout
26+
// description: Demonstrates closing a subscription with a specified timeout for graceful shutdown.
27+
// usage: node closeSubscriptionWithTimeout.js <topic-name> <subscription-name>
28+
29+
// This sample is currently speculative.
30+
// -START pubsub_close_subscription_with_timeout]
31+
32+
// Imports the Google Cloud client library
33+
import {
34+
PubSub,
35+
Duration,
36+
SubscriptionCloseBehaviors,
37+
} from '@google-cloud/pubsub';
38+
39+
// Creates a client; cache this for further use
40+
const pubsub = new PubSub();
41+
42+
async function closeSubscriptionWithTimeout(
43+
topicNameOrId: string,
44+
subscriptionNameOrId: string,
45+
) {
46+
const topic = pubsub.topic(topicNameOrId);
47+
48+
// Closes the subscription immediately, not waiting for anything.
49+
let subscription = topic.subscription(subscriptionNameOrId, {
50+
closeOptions: {
51+
timeout: Duration.from({seconds: 0}),
52+
},
53+
});
54+
await subscription.close();
55+
56+
// Shuts down the gRPC connection, and waits for just before the timeout
57+
// to send nacks for buffered messages. If `timeout` were missing, this
58+
// would wait for the maximum leasing timeout.
59+
subscription = topic.subscription(subscriptionNameOrId, {
60+
closeOptions: {
61+
behavior: SubscriptionCloseBehaviors.WaitForProcessing,
62+
timeout: Duration.from({seconds: 10}),
63+
},
64+
});
65+
await subscription.close();
66+
67+
// Shuts down the gRPC connection, sends nacks for buffered messages, and waits
68+
// through the timeout for nacks to send.
69+
subscription = topic.subscription(subscriptionNameOrId, {
70+
closeOptions: {
71+
behavior: SubscriptionCloseBehaviors.NackImmediately,
72+
timeout: Duration.from({seconds: 10}),
73+
},
74+
});
75+
await subscription.close();
76+
}
77+
// -END pubsub_close_subscription_with_timeout]
78+
79+
// Presumes topic and subscription have been created prior to running the sample.
80+
// If you uncomment the cleanup code above, the sample will delete them afterwards.
81+
function main(
82+
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
83+
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
84+
) {
85+
closeSubscriptionWithTimeout(topicNameOrId, subscriptionNameOrId).catch(
86+
err => {
87+
console.error(err.message);
88+
process.exitCode = 1;
89+
},
90+
);
91+
}
92+
93+
main(...process.argv.slice(2));

src/exponential-retry.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ export class ExponentialRetry<T> {
7777
private _timer?: NodeJS.Timeout;
7878

7979
constructor(backoff: Duration, maxBackoff: Duration) {
80-
this._backoffMs = backoff.totalOf('millisecond');
81-
this._maxBackoffMs = maxBackoff.totalOf('millisecond');
80+
this._backoffMs = backoff.milliseconds;
81+
this._maxBackoffMs = maxBackoff.milliseconds;
8282
}
8383

8484
/**
@@ -170,7 +170,7 @@ export class ExponentialRetry<T> {
170170

171171
next.retryInfo!.callback(
172172
next as unknown as T,
173-
Duration.from({millis: now - next.retryInfo!.firstRetry}),
173+
Duration.from({milliseconds: now - next.retryInfo!.firstRetry}),
174174
);
175175
} else {
176176
break;

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ export {
143143
SubscriptionMetadata,
144144
SubscriptionOptions,
145145
SubscriptionCloseCallback,
146+
SubscriptionCloseOptions,
147+
SubscriptionCloseBehaviors,
146148
CreateSubscriptionOptions,
147149
CreateSubscriptionCallback,
148150
CreateSubscriptionResponse,

src/lease-manager.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ export class LeaseManager extends EventEmitter {
145145
*/
146146
clear(): Message[] {
147147
const wasFull = this.isFull();
148+
const wasEmpty = this.isEmpty();
148149

149150
if (this.pending > 0) {
150151
logs.subscriberFlowControl.info(
@@ -161,11 +162,15 @@ export class LeaseManager extends EventEmitter {
161162
if (wasFull) {
162163
process.nextTick(() => this.emit('free'));
163164
}
165+
if (!wasEmpty && this.isEmpty()) {
166+
process.nextTick(() => this.emit('empty'));
167+
}
164168

165169
this._cancelExtension();
166170

167171
return remaining;
168172
}
173+
169174
/**
170175
* Indicates if we're at or over capacity.
171176
*
@@ -176,6 +181,17 @@ export class LeaseManager extends EventEmitter {
176181
const {maxBytes, maxMessages} = this._options;
177182
return this.size >= maxMessages! || this.bytes >= maxBytes!;
178183
}
184+
185+
/**
186+
* True if we have no messages in leasing.
187+
*
188+
* @returns {boolean}
189+
* @private
190+
*/
191+
isEmpty(): boolean {
192+
return this._messages.size === 0;
193+
}
194+
179195
/**
180196
* Removes a message from the inventory. Stopping the deadline extender if no
181197
* messages are left over.
@@ -216,6 +232,10 @@ export class LeaseManager extends EventEmitter {
216232
this._dispense(this._pending.shift()!);
217233
}
218234

235+
if (this.isEmpty()) {
236+
this.emit('empty');
237+
}
238+
219239
if (this.size === 0 && this._isLeasing) {
220240
this._cancelExtension();
221241
}
@@ -265,6 +285,7 @@ export class LeaseManager extends EventEmitter {
265285
if (this._subscriber.isOpen) {
266286
message.subSpans.flowEnd();
267287
process.nextTick(() => {
288+
message.dispatched();
268289
logs.callbackDelivery.info(
269290
'message (ID %s, ackID %s) delivery to user callbacks',
270291
message.id,

src/message-stream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ const DEFAULT_OPTIONS: MessageStreamOptions = {
7373
highWaterMark: 0,
7474
maxStreams: defaultOptions.subscription.maxStreams,
7575
timeout: 300000,
76-
retryMinBackoff: Duration.from({millis: 100}),
76+
retryMinBackoff: Duration.from({milliseconds: 100}),
7777
retryMaxBackoff: Duration.from({seconds: 60}),
7878
};
7979

src/pubsub.ts

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -352,15 +352,34 @@ export class PubSub {
352352
}
353353

354354
/**
355-
* Closes out this object, releasing any server connections. Note that once
356-
* you close a PubSub object, it may not be used again. Any pending operations
357-
* (e.g. queued publish messages) will fail. If you have topic or subscription
358-
* objects that may have pending operations, you should call close() on those
359-
* first if you want any pending messages to be delivered correctly. The
355+
* Closes the PubSub client, releasing any underlying gRPC connections.
356+
*
357+
* Note that once you close a PubSub object, it may not be used again. Any pending
358+
* operations (e.g. queued publish messages) will fail. If you have topic or
359+
* subscription objects that may have pending operations, you should call close()
360+
* on those first if you want any pending messages to be delivered correctly. The
360361
* PubSub class doesn't track those.
362+
363+
* Note that this method primarily closes the gRPC clients (Publisher and Subscriber)
364+
* used for API requests. It does **not** automatically handle the graceful shutdown
365+
* of active subscriptions.
366+
*
367+
* For graceful shutdown of subscriptions with specific timeout behavior (e.g.,
368+
* ensuring buffered messages are nacked before closing), please refer to the
369+
* {@link Subscription#close} method. It is recommended to call
370+
* `Subscription.close({timeout: ...})` directly on your active `Subscription`
371+
* objects *before* calling `PubSub.close()` if you require that specific
372+
* shutdown behavior.
373+
*
374+
* Calling `PubSub.close()` without first closing active subscriptions might
375+
* result in abrupt termination of message processing for those subscriptions.
376+
* Any pending operations on associated Topic or Subscription objects (e.g.,
377+
* queued publish messages or unacked subscriber messages) may fail after
378+
* `PubSub.close()` is called.
361379
*
362380
* @callback EmptyCallback
363-
* @returns {Promise<void>}
381+
* @param {Error} [err] Request error, if any.
382+
* @returns {Promise<void>} Resolves when the clients are closed.
364383
*/
365384
close(): Promise<void>;
366385
close(callback: EmptyCallback): void;

0 commit comments

Comments
 (0)