diff --git a/Tokenization/backend/wrapper/src/client/connectionManager/CentralConnection.ts b/Tokenization/backend/wrapper/src/client/connectionManager/CentralConnection.ts index 2d9907de6..88f6bfb15 100644 --- a/Tokenization/backend/wrapper/src/client/connectionManager/CentralConnection.ts +++ b/Tokenization/backend/wrapper/src/client/connectionManager/CentralConnection.ts @@ -16,6 +16,7 @@ import type * as grpc from '@grpc/grpc-js'; import { LogManager } from '@aliceo2/web-ui'; import type { CentralCommandDispatcher } from './eventManagement/CentralCommandDispatcher'; import type { DuplexMessageModel } from '../../models/message.model'; +import { ReconnectionScheduler } from '../../utils/connection/reconnectionScheduler'; /** * This class manages the duplex stream with the CentralSystem gRPC service. @@ -24,6 +25,11 @@ import type { DuplexMessageModel } from '../../models/message.model'; export class CentralConnection { private _logger = LogManager.getLogger('CentralConnection'); private _stream?: grpc.ClientDuplexStream; + private _reconnectionScheduler: ReconnectionScheduler = new ReconnectionScheduler( + () => this.connect(), + { initialDelay: 1000, maxDelay: 30000 }, + this._logger + ); /** * Constructor for the CentralConnection class. @@ -44,32 +50,23 @@ export class CentralConnection { this._stream?.on('data', (payload: DuplexMessageModel) => { this._logger.debugMessage(`Received payload: ${JSON.stringify(payload)}`); + this._reconnectionScheduler.reset(); this._dispatcher.dispatch(payload); }); this._stream?.on('end', () => { this._logger.infoMessage(`Stream ended, attempting to reconnect...`); this._stream = undefined; - this.scheduleReconnect(); + this._reconnectionScheduler.schedule(); }); this._stream?.on('error', (err: any) => { this._logger.infoMessage('Stream error:', err, ' attempting to reconnect...'); this._stream = undefined; - this.scheduleReconnect(); + this._reconnectionScheduler.schedule(); }); } - /** - * Schedules a reconnect with exponential backoff. - */ - private scheduleReconnect() { - setTimeout(() => { - this._logger.infoMessage(`Trying to reconnect...`); - this.connect(); - }, 2000); - } - /** * Starts the connection to the central system. */ diff --git a/Tokenization/backend/wrapper/src/test/utils/connection/reconnectionScheduler.test.ts b/Tokenization/backend/wrapper/src/test/utils/connection/reconnectionScheduler.test.ts new file mode 100644 index 000000000..8107267cd --- /dev/null +++ b/Tokenization/backend/wrapper/src/test/utils/connection/reconnectionScheduler.test.ts @@ -0,0 +1,123 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +import { ReconnectionScheduler } from '../../../utils/connection/reconnectionScheduler'; + +describe('ReconnectionScheduler', () => { + let reconnectCallback: jest.Mock; + let logger: { infoMessage: jest.Mock; errorMessage?: jest.Mock }; + let scheduler: ReconnectionScheduler; + + beforeEach(() => { + jest.useFakeTimers(); + reconnectCallback = jest.fn(); + logger = { + infoMessage: jest.fn(), + errorMessage: jest.fn(), + }; + + scheduler = new ReconnectionScheduler( + reconnectCallback, + { + initialDelay: 1000, + maxDelay: 8000, + }, + logger as any + ); + }); + + afterEach(() => { + jest.clearAllTimers(); + jest.useRealTimers(); + }); + + test("schedule's first attempt should schedule and call reconnectCallback", () => { + scheduler.schedule(); + + expect(logger.infoMessage).toHaveBeenCalledWith('Recconection attempt #1: Sleep for 2000 ms.'); + + expect(reconnectCallback).not.toHaveBeenCalled(); + jest.advanceTimersByTime(1999); + expect(reconnectCallback).not.toHaveBeenCalled(); + jest.advanceTimersByTime(1); + expect(reconnectCallback).toHaveBeenCalledTimes(1); + }); + + test('Schedule attempts should be exponential', () => { + scheduler.schedule(); + jest.advanceTimersByTime(2000); + + scheduler.schedule(); + expect(logger.infoMessage).toHaveBeenLastCalledWith('Recconection attempt #2: Sleep for 4000 ms.'); + jest.advanceTimersByTime(4000); + expect(reconnectCallback).toHaveBeenCalledTimes(2); + }); + + test("schedule's delay should be limited by maxDelay", () => { + scheduler = new ReconnectionScheduler( + reconnectCallback, + { + initialDelay: 1000, + maxDelay: 3000, + }, + logger as any + ); + + scheduler.schedule(); + expect(logger.infoMessage).toHaveBeenLastCalledWith('Recconection attempt #1: Sleep for 2000 ms.'); + jest.advanceTimersByTime(2000); + + scheduler.schedule(); + expect(logger.infoMessage).toHaveBeenLastCalledWith('Recconection attempt #2: Sleep for 3000 ms.'); + jest.advanceTimersByTime(3000); + + scheduler.schedule(); + expect(logger.infoMessage).toHaveBeenLastCalledWith('Recconection attempt #3: Sleep for 3000 ms.'); + }); + + test('schedule() should not schedule again if it is scheduled', () => { + scheduler.schedule(); + scheduler.schedule(); + + expect(logger.infoMessage).toHaveBeenCalledTimes(1); + + jest.advanceTimersByTime(100000); + expect(reconnectCallback).toHaveBeenCalledTimes(1); + }); + + test('reset() should clear timer, reset attemptCount and currentDelay', () => { + scheduler.schedule(); + + jest.advanceTimersByTime(500); + expect(reconnectCallback).not.toHaveBeenCalled(); + + scheduler.reset(); + + jest.advanceTimersByTime(100000); + expect(reconnectCallback).not.toHaveBeenCalled(); + + scheduler.schedule(); + expect(logger.infoMessage).toHaveBeenLastCalledWith('Recconection attempt #1: Sleep for 2000 ms.'); + }); + + test('reset() should ignore another reset due to isResseting variable', () => { + scheduler.schedule(); + scheduler.reset(); + scheduler.reset(); + scheduler.schedule(); + + jest.advanceTimersByTime(2000); + expect(reconnectCallback).toHaveBeenCalledTimes(1); + }); +}); diff --git a/Tokenization/backend/wrapper/src/utils/connection/reconnectionScheduler.ts b/Tokenization/backend/wrapper/src/utils/connection/reconnectionScheduler.ts new file mode 100644 index 000000000..4b03d434d --- /dev/null +++ b/Tokenization/backend/wrapper/src/utils/connection/reconnectionScheduler.ts @@ -0,0 +1,88 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +export interface ReconnectionOptions { + initialDelay?: number; // Initial delay in ms + maxDelay?: number; // Maximum delay in ms +} + +/** + * A scheduler that manages reconnection attempts with an exponential backoff. + */ +export class ReconnectionScheduler { + private reconnectCallback: any; + private initialDelay: number; + private maxDelay: number; + private currentDelay: number; + private attemptCount: number; + private timeoutId: any; + private logger: Logger; + + private isResetting: boolean = false; + private isScheduling: boolean = false; + + /** + * Creates a new instance of the ReconnectionScheduler. + * @param {any} reconnectCallback - The callback to be called when a reconnection attempt is scheduled. + * @param {ReconnectionOptions} [options] - Options for the reconnection schedule. + * @param {Logger} logger - The logger instance to be used for logging messages. + */ + constructor(reconnectCallback: any, options: ReconnectionOptions = {}, logger: Logger) { + this.reconnectCallback = reconnectCallback; + this.initialDelay = options.initialDelay ?? 1000; + this.maxDelay = options.maxDelay ?? 30000; + + this.currentDelay = this.initialDelay; + this.attemptCount = 0; + this.timeoutId = null; + + this.logger = logger; + } + + /** + * Schedules the next reconnection attempt using exponential backoff. + */ + schedule() { + if (this.isScheduling) return; + this.isScheduling = true; + this.isResetting = false; + this.attemptCount++; + + // Exponential backoff calculation + const delay = this.initialDelay * Math.pow(2, this.attemptCount); + + this.currentDelay = Math.min(this.maxDelay, delay); + + this.logger.infoMessage(`Recconection attempt #${this.attemptCount}: Sleep for ${this.currentDelay.toFixed(0)} ms.`); + + // Plan the reconnection attempt + this.timeoutId = setTimeout(() => { + this.isScheduling = false; + this.reconnectCallback(); + }, this.currentDelay); + } + + /** + * Resets the scheduler to its initial state. + */ + reset() { + if (this.isResetting) return; + this.isScheduling = false; + this.isResetting = true; + + clearTimeout(this.timeoutId); + this.attemptCount = 0; + this.currentDelay = this.initialDelay; + } +} diff --git a/Tokenization/backend/wrapper/src/utils/types/webui.d.ts b/Tokenization/backend/wrapper/src/utils/types/webui.d.ts index 9dfad4fb3..63b026090 100644 --- a/Tokenization/backend/wrapper/src/utils/types/webui.d.ts +++ b/Tokenization/backend/wrapper/src/utils/types/webui.d.ts @@ -14,11 +14,13 @@ declare module '@aliceo2/web-ui' { export const LogManager: { - getLogger: (name: string) => { - infoMessage: (...args: any[]) => void; - errorMessage: (...args: any[]) => void; - warnMessage: (...args: any[]) => void; - debugMessage: (...args: any[]) => void; - }; + getLogger: (name: string) => Logger; }; } + +declare interface Logger { + infoMessage: (...args: any[]) => void; + errorMessage: (...args: any[]) => void; + warnMessage: (...args: any[]) => void; + debugMessage: (...args: any[]) => void; +}