-
Notifications
You must be signed in to change notification settings - Fork 2.7k
feat(runner): add metadata parameter to Runner.run_async() #3985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -400,6 +400,7 @@ async def run_async( | |||||
| new_message: Optional[types.Content] = None, | ||||||
| state_delta: Optional[dict[str, Any]] = None, | ||||||
| run_config: Optional[RunConfig] = None, | ||||||
| metadata: Optional[dict[str, Any]] = None, | ||||||
| ) -> AsyncGenerator[Event, None]: | ||||||
| """Main entry method to run the agent in this runner. | ||||||
|
|
||||||
|
|
@@ -417,6 +418,13 @@ async def run_async( | |||||
| new_message: A new message to append to the session. | ||||||
| state_delta: Optional state changes to apply to the session. | ||||||
| run_config: The run config for the agent. | ||||||
| metadata: Optional per-request metadata that will be passed to callbacks. | ||||||
| This allows passing request-specific context such as user_id, trace_id, | ||||||
| or memory context keys to before_model_callback and other callbacks. | ||||||
| Note: A shallow copy is made of this dictionary, so top-level changes | ||||||
| within callbacks won't affect the original. However, modifications to | ||||||
| nested mutable objects (e.g., nested dicts or lists) will affect the | ||||||
| original. | ||||||
|
|
||||||
| Yields: | ||||||
| The events generated by the agent. | ||||||
|
|
@@ -426,13 +434,16 @@ async def run_async( | |||||
| new_message are None. | ||||||
| """ | ||||||
| run_config = run_config or RunConfig() | ||||||
| # Create a shallow copy to isolate from caller's modifications | ||||||
| metadata = metadata.copy() if metadata is not None else None | ||||||
|
|
||||||
| if new_message and not new_message.role: | ||||||
| new_message.role = 'user' | ||||||
|
|
||||||
| async def _run_with_trace( | ||||||
| new_message: Optional[types.Content] = None, | ||||||
| invocation_id: Optional[str] = None, | ||||||
| metadata: Optional[dict[str, Any]] = None, | ||||||
| ) -> AsyncGenerator[Event, None]: | ||||||
| with tracer.start_as_current_span('invocation'): | ||||||
| session = await self.session_service.get_session( | ||||||
|
|
@@ -463,6 +474,7 @@ async def _run_with_trace( | |||||
| invocation_id=invocation_id, | ||||||
| run_config=run_config, | ||||||
| state_delta=state_delta, | ||||||
| metadata=metadata, | ||||||
| ) | ||||||
| if invocation_context.end_of_agents.get( | ||||||
| invocation_context.agent.name | ||||||
|
|
@@ -476,6 +488,7 @@ async def _run_with_trace( | |||||
| new_message=new_message, # new_message is not None. | ||||||
| run_config=run_config, | ||||||
| state_delta=state_delta, | ||||||
| metadata=metadata, | ||||||
| ) | ||||||
|
|
||||||
| async def execute(ctx: InvocationContext) -> AsyncGenerator[Event]: | ||||||
|
|
@@ -502,7 +515,9 @@ async def execute(ctx: InvocationContext) -> AsyncGenerator[Event]: | |||||
| self.app, session, self.session_service | ||||||
| ) | ||||||
|
|
||||||
| async with Aclosing(_run_with_trace(new_message, invocation_id)) as agen: | ||||||
| async with Aclosing( | ||||||
| _run_with_trace(new_message, invocation_id, metadata) | ||||||
| ) as agen: | ||||||
| async for event in agen: | ||||||
| yield event | ||||||
|
|
||||||
|
|
@@ -1186,6 +1201,7 @@ async def _setup_context_for_new_invocation( | |||||
| new_message: types.Content, | ||||||
| run_config: RunConfig, | ||||||
| state_delta: Optional[dict[str, Any]], | ||||||
| metadata: Optional[dict[str, Any]] = None, | ||||||
| ) -> InvocationContext: | ||||||
| """Sets up the context for a new invocation. | ||||||
|
|
||||||
|
|
@@ -1194,6 +1210,7 @@ async def _setup_context_for_new_invocation( | |||||
| new_message: The new message to process and append to the session. | ||||||
| run_config: The run config of the agent. | ||||||
| state_delta: Optional state changes to apply to the session. | ||||||
| metadata: Optional per-request metadata to pass to callbacks. | ||||||
|
|
||||||
| Returns: | ||||||
| The invocation context for the new invocation. | ||||||
|
|
@@ -1203,6 +1220,7 @@ async def _setup_context_for_new_invocation( | |||||
| session, | ||||||
| new_message=new_message, | ||||||
| run_config=run_config, | ||||||
| metadata=metadata, | ||||||
| ) | ||||||
| # Step 2: Handle new message, by running callbacks and appending to | ||||||
| # session. | ||||||
|
|
@@ -1225,6 +1243,7 @@ async def _setup_context_for_resumed_invocation( | |||||
| invocation_id: Optional[str], | ||||||
| run_config: RunConfig, | ||||||
| state_delta: Optional[dict[str, Any]], | ||||||
| metadata: Optional[dict[str, Any]] = None, | ||||||
| ) -> InvocationContext: | ||||||
| """Sets up the context for a resumed invocation. | ||||||
|
|
||||||
|
|
@@ -1234,6 +1253,7 @@ async def _setup_context_for_resumed_invocation( | |||||
| invocation_id: The invocation id to resume. | ||||||
| run_config: The run config of the agent. | ||||||
| state_delta: Optional state changes to apply to the session. | ||||||
| metadata: Optional per-request metadata to pass to callbacks. | ||||||
|
|
||||||
| Returns: | ||||||
| The invocation context for the resumed invocation. | ||||||
|
|
@@ -1259,6 +1279,7 @@ async def _setup_context_for_resumed_invocation( | |||||
| new_message=user_message, | ||||||
| run_config=run_config, | ||||||
| invocation_id=invocation_id, | ||||||
| metadata=metadata, | ||||||
| ) | ||||||
| # Step 3: Maybe handle new message. | ||||||
| if new_message: | ||||||
|
|
@@ -1303,6 +1324,7 @@ def _new_invocation_context( | |||||
| new_message: Optional[types.Content] = None, | ||||||
| live_request_queue: Optional[LiveRequestQueue] = None, | ||||||
| run_config: Optional[RunConfig] = None, | ||||||
| metadata: Optional[dict[str, Any]] = None, | ||||||
| ) -> InvocationContext: | ||||||
| """Creates a new invocation context. | ||||||
|
|
||||||
|
|
@@ -1312,6 +1334,7 @@ def _new_invocation_context( | |||||
| new_message: The new message for the context. | ||||||
| live_request_queue: The live request queue for the context. | ||||||
| run_config: The run config for the context. | ||||||
| metadata: Optional per-request metadata for the context. | ||||||
|
|
||||||
| Returns: | ||||||
| The new invocation context. | ||||||
|
|
@@ -1343,6 +1366,7 @@ def _new_invocation_context( | |||||
| live_request_queue=live_request_queue, | ||||||
| run_config=run_config, | ||||||
| resumability_config=self.resumability_config, | ||||||
| metadata=metadata, | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To prevent accidental modification of the original
Suggested change
|
||||||
| ) | ||||||
|
|
||||||
| def _new_invocation_context_for_live( | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To prevent potential subtle bugs, it's a good practice to clarify the copy behavior of the
metadatadictionary in the docstring. Since a shallow copy is performed, modifications to nested mutable objects within a callback will affect the original object passed by the caller. Please add a note about this to help users of the API understand this behavior and avoid unexpected side effects. For example, you could add:Note: A shallow copy is made of this dictionary, so changes to nested mutable objects will affect the original object.