Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion agentex/src/adapters/streams/adapter_redis.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
import time
from collections.abc import AsyncIterator
from typing import Annotated, Any

Expand All @@ -12,6 +13,9 @@

logger = make_logger(__name__)

# Debounce interval for Redis metrics collection (seconds)
_METRICS_DEBOUNCE_INTERVAL = 30


class RedisStreamRepository(StreamRepository):
def __init__(
Expand All @@ -31,6 +35,7 @@ def __init__(
environment_variables.REDIS_URL, decode_responses=False
)
self.environment_variables = environment_variables
self._last_metrics_time: float = 0.0 # For debouncing metrics collection

async def send_data(self, topic: str, data: dict[str, Any]) -> str:
"""
Expand All @@ -55,14 +60,23 @@ async def send_data(self, topic: str, data: dict[str, Any]) -> str:
name=topic,
fields={"data": data_json},
)
await self.send_redis_connection_metrics()
return message_id
except Exception as e:
logger.error(f"Error publishing data to Redis stream {topic}: {e}")
raise

async def send_redis_connection_metrics(self):
"""
Send Redis connection metrics to Datadog with debouncing.
Only collects metrics if at least _METRICS_DEBOUNCE_INTERVAL seconds
have passed since the last collection to avoid overhead on hot paths.
"""
now = time.monotonic()
if now - self._last_metrics_time < _METRICS_DEBOUNCE_INTERVAL:
return # Skip if we recently sent metrics

try:
self._last_metrics_time = now
info = await self.redis.info()
env_value = self.environment_variables.ENVIRONMENT
tags = [f"env:{env_value}"]
Expand Down
Loading