-
Notifications
You must be signed in to change notification settings - Fork 1.8k
WIP feat(DRIVERS-3239): add exponential backoff in operation retry loop #4806
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| import { setTimeout } from 'node:timers/promises'; | ||
|
|
||
| import { MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION } from '../cmap/wire_protocol/constants'; | ||
| import { | ||
| isRetryableReadError, | ||
|
|
@@ -10,6 +12,7 @@ import { | |
| MongoInvalidArgumentError, | ||
| MongoNetworkError, | ||
| MongoNotConnectedError, | ||
| MongoOperationTimeoutError, | ||
| MongoRuntimeError, | ||
| MongoServerError, | ||
| MongoTransactionError, | ||
|
|
@@ -26,7 +29,13 @@ import { | |
| import type { Topology } from '../sdam/topology'; | ||
| import type { ClientSession } from '../sessions'; | ||
| import { TimeoutContext } from '../timeout'; | ||
| import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils'; | ||
| import { RETRY_COST, TOKEN_REFRESH_RATE } from '../token_bucket'; | ||
| import { | ||
| abortable, | ||
| ExponentialBackoffProvider, | ||
| maxWireVersion, | ||
| supportsRetryableWrites | ||
| } from '../utils'; | ||
| import { AggregateOperation } from './aggregate'; | ||
| import { AbstractOperation, Aspect } from './operation'; | ||
|
|
||
|
|
@@ -50,7 +59,7 @@ type ResultTypeFromOperation<TOperation extends AbstractOperation> = ReturnType< | |
| * The expectation is that this function: | ||
| * - Connects the MongoClient if it has not already been connected, see {@link autoConnect} | ||
| * - Creates a session if none is provided and cleans up the session it creates | ||
| * - Tries an operation and retries under certain conditions, see {@link tryOperation} | ||
| * - Tries an operation and retries under certain conditions, see {@link executeOperationWithRetries} | ||
| * | ||
| * @typeParam T - The operation's type | ||
| * @typeParam TResult - The type of the operation's result, calculated from T | ||
|
|
@@ -120,7 +129,7 @@ export async function executeOperation< | |
| }); | ||
|
|
||
| try { | ||
| return await tryOperation(operation, { | ||
| return await executeOperationWithRetries(operation, { | ||
| topology, | ||
| timeoutContext, | ||
| session, | ||
|
|
@@ -184,7 +193,10 @@ type RetryOptions = { | |
| * | ||
| * @param operation - The operation to execute | ||
| * */ | ||
| async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFromOperation<T>>( | ||
| async function executeOperationWithRetries< | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. better name imo. |
||
| T extends AbstractOperation, | ||
| TResult = ResultTypeFromOperation<T> | ||
| >( | ||
| operation: T, | ||
| { topology, timeoutContext, session, readPreference }: RetryOptions | ||
| ): Promise<TResult> { | ||
|
|
@@ -233,11 +245,27 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro | |
| session.incrementTransactionNumber(); | ||
| } | ||
|
|
||
| const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1; | ||
| let previousOperationError: MongoError | undefined; | ||
| const deprioritizedServers = new DeprioritizedServers(); | ||
|
|
||
| for (let tries = 0; tries < maxTries; tries++) { | ||
| const backoffDelayProvider = new ExponentialBackoffProvider( | ||
| 10_000, // MAX_BACKOFF | ||
| 100, // base backoff | ||
| 2 // backoff rate | ||
| ); | ||
|
|
||
| let maxAttempts = | ||
| (operation.maxAttempts ?? willRetry) ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1; | ||
|
|
||
| for ( | ||
| let attempt = 0; | ||
| attempt < maxAttempts; | ||
| attempt++, | ||
| maxAttempts = | ||
| willRetry && previousOperationError?.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) | ||
| ? 6 | ||
| : maxAttempts | ||
| ) { | ||
| if (previousOperationError) { | ||
| if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) { | ||
| throw new MongoServerError({ | ||
|
|
@@ -247,15 +275,39 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro | |
| }); | ||
| } | ||
|
|
||
| if (operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) { | ||
| const isRetryable = | ||
| // bulk write commands are retryable if all operations in the batch are retryable | ||
| (operation.hasAspect(Aspect.COMMAND_BATCHING) && operation.canRetryWrite) || | ||
| // if we have a retryable read or write operation, we can retry | ||
| (hasWriteAspect && willRetryWrite && isRetryableWriteError(previousOperationError)) || | ||
| (hasReadAspect && willRetryRead && isRetryableReadError(previousOperationError)) || | ||
| // if we have a retryable, system overloaded error, we can retry | ||
| (previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) && | ||
| previousOperationError.hasErrorLabel(MongoErrorLabel.RetryableError)); | ||
|
|
||
| if (!isRetryable) { | ||
| throw previousOperationError; | ||
| } | ||
|
|
||
| if (hasWriteAspect && !isRetryableWriteError(previousOperationError)) | ||
| throw previousOperationError; | ||
|
|
||
| if (hasReadAspect && !isRetryableReadError(previousOperationError)) { | ||
| throw previousOperationError; | ||
| if (previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) { | ||
| const delayMS = backoffDelayProvider.getNextBackoffDuration(); | ||
|
|
||
| // if the delay would exhaust the CSOT timeout, short-circuit. | ||
| if (timeoutContext.csotEnabled() && delayMS > timeoutContext.remainingTimeMS) { | ||
| // TODO: is this the right error to throw? | ||
| throw new MongoOperationTimeoutError( | ||
| `MongoDB SystemOverload exponential backoff would exceed timeoutMS deadline: remaining CSOT deadline=${timeoutContext.remainingTimeMS}, backoff delayMS=${delayMS}`, | ||
| { | ||
| cause: previousOperationError | ||
| } | ||
| ); | ||
| } | ||
|
|
||
| if (!topology.tokenBucket.consume(RETRY_COST)) { | ||
| throw previousOperationError; | ||
| } | ||
|
|
||
| await setTimeout(delayMS); | ||
| } | ||
|
|
||
| if ( | ||
|
|
@@ -285,19 +337,34 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro | |
| operation.server = server; | ||
|
|
||
| try { | ||
| // If tries > 0 and we are command batching we need to reset the batch. | ||
| if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) { | ||
| const isRetry = attempt > 0; | ||
|
|
||
| // If attempt > 0 and we are command batching we need to reset the batch. | ||
| if (isRetry && operation.hasAspect(Aspect.COMMAND_BATCHING)) { | ||
| operation.resetBatch(); | ||
| } | ||
|
|
||
| try { | ||
| const result = await server.command(operation, timeoutContext); | ||
| topology.tokenBucket.deposit( | ||
| isRetry | ||
| ? // on successful retry, deposit the retry cost + the refresh rate. | ||
| TOKEN_REFRESH_RATE + RETRY_COST | ||
| : // otherwise, just deposit the refresh rate. | ||
| TOKEN_REFRESH_RATE | ||
| ); | ||
| return operation.handleOk(result); | ||
| } catch (error) { | ||
| return operation.handleError(error); | ||
| } | ||
| } catch (operationError) { | ||
| if (!(operationError instanceof MongoError)) throw operationError; | ||
|
|
||
| if (!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) { | ||
| // if an operation fails with an error that does not contain the SystemOverloadError, deposit 1 token. | ||
| topology.tokenBucket.deposit(RETRY_COST); | ||
| } | ||
|
|
||
| if ( | ||
| previousOperationError != null && | ||
| operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed) | ||
|
|
@@ -312,8 +379,5 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro | |
| } | ||
| } | ||
|
|
||
| throw ( | ||
| previousOperationError ?? | ||
| new MongoRuntimeError('Tried to propagate retryability error, but no error was found.') | ||
| ); | ||
| throw previousOperationError ?? new MongoRuntimeError('ahh'); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| import type { MongoClient, MongoOptions } from './mongo_client'; | ||
| import { TypedEventEmitter } from './mongo_types'; | ||
| import { executeOperation } from './operations/execute_operation'; | ||
| import { RetryAttemptContext } from './operations/operation'; | ||
|
Check failure on line 28 in src/sessions.ts
|
||
| import { RunCommandOperation } from './operations/run_command'; | ||
| import { ReadConcernLevel } from './read_concern'; | ||
| import { ReadPreference } from './read_preference'; | ||
|
|
@@ -468,7 +469,11 @@ | |
| } else { | ||
| const wcKeys = Object.keys(wc); | ||
| if (wcKeys.length > 2 || (!wcKeys.includes('wtimeoutMS') && !wcKeys.includes('wTimeoutMS'))) | ||
| // if the write concern was specified with wTimeoutMS, then we set both wtimeoutMS and wTimeoutMS, guaranteeing at least two keys, so if we have more than two keys, then we can automatically assume that we should add the write concern to the command. If it has 2 or fewer keys, we need to check that those keys aren't the wtimeoutMS or wTimeoutMS options before we add the write concern to the command | ||
| // if the write concern was specified with wTimeoutMS, then we set both wtimeoutMS | ||
| // and wTimeoutMS, guaranteeing at least two keys, so if we have more than two keys, | ||
| // then we can automatically assume that we should add the write concern to the command. | ||
| // If it has 2 or fewer keys, we need to check that those keys aren't the wtimeoutMS | ||
| // or wTimeoutMS options before we add the write concern to the command | ||
| WriteConcern.apply(command, { ...wc, wtimeoutMS: undefined }); | ||
| } | ||
| } | ||
|
|
@@ -489,11 +494,14 @@ | |
| command.recoveryToken = this.transaction.recoveryToken; | ||
| } | ||
|
|
||
| const retryContext = new RetryAttemptContext(5); | ||
|
|
||
| const operation = new RunCommandOperation(new MongoDBNamespace('admin'), command, { | ||
| session: this, | ||
| readPreference: ReadPreference.primary, | ||
| bypassPinningCheck: true | ||
| }); | ||
| operation.attempts = retryContext; | ||
|
Check failure on line 504 in src/sessions.ts
|
||
|
|
||
| const timeoutContext = | ||
| this.timeoutContext ?? | ||
|
|
@@ -518,15 +526,13 @@ | |
| this.unpin({ force: true }); | ||
|
|
||
| try { | ||
| await executeOperation( | ||
| this.client, | ||
| new RunCommandOperation(new MongoDBNamespace('admin'), command, { | ||
| session: this, | ||
| readPreference: ReadPreference.primary, | ||
| bypassPinningCheck: true | ||
| }), | ||
| timeoutContext | ||
| ); | ||
| const op = new RunCommandOperation(new MongoDBNamespace('admin'), command, { | ||
| session: this, | ||
| readPreference: ReadPreference.primary, | ||
| bypassPinningCheck: true | ||
| }); | ||
| op.attempts = retryContext; | ||
|
Check failure on line 534 in src/sessions.ts
|
||
| await executeOperation(this.client, op, timeoutContext); | ||
| return; | ||
| } catch (retryCommitError) { | ||
| // If the retry failed, we process that error instead of the original | ||
|
|
@@ -1013,6 +1019,11 @@ | |
| id: ServerSessionId; | ||
| lastUse: number; | ||
| txnNumber: number; | ||
|
|
||
| /* | ||
| * Indicates that a network error has been encountered while using this session. | ||
| * Once a session is marked as dirty, it is always dirty. | ||
| */ | ||
| isDirty: boolean; | ||
|
|
||
| /** @internal */ | ||
|
|
@@ -1106,16 +1117,15 @@ | |
| * @param session - The session to release to the pool | ||
| */ | ||
| release(session: ServerSession): void { | ||
| const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10; | ||
| if (this.client.topology?.loadBalanced) { | ||
| if (session.isDirty) return; | ||
|
|
||
| if (this.client.topology?.loadBalanced && !sessionTimeoutMinutes) { | ||
| this.sessions.unshift(session); | ||
| } | ||
|
|
||
| if (!sessionTimeoutMinutes) { | ||
| return; | ||
| } | ||
|
|
||
| const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10; | ||
|
|
||
| this.sessions.prune(session => session.hasTimedOut(sessionTimeoutMinutes)); | ||
|
|
||
| if (!session.hasTimedOut(sessionTimeoutMinutes)) { | ||
|
|
@@ -1203,9 +1213,9 @@ | |
| command.autocommit = false; | ||
|
|
||
| if (session.transaction.state === TxnState.STARTING_TRANSACTION) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the transactions spec:
Technically we weren't spec compliant - we transitioned a transaction to "in progress" here (in With client backpressure, we now need to omit |
||
| session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS); | ||
| command.startTransaction = true; | ||
|
|
||
| // TODO: read concern only applied if it is not the same as the server's default | ||
| const readConcern = | ||
| session.transaction.options.readConcern || session?.clientOptions?.readConcern; | ||
| if (readConcern) { | ||
|
|
@@ -1241,4 +1251,17 @@ | |
| session.snapshotTime = atClusterTime; | ||
| } | ||
| } | ||
|
|
||
| if (session.transaction.state === TxnState.STARTING_TRANSACTION) { | ||
| if (document.ok === 1) { | ||
| session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS); | ||
| } else { | ||
| const error = new MongoServerError(document.toObject()); | ||
| const isBackpressureError = error.hasErrorLabel(MongoErrorLabel.RetryableError); | ||
|
|
||
| if (!isBackpressureError) { | ||
| session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS); | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
edge case: if we encounter a network error (such as a failCommand with closeConnection=true) we never get a server response to update a session with, but still need to update the session's transaction, if the session is in a transaction.