From 63fe3e1121810f74b3ba405b99391338fafcc147 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Wed, 24 Dec 2025 16:09:34 -0800 Subject: [PATCH 1/2] perf: Use INSERT RETURNING for event creation Replace session.add() + session.refresh() with insert().returning() to get the auto-generated sequence_id in a single database round-trip. Before: 2 queries per event (INSERT + SELECT) After: 1 query per event (INSERT ... RETURNING *) Expected ~33% reduction in database queries for event writes. --- .../domain/repositories/event_repository.py | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/agentex/src/domain/repositories/event_repository.py b/agentex/src/domain/repositories/event_repository.py index 28715f1..1aa750c 100644 --- a/agentex/src/domain/repositories/event_repository.py +++ b/agentex/src/domain/repositories/event_repository.py @@ -2,6 +2,7 @@ from fastapi import Depends from sqlalchemy import and_ +from sqlalchemy.dialects.postgresql import insert from sqlalchemy.future import select from src.adapters.crud_store.adapter_postgres import ( PostgresCRUDRepository, @@ -32,7 +33,6 @@ def __init__( EventEntity, ) - # Have to do this because the sequence_id is automatically generated by the ORM async def create( self, id: str, @@ -40,20 +40,25 @@ async def create( agent_id: str, content: TaskMessageContentEntity | None = None, ) -> EventEntity: + """Create an event using INSERT ... RETURNING to get sequence_id in one query.""" async with ( self.start_async_db_session(True) as session, async_sql_exception_handler(), ): - orm = EventORM( - id=id, - task_id=task_id, - agent_id=agent_id, - content=content.model_dump(mode="json") if content else None, + stmt = ( + insert(EventORM) + .values( + id=id, + task_id=task_id, + agent_id=agent_id, + content=content.model_dump(mode="json") if content else None, + ) + .returning(EventORM) ) - session.add(orm) + + result = await session.execute(stmt) + orm = result.scalar_one() await session.commit() - # Refresh the ORM object to get the auto-generated sequence_id - await session.refresh(orm) return self.entity.model_validate(orm) async def list_events_after_last_processed( From e1c83864bced5a2ffd1f606de8ec8f682920a561 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Wed, 24 Dec 2025 16:32:19 -0800 Subject: [PATCH 2/2] Apply INSERT RETURNING optimization to base PostgresCRUDRepository Extends the single-query INSERT RETURNING pattern to all PostgreSQL repositories by optimizing the base class create() method. Key changes: - Use insert().values().returning() instead of add() + refresh() - Return explicit columns to avoid lazy-loading relationships - Exclude None values to preserve server defaults (e.g., created_at) This reduces database round-trips from 2 to 1 for all PostgreSQL create operations across agents, spans, tasks, and other entities. --- .../adapters/crud_store/adapter_postgres.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/agentex/src/adapters/crud_store/adapter_postgres.py b/agentex/src/adapters/crud_store/adapter_postgres.py index 8a794ce..027ea16 100644 --- a/agentex/src/adapters/crud_store/adapter_postgres.py +++ b/agentex/src/adapters/crud_store/adapter_postgres.py @@ -21,6 +21,7 @@ select, update, ) +from sqlalchemy.dialects.postgresql import insert from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm.interfaces import LoaderOption from sqlalchemy.sql import Select @@ -167,15 +168,25 @@ async def start_async_db_session( yield session async def create(self, item: T) -> T: + """Create an item using INSERT ... RETURNING for single query efficiency. + + Uses RETURNING with explicit columns to avoid lazy-loading relationship + attributes on the returned ORM object, which would fail outside async context. + """ async with ( self.start_async_db_session(True) as session, async_sql_exception_handler(), ): - orm = self.orm(**item.to_dict()) - session.add(orm) + # Exclude None values to allow server defaults (e.g., created_at) to apply + values = {k: v for k, v in item.to_dict().items() if v is not None} + # Return only columns (not full ORM) to avoid lazy-loading relationships + stmt = ( + insert(self.orm).values(**values).returning(*self.orm.__table__.columns) + ) + result = await session.execute(stmt) + row = result.one() await session.commit() - await session.refresh(orm) - return self.entity.model_validate(orm) + return self.entity.model_validate(dict(row._mapping)) async def batch_create(self, items: list[T]) -> list[T]: async with (