diff --git a/agentex/src/adapters/streams/adapter_redis.py b/agentex/src/adapters/streams/adapter_redis.py index d283dc8..7af40d5 100644 --- a/agentex/src/adapters/streams/adapter_redis.py +++ b/agentex/src/adapters/streams/adapter_redis.py @@ -54,11 +54,13 @@ async def send_data(self, topic: str, data: dict[str, Any]) -> str: logger.info(f"Publishing data to stream {topic}, data: {data_json}") - # Add to Redis stream with a reasonable max length + # Add to Redis stream with maxlen to prevent unbounded growth await self.send_redis_connection_metrics() message_id = await self.redis.xadd( name=topic, fields={"data": data_json}, + maxlen=self.environment_variables.REDIS_STREAM_MAXLEN, + approximate=True, # Use ~ for better performance (O(1) vs O(N)) ) return message_id except Exception as e: diff --git a/agentex/src/config/environment_variables.py b/agentex/src/config/environment_variables.py index 1222b61..f82743b 100644 --- a/agentex/src/config/environment_variables.py +++ b/agentex/src/config/environment_variables.py @@ -40,6 +40,7 @@ class EnvVarKeys(str, Enum): REDIS_MAX_CONNECTIONS = "REDIS_MAX_CONNECTIONS" REDIS_CONNECTION_TIMEOUT = "REDIS_CONNECTION_TIMEOUT" REDIS_SOCKET_TIMEOUT = "REDIS_SOCKET_TIMEOUT" + REDIS_STREAM_MAXLEN = "REDIS_STREAM_MAXLEN" IMAGE_PULL_SECRET_NAME = "IMAGE_PULL_SECRET_NAME" AGENTEX_AUTH_URL = "AGENTEX_AUTH_URL" ALLOWED_ORIGINS = "ALLOWED_ORIGINS" @@ -88,6 +89,9 @@ class EnvironmentVariables(BaseModel): REDIS_MAX_CONNECTIONS: int = 50 # Increased for SSE streaming REDIS_CONNECTION_TIMEOUT: int = 60 # Connection timeout in seconds REDIS_SOCKET_TIMEOUT: int = 30 # Socket timeout in seconds + REDIS_STREAM_MAXLEN: int = ( + 10000 # Max entries per Redis stream to prevent unbounded growth + ) IMAGE_PULL_SECRET_NAME: str | None = None AGENTEX_AUTH_URL: str | None = None ALLOWED_ORIGINS: str | None = None @@ -146,6 +150,9 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: REDIS_SOCKET_TIMEOUT=int( os.environ.get(EnvVarKeys.REDIS_SOCKET_TIMEOUT, "30") ), + REDIS_STREAM_MAXLEN=int( + os.environ.get(EnvVarKeys.REDIS_STREAM_MAXLEN, "10000") + ), IMAGE_PULL_SECRET_NAME=os.environ.get(EnvVarKeys.IMAGE_PULL_SECRET_NAME), AGENTEX_AUTH_URL=os.environ.get(EnvVarKeys.AGENTEX_AUTH_URL), ALLOWED_ORIGINS=os.environ.get(EnvVarKeys.ALLOWED_ORIGINS, "*"),