Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/google/adk/events/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Optional
import uuid

from google.adk import runtime
from google.genai import types
from pydantic import alias_generators
from pydantic import ConfigDict
Expand Down Expand Up @@ -70,7 +71,7 @@ class Event(LlmResponse):
# Do not assign the ID. It will be assigned by the session.
id: str = ''
"""The unique identifier of the event."""
timestamp: float = Field(default_factory=lambda: datetime.now().timestamp())
timestamp: float = Field(default_factory=lambda: runtime.get_time())
"""The timestamp of the event."""

def model_post_init(self, __context):
Expand Down Expand Up @@ -125,4 +126,4 @@ def has_trailing_code_execution_result(

@staticmethod
def new_id():
return str(uuid.uuid4())
return runtime.new_uuid()
Empty file.
155 changes: 155 additions & 0 deletions src/google/adk/integrations/temporal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Temporal integration helpers for ADK."""

import functools
import inspect
from typing import Any, AsyncGenerator, Callable, Optional, List

from temporalio import workflow, activity
from google.adk.models import BaseLlm, LlmRequest, LlmResponse, LLMRegistry
from google.genai import types


def activity_as_tool(
activity_def: Callable,
**activity_options: Any
) -> Callable:
"""Wraps a Temporal Activity Definition into an ADK-compatible tool.

Args:
activity_def: The Temporal activity definition (decorated with @activity.defn).
**activity_options: Options to pass to workflow.execute_activity
(e.g. start_to_close_timeout, retry_policy).

Returns:
A callable tool that executes the activity when invoked.
"""

# We create a wrapper that delegates to workflow.execute_activity
async def tool_wrapper(*args, **kwargs) -> Any:
# Bind arguments to the activity signature to ensure correct order/mapping.
try:
sig = inspect.signature(activity_def)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
activity_args = [bound_args.arguments[p] for p in sig.parameters]
except (TypeError, ValueError):
# Fallback for built-ins or other complex callables where binding may fail
# or if arguments don't match signature (e.g. simpler invocation).
# Temporal Python SDK typically invokes activities with `args=[...]`.
activity_args = list(args) + list(kwargs.values()) if kwargs else list(args)

return await workflow.execute_activity(
activity_def,
args=activity_args,
**activity_options
)

# Copy metadata so ADK can inspect the tool (name, docstring, annotations)
# ADK uses this to generate the tool schema for the LLM.
tool_wrapper.__doc__ = activity_def.__doc__
tool_wrapper.__name__ = getattr(activity_def, "name", activity_def.__name__)

# Attempt to copy annotations if they exist
if hasattr(activity_def, "__annotations__"):
tool_wrapper.__annotations__ = activity_def.__annotations__

# CRITICAL: Copy signature so FunctionTool can generate correct parameters schema
try:
tool_wrapper.__signature__ = inspect.signature(activity_def)
except Exception:
pass # Fallback if signature copy fails (e.g. builtins)

return tool_wrapper


@activity.defn
async def generate_content_activity(request: LlmRequest) -> List[LlmResponse]:
"""Generic activity to invoke an LLM via ADK's LLMRegistry.

The model name is expected to be in `request.model`.
"""
if not request.model:
raise ValueError("LlmRequest.model must be set when using generate_content_activity.")

llm = LLMRegistry.new_llm(request.model)
return [response async for response in llm.generate_content_async(request)]


class TemporalModel(BaseLlm):
"""An ADK ModelWrapper that executes content generation as a Temporal Activity.

This effectively delegates the 'generate_content' call to an external Activity,
ensuring that the network I/O to Vertex/Gemini is recorded in Temporal history.
"""

activity_def: Callable
activity_options: dict[str, Any]

def __init__(
self,
model_name: str,
activity_def: Callable = generate_content_activity,
**activity_options: Any
):
"""Initializes the TemporalModel.

Args:
model_name: The name of the model to report to ADK.
activity_def: The Temporal activity definition to invoke.
Defaults to `generate_content_activity`.
**activity_options: Options for workflow.execute_activity.
"""
super().__init__(
model=model_name,
activity_def=activity_def,
activity_options=activity_options
)

async def generate_content_async(
self,
llm_request: LlmRequest,
stream: bool = False
) -> AsyncGenerator[LlmResponse, None]:
"""Generates content by calling the configured Temporal Activity."""

# Ensure model name is carried in the request for the generic activity
if not llm_request.model:
llm_request.model = self.model

# Note: Temporal Activities are not typically streaming in the Python SDK
# in the way python async generators work (streaming back to workflow is complex).
# Standard approach is to return the full response.
# We will assume non-streaming activity execution for now.

# Execute the activity
responses: List[LlmResponse] = await workflow.execute_activity(
self.activity_def,
args=[llm_request],
**self.activity_options
)

# Yield the responses
for response in responses:
yield response

@classmethod
def default_activities(cls) -> List[Callable]:
"""Returns the default activities used by this model wrapper.

Useful for registering activities with the Temporal Worker.
"""
return [generate_content_activity]
53 changes: 53 additions & 0 deletions src/google/adk/runtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runtime module for abstracting system primitives like time and UUIDs."""

import time
import uuid
from typing import Callable

_time_provider: Callable[[], float] = time.time
_id_provider: Callable[[], str] = lambda: str(uuid.uuid4())


def set_time_provider(provider: Callable[[], float]) -> None:
"""Sets the provider for the current time.

Args:
provider: A callable that returns the current time in seconds since the
epoch.
"""
global _time_provider
_time_provider = provider


def set_id_provider(provider: Callable[[], str]) -> None:
"""Sets the provider for generating unique IDs.

Args:
provider: A callable that returns a unique ID string.
"""
global _id_provider
_id_provider = provider


def get_time() -> float:
"""Returns the current time in seconds since the epoch."""
return _time_provider()


def new_uuid() -> str:
"""Returns a new unique ID."""
return _id_provider()
5 changes: 3 additions & 2 deletions src/google/adk/sessions/in_memory_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from typing_extensions import override

from google.adk import runtime
from . import _session_util
from ..errors.already_exists_error import AlreadyExistsError
from ..events.event import Event
Expand Down Expand Up @@ -108,14 +109,14 @@ def _create_session_impl(
session_id = (
session_id.strip()
if session_id and session_id.strip()
else str(uuid.uuid4())
else runtime.new_uuid()
)
session = Session(
app_name=app_name,
user_id=user_id,
id=session_id,
state=session_state or {},
last_update_time=time.time(),
last_update_time=runtime.get_time(),
)

if app_name not in self.sessions:
Expand Down
23 changes: 23 additions & 0 deletions tests/integration/README_TEMPORAL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Temporal Integration Tests

The file `manual_test_temporal_integration.py` contains integration tests for ADK's Temporal support.
It is named `manual_test_...` to be excluded from standard CI/test runs because it requires:

1. **Local Temporal Server**: You must have a Temporal server running locally (e.g., via `temporal server start-dev`).
2. **GCP Credentials**: Environment variables `GOOGLE_CLOUD_PROJECT` and `GOOGLE_CLOUD_LOCATION` must be set.
3. **Local Environment**: It assumes `localhost:7233`.

## How to Run

1. Start Temporal Server:
```bash
temporal server start-dev
```

2. Run the test directly:
```bash
export GOOGLE_CLOUD_PROJECT="your-project"
export GOOGLE_CLOUD_LOCATION="us-central1"

uv run pytest tests/integration/manual_test_temporal_integration.py
```
Loading