From 32b1a8c178d8d6814df0493f03c738134a781414 Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Wed, 29 Jan 2025 09:59:25 -0600 Subject: [PATCH 1/7] Add configurable retry behavior with reasonable defaults for count, wait, backoff, and jitter This makes retry behavior consistent between sync and async clients, and in its default configuration will get users past network interruptions. --- indico/config/config.py | 9 +++ indico/http/client.py | 19 ++++- indico/http/retry.py | 147 +++++++++++++++++++--------------- tests/unit/http/test_retry.py | 73 +++++++++++++++++ 4 files changed, 180 insertions(+), 68 deletions(-) create mode 100644 tests/unit/http/test_retry.py diff --git a/indico/config/config.py b/indico/config/config.py index 04713351..54f34a9b 100644 --- a/indico/config/config.py +++ b/indico/config/config.py @@ -22,6 +22,10 @@ class IndicoConfig: api_token= (str, optional): The actual text of the API Token. Takes precedence over api_token_path verify_ssl= (bool, optional): Whether to verify the host's SSL certificate. Default=True requests_params= (dict, optional): Dictionary of requests. Session parameters to set + retry_count= (int, optional): Retry API calls this many times. + retry_wait= (float, optional): Wait this many seconds after the first error before retrying. + retry_backoff= (float, optional): Multiply the wait time by this amount for each additional error. + retry_jitter= (float, optional): Add a random amount of time (up to this percent as a decimal) to the wait time to prevent simultaneous retries. Returns: IndicoConfig object @@ -42,6 +46,11 @@ def __init__(self, **kwargs: "Any"): self.requests_params: "Optional[AnyDict]" = None self._disable_cookie_domain: bool = False + self.retry_count: int = int(os.getenv("INDICO_retry_count", "4")) + self.retry_wait: float = float(os.getenv("INDICO_retry_wait", "1")) + self.retry_backoff: float = float(os.getenv("INDICO_retry_backoff", "4")) + self.retry_jitter: float = float(os.getenv("INDICO_retry_jitter", "1")) + for key, value in kwargs.items(): if hasattr(self, key): setattr(self, key, value) diff --git a/indico/http/client.py b/indico/http/client.py index a4ab3513..98ce1cec 100644 --- a/indico/http/client.py +++ b/indico/http/client.py @@ -17,7 +17,7 @@ ) from indico.http.serialization import aio_deserialize, deserialize -from .retry import aioretry +from .retry import retry if TYPE_CHECKING: # pragma: no cover from http.cookiejar import Cookie @@ -50,6 +50,14 @@ class HTTPClient: def __init__(self, config: "Optional[IndicoConfig]" = None): self.config = config or IndicoConfig() self.base_url = f"{self.config.protocol}://{self.config.host}" + self._decorate_with_retry = retry( + requests.RequestException, + count=self.config.retry_count, + wait=self.config.retry_wait, + backoff=self.config.retry_backoff, + jitter=self.config.retry_jitter, + ) + self._make_request = self._decorate_with_retry(self._make_request) # type: ignore[method-assign] self.request_session = requests.Session() if isinstance(self.config.requests_params, dict): @@ -232,6 +240,14 @@ def __init__(self, config: "Optional[IndicoConfig]" = None): """ self.config = config or IndicoConfig() self.base_url = f"{self.config.protocol}://{self.config.host}" + self._decorate_with_retry = retry( + aiohttp.ClientConnectionError, + count=self.config.retry_count, + wait=self.config.retry_wait, + backoff=self.config.retry_backoff, + jitter=self.config.retry_jitter, + ) + self._make_request = self._decorate_with_retry(self._make_request) # type: ignore[method-assign] self.request_session = aiohttp.ClientSession() if isinstance(self.config.requests_params, dict): @@ -316,7 +332,6 @@ def _handle_files( for f in files: f.close() - @aioretry(aiohttp.ClientConnectionError, aiohttp.ServerDisconnectedError) async def _make_request( self, method: str, diff --git a/indico/http/retry.py b/indico/http/retry.py index 85ed1850..11c29055 100644 --- a/indico/http/retry.py +++ b/indico/http/retry.py @@ -1,85 +1,100 @@ import asyncio import time from functools import wraps -from random import randint -from typing import TYPE_CHECKING +from inspect import iscoroutinefunction +from random import random +from typing import TYPE_CHECKING, overload -if TYPE_CHECKING: # pragma: no cover - from typing import Awaitable, Callable, Optional, Tuple, Type, TypeVar, Union +if TYPE_CHECKING: + import sys + from collections.abc import Awaitable, Callable + from typing import Type - from typing_extensions import ParamSpec + if sys.version_info >= (3, 10): + from typing import ParamSpec, TypeVar + else: + from typing_extensions import ParamSpec, TypeVar - P = ParamSpec("P") - T = TypeVar("T") + ArgumentsType = ParamSpec("ArgumentsType") + OuterReturnType = TypeVar("OuterReturnType") + InnerReturnType = TypeVar("InnerReturnType") def retry( - *ExceptionTypes: "Type[Exception]", tries: int = 3, delay: int = 1, backoff: int = 2 -) -> "Callable[[Callable[P, T]], Callable[P, T]]": + *errors: "Type[Exception]", + count: int, + wait: float, + backoff: float, + jitter: float, +) -> "Callable[[Callable[ArgumentsType, OuterReturnType]], Callable[ArgumentsType, OuterReturnType]]": # noqa: E501 """ - Retry with exponential backoff + Decorate a function or coroutine to retry when it raises specified errors, + apply exponential backoff and jitter to the wait time, + and raise the last error if it retries too many times. - Original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry + Arguments: + errors: Retry the function when it raises one of these errors. + count: Retry the function this many times. + wait: Wait this many seconds after the first error before retrying. + backoff: Multiply the wait time by this amount for each additional error. + jitter: Add a random amount of time (up to this percent as a decimal) + to the wait time to prevent simultaneous retries. """ - def retry_decorator(f: "Callable[P, T]") -> "Callable[P, T]": - @wraps(f) - def retry_fn(*args: "P.args", **kwargs: "P.kwargs") -> "T": - n_tries, n_delay = tries, delay - while n_tries > 1: - try: - return f(*args, **kwargs) - except ExceptionTypes: - time.sleep(n_delay) - n_tries -= 1 - n_delay *= backoff - return f(*args, **kwargs) - - return retry_fn + def wait_time(times_retried: int) -> float: + """ + Calculate the sleep time based on number of times retried. + """ + return wait * backoff**times_retried * (1 + jitter * random()) - return retry_decorator + @overload + def retry_decorator( + decorated: "Callable[ArgumentsType, Awaitable[InnerReturnType]]", + ) -> "Callable[ArgumentsType, Awaitable[InnerReturnType]]": ... + @overload + def retry_decorator( + decorated: "Callable[ArgumentsType, InnerReturnType]", + ) -> "Callable[ArgumentsType, InnerReturnType]": ... -def aioretry( - *ExceptionTypes: "Type[Exception]", - tries: int = 3, - delay: "Union[int, Tuple[int, int]]" = 1, - backoff: int = 2, - condition: "Optional[Callable[[Exception], bool]]" = None, -) -> "Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]": - """ - Retry with exponential backoff - - Original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry - Options: - condition: Callable to evaluate if an exception of a given type - is retryable for additional handling - delay: an initial time to wait (seconds). If a tuple, choose a random number - in that range to start. This can helps prevent retries at the exact - same time across multiple concurrent function calls - """ + def retry_decorator( + decorated: "Callable[ArgumentsType, InnerReturnType]", + ) -> "Callable[ArgumentsType, Awaitable[InnerReturnType]] | Callable[ArgumentsType, InnerReturnType]": # noqa: E501 + """ + Decorate either a function or coroutine as appropriate. + """ + if iscoroutinefunction(decorated): + + @wraps(decorated) + async def retrying_coroutine( # type: ignore[return] + *args: "ArgumentsType.args", **kwargs: "ArgumentsType.kwargs" + ) -> "InnerReturnType": + for times_retried in range(count + 1): + try: + return await decorated(*args, **kwargs) # type: ignore[no-any-return] + except errors: + if times_retried >= count: + raise + + await asyncio.sleep(wait_time(times_retried)) + + return retrying_coroutine + + else: + + @wraps(decorated) + def retrying_function( # type: ignore[return] + *args: "ArgumentsType.args", **kwargs: "ArgumentsType.kwargs" + ) -> "InnerReturnType": + for times_retried in range(count + 1): + try: + return decorated(*args, **kwargs) + except errors: + if times_retried >= count: + raise + + time.sleep(wait_time(times_retried)) - def retry_decorator(f: "Callable[P, Awaitable[T]]") -> "Callable[P, Awaitable[T]]": - @wraps(f) - async def retry_fn(*args: "P.args", **kwargs: "P.kwargs") -> "T": - n_tries = tries - if isinstance(delay, tuple): - # pick a random number to sleep - n_delay = randint(*delay) - else: - n_delay = delay - while True: - try: - return await f(*args, **kwargs) - except ExceptionTypes as e: - if condition and not condition(e): - raise - await asyncio.sleep(n_delay) - n_tries -= 1 - n_delay *= backoff - if n_tries <= 0: - raise - - return retry_fn + return retrying_function return retry_decorator diff --git a/tests/unit/http/test_retry.py b/tests/unit/http/test_retry.py new file mode 100644 index 00000000..618e9ebd --- /dev/null +++ b/tests/unit/http/test_retry.py @@ -0,0 +1,73 @@ +import pytest + +from indico.http.retry import retry + + +def test_no_errors() -> None: + @retry(Exception, count=0, wait=0, backoff=0, jitter=0) + def no_errors() -> bool: + return True + + assert no_errors() + + +def test_raises_errors() -> None: + calls = 0 + + @retry(RuntimeError, count=4, wait=0, backoff=0, jitter=0) + def raises_errors() -> None: + nonlocal calls + calls += 1 + raise RuntimeError() + + with pytest.raises(RuntimeError): + raises_errors() + + assert calls == 5 + + +def test_raises_other_errors() -> None: + calls = 0 + + @retry(RuntimeError, count=4, wait=0, backoff=0, jitter=0) + def raises_errors() -> None: + nonlocal calls + calls += 1 + raise ValueError() + + with pytest.raises(ValueError): + raises_errors() + + assert calls == 1 + + +@pytest.mark.asyncio +async def test_raises_errors_async() -> None: + calls = 0 + + @retry(RuntimeError, count=4, wait=0, backoff=0, jitter=0) + async def raises_errors() -> None: + nonlocal calls + calls += 1 + raise RuntimeError() + + with pytest.raises(RuntimeError): + await raises_errors() + + assert calls == 5 + + +@pytest.mark.asyncio +async def test_raises_other_errors_async() -> None: + calls = 0 + + @retry(RuntimeError, count=4, wait=0, backoff=0, jitter=0) + async def raises_errors() -> None: + nonlocal calls + calls += 1 + raise ValueError() + + with pytest.raises(ValueError): + await raises_errors() + + assert calls == 1 From 1f9ebc114da45663a4545eec72e30b3246e0a123 Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Wed, 29 Jan 2025 10:16:54 -0600 Subject: [PATCH 2/7] Make socket connect and read timeouts consistent between sync and async clients --- indico/http/client.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/indico/http/client.py b/indico/http/client.py index 98ce1cec..63ac820f 100644 --- a/indico/http/client.py +++ b/indico/http/client.py @@ -177,6 +177,7 @@ def _make_request( f"{self.base_url}{path}", headers=headers, stream=True, + timeout=(4, 64), verify=False if not self.config.verify_ssl or not self.request_session.verify else True, @@ -361,6 +362,7 @@ async def _make_request( async with getattr(self.request_session, method)( f"{self.base_url}{path}", headers=headers, + timeout=aiohttp.ClientTimeout(sock_connect=4, sock_read=64), verify_ssl=self.config.verify_ssl, **request_kwargs, ) as response: From ea96ef6c0239a630cbcc4185565b983aeb3f8b9c Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Mon, 20 Oct 2025 15:02:15 -0500 Subject: [PATCH 3/7] Fix environment variable casing for retry arguments --- indico/config/config.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/indico/config/config.py b/indico/config/config.py index 54f34a9b..f346c7f3 100644 --- a/indico/config/config.py +++ b/indico/config/config.py @@ -46,10 +46,10 @@ def __init__(self, **kwargs: "Any"): self.requests_params: "Optional[AnyDict]" = None self._disable_cookie_domain: bool = False - self.retry_count: int = int(os.getenv("INDICO_retry_count", "4")) - self.retry_wait: float = float(os.getenv("INDICO_retry_wait", "1")) - self.retry_backoff: float = float(os.getenv("INDICO_retry_backoff", "4")) - self.retry_jitter: float = float(os.getenv("INDICO_retry_jitter", "1")) + self.retry_count: int = int(os.getenv("INDICO_RETRY_COUNT", "4")) + self.retry_wait: float = float(os.getenv("INDICO_RETRY_WAIT", "1")) + self.retry_backoff: float = float(os.getenv("INDICO_RETRY_BACKOFF", "4")) + self.retry_jitter: float = float(os.getenv("INDICO_RETRY_JITTER", "1")) for key, value in kwargs.items(): if hasattr(self, key): From 0fbfc54f1df9bff8a3dd04d28a94fddd2320d161 Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Tue, 18 Nov 2025 07:59:29 -0600 Subject: [PATCH 4/7] Simplify `_handle_files`'s `verify` conditional in `_make_request` --- indico/http/client.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/indico/http/client.py b/indico/http/client.py index 63ac820f..9cf0e417 100644 --- a/indico/http/client.py +++ b/indico/http/client.py @@ -178,9 +178,7 @@ def _make_request( headers=headers, stream=True, timeout=(4, 64), - verify=False - if not self.config.verify_ssl or not self.request_session.verify - else True, + verify=(self.config.verify_ssl and self.request_session.verify), **new_kwargs, ) From c58b697a7a10aca3e79daaa3a554d9e86f396d17 Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Tue, 18 Nov 2025 08:08:33 -0600 Subject: [PATCH 5/7] Replace method assignment with dynamic wrapper method --- indico/http/client.py | 64 ++++++++++++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/indico/http/client.py b/indico/http/client.py index 9cf0e417..27fe39b1 100644 --- a/indico/http/client.py +++ b/indico/http/client.py @@ -50,14 +50,6 @@ class HTTPClient: def __init__(self, config: "Optional[IndicoConfig]" = None): self.config = config or IndicoConfig() self.base_url = f"{self.config.protocol}://{self.config.host}" - self._decorate_with_retry = retry( - requests.RequestException, - count=self.config.retry_count, - wait=self.config.retry_wait, - backoff=self.config.retry_backoff, - jitter=self.config.retry_jitter, - ) - self._make_request = self._decorate_with_retry(self._make_request) # type: ignore[method-assign] self.request_session = requests.Session() if isinstance(self.config.requests_params, dict): @@ -168,6 +160,30 @@ def _make_request( headers: "Optional[Dict[str, str]]" = None, _refreshed: bool = False, **request_kwargs: "Any", + ) -> "Any": + return retry( + requests.RequestException, + count=self.config.retry_count, + wait=self.config.retry_wait, + backoff=self.config.retry_backoff, + jitter=self.config.retry_jitter, + )( + self._make_request_once, + )( + method=method, + path=path, + headers=headers, + _refreshed=_refreshed, + **request_kwargs, + ) + + def _make_request_once( + self, + method: str, + path: str, + headers: "Optional[Dict[str, str]]" = None, + _refreshed: bool = False, + **request_kwargs: "Any", ) -> "Any": logger.debug( f"[{method}] {path}\n\t Headers: {headers}\n\tRequest Args:{request_kwargs}" @@ -239,14 +255,6 @@ def __init__(self, config: "Optional[IndicoConfig]" = None): """ self.config = config or IndicoConfig() self.base_url = f"{self.config.protocol}://{self.config.host}" - self._decorate_with_retry = retry( - aiohttp.ClientConnectionError, - count=self.config.retry_count, - wait=self.config.retry_wait, - backoff=self.config.retry_backoff, - jitter=self.config.retry_jitter, - ) - self._make_request = self._decorate_with_retry(self._make_request) # type: ignore[method-assign] self.request_session = aiohttp.ClientSession() if isinstance(self.config.requests_params, dict): @@ -338,6 +346,30 @@ async def _make_request( headers: "Optional[Dict[str, str]]" = None, _refreshed: bool = False, **request_kwargs: "Any", + ) -> "Any": + return await retry( + aiohttp.ClientConnectionError, + count=self.config.retry_count, + wait=self.config.retry_wait, + backoff=self.config.retry_backoff, + jitter=self.config.retry_jitter, + )( + self._make_request_once, + )( + method=method, + path=path, + headers=headers, + _refreshed=_refreshed, + **request_kwargs, + ) + + async def _make_request_once( + self, + method: str, + path: str, + headers: "Optional[Dict[str, str]]" = None, + _refreshed: bool = False, + **request_kwargs: "Any", ) -> "Any": logger.debug( f"[{method}] {path}\n\t Headers: {headers}\n\tRequest Args:{request_kwargs}" From d6ec2b0bf9c2d747aac94f1e5be6c57b0828ae6e Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Tue, 18 Nov 2025 10:34:06 -0600 Subject: [PATCH 6/7] Make socket connect and read timeouts configurable --- indico/config/config.py | 9 +++++++++ indico/http/client.py | 10 ++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/indico/config/config.py b/indico/config/config.py index f346c7f3..58136adf 100644 --- a/indico/config/config.py +++ b/indico/config/config.py @@ -26,6 +26,8 @@ class IndicoConfig: retry_wait= (float, optional): Wait this many seconds after the first error before retrying. retry_backoff= (float, optional): Multiply the wait time by this amount for each additional error. retry_jitter= (float, optional): Add a random amount of time (up to this percent as a decimal) to the wait time to prevent simultaneous retries. + socket_connect_timeout= (float, optional): Number of seconds to wait for the a connection to the server. + socket_read_timeout= (float, optional): Number of seconds to wait for the server to send a response. Returns: IndicoConfig object @@ -51,6 +53,13 @@ def __init__(self, **kwargs: "Any"): self.retry_backoff: float = float(os.getenv("INDICO_RETRY_BACKOFF", "4")) self.retry_jitter: float = float(os.getenv("INDICO_RETRY_JITTER", "1")) + self.socket_connect_timeout: float = float( + os.getenv("INDICO_SOCKET_CONNECT_TIMEOUT", "4") + ) + self.socket_read_timeout: float = float( + os.getenv("INDICO_SOCKET_READ_TIMEOUT", "64") + ) + for key, value in kwargs.items(): if hasattr(self, key): setattr(self, key, value) diff --git a/indico/http/client.py b/indico/http/client.py index 27fe39b1..43b2bd51 100644 --- a/indico/http/client.py +++ b/indico/http/client.py @@ -193,7 +193,10 @@ def _make_request_once( f"{self.base_url}{path}", headers=headers, stream=True, - timeout=(4, 64), + timeout=( + self.config.socket_connect_timeout, + self.config.socket_read_timeout, + ), verify=(self.config.verify_ssl and self.request_session.verify), **new_kwargs, ) @@ -392,7 +395,10 @@ async def _make_request_once( async with getattr(self.request_session, method)( f"{self.base_url}{path}", headers=headers, - timeout=aiohttp.ClientTimeout(sock_connect=4, sock_read=64), + timeout=aiohttp.ClientTimeout( + sock_connect=self.config.socket_connect_timeout, + sock_read=self.config.socket_read_timeout, + ), verify_ssl=self.config.verify_ssl, **request_kwargs, ) as response: From f45cdda447caac1711c45375cfca5f3051e4526c Mon Sep 17 00:00:00 2001 From: Michael Welborn Date: Fri, 21 Nov 2025 08:56:57 -0600 Subject: [PATCH 7/7] Deduplicate retry return type `TypeVar` --- indico/http/retry.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/indico/http/retry.py b/indico/http/retry.py index 11c29055..04444b43 100644 --- a/indico/http/retry.py +++ b/indico/http/retry.py @@ -16,8 +16,7 @@ from typing_extensions import ParamSpec, TypeVar ArgumentsType = ParamSpec("ArgumentsType") - OuterReturnType = TypeVar("OuterReturnType") - InnerReturnType = TypeVar("InnerReturnType") + ReturnType = TypeVar("ReturnType") def retry( @@ -26,7 +25,7 @@ def retry( wait: float, backoff: float, jitter: float, -) -> "Callable[[Callable[ArgumentsType, OuterReturnType]], Callable[ArgumentsType, OuterReturnType]]": # noqa: E501 +) -> "Callable[[Callable[ArgumentsType, ReturnType]], Callable[ArgumentsType, ReturnType]]": # noqa: E501 """ Decorate a function or coroutine to retry when it raises specified errors, apply exponential backoff and jitter to the wait time, @@ -49,17 +48,17 @@ def wait_time(times_retried: int) -> float: @overload def retry_decorator( - decorated: "Callable[ArgumentsType, Awaitable[InnerReturnType]]", - ) -> "Callable[ArgumentsType, Awaitable[InnerReturnType]]": ... + decorated: "Callable[ArgumentsType, Awaitable[ReturnType]]", + ) -> "Callable[ArgumentsType, Awaitable[ReturnType]]": ... @overload def retry_decorator( - decorated: "Callable[ArgumentsType, InnerReturnType]", - ) -> "Callable[ArgumentsType, InnerReturnType]": ... + decorated: "Callable[ArgumentsType, ReturnType]", + ) -> "Callable[ArgumentsType, ReturnType]": ... def retry_decorator( - decorated: "Callable[ArgumentsType, InnerReturnType]", - ) -> "Callable[ArgumentsType, Awaitable[InnerReturnType]] | Callable[ArgumentsType, InnerReturnType]": # noqa: E501 + decorated: "Callable[ArgumentsType, ReturnType]", + ) -> "Callable[ArgumentsType, Awaitable[ReturnType]] | Callable[ArgumentsType, ReturnType]": # noqa: E501 """ Decorate either a function or coroutine as appropriate. """ @@ -68,7 +67,7 @@ def retry_decorator( @wraps(decorated) async def retrying_coroutine( # type: ignore[return] *args: "ArgumentsType.args", **kwargs: "ArgumentsType.kwargs" - ) -> "InnerReturnType": + ) -> "ReturnType": for times_retried in range(count + 1): try: return await decorated(*args, **kwargs) # type: ignore[no-any-return] @@ -85,7 +84,7 @@ async def retrying_coroutine( # type: ignore[return] @wraps(decorated) def retrying_function( # type: ignore[return] *args: "ArgumentsType.args", **kwargs: "ArgumentsType.kwargs" - ) -> "InnerReturnType": + ) -> "ReturnType": for times_retried in range(count + 1): try: return decorated(*args, **kwargs)