From 81d24446d3f8fa8f6fa6541d46e8f643e2d8a1cc Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Wed, 24 Dec 2025 14:12:12 -0800 Subject: [PATCH] perf: Use asyncio.gather() for parallel async operations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improve concurrency by parallelizing independent async operations: - agents_use_case.py: Run deployment history and healthcheck workflow in parallel during agent registration (~50% faster) - task_message_service.py: Parallelize batch message operations - update_messages(): O(2N) sequential → O(2) parallel batches - delete_messages(): O(N) sequential → O(1) parallel validation These changes reduce latency for batch operations by running independent database calls concurrently instead of sequentially. --- .../domain/services/task_message_service.py | 57 ++++++++++++++----- .../src/domain/use_cases/agents_use_case.py | 9 ++- 2 files changed, 49 insertions(+), 17 deletions(-) diff --git a/agentex/src/domain/services/task_message_service.py b/agentex/src/domain/services/task_message_service.py index 4cd23b3..d164986 100644 --- a/agentex/src/domain/services/task_message_service.py +++ b/agentex/src/domain/services/task_message_service.py @@ -1,3 +1,4 @@ +import asyncio from datetime import UTC, datetime, timedelta from typing import Annotated, Any, Literal @@ -199,16 +200,33 @@ async def update_messages( Returns: List of updated TaskMessageEntity objects """ - updated_messages = [] - for message_id, message in updates.items(): - # Get the existing task message - task_message = await self.repository.get(id=message_id) + if not updates: + return [] + + # Fetch all messages in parallel + message_ids = list(updates.keys()) + fetch_results = await asyncio.gather( + *[self.repository.get(id=message_id) for message_id in message_ids], + return_exceptions=True, + ) + + # Prepare updates for valid messages (exist and belong to task) + messages_to_update = [] + for message_id, task_message in zip(message_ids, fetch_results, strict=True): + if isinstance(task_message, Exception): + continue if task_message and task_message.task_id == task_id: - # Update the message field but preserve other fields - task_message.content = message - updated_messages.append(await self.repository.update(task_message)) + task_message.content = updates[message_id] + messages_to_update.append(task_message) - return updated_messages + if not messages_to_update: + return [] + + # Update all valid messages in parallel + updated_messages = await asyncio.gather( + *[self.repository.update(msg) for msg in messages_to_update] + ) + return list(updated_messages) async def delete_message(self, task_id: str, message_id: str) -> bool: """ @@ -239,12 +257,23 @@ async def delete_messages(self, task_id: str, message_ids: list[str]) -> int: Returns: Number of messages deleted """ - # First check which messages exist and belong to the task - valid_ids = [] - for message_id in message_ids: - task_message = await self.repository.get(id=message_id) - if task_message and task_message.task_id == task_id: - valid_ids.append(message_id) + if not message_ids: + return 0 + + # Fetch all messages in parallel to validate ownership + fetch_results = await asyncio.gather( + *[self.repository.get(id=message_id) for message_id in message_ids], + return_exceptions=True, + ) + + # Filter to valid IDs (exist and belong to task) + valid_ids = [ + message_id + for message_id, task_message in zip(message_ids, fetch_results, strict=True) + if not isinstance(task_message, Exception) + and task_message + and task_message.task_id == task_id + ] if valid_ids: await self.repository.batch_delete(ids=valid_ids) diff --git a/agentex/src/domain/use_cases/agents_use_case.py b/agentex/src/domain/use_cases/agents_use_case.py index 3fd35e9..3f8a291 100644 --- a/agentex/src/domain/use_cases/agents_use_case.py +++ b/agentex/src/domain/use_cases/agents_use_case.py @@ -1,3 +1,4 @@ +import asyncio from datetime import UTC, datetime from typing import Annotated, Any @@ -118,9 +119,11 @@ async def register_agent( logger.info( f"Agent {name} was likely created in parallel, skipping creation" ) - await self.maybe_update_agent_deployment_history(agent) - - await self.ensure_healthcheck_workflow(agent) + # Run deployment history and healthcheck in parallel - both are independent operations + await asyncio.gather( + self.maybe_update_agent_deployment_history(agent), + self.ensure_healthcheck_workflow(agent), + ) return agent async def maybe_update_agent_deployment_history(self, agent: AgentEntity) -> None: