diff --git a/agentex/src/adapters/crud_store/adapter_postgres.py b/agentex/src/adapters/crud_store/adapter_postgres.py index 8a794ce..027ea16 100644 --- a/agentex/src/adapters/crud_store/adapter_postgres.py +++ b/agentex/src/adapters/crud_store/adapter_postgres.py @@ -21,6 +21,7 @@ select, update, ) +from sqlalchemy.dialects.postgresql import insert from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm.interfaces import LoaderOption from sqlalchemy.sql import Select @@ -167,15 +168,25 @@ async def start_async_db_session( yield session async def create(self, item: T) -> T: + """Create an item using INSERT ... RETURNING for single query efficiency. + + Uses RETURNING with explicit columns to avoid lazy-loading relationship + attributes on the returned ORM object, which would fail outside async context. + """ async with ( self.start_async_db_session(True) as session, async_sql_exception_handler(), ): - orm = self.orm(**item.to_dict()) - session.add(orm) + # Exclude None values to allow server defaults (e.g., created_at) to apply + values = {k: v for k, v in item.to_dict().items() if v is not None} + # Return only columns (not full ORM) to avoid lazy-loading relationships + stmt = ( + insert(self.orm).values(**values).returning(*self.orm.__table__.columns) + ) + result = await session.execute(stmt) + row = result.one() await session.commit() - await session.refresh(orm) - return self.entity.model_validate(orm) + return self.entity.model_validate(dict(row._mapping)) async def batch_create(self, items: list[T]) -> list[T]: async with ( diff --git a/agentex/src/domain/repositories/event_repository.py b/agentex/src/domain/repositories/event_repository.py index 28715f1..1aa750c 100644 --- a/agentex/src/domain/repositories/event_repository.py +++ b/agentex/src/domain/repositories/event_repository.py @@ -2,6 +2,7 @@ from fastapi import Depends from sqlalchemy import and_ +from sqlalchemy.dialects.postgresql import insert from sqlalchemy.future import select from src.adapters.crud_store.adapter_postgres import ( PostgresCRUDRepository, @@ -32,7 +33,6 @@ def __init__( EventEntity, ) - # Have to do this because the sequence_id is automatically generated by the ORM async def create( self, id: str, @@ -40,20 +40,25 @@ async def create( agent_id: str, content: TaskMessageContentEntity | None = None, ) -> EventEntity: + """Create an event using INSERT ... RETURNING to get sequence_id in one query.""" async with ( self.start_async_db_session(True) as session, async_sql_exception_handler(), ): - orm = EventORM( - id=id, - task_id=task_id, - agent_id=agent_id, - content=content.model_dump(mode="json") if content else None, + stmt = ( + insert(EventORM) + .values( + id=id, + task_id=task_id, + agent_id=agent_id, + content=content.model_dump(mode="json") if content else None, + ) + .returning(EventORM) ) - session.add(orm) + + result = await session.execute(stmt) + orm = result.scalar_one() await session.commit() - # Refresh the ORM object to get the auto-generated sequence_id - await session.refresh(orm) return self.entity.model_validate(orm) async def list_events_after_last_processed(