Skip to content

Commit aa67999

Browse files
use token bucket and apply exponential backoff in retry loop
1 parent 3850431 commit aa67999

File tree

2 files changed

+116
-52
lines changed

2 files changed

+116
-52
lines changed

src/operations/execute_operation.ts

Lines changed: 113 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { setTimeout } from 'node:timers/promises';
2+
13
import { MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION } from '../cmap/wire_protocol/constants';
24
import {
35
isRetryableReadError,
@@ -10,6 +12,7 @@ import {
1012
MongoInvalidArgumentError,
1113
MongoNetworkError,
1214
MongoNotConnectedError,
15+
MongoOperationTimeoutError,
1316
MongoRuntimeError,
1417
MongoServerError,
1518
MongoTransactionError,
@@ -26,6 +29,7 @@ import {
2629
import type { Topology } from '../sdam/topology';
2730
import type { ClientSession } from '../sessions';
2831
import { TimeoutContext } from '../timeout';
32+
import { RETRY_COST, TOKEN_REFRESH_RATE } from '../token_bucket';
2933
import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils';
3034
import { AggregateOperation } from './aggregate';
3135
import { AbstractOperation, Aspect } from './operation';
@@ -233,71 +237,138 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
233237
session.incrementTransactionNumber();
234238
}
235239

236-
const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
240+
// The maximum number of retry attempts using regular retryable reads/writes logic (not including
241+
// SystemOverLoad error retries).
242+
const maxNonOverloadRetryAttempts = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
237243
let previousOperationError: MongoError | undefined;
238244
const deprioritizedServers = new DeprioritizedServers();
245+
const nonOverloadRetryAttempt = 0;
246+
247+
let systemOverloadRetryAttempt = 0;
248+
const maxSystemOverloadRetryAttempts = 5;
239249

240-
for (let tries = 0; tries < maxTries; tries++) {
250+
while (true) {
241251
if (previousOperationError) {
242-
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
243-
throw new MongoServerError({
244-
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
245-
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
246-
originalError: previousOperationError
252+
if (previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadError)) {
253+
systemOverloadRetryAttempt += 1;
254+
255+
// if the SystemOverloadError is not retryable, throw.
256+
if (!previousOperationError.hasErrorLabel(MongoErrorLabel.RetryableError)) {
257+
throw previousOperationError;
258+
}
259+
260+
// if we have exhausted overload retry attempts, throw.
261+
if (systemOverloadRetryAttempt > maxSystemOverloadRetryAttempts) {
262+
throw previousOperationError;
263+
}
264+
265+
const delayMS =
266+
Math.random() *
267+
Math.min(
268+
10_000, // MAX_BACKOFF,
269+
100 * 2 ** systemOverloadRetryAttempt
270+
);
271+
272+
// if the delay would exhaust the CSOT timeout, short-circuit.
273+
if (timeoutContext.csotEnabled() && delayMS > timeoutContext.remainingTimeMS) {
274+
// TODO: is this the right error to throw?
275+
throw new MongoOperationTimeoutError(
276+
`MongoDB SystemOverload exponential backoff would exceed timeoutMS deadline: remaining CSOT deadline=${timeoutContext.remainingTimeMS}, backoff delayMS=${delayMS}`,
277+
{
278+
cause: previousOperationError
279+
}
280+
);
281+
}
282+
283+
await setTimeout(delayMS);
284+
285+
if (!topology.tokenBucket.consume(RETRY_COST)) {
286+
throw previousOperationError;
287+
}
288+
289+
server = await topology.selectServer(selector, {
290+
session,
291+
operationName: operation.commandName,
292+
deprioritizedServers,
293+
signal: operation.options.signal
294+
});
295+
} else {
296+
// we have no more retry attempts, throw.
297+
if (nonOverloadRetryAttempt >= maxNonOverloadRetryAttempts) {
298+
throw previousOperationError;
299+
}
300+
301+
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
302+
throw new MongoServerError({
303+
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
304+
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
305+
originalError: previousOperationError
306+
});
307+
}
308+
309+
if (
310+
(operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) ||
311+
(hasWriteAspect && !isRetryableWriteError(previousOperationError)) ||
312+
(hasReadAspect && !isRetryableReadError(previousOperationError))
313+
) {
314+
throw previousOperationError;
315+
}
316+
317+
if (
318+
previousOperationError instanceof MongoNetworkError &&
319+
operation.hasAspect(Aspect.CURSOR_CREATING) &&
320+
session != null &&
321+
session.isPinned &&
322+
!session.inTransaction()
323+
) {
324+
session.unpin({ force: true, forceClear: true });
325+
}
326+
327+
server = await topology.selectServer(selector, {
328+
session,
329+
operationName: operation.commandName,
330+
deprioritizedServers,
331+
signal: operation.options.signal
247332
});
248-
}
249-
250-
if (operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) {
251-
throw previousOperationError;
252-
}
253-
254-
if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
255-
throw previousOperationError;
256-
257-
if (hasReadAspect && !isRetryableReadError(previousOperationError)) {
258-
throw previousOperationError;
259-
}
260-
261-
if (
262-
previousOperationError instanceof MongoNetworkError &&
263-
operation.hasAspect(Aspect.CURSOR_CREATING) &&
264-
session != null &&
265-
session.isPinned &&
266-
!session.inTransaction()
267-
) {
268-
session.unpin({ force: true, forceClear: true });
269-
}
270-
271-
server = await topology.selectServer(selector, {
272-
session,
273-
operationName: operation.commandName,
274-
deprioritizedServers,
275-
signal: operation.options.signal
276-
});
277333

278-
if (hasWriteAspect && !supportsRetryableWrites(server)) {
279-
throw new MongoUnexpectedServerResponseError(
280-
'Selected server does not support retryable writes'
281-
);
334+
if (hasWriteAspect && !supportsRetryableWrites(server)) {
335+
throw new MongoUnexpectedServerResponseError(
336+
'Selected server does not support retryable writes'
337+
);
338+
}
282339
}
283340
}
284341

285342
operation.server = server;
286343

287344
try {
288-
// If tries > 0 and we are command batching we need to reset the batch.
289-
if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
345+
// If attempt > 0 and we are command batching we need to reset the batch.
346+
if (nonOverloadRetryAttempt > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
290347
operation.resetBatch();
291348
}
292349

293350
try {
294351
const result = await server.command(operation, timeoutContext);
352+
const isRetry = nonOverloadRetryAttempt > 0 || systemOverloadRetryAttempt > 0;
353+
topology.tokenBucket.deposit(
354+
isRetry
355+
? // on successful retry, deposit the retry cost + the refresh rate.
356+
TOKEN_REFRESH_RATE + RETRY_COST
357+
: // otherwise, just deposit the refresh rate.
358+
TOKEN_REFRESH_RATE
359+
);
295360
return operation.handleOk(result);
296361
} catch (error) {
297362
return operation.handleError(error);
298363
}
299364
} catch (operationError) {
300365
if (!(operationError instanceof MongoError)) throw operationError;
366+
367+
if (!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadError)) {
368+
// if an operation fails with an error that does not contain the SystemOverloadError, deposit 1 token.
369+
topology.tokenBucket.deposit(RETRY_COST);
370+
}
371+
301372
if (
302373
previousOperationError != null &&
303374
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
@@ -311,9 +382,4 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
311382
timeoutContext.clear();
312383
}
313384
}
314-
315-
throw (
316-
previousOperationError ??
317-
new MongoRuntimeError('Tried to propagate retryability error, but no error was found.')
318-
);
319385
}

src/sdam/topology.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import { type Abortable, TypedEventEmitter } from '../mongo_types';
3535
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3636
import type { ClientSession } from '../sessions';
3737
import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
38+
import { TokenBucket } from '../token_bucket';
3839
import type { Transaction } from '../transactions';
3940
import {
4041
addAbortListener,
@@ -207,18 +208,15 @@ export type TopologyEvents = {
207208
* @internal
208209
*/
209210
export class Topology extends TypedEventEmitter<TopologyEvents> {
210-
/** @internal */
211211
s: TopologyPrivate;
212-
/** @internal */
213212
waitQueue: List<ServerSelectionRequest>;
214-
/** @internal */
215213
hello?: Document;
216-
/** @internal */
217214
_type?: string;
218215

216+
tokenBucket = new TokenBucket(1000);
217+
219218
client!: MongoClient;
220219

221-
/** @internal */
222220
private connectionLock?: Promise<Topology>;
223221

224222
/** @event */

0 commit comments

Comments
 (0)