diff --git a/src/google/adk/events/event.py b/src/google/adk/events/event.py index cca086430b..faeefc802f 100644 --- a/src/google/adk/events/event.py +++ b/src/google/adk/events/event.py @@ -18,6 +18,7 @@ from typing import Optional import uuid +from google.adk import runtime from google.genai import types from pydantic import alias_generators from pydantic import ConfigDict @@ -70,7 +71,7 @@ class Event(LlmResponse): # Do not assign the ID. It will be assigned by the session. id: str = '' """The unique identifier of the event.""" - timestamp: float = Field(default_factory=lambda: datetime.now().timestamp()) + timestamp: float = Field(default_factory=lambda: runtime.get_time()) """The timestamp of the event.""" def model_post_init(self, __context): @@ -125,4 +126,4 @@ def has_trailing_code_execution_result( @staticmethod def new_id(): - return str(uuid.uuid4()) + return runtime.new_uuid() diff --git a/src/google/adk/integrations/__init__.py b/src/google/adk/integrations/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/google/adk/integrations/temporal.py b/src/google/adk/integrations/temporal.py new file mode 100644 index 0000000000..d05145b5ac --- /dev/null +++ b/src/google/adk/integrations/temporal.py @@ -0,0 +1,155 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Temporal integration helpers for ADK.""" + +import functools +import inspect +from typing import Any, AsyncGenerator, Callable, Optional, List + +from temporalio import workflow, activity +from google.adk.models import BaseLlm, LlmRequest, LlmResponse, LLMRegistry +from google.genai import types + + +def activity_as_tool( + activity_def: Callable, + **activity_options: Any +) -> Callable: + """Wraps a Temporal Activity Definition into an ADK-compatible tool. + + Args: + activity_def: The Temporal activity definition (decorated with @activity.defn). + **activity_options: Options to pass to workflow.execute_activity + (e.g. start_to_close_timeout, retry_policy). + + Returns: + A callable tool that executes the activity when invoked. + """ + + # We create a wrapper that delegates to workflow.execute_activity + async def tool_wrapper(*args, **kwargs) -> Any: + # Bind arguments to the activity signature to ensure correct order/mapping. + try: + sig = inspect.signature(activity_def) + bound_args = sig.bind(*args, **kwargs) + bound_args.apply_defaults() + activity_args = [bound_args.arguments[p] for p in sig.parameters] + except (TypeError, ValueError): + # Fallback for built-ins or other complex callables where binding may fail + # or if arguments don't match signature (e.g. simpler invocation). + # Temporal Python SDK typically invokes activities with `args=[...]`. + activity_args = list(args) + list(kwargs.values()) if kwargs else list(args) + + return await workflow.execute_activity( + activity_def, + args=activity_args, + **activity_options + ) + + # Copy metadata so ADK can inspect the tool (name, docstring, annotations) + # ADK uses this to generate the tool schema for the LLM. + tool_wrapper.__doc__ = activity_def.__doc__ + tool_wrapper.__name__ = getattr(activity_def, "name", activity_def.__name__) + + # Attempt to copy annotations if they exist + if hasattr(activity_def, "__annotations__"): + tool_wrapper.__annotations__ = activity_def.__annotations__ + + # CRITICAL: Copy signature so FunctionTool can generate correct parameters schema + try: + tool_wrapper.__signature__ = inspect.signature(activity_def) + except Exception: + pass # Fallback if signature copy fails (e.g. builtins) + + return tool_wrapper + + +@activity.defn +async def generate_content_activity(request: LlmRequest) -> List[LlmResponse]: + """Generic activity to invoke an LLM via ADK's LLMRegistry. + + The model name is expected to be in `request.model`. + """ + if not request.model: + raise ValueError("LlmRequest.model must be set when using generate_content_activity.") + + llm = LLMRegistry.new_llm(request.model) + return [response async for response in llm.generate_content_async(request)] + + +class TemporalModel(BaseLlm): + """An ADK ModelWrapper that executes content generation as a Temporal Activity. + + This effectively delegates the 'generate_content' call to an external Activity, + ensuring that the network I/O to Vertex/Gemini is recorded in Temporal history. + """ + + activity_def: Callable + activity_options: dict[str, Any] + + def __init__( + self, + model_name: str, + activity_def: Callable = generate_content_activity, + **activity_options: Any + ): + """Initializes the TemporalModel. + + Args: + model_name: The name of the model to report to ADK. + activity_def: The Temporal activity definition to invoke. + Defaults to `generate_content_activity`. + **activity_options: Options for workflow.execute_activity. + """ + super().__init__( + model=model_name, + activity_def=activity_def, + activity_options=activity_options + ) + + async def generate_content_async( + self, + llm_request: LlmRequest, + stream: bool = False + ) -> AsyncGenerator[LlmResponse, None]: + """Generates content by calling the configured Temporal Activity.""" + + # Ensure model name is carried in the request for the generic activity + if not llm_request.model: + llm_request.model = self.model + + # Note: Temporal Activities are not typically streaming in the Python SDK + # in the way python async generators work (streaming back to workflow is complex). + # Standard approach is to return the full response. + # We will assume non-streaming activity execution for now. + + # Execute the activity + responses: List[LlmResponse] = await workflow.execute_activity( + self.activity_def, + args=[llm_request], + **self.activity_options + ) + + # Yield the responses + for response in responses: + yield response + + @classmethod + def default_activities(cls) -> List[Callable]: + """Returns the default activities used by this model wrapper. + + Useful for registering activities with the Temporal Worker. + """ + return [generate_content_activity] diff --git a/src/google/adk/runtime.py b/src/google/adk/runtime.py new file mode 100644 index 0000000000..edd6be9045 --- /dev/null +++ b/src/google/adk/runtime.py @@ -0,0 +1,53 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Runtime module for abstracting system primitives like time and UUIDs.""" + +import time +import uuid +from typing import Callable + +_time_provider: Callable[[], float] = time.time +_id_provider: Callable[[], str] = lambda: str(uuid.uuid4()) + + +def set_time_provider(provider: Callable[[], float]) -> None: + """Sets the provider for the current time. + + Args: + provider: A callable that returns the current time in seconds since the + epoch. + """ + global _time_provider + _time_provider = provider + + +def set_id_provider(provider: Callable[[], str]) -> None: + """Sets the provider for generating unique IDs. + + Args: + provider: A callable that returns a unique ID string. + """ + global _id_provider + _id_provider = provider + + +def get_time() -> float: + """Returns the current time in seconds since the epoch.""" + return _time_provider() + + +def new_uuid() -> str: + """Returns a new unique ID.""" + return _id_provider() diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index 6ba7f0bb01..9df7ca17c9 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -22,6 +22,7 @@ from typing_extensions import override +from google.adk import runtime from . import _session_util from ..errors.already_exists_error import AlreadyExistsError from ..events.event import Event @@ -108,14 +109,14 @@ def _create_session_impl( session_id = ( session_id.strip() if session_id and session_id.strip() - else str(uuid.uuid4()) + else runtime.new_uuid() ) session = Session( app_name=app_name, user_id=user_id, id=session_id, state=session_state or {}, - last_update_time=time.time(), + last_update_time=runtime.get_time(), ) if app_name not in self.sessions: diff --git a/tests/integration/README_TEMPORAL.md b/tests/integration/README_TEMPORAL.md new file mode 100644 index 0000000000..31a9c3855e --- /dev/null +++ b/tests/integration/README_TEMPORAL.md @@ -0,0 +1,23 @@ +# Temporal Integration Tests + +The file `manual_test_temporal_integration.py` contains integration tests for ADK's Temporal support. +It is named `manual_test_...` to be excluded from standard CI/test runs because it requires: + +1. **Local Temporal Server**: You must have a Temporal server running locally (e.g., via `temporal server start-dev`). +2. **GCP Credentials**: Environment variables `GOOGLE_CLOUD_PROJECT` and `GOOGLE_CLOUD_LOCATION` must be set. +3. **Local Environment**: It assumes `localhost:7233`. + +## How to Run + +1. Start Temporal Server: + ```bash + temporal server start-dev + ``` + +2. Run the test directly: + ```bash + export GOOGLE_CLOUD_PROJECT="your-project" + export GOOGLE_CLOUD_LOCATION="us-central1" + + uv run pytest tests/integration/manual_test_temporal_integration.py + ``` diff --git a/tests/integration/manual_test_temporal_integration.py b/tests/integration/manual_test_temporal_integration.py new file mode 100644 index 0000000000..7bc4e3f782 --- /dev/null +++ b/tests/integration/manual_test_temporal_integration.py @@ -0,0 +1,319 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Integration tests for ADK Temporal support.""" + +import dataclasses +import logging +import uuid +import os +from datetime import timedelta +from typing import AsyncGenerator, List + +import pytest +from temporalio import workflow, activity +from temporalio.client import Client +from temporalio.contrib.pydantic import pydantic_data_converter, PydanticPayloadConverter +from temporalio.converter import DataConverter, DefaultPayloadConverter +from temporalio.plugin import SimplePlugin +from temporalio.worker import Worker, WorkflowRunner +from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner, SandboxRestrictions + + +from google.genai import types +from google.adk import Agent, Runner, runtime +from google.adk.agents import LlmAgent +from google.adk.tools import AgentTool +from google.adk.models import LlmRequest, LlmResponse, LLMRegistry +from google.adk.sessions import InMemorySessionService +from google.adk.utils.context_utils import Aclosing +from google.adk.events import Event +from google.adk.integrations.temporal import activity_as_tool, TemporalModel, generate_content_activity + +# Required Environment Variables for this test: +# - GOOGLE_CLOUD_PROJECT +# - GOOGLE_CLOUD_LOCATION +# - GOOGLE_GENAI_USE_VERTEXAI (optional, defaults to 1 for this test if needed, or set externally) +# - Temporal Server running at localhost:7233 + +logger = logging.getLogger(__name__) + + +@activity.defn +async def get_weather(city: str) -> str: + """Activity that gets weather.""" + return "Warm and sunny. 17 degrees." + +# --- Customized LLM Activities for Better Trace Visibility --- + +@activity.defn(name="coordinator_think") +async def coordinator_think(req: LlmRequest) -> List[LlmResponse]: + """Activity for the Coordinator agent.""" + return await generate_content_activity(req) + +@activity.defn(name="tool_agent_think") +async def tool_agent_think(req: LlmRequest) -> List[LlmResponse]: + """Activity for the Tool Agent.""" + return await generate_content_activity(req) + +@activity.defn(name="specialist_think") +async def specialist_think(req: LlmRequest) -> List[LlmResponse]: + """Activity for the Specialist/Handoff Agent.""" + return await generate_content_activity(req) + + +@workflow.defn +class WeatherAgent: + @workflow.run + async def run(self, prompt: str) -> Event | None: + logger.info("Workflow started.") + + # 1. Configure ADK Runtime to use Temporal Determinism + runtime.set_time_provider(lambda: workflow.now().timestamp()) + runtime.set_id_provider(lambda: str(workflow.uuid4())) + + # 2. Define Agent using Temporal Helpers + # Uses generic 'generate_content_activity' by default + agent_model = TemporalModel( + model_name="gemini-2.5-pro", + start_to_close_timeout=timedelta(minutes=2) + ) + + # Wraps 'get_weather' activity as a Tool + weather_tool = activity_as_tool( + get_weather, + start_to_close_timeout=timedelta(seconds=60) + ) + + agent = Agent( + name='test_agent', + model=agent_model, + tools=[weather_tool] + ) + + # 3. Create Session (uses runtime.new_uuid() -> workflow.uuid4()) + session_service = InMemorySessionService() + logger.info("Create session.") + session = await session_service.create_session(app_name="test_app", user_id="test") + + logger.info(f"Session created with ID: {session.id}") + + # 4. Run Agent + runner = Runner( + agent=agent, + app_name='test_app', + session_service=session_service, + ) + + logger.info("Starting runner.") + last_event = None + async with Aclosing(runner.run_async( + user_id="test", + session_id=session.id, + new_message=types.Content( + role='user', parts=[types.Part(text=prompt)] + ), + )) as agen: + async for event in agen: + logger.info(f"Event: {event}") + last_event = event + + return last_event + +@workflow.defn +class MultiAgentWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + # 1. Runtime Setup + runtime.set_time_provider(lambda: workflow.now().timestamp()) + runtime.set_id_provider(lambda: str(workflow.uuid4())) + + # 2. Define Distinct Models for Visualization + # We use a separate activity for each agent so they show up distinctly in the Temporal UI. + + coordinator_model = TemporalModel( + model_name="gemini-2.5-pro", + activity_def=coordinator_think, + start_to_close_timeout=timedelta(minutes=2) + ) + + tool_agent_model = TemporalModel( + model_name="gemini-2.5-pro", + activity_def=tool_agent_think, + start_to_close_timeout=timedelta(minutes=2) + ) + + specialist_model = TemporalModel( + model_name="gemini-2.5-pro", + activity_def=specialist_think, + start_to_close_timeout=timedelta(minutes=2) + ) + + # 3. Define Sub-Agents + + # Agent to be used as a Tool + tool_agent = LlmAgent( + name="ToolAgent", + model=tool_agent_model, + instruction="You are a tool agent. You help with specific sub-tasks. Always include 'From ToolAgent:' in your response." + ) + agent_tool = AgentTool(tool_agent) + + # Agent to be transferred to (Handoff) + handoff_agent = LlmAgent( + name="HandoffAgent", + model=specialist_model, + instruction="You are a Specialist Agent. You handle specialized requests. Always include 'From HandoffAgent:' in your response." + ) + + # 4. Define Parent Agent + parent_agent = LlmAgent( + name="Coordinator", + model=coordinator_model, + # Instructions to guide the LLM when to use which + instruction=( + "You are a Coordinator. " + "CRITICAL INSTRUCTION: You MUST NOT answer user queries directly if they related to specific tasks. " + "1. If the user asks for 'help' or 'subtask', you MUST use the 'ToolAgent' tool (AgentTool). " + "2. If the user asks to 'switch' or 'specialist', you MUST transfer to the HandoffAgent using 'transfer_to_agent'. " + "Do not apologize. Do not say you will do it. Just call the function." + ), + tools=[agent_tool], + sub_agents=[handoff_agent] + ) + + # 5. Execute + session_service = InMemorySessionService() + session = await session_service.create_session(app_name="multi_agent_app", user_id="user_MULTI") + + runner = Runner( + agent=parent_agent, + app_name='multi_agent_app', + session_service=session_service, + ) + + # We will run a multi-turn conversation to test both paths + # Turn 1: Trigger Tool + logger.info("--- Turn 1: Trigger Tool ---") + tool_response_text = "" + async with Aclosing(runner.run_async( + user_id=session.user_id, + session_id=session.id, + new_message=types.Content(role='user', parts=[types.Part(text="I need help with a subtask.")]) + )) as agen: + async for event in agen: + logger.info(f"Event Author: {event.author} | Actions: {event.actions}") + if event.content and event.content.parts: + for part in event.content.parts: + if part.text: tool_response_text += part.text + + # Turn 2: Trigger Handoff + logger.info("--- Turn 2: Trigger Handoff ---") + handoff_response_text = "" + async with Aclosing(runner.run_async( + user_id=session.user_id, + session_id=session.id, + new_message=types.Content(role='user', parts=[types.Part(text="Please switch me to the specialist.")]) + )) as agen: + async for event in agen: + logger.info(f"Event Author: {event.author} | Actions: {event.actions}") + if event.content and event.content.parts: + for part in event.content.parts: + if part.text: handoff_response_text += part.text + + logger.info(f"Tool Response: {tool_response_text}") + logger.info(f"Handoff Response: {handoff_response_text}") + + return f"Tool: {tool_response_text} | Handoff: {handoff_response_text}" + + +class ADKPlugin(SimplePlugin): + def __init__(self): + super().__init__( + name="ADKPlugin", + data_converter=_data_converter, + workflow_runner=workflow_runner, + ) + +def workflow_runner(runner: WorkflowRunner | None) -> WorkflowRunner: + if not runner: + raise ValueError("No WorkflowRunner provided to the ADK plugin.") + + # If in sandbox, add additional passthrough + if isinstance(runner, SandboxedWorkflowRunner): + return dataclasses.replace( + runner, + restrictions=runner.restrictions.with_passthrough_modules("google.adk", "google.genai"), + ) + return runner + +def _data_converter(converter: DataConverter | None) -> DataConverter: + if converter is None: + return pydantic_data_converter + elif converter.payload_converter_class is DefaultPayloadConverter: + return dataclasses.replace( + converter, payload_converter_class=PydanticPayloadConverter + ) + elif not isinstance(converter.payload_converter, PydanticPayloadConverter): + raise ValueError( + "The payload converter must be of type PydanticPayloadConverter." + ) + return converter + +@pytest.mark.asyncio +async def test_temporalio_integration(): + """Run full integration test with Temporal Server.""" + + # Normally this should only run if local Temporal server is available + # For now, we assume it is, as per user context. + + # Start client/worker + if "GOOGLE_CLOUD_PROJECT" not in os.environ: + pytest.skip("GOOGLE_CLOUD_PROJECT not set. Skipping integration test.") + + try: + client = await Client.connect("localhost:7233", plugins=[ADKPlugin()]) + except RuntimeError: + pytest.skip("Could not connect to Temporal server. Is it running?") + + async with Worker( + client, + workflows=[WeatherAgent, MultiAgentWorkflow], + activities=TemporalModel.default_activities() + [ + get_weather, + coordinator_think, + tool_agent_think, + specialist_think + ], + task_queue="hello_world_queue", + max_cached_workflows=0, + ) as worker: + print("Worker started.") + # Run Weather Agent + result_weather = await client.execute_workflow( + WeatherAgent.run, + "What is the weather in Tokyo?", + id=str(uuid.uuid4()), + task_queue="hello_world_queue", + ) + print(f"Weather Agent Result: {result_weather}") + + # Run Multi-Agent Workflow + result_multi = await client.execute_workflow( + MultiAgentWorkflow.run, + "start", # Argument ignored in run logic (hardcoded prompts) + id=str(uuid.uuid4()), + task_queue="hello_world_queue", + ) + print(f"Multi-Agent Result: {result_multi}") diff --git a/tests/unittests/integrations/test_temporal.py b/tests/unittests/integrations/test_temporal.py new file mode 100644 index 0000000000..ec7fed9e6a --- /dev/null +++ b/tests/unittests/integrations/test_temporal.py @@ -0,0 +1,143 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for Temporal integration helpers.""" + +import unittest +from unittest.mock import MagicMock, patch, AsyncMock +import sys +import asyncio +from typing import Any + +from google.genai import types + +# Configure Mocks globally +# We create fresh mocks here. +mock_workflow = MagicMock() +mock_activity = MagicMock() +mock_worker = MagicMock() +mock_client = MagicMock() + +# Important: execute_activity must be awaitable +mock_workflow.execute_activity = AsyncMock(return_value="mock_result") + +# Mock the parent package +mock_temporalio = MagicMock() +mock_temporalio.workflow = mock_workflow +mock_temporalio.activity = mock_activity +mock_temporalio.worker = mock_worker +mock_temporalio.client = mock_client + +# Mock sys.modules +with patch.dict(sys.modules, { + "temporalio": mock_temporalio, + "temporalio.workflow": mock_workflow, + "temporalio.activity": mock_activity, + "temporalio.worker": mock_worker, + "temporalio.client": mock_client, +}): + from google.adk.integrations import temporal + from google.adk.models import LlmRequest, LlmResponse + + +class TestTemporalIntegration(unittest.TestCase): + + def test_activity_as_tool_wrapper(self): + # Reset mocks + mock_workflow.reset_mock() + mock_workflow.execute_activity = AsyncMock(return_value="mock_result") + + # Verify mock setup + # If this fails, then 'temporal.workflow' is NOT our 'mock_workflow' + assert temporal.workflow.execute_activity is mock_workflow.execute_activity + + # Define a fake activity + async def fake_activity(arg: str) -> str: + """My Docstring.""" + return f"Hello {arg}" + + fake_activity.name = "fake_activity_name" + + # Create tool + tool = temporal.activity_as_tool( + fake_activity, + start_to_close_timeout=100 + ) + + # Check metadata + self.assertEqual(tool.__name__, "fake_activity_name") + self.assertEqual(tool.__doc__, "My Docstring.") + + # Run tool (wrapper) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + result = loop.run_until_complete(tool("World")) + finally: + loop.close() + + # Verify call + mock_workflow.execute_activity.assert_called_once() + args, kwargs = mock_workflow.execute_activity.call_args + self.assertEqual(kwargs['args'], ['World']) + self.assertEqual(kwargs['start_to_close_timeout'], 100) + + def test_temporal_model_generate_content(self): + # Reset mocks + mock_workflow.reset_mock() + + # Prepare valid LlmResponse with content + response_content = types.Content(parts=[types.Part(text="test_resp")]) + llm_response = LlmResponse(content=response_content) + + # generate_content_async expects execute_activity to return response list (iterator) + mock_workflow.execute_activity = AsyncMock(return_value=[llm_response]) + + # Mock an activity def + mock_activity_def = MagicMock() + + # Create model + model = temporal.TemporalModel( + model_name="test-model", + activity_def=mock_activity_def, + schedule_to_close_timeout=50 + ) + + # Create request + req = LlmRequest(model="test-model", prompt="hi") + + # Run generate_content_async (it is an async generator) + async def run_gen(): + results = [] + async for r in model.generate_content_async(req): + results.append(r) + return results + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + results = loop.run_until_complete(run_gen()) + finally: + loop.close() + + # Verify execute_activity called + mock_workflow.execute_activity.assert_called_once() + args, kwargs = mock_workflow.execute_activity.call_args + self.assertEqual(kwargs['args'], [req]) + self.assertEqual(kwargs['schedule_to_close_timeout'], 50) + self.assertEqual(len(results), 1) + self.assertEqual(results[0].content.parts[0].text, "test_resp") + diff --git a/tests/unittests/test_runtime.py b/tests/unittests/test_runtime.py new file mode 100644 index 0000000000..cde013631b --- /dev/null +++ b/tests/unittests/test_runtime.py @@ -0,0 +1,56 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the runtime module.""" + +import time +import uuid +import unittest +from unittest.mock import MagicMock, patch + +from google.adk import runtime + + +class TestRuntime(unittest.TestCase): + + def tearDown(self): + # Reset providers to default after each test + runtime.set_time_provider(time.time) + runtime.set_id_provider(lambda: str(uuid.uuid4())) + + def test_default_time_provider(self): + # Verify it returns a float that is close to now + now = time.time() + rt_time = runtime.get_time() + self.assertIsInstance(rt_time, float) + self.assertAlmostEqual(rt_time, now, delta=1.0) + + def test_default_id_provider(self): + # Verify it returns a string uuid + uid = runtime.new_uuid() + self.assertIsInstance(uid, str) + # Should be parseable as uuid + uuid.UUID(uid) + + def test_custom_time_provider(self): + # Test override + mock_time = 123456789.0 + runtime.set_time_provider(lambda: mock_time) + self.assertEqual(runtime.get_time(), mock_time) + + def test_custom_id_provider(self): + # Test override + mock_id = "test-id-123" + runtime.set_id_provider(lambda: mock_id) + self.assertEqual(runtime.new_uuid(), mock_id)