Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 43 additions & 14 deletions agentex/src/domain/services/task_message_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from datetime import UTC, datetime, timedelta
from typing import Annotated, Any, Literal

Expand Down Expand Up @@ -199,16 +200,33 @@ async def update_messages(
Returns:
List of updated TaskMessageEntity objects
"""
updated_messages = []
for message_id, message in updates.items():
# Get the existing task message
task_message = await self.repository.get(id=message_id)
if not updates:
return []

# Fetch all messages in parallel
message_ids = list(updates.keys())
fetch_results = await asyncio.gather(
*[self.repository.get(id=message_id) for message_id in message_ids],
return_exceptions=True,
)

# Prepare updates for valid messages (exist and belong to task)
messages_to_update = []
for message_id, task_message in zip(message_ids, fetch_results, strict=True):
if isinstance(task_message, Exception):
continue
if task_message and task_message.task_id == task_id:
# Update the message field but preserve other fields
task_message.content = message
updated_messages.append(await self.repository.update(task_message))
task_message.content = updates[message_id]
messages_to_update.append(task_message)

return updated_messages
if not messages_to_update:
return []

# Update all valid messages in parallel
updated_messages = await asyncio.gather(
*[self.repository.update(msg) for msg in messages_to_update]
)
return list(updated_messages)

async def delete_message(self, task_id: str, message_id: str) -> bool:
"""
Expand Down Expand Up @@ -239,12 +257,23 @@ async def delete_messages(self, task_id: str, message_ids: list[str]) -> int:
Returns:
Number of messages deleted
"""
# First check which messages exist and belong to the task
valid_ids = []
for message_id in message_ids:
task_message = await self.repository.get(id=message_id)
if task_message and task_message.task_id == task_id:
valid_ids.append(message_id)
if not message_ids:
return 0

# Fetch all messages in parallel to validate ownership
fetch_results = await asyncio.gather(
*[self.repository.get(id=message_id) for message_id in message_ids],
return_exceptions=True,
)

# Filter to valid IDs (exist and belong to task)
valid_ids = [
message_id
for message_id, task_message in zip(message_ids, fetch_results, strict=True)
if not isinstance(task_message, Exception)
and task_message
and task_message.task_id == task_id
]

if valid_ids:
await self.repository.batch_delete(ids=valid_ids)
Expand Down
9 changes: 6 additions & 3 deletions agentex/src/domain/use_cases/agents_use_case.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from datetime import UTC, datetime
from typing import Annotated, Any

Expand Down Expand Up @@ -118,9 +119,11 @@ async def register_agent(
logger.info(
f"Agent {name} was likely created in parallel, skipping creation"
)
await self.maybe_update_agent_deployment_history(agent)

await self.ensure_healthcheck_workflow(agent)
# Run deployment history and healthcheck in parallel - both are independent operations
await asyncio.gather(
self.maybe_update_agent_deployment_history(agent),
self.ensure_healthcheck_workflow(agent),
)
return agent

async def maybe_update_agent_deployment_history(self, agent: AgentEntity) -> None:
Expand Down
Loading