From a5d2e42f2db1b2325a8a124b050a3c18d4f1fc70 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Wed, 24 Dec 2025 12:36:13 -0800 Subject: [PATCH] Add debouncing to Redis metrics collection - Add 30-second debounce interval for send_redis_connection_metrics() - Remove duplicate metrics call after xadd (was calling twice per send) - Use time.monotonic() for reliable time tracking Before: redis.info() called on every stream operation (expensive) After: redis.info() called at most once per 30 seconds per instance --- agentex/src/adapters/streams/adapter_redis.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/agentex/src/adapters/streams/adapter_redis.py b/agentex/src/adapters/streams/adapter_redis.py index 1ef13d4..d283dc8 100644 --- a/agentex/src/adapters/streams/adapter_redis.py +++ b/agentex/src/adapters/streams/adapter_redis.py @@ -1,5 +1,6 @@ import asyncio import json +import time from collections.abc import AsyncIterator from typing import Annotated, Any @@ -12,6 +13,9 @@ logger = make_logger(__name__) +# Debounce interval for Redis metrics collection (seconds) +_METRICS_DEBOUNCE_INTERVAL = 30 + class RedisStreamRepository(StreamRepository): def __init__( @@ -31,6 +35,7 @@ def __init__( environment_variables.REDIS_URL, decode_responses=False ) self.environment_variables = environment_variables + self._last_metrics_time: float = 0.0 # For debouncing metrics collection async def send_data(self, topic: str, data: dict[str, Any]) -> str: """ @@ -55,14 +60,23 @@ async def send_data(self, topic: str, data: dict[str, Any]) -> str: name=topic, fields={"data": data_json}, ) - await self.send_redis_connection_metrics() return message_id except Exception as e: logger.error(f"Error publishing data to Redis stream {topic}: {e}") raise async def send_redis_connection_metrics(self): + """ + Send Redis connection metrics to Datadog with debouncing. + Only collects metrics if at least _METRICS_DEBOUNCE_INTERVAL seconds + have passed since the last collection to avoid overhead on hot paths. + """ + now = time.monotonic() + if now - self._last_metrics_time < _METRICS_DEBOUNCE_INTERVAL: + return # Skip if we recently sent metrics + try: + self._last_metrics_time = now info = await self.redis.info() env_value = self.environment_variables.ENVIRONMENT tags = [f"env:{env_value}"]