Skip to content
Open
7 changes: 7 additions & 0 deletions src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ export class LeaseManager extends EventEmitter {
* Adds a message to the inventory, kicking off the deadline extender if it
* isn't already running.
*
* @fires LeaseManager#full
*
* @param {Message} message The message.
* @private
*/
Expand Down Expand Up @@ -141,6 +143,10 @@ export class LeaseManager extends EventEmitter {
}
/**
* Removes ALL messages from inventory, and returns the ones removed.
*
* @fires LeaseManager#free
* @fires LeaseManager#empty
*
* @private
*/
clear(): Message[] {
Expand Down Expand Up @@ -197,6 +203,7 @@ export class LeaseManager extends EventEmitter {
* messages are left over.
*
* @fires LeaseManager#free
* @fires LeaseManager#empty
*
* @param {Message} message The message to remove.
* @private
Expand Down
9 changes: 5 additions & 4 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ export class Message implements tracing.MessageWithAttributes {
* close() function.
* @property {SubscriberCloseBehavior} [options.behavior] The behavior of the close operation.
* - NackImmediately: Sends nacks for all messages held by the client library, and
* wait for them to send.
* wait for them to send. (default to match old behavior)
* - WaitForProcessing: Continues normal ack/nack and leasing processes until close
* to the timeout, then switches to NackImmediately behavior to close down.
* Use {@link SubscriberCloseBehaviors} for enum values.
Expand Down Expand Up @@ -970,9 +970,10 @@ export class Subscriber extends EventEmitter {

const options = this._options.closeOptions;

// If no behavior is specified, default to Wait.
// If no behavior is specified, default to Nack. This most closely matches
// the old behavior.
const behavior =
options?.behavior ?? SubscriberCloseBehaviors.WaitForProcessing;
options?.behavior ?? SubscriberCloseBehaviors.NackImmediately;

// The timeout can't realistically be longer than the longest time we're willing
// to lease messages.
Expand Down Expand Up @@ -1000,7 +1001,7 @@ export class Subscriber extends EventEmitter {
const shutdownStart = Date.now();
if (
behavior === SubscriberCloseBehaviors.WaitForProcessing &&
!this._inventory.isEmpty
!this._inventory.isEmpty()
) {
const waitTimeout = timeout.subtract(FINAL_NACK_TIMEOUT);

Expand Down
8 changes: 4 additions & 4 deletions system-test/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ describe('pubsub', () => {
const SUB_NAMES = [generateSubName(), generateSubName()];
const SUB_DETACH_NAME = generateSubForDetach();

const thirty = Duration.from({minutes: 30});
const sixty = Duration.from({minutes: 60});
const thirty = Duration.from({seconds: 30});
const sixty = Duration.from({seconds: 60});
const SUBSCRIPTIONS = [
topic.subscription(SUB_NAMES[0], {minAckDeadline: thirty, maxAckDeadline: thirty}),
topic.subscription(SUB_NAMES[1], {minAckDeadline: sixty, maxAckDeadline: sixty}),
Expand Down Expand Up @@ -659,9 +659,9 @@ describe('pubsub', () => {

subscription.on('error', done);

// eslint-disable-next-line @typescript-eslint/no-explicit-any
subscription.on('message', (message: {data: any}) => {
subscription.on('message', message => {
assert.deepStrictEqual(message.data, Buffer.from('hello'));
message.ack();

if (++messageCount === 10) {
subscription.close(done);
Expand Down
2 changes: 1 addition & 1 deletion test/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class FakeLeaseManager extends EventEmitter {
remove(message: s.Message): void {}

_isEmpty = true;
get isEmpty() {
isEmpty(): boolean {
return this._isEmpty;
}
}
Expand Down
Loading