diff --git a/agentex/src/adapters/streams/adapter_redis.py b/agentex/src/adapters/streams/adapter_redis.py index 1ef13d4..d283dc8 100644 --- a/agentex/src/adapters/streams/adapter_redis.py +++ b/agentex/src/adapters/streams/adapter_redis.py @@ -1,5 +1,6 @@ import asyncio import json +import time from collections.abc import AsyncIterator from typing import Annotated, Any @@ -12,6 +13,9 @@ logger = make_logger(__name__) +# Debounce interval for Redis metrics collection (seconds) +_METRICS_DEBOUNCE_INTERVAL = 30 + class RedisStreamRepository(StreamRepository): def __init__( @@ -31,6 +35,7 @@ def __init__( environment_variables.REDIS_URL, decode_responses=False ) self.environment_variables = environment_variables + self._last_metrics_time: float = 0.0 # For debouncing metrics collection async def send_data(self, topic: str, data: dict[str, Any]) -> str: """ @@ -55,14 +60,23 @@ async def send_data(self, topic: str, data: dict[str, Any]) -> str: name=topic, fields={"data": data_json}, ) - await self.send_redis_connection_metrics() return message_id except Exception as e: logger.error(f"Error publishing data to Redis stream {topic}: {e}") raise async def send_redis_connection_metrics(self): + """ + Send Redis connection metrics to Datadog with debouncing. + Only collects metrics if at least _METRICS_DEBOUNCE_INTERVAL seconds + have passed since the last collection to avoid overhead on hot paths. + """ + now = time.monotonic() + if now - self._last_metrics_time < _METRICS_DEBOUNCE_INTERVAL: + return # Skip if we recently sent metrics + try: + self._last_metrics_time = now info = await self.redis.info() env_value = self.environment_variables.ENVIRONMENT tags = [f"env:{env_value}"]