diff --git a/pyproject.toml b/pyproject.toml index 9e7b3eae..0af0c1f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,13 +1,14 @@ [project] name = "uipath-langchain" -version = "0.1.44" +version = "0.2.0" description = "Python SDK that enables developers to build and deploy LangGraph agents to the UiPath Cloud Platform" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" dependencies = [ - "uipath>=2.2.44, <2.3.0", + "uipath>=2.3.0, <2.4.0", + "uipath-runtime>=0.3.2, <0.4.0", "langgraph>=1.0.0, <2.0.0", - "langchain-core>=1.0.0, <2.0.0", + "langchain-core>=1.2.5, <2.0.0", "aiosqlite==0.21.0", "langgraph-checkpoint-sqlite>=3.0.0, <4.0.0", "langchain-openai>=1.0.0, <2.0.0", diff --git a/src/uipath_langchain/runtime/factory.py b/src/uipath_langchain/runtime/factory.py index 3da42214..8a872ec0 100644 --- a/src/uipath_langchain/runtime/factory.py +++ b/src/uipath_langchain/runtime/factory.py @@ -275,6 +275,7 @@ async def _create_runtime_instance( delegate=base_runtime, storage=storage, trigger_manager=trigger_manager, + runtime_id=runtime_id, ) async def new_runtime( diff --git a/src/uipath_langchain/runtime/runtime.py b/src/uipath_langchain/runtime/runtime.py index 10c41114..c38b8086 100644 --- a/src/uipath_langchain/runtime/runtime.py +++ b/src/uipath_langchain/runtime/runtime.py @@ -293,29 +293,7 @@ def _extract_graph_result(self, final_chunk: Any) -> Any: def _is_interrupted(self, state: StateSnapshot) -> bool: """Check if execution was interrupted (static or dynamic).""" - # Check for static interrupts (interrupt_before/after) - if hasattr(state, "next") and state.next: - return True - - # Check for dynamic interrupts (interrupt() inside node) - if hasattr(state, "tasks"): - for task in state.tasks: - if hasattr(task, "interrupts") and task.interrupts: - return True - - return False - - def _get_dynamic_interrupt(self, state: StateSnapshot) -> Interrupt | None: - """Get the first dynamic interrupt if any.""" - if not hasattr(state, "tasks"): - return None - - for task in state.tasks: - if hasattr(task, "interrupts") and task.interrupts: - for interrupt in task.interrupts: - if isinstance(interrupt, Interrupt): - return interrupt - return None + return bool(state.next) async def _create_runtime_result( self, @@ -344,13 +322,27 @@ async def _create_suspended_result( graph_state: StateSnapshot, ) -> UiPathRuntimeResult: """Create result for suspended execution.""" - # Check if it's a dynamic interrupt - dynamic_interrupt = self._get_dynamic_interrupt(graph_state) - - if dynamic_interrupt: - # Dynamic interrupt - should create and save resume trigger + interrupt_map: dict[str, Any] = {} + + # Get nodes that are still scheduled to run + next_nodes = set(graph_state.next) if graph_state.next else set() + + if graph_state.interrupts: + for interrupt in graph_state.interrupts: + if isinstance(interrupt, Interrupt): + # Find which task this interrupt belongs to + for task in graph_state.tasks: + if task.interrupts and interrupt in task.interrupts: + # Only include if this task's node is still in next + if task.name in next_nodes: + interrupt_map[interrupt.id] = interrupt.value + break + + # If we have dynamic interrupts, return suspended with interrupt map + # The output is used to create the resume triggers + if interrupt_map: return UiPathRuntimeResult( - output=dynamic_interrupt.value, + output=interrupt_map, status=UiPathRuntimeStatus.SUSPENDED, ) else: diff --git a/src/uipath_langchain/runtime/storage.py b/src/uipath_langchain/runtime/storage.py index e4eee386..e465712c 100644 --- a/src/uipath_langchain/runtime/storage.py +++ b/src/uipath_langchain/runtime/storage.py @@ -1,115 +1,222 @@ """SQLite implementation of UiPathResumableStorageProtocol.""" import json -from typing import cast +from typing import Any, cast from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver from pydantic import BaseModel -from uipath.runtime import ( - UiPathApiTrigger, - UiPathResumeTrigger, - UiPathResumeTriggerName, - UiPathResumeTriggerType, -) +from uipath.runtime import UiPathResumeTrigger class SqliteResumableStorage: - """SQLite storage for resume triggers.""" + """SQLite storage for resume triggers and arbitrary kv pairs.""" def __init__( - self, memory: AsyncSqliteSaver, table_name: str = "__uipath_resume_triggers" + self, + memory: AsyncSqliteSaver, ): self.memory = memory - self.table_name = table_name + self.rs_table_name = "__uipath_resume_triggers" + self.kv_table_name = "__uipath_runtime_kv" self._initialized = False async def _ensure_table(self) -> None: - """Create table if needed.""" + """Create tables if needed.""" if self._initialized: return await self.memory.setup() async with self.memory.lock, self.memory.conn.cursor() as cur: - await cur.execute(f""" - CREATE TABLE IF NOT EXISTS {self.table_name} ( + # Enable WAL mode for high concurrency + await cur.execute("PRAGMA journal_mode=WAL") + + await cur.execute( + f""" + CREATE TABLE IF NOT EXISTS {self.rs_table_name} ( id INTEGER PRIMARY KEY AUTOINCREMENT, - type TEXT NOT NULL, - name TEXT NOT NULL, - key TEXT, - folder_key TEXT, - folder_path TEXT, - payload TEXT, + runtime_id TEXT NOT NULL, + interrupt_id TEXT NOT NULL, + data TEXT NOT NULL, timestamp DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')) ) - """) - await self.memory.conn.commit() - self._initialized = True + """ + ) - async def save_trigger(self, trigger: UiPathResumeTrigger) -> None: - """Save resume trigger to database.""" - await self._ensure_table() + await cur.execute( + f""" + CREATE INDEX IF NOT EXISTS idx_{self.rs_table_name}_runtime_id + ON {self.rs_table_name}(runtime_id) + """ + ) - trigger_key = ( - trigger.api_resume.inbox_id if trigger.api_resume else trigger.item_key - ) - payload = trigger.payload - if payload: - payload = ( - ( - payload.model_dump() - if isinstance(payload, BaseModel) - else json.dumps(payload) + await cur.execute( + f""" + CREATE TABLE IF NOT EXISTS {self.kv_table_name} ( + runtime_id TEXT NOT NULL, + namespace TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT, + timestamp DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')), + PRIMARY KEY (runtime_id, namespace, key) ) - if isinstance(payload, dict) - else str(payload) + """ ) + await self.memory.conn.commit() + + self._initialized = True + + async def save_triggers( + self, runtime_id: str, triggers: list[UiPathResumeTrigger] + ) -> None: + """Save resume triggers to database, replacing all existing triggers for this runtime_id.""" + await self._ensure_table() + async with self.memory.lock, self.memory.conn.cursor() as cur: + # Delete all existing triggers for this runtime_id await cur.execute( - f"INSERT INTO {self.table_name} (type, key, name, payload, folder_path, folder_key) VALUES (?, ?, ?, ?, ?, ?)", - ( - trigger.trigger_type.value, - trigger_key, - trigger.trigger_name.value, - payload, - trigger.folder_path, - trigger.folder_key, - ), + f""" + DELETE FROM {self.rs_table_name} + WHERE runtime_id = ? + """, + (runtime_id,), ) + + # Insert new triggers + for trigger in triggers: + trigger_data = trigger.model_dump() + trigger_data["payload"] = trigger.payload + trigger_data["trigger_name"] = trigger.trigger_name + + await cur.execute( + f""" + INSERT INTO {self.rs_table_name} + (runtime_id, interrupt_id, data) + VALUES (?, ?, ?) + """, + ( + runtime_id, + trigger.interrupt_id, + json.dumps(trigger_data), + ), + ) await self.memory.conn.commit() - async def get_latest_trigger(self) -> UiPathResumeTrigger | None: - """Get most recent trigger from database.""" + async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | None: + """Get all triggers for runtime_id from database.""" await self._ensure_table() async with self.memory.lock, self.memory.conn.cursor() as cur: - await cur.execute(f""" - SELECT type, key, name, folder_path, folder_key, payload - FROM {self.table_name} - ORDER BY timestamp DESC - LIMIT 1 - """) - result = await cur.fetchone() + await cur.execute( + f""" + SELECT data + FROM {self.rs_table_name} + WHERE runtime_id = ? + ORDER BY timestamp ASC + """, + (runtime_id,), + ) + results = await cur.fetchall() + + if not results: + return None - if not result: - return None + triggers = [] + for result in results: + data_text = cast(str, result[0]) + trigger = UiPathResumeTrigger.model_validate_json(data_text) + triggers.append(trigger) - trigger_type, key, name, folder_path, folder_key, payload = cast( - tuple[str, str, str, str, str, str], tuple(result) + return triggers + + async def delete_trigger( + self, runtime_id: str, trigger: UiPathResumeTrigger + ) -> None: + """Delete resume trigger from storage.""" + await self._ensure_table() + + async with self.memory.lock, self.memory.conn.cursor() as cur: + await cur.execute( + f""" + DELETE FROM {self.rs_table_name} + WHERE runtime_id = ? AND interrupt_id = ? + """, + ( + runtime_id, + trigger.interrupt_id, + ), ) + await self.memory.conn.commit() + + async def set_value( + self, + runtime_id: str, + namespace: str, + key: str, + value: Any, + ) -> None: + """Save arbitrary key-value pair to database.""" + if not ( + isinstance(value, str) + or isinstance(value, dict) + or isinstance(value, BaseModel) + or value is None + ): + raise TypeError("Value must be str, dict, BaseModel or None.") + + await self._ensure_table() - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType(trigger_type), - trigger_name=UiPathResumeTriggerName(name), - item_key=key, - folder_path=folder_path, - folder_key=folder_key, - payload=payload, + value_text = self._dump_value(value) + + async with self.memory.lock, self.memory.conn.cursor() as cur: + await cur.execute( + f""" + INSERT INTO {self.kv_table_name} (runtime_id, namespace, key, value) + VALUES (?, ?, ?, ?) + ON CONFLICT(runtime_id, namespace, key) + DO UPDATE SET + value = excluded.value, + timestamp = (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')) + """, + (runtime_id, namespace, key, value_text), ) + await self.memory.conn.commit() - if resume_trigger.trigger_type == UiPathResumeTriggerType.API: - resume_trigger.api_resume = UiPathApiTrigger( - inbox_id=resume_trigger.item_key, request=resume_trigger.payload - ) + async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any: + """Get arbitrary key-value pair from database (scoped by runtime_id + namespace).""" + await self._ensure_table() - return resume_trigger + async with self.memory.lock, self.memory.conn.cursor() as cur: + await cur.execute( + f""" + SELECT value + FROM {self.kv_table_name} + WHERE runtime_id = ? AND namespace = ? AND key = ? + LIMIT 1 + """, + (runtime_id, namespace, key), + ) + row = await cur.fetchone() + + if not row: + return None + + return self._load_value(cast(str | None, row[0])) + + def _dump_value(self, value: str | dict[str, Any] | BaseModel | None) -> str | None: + if value is None: + return None + if isinstance(value, BaseModel): + return "j:" + json.dumps(value.model_dump()) + if isinstance(value, dict): + return "j:" + json.dumps(value) + return "s:" + value + + def _load_value(self, raw: str | None) -> Any: + if raw is None: + return None + if raw.startswith("s:"): + return raw[2:] + if raw.startswith("j:"): + return json.loads(raw[2:]) + return raw diff --git a/tests/hitl/test_hitl_api_trigger.py b/tests/hitl/test_hitl_api_trigger.py index 73c45f15..50377ad7 100644 --- a/tests/hitl/test_hitl_api_trigger.py +++ b/tests/hitl/test_hitl_api_trigger.py @@ -67,7 +67,6 @@ async def test_agent( input=None, options=UiPathExecuteOptions(resume=False) ) - print(context.result) assert context.result is not None # Verify that __uipath directory and state.db were created @@ -89,15 +88,19 @@ async def test_agent( # Check the inserted trigger data from first execution cursor.execute( - "SELECT type, key, name, folder_path, folder_key, payload FROM __uipath_resume_triggers" + "SELECT runtime_id, interrupt_id, data FROM __uipath_resume_triggers" ) triggers = cursor.fetchall() assert len(triggers) == 1 - type, key, name, folder_path, folder_key, payload = triggers[0] - assert type == "Api" - assert name == "Api" - assert folder_path == folder_key is None - assert payload == "interrupt message" + runtime_id, interrupt_id, data = triggers[0] + + # Parse the JSON data + trigger_data = json.loads(data) + assert trigger_data["trigger_type"] == "Api" + assert trigger_data["trigger_name"] == "Api" + assert trigger_data["folder_path"] is None + assert trigger_data["folder_key"] is None + assert trigger_data["payload"] == "interrupt message" finally: if conn: conn.close() @@ -109,7 +112,7 @@ async def test_agent( # Mock API response for resume scenario base_url = os.getenv("UIPATH_URL") httpx_mock.add_response( - url=f"{base_url}/orchestrator_/api/JobTriggers/GetPayload/{key}", + url=f"{base_url}/orchestrator_/api/JobTriggers/GetPayload/{trigger_data['api_resume']['inbox_id']}", status_code=200, text=json.dumps({"payload": "human response"}), ) diff --git a/tests/hitl/test_hitl_job_trigger.py b/tests/hitl/test_hitl_job_trigger.py index d463fff9..92432154 100644 --- a/tests/hitl/test_hitl_job_trigger.py +++ b/tests/hitl/test_hitl_job_trigger.py @@ -99,17 +99,19 @@ async def test_agent_job_trigger( # Check the first job trigger data cursor.execute( - "SELECT type, key, name, folder_path, folder_key, payload FROM __uipath_resume_triggers" + "SELECT runtime_id, interrupt_id, data FROM __uipath_resume_triggers" ) triggers = cursor.fetchall() assert len(triggers) == 1 - type, key, name, folder_path, folder_key, payload = triggers[0] - assert type == "Job" - assert name == "Job" - assert folder_path == "process-folder-path" - assert folder_key is None - assert "input_arg_1" in payload - assert "value_1" in payload + runtime_id, interrupt_id, data = triggers[0] + + trigger_data = json.loads(data) + assert trigger_data["trigger_type"] == "Job" + assert trigger_data["trigger_name"] == "Job" + assert trigger_data["folder_path"] == "process-folder-path" + assert trigger_data["folder_key"] is None + assert "input_arg_1" in data + assert "value_1" in data finally: if conn: conn.close() @@ -121,7 +123,7 @@ async def test_agent_job_trigger( # Mock response for first resume: job output arguments output_args_dict = {"output_arg_1": "response from invoke process"} httpx_mock.add_response( - url=f"{base_url}/orchestrator_/odata/Jobs/UiPath.Server.Configuration.OData.GetByKey(identifier={key})", + url=f"{base_url}/orchestrator_/odata/Jobs/UiPath.Server.Configuration.OData.GetByKey(identifier={trigger_data['item_key']})", json={ "key": f"{job_key}", "id": 123, @@ -167,18 +169,23 @@ async def test_agent_job_trigger( assert len(tables) == 1 # Check the second job trigger data (from wait job) - cursor.execute("""SELECT type, key, name, folder_path, folder_key, payload FROM __uipath_resume_triggers - ORDER BY timestamp DESC - """) + cursor.execute("""SELECT runtime_id, interrupt_id, data FROM __uipath_resume_triggers + ORDER BY timestamp DESC + """) triggers = cursor.fetchall() - assert len(triggers) == 2 - type, key, name, folder_key, folder_path, payload = triggers[0] - assert type == "Job" - assert name == "Job" - assert folder_path is None - assert folder_key is None - assert "123" in payload - assert key == "487d9dc7-30fe-4926-b5f0-35a956914042" + assert len(triggers) == 1 + runtime_id, interrupt_id, data = triggers[0] + + trigger_data = json.loads(data) + assert trigger_data["trigger_type"] == "Job" + assert trigger_data["trigger_name"] == "Job" + assert trigger_data["folder_path"] is None + assert trigger_data["folder_key"] is None + assert "123" in data + assert ( + trigger_data["item_key"] + == "487d9dc7-30fe-4926-b5f0-35a956914042" + ) finally: if conn: conn.close() @@ -191,7 +198,7 @@ async def test_agent_job_trigger( output_args_dict = {"output_arg_2": "response from wait job"} httpx_mock.add_response( - url=f"{base_url}/orchestrator_/odata/Jobs/UiPath.Server.Configuration.OData.GetByKey(identifier={key})", + url=f"{base_url}/orchestrator_/odata/Jobs/UiPath.Server.Configuration.OData.GetByKey(identifier={trigger_data['item_key']})", json={ "key": f"{job_key}", "id": 123, diff --git a/tests/hitl/test_action_trigger.py b/tests/hitl/test_hitl_task_trigger.py similarity index 86% rename from tests/hitl/test_action_trigger.py rename to tests/hitl/test_hitl_task_trigger.py index deaff824..a226b338 100644 --- a/tests/hitl/test_action_trigger.py +++ b/tests/hitl/test_hitl_task_trigger.py @@ -131,16 +131,18 @@ async def test_agent_action_trigger( # Check the first action trigger data cursor.execute( - "SELECT type, key, name, folder_path, folder_key, payload FROM __uipath_resume_triggers" + "SELECT runtime_id, interrupt_id, data FROM __uipath_resume_triggers" ) triggers = cursor.fetchall() assert len(triggers) == 1 - type, key, name, folder_path, folder_key, payload = triggers[0] - assert type == "Task" - assert folder_path == "app-folder-path" - assert folder_key is None - assert "agent question" in payload - assert "Action Required" in payload + runtime_id, interrupt_id, data = triggers[0] + + trigger_data = json.loads(data) + assert trigger_data["trigger_type"] == "Task" + assert trigger_data["folder_path"] == "app-folder-path" + assert trigger_data["folder_key"] is None + assert "agent question" in data + assert "Action Required" in data finally: if conn: conn.close() @@ -151,7 +153,7 @@ async def test_agent_action_trigger( # Mock response for first resume: human response from create action httpx_mock.add_response( - url=f"{base_url}/orchestrator_/tasks/GenericTasks/GetTaskDataByKey?taskKey={key}", + url=f"{base_url}/orchestrator_/tasks/GenericTasks/GetTaskDataByKey?taskKey={trigger_data['item_key']}", json={ "id": 1, "title": "Action Required: Report Review", @@ -196,18 +198,23 @@ async def test_agent_action_trigger( assert len(tables) == 1 # Check the second trigger data (from wait action) - cursor.execute("""SELECT type, key, name, folder_path, folder_key, payload FROM __uipath_resume_triggers - ORDER BY timestamp DESC - """) + cursor.execute("""SELECT runtime_id, interrupt_id, data FROM __uipath_resume_triggers + ORDER BY timestamp DESC + """) triggers = cursor.fetchall() - assert len(triggers) == 2 - type, key, name, folder_key, folder_path, payload = triggers[0] - assert type == "Task" - assert name == "Task" - assert folder_path is None - assert folder_key is None - assert "agent question from wait action" in payload - assert key == "1662478a-65b4-4a09-8e22-d707e5bd64f3" + assert len(triggers) == 1 + runtime_id, interrupt_id, data = triggers[0] + + trigger_data = json.loads(data) + assert trigger_data["trigger_type"] == "Task" + assert trigger_data["trigger_name"] == "Task" + assert trigger_data["folder_path"] is None + assert trigger_data["folder_key"] is None + assert "agent question from wait action" in data + assert ( + trigger_data["item_key"] + == "1662478a-65b4-4a09-8e22-d707e5bd64f3" + ) finally: if conn: conn.close() @@ -218,7 +225,7 @@ async def test_agent_action_trigger( # Mock response for second resume: human response from wait action httpx_mock.add_response( - url=f"{base_url}/orchestrator_/tasks/GenericTasks/GetTaskDataByKey?taskKey={key}", + url=f"{base_url}/orchestrator_/tasks/GenericTasks/GetTaskDataByKey?taskKey={trigger_data['item_key']}", json={ "id": 1, "title": "Action Required: Report Review", diff --git a/tests/cli/test_graph.py b/tests/runtime/test_graph.py similarity index 100% rename from tests/cli/test_graph.py rename to tests/runtime/test_graph.py diff --git a/tests/runtime/test_resumable.py b/tests/runtime/test_resumable.py new file mode 100644 index 00000000..845d1db9 --- /dev/null +++ b/tests/runtime/test_resumable.py @@ -0,0 +1,202 @@ +import os +import tempfile +from typing import Any, TypedDict + +import pytest +from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver +from langgraph.graph import END, START, StateGraph +from langgraph.types import interrupt +from uipath.core.errors import ErrorCategory, UiPathPendingTriggerError +from uipath.runtime import ( + UiPathExecuteOptions, + UiPathResumableRuntime, + UiPathResumeTrigger, + UiPathResumeTriggerName, + UiPathResumeTriggerType, + UiPathRuntimeStatus, +) + +from uipath_langchain.runtime import UiPathLangGraphRuntime +from uipath_langchain.runtime.storage import SqliteResumableStorage + + +class MockTriggerHandler: + """Mock implementation of UiPathResumeTriggerHandler.""" + + def __init__(self): + self.call_count = 0 + + async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger: + """Create a trigger from suspend value.""" + trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.API, + trigger_name=UiPathResumeTriggerName.API, + payload=suspend_value, + ) + return trigger + + async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any: + """Read trigger and return mock response. + + 1st call: success + 2nd call: fail + 3rd call: fail + 4th call: success + 5th call: fail + 6th call: success + """ + self.call_count += 1 + + # Success on calls 1, 4, 6 (every 3rd starting from 1, then every 2nd, then last) + if self.call_count in [1, 4, 6]: + assert trigger.payload is not None + branch_name = trigger.payload.get("message", "unknown") + return f"Response for {branch_name}" + + # Fail otherwise + raise UiPathPendingTriggerError( + ErrorCategory.SYSTEM, f"Trigger is still pending (call #{self.call_count})" + ) + + +@pytest.mark.asyncio +async def test_parallel_branches_with_multiple_interrupts_execution(): + """Test graph execution with parallel branches and multiple interrupts.""" + + # Define state + class State(TypedDict, total=False): + branch_a_result: str | None + branch_b_result: str | None + branch_c_result: str | None + + # Define nodes that interrupt + def branch_a(state: State) -> State: + result = interrupt({"message": "Branch A needs input"}) + return {"branch_a_result": f"A completed with: {result}"} + + def branch_b(state: State) -> State: + result = interrupt({"message": "Branch B needs input"}) + return {"branch_b_result": f"B completed with: {result}"} + + def branch_c(state: State) -> State: + result = interrupt({"message": "Branch C needs input"}) + return {"branch_c_result": f"C completed with: {result}"} + + # Build graph with parallel branches + graph = StateGraph(State) + graph.add_node("branch_a", branch_a) + graph.add_node("branch_b", branch_b) + graph.add_node("branch_c", branch_c) + + # All branches start in parallel + graph.add_edge(START, "branch_a") + graph.add_edge(START, "branch_b") + graph.add_edge(START, "branch_c") + + # All branches go to end + graph.add_edge("branch_a", END) + graph.add_edge("branch_b", END) + graph.add_edge("branch_c", END) + + # Create temporary database + temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + temp_db.close() + + try: + # Compile graph with checkpointer + async with AsyncSqliteSaver.from_conn_string(temp_db.name) as memory: + compiled_graph = graph.compile(checkpointer=memory) + + # Create base runtime + base_runtime = UiPathLangGraphRuntime( + graph=compiled_graph, + runtime_id="parallel-test", + entrypoint="test", + ) + + # Create storage and trigger manager + storage = SqliteResumableStorage(memory) + + # Wrap with UiPathResumableRuntime + runtime = UiPathResumableRuntime( + delegate=base_runtime, + storage=storage, + trigger_manager=MockTriggerHandler(), + runtime_id="parallel-test", + ) + + # First execution - should hit all 3 interrupts + result = await runtime.execute( + input={ + "branch_a_result": None, + "branch_b_result": None, + "branch_c_result": None, + }, + options=UiPathExecuteOptions(resume=False), + ) + + # Should be suspended with 3 triggers + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + assert len(result.triggers) == 3 + + # Verify triggers were saved to storage + saved_triggers = await storage.get_triggers("parallel-test") + assert saved_triggers is not None + assert len(saved_triggers) == 3 + + # Resume 1: Resolve only first interrupt (no input, will restore from storage) + result_1 = await runtime.execute( + input=None, + options=UiPathExecuteOptions(resume=True), + ) + + # Should still be suspended with 2 remaining interrupts + assert result_1.status == UiPathRuntimeStatus.SUSPENDED + assert result_1.triggers is not None + assert len(result_1.triggers) == 2 + + # Verify only 2 triggers remain in storage + saved_triggers = await storage.get_triggers("parallel-test") + assert saved_triggers is not None + assert len(saved_triggers) == 2 + + # Resume 2: Resolve second interrupt + result_2 = await runtime.execute( + input=None, + options=UiPathExecuteOptions(resume=True), + ) + + # Should still be suspended with 1 remaining interrupt + assert result_2.status == UiPathRuntimeStatus.SUSPENDED + assert result_2.triggers is not None + assert len(result_2.triggers) == 1 + + # Verify only 1 trigger remains in storage + saved_triggers = await storage.get_triggers("parallel-test") + assert saved_triggers is not None + assert len(saved_triggers) == 1 + + # Resume 3: Resolve final interrupt + result_3 = await runtime.execute( + input=None, + options=UiPathExecuteOptions(resume=True), + ) + + # Should now be successful + assert result_3.status == UiPathRuntimeStatus.SUCCESSFUL + assert result_3.output is not None + + # Verify no triggers remain + saved_triggers = await storage.get_triggers("parallel-test") + assert saved_triggers is None or len(saved_triggers) == 0 + + # Verify all branches completed + output = result_3.output + assert "branch_a_result" in output + assert "branch_b_result" in output + assert "branch_c_result" in output + + finally: + if os.path.exists(temp_db.name): + os.remove(temp_db.name) diff --git a/tests/cli/test_schema.py b/tests/runtime/test_schema.py similarity index 100% rename from tests/cli/test_schema.py rename to tests/runtime/test_schema.py diff --git a/tests/runtime/test_storage.py b/tests/runtime/test_storage.py new file mode 100644 index 00000000..3fcb2f16 --- /dev/null +++ b/tests/runtime/test_storage.py @@ -0,0 +1,287 @@ +"""Tests for SqliteResumableStorage.""" + +import asyncio +import os +import tempfile +from typing import Any + +import pytest +from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver +from uipath.runtime import ( + UiPathResumeTrigger, + UiPathResumeTriggerName, + UiPathResumeTriggerType, +) + +from uipath_langchain.runtime.storage import SqliteResumableStorage + + +@pytest.fixture +async def storage(): + """Create a SqliteResumableStorage instance with temporary database file.""" + temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + temp_db.close() + + try: + async with AsyncSqliteSaver.from_conn_string(temp_db.name) as memory: + storage = SqliteResumableStorage(memory) + await storage._ensure_table() + + yield storage + finally: + if os.path.exists(temp_db.name): + os.remove(temp_db.name) + + +class TestKeyValueStorage: + """Tests for key-value storage functionality.""" + + @pytest.mark.asyncio + async def test_set_and_get_string_value(self, storage: SqliteResumableStorage): + """Test storing and retrieving a string value.""" + await storage.set_value("runtime1", "namespace1", "key1", "test_value") + result = await storage.get_value("runtime1", "namespace1", "key1") + assert result == "test_value" + + @pytest.mark.asyncio + async def test_set_and_get_dict_value(self, storage: SqliteResumableStorage): + """Test storing and retrieving a dictionary value.""" + test_dict = {"foo": "bar", "nested": {"key": "value"}} + await storage.set_value("runtime1", "namespace1", "key1", test_dict) + result = await storage.get_value("runtime1", "namespace1", "key1") + assert result == test_dict + + @pytest.mark.asyncio + async def test_set_value_overrides_existing(self, storage: SqliteResumableStorage): + """Test that set_value overrides existing values.""" + await storage.set_value("runtime1", "namespace1", "key1", "first_value") + await storage.set_value("runtime1", "namespace1", "key1", "second_value") + result = await storage.get_value("runtime1", "namespace1", "key1") + assert result == "second_value" + + @pytest.mark.asyncio + async def test_get_nonexistent_value_returns_none( + self, storage: SqliteResumableStorage + ): + """Test that getting a non-existent value returns None.""" + result = await storage.get_value("runtime1", "namespace1", "nonexistent") + assert result is None + + @pytest.mark.asyncio + async def test_values_scoped_by_runtime_and_namespace( + self, storage: SqliteResumableStorage + ): + """Test that values are properly scoped by runtime_id and namespace.""" + await storage.set_value("runtime1", "namespace1", "key1", "value1") + await storage.set_value("runtime2", "namespace1", "key1", "value2") + await storage.set_value("runtime1", "namespace2", "key1", "value3") + + assert await storage.get_value("runtime1", "namespace1", "key1") == "value1" + assert await storage.get_value("runtime2", "namespace1", "key1") == "value2" + assert await storage.get_value("runtime1", "namespace2", "key1") == "value3" + + +class TestTriggerStorage: + """Tests for trigger storage functionality.""" + + @pytest.mark.asyncio + async def test_save_and_get_single_trigger(self, storage: SqliteResumableStorage): + """Test saving and retrieving a single trigger.""" + trigger = UiPathResumeTrigger( + interrupt_id="interrupt1", + trigger_type=UiPathResumeTriggerType.API, + trigger_name=UiPathResumeTriggerName.API, + item_key="key1", + payload="test payload", + ) + + await storage.save_triggers("runtime1", [trigger]) + triggers = await storage.get_triggers("runtime1") + + assert triggers is not None + assert len(triggers) == 1 + assert triggers[0].interrupt_id == "interrupt1" + assert triggers[0].trigger_type == UiPathResumeTriggerType.API + assert triggers[0].trigger_name == UiPathResumeTriggerName.API + assert triggers[0].item_key == "key1" + assert triggers[0].payload == "test payload" + + @pytest.mark.asyncio + async def test_save_triggers_overrides_previous( + self, storage: SqliteResumableStorage + ): + """Test that save_triggers replaces all previous triggers.""" + # Save first set of triggers + trigger1 = UiPathResumeTrigger( + interrupt_id="interrupt1", + trigger_type=UiPathResumeTriggerType.API, + trigger_name=UiPathResumeTriggerName.API, + payload="payload1", + ) + trigger2 = UiPathResumeTrigger( + interrupt_id="interrupt2", + trigger_type=UiPathResumeTriggerType.TASK, + trigger_name=UiPathResumeTriggerName.TASK, + payload="payload2", + ) + await storage.save_triggers("runtime1", [trigger1, trigger2]) + + # Verify first set + triggers = await storage.get_triggers("runtime1") + assert triggers is not None + assert len(triggers) == 2 + + # Save second set of triggers - should replace first set + trigger3 = UiPathResumeTrigger( + interrupt_id="interrupt3", + trigger_type=UiPathResumeTriggerType.JOB, + trigger_name=UiPathResumeTriggerName.JOB, + payload="payload3", + ) + await storage.save_triggers("runtime1", [trigger3]) + + # Verify only second set exists + triggers = await storage.get_triggers("runtime1") + assert triggers is not None + assert len(triggers) == 1 + assert triggers[0].interrupt_id == "interrupt3" + assert triggers[0].trigger_type == UiPathResumeTriggerType.JOB + + @pytest.mark.asyncio + async def test_save_empty_list_deletes_triggers( + self, storage: SqliteResumableStorage + ): + """Test that saving empty list doesn't clear existing triggers.""" + trigger = UiPathResumeTrigger( + interrupt_id="interrupt1", + trigger_type=UiPathResumeTriggerType.API, + trigger_name=UiPathResumeTriggerName.API, + payload="test", + ) + await storage.save_triggers("runtime1", [trigger]) + + # Save empty list + await storage.save_triggers("runtime1", []) + + # Verify trigger no longer exists + triggers = await storage.get_triggers("runtime1") + assert triggers is None + + @pytest.mark.asyncio + async def test_delete_trigger(self, storage: SqliteResumableStorage): + """Test deleting a specific trigger.""" + trigger1 = UiPathResumeTrigger( + interrupt_id="interrupt1", + trigger_type=UiPathResumeTriggerType.API, + trigger_name=UiPathResumeTriggerName.API, + payload="payload1", + ) + trigger2 = UiPathResumeTrigger( + interrupt_id="interrupt2", + trigger_type=UiPathResumeTriggerType.TASK, + trigger_name=UiPathResumeTriggerName.TASK, + payload="payload2", + ) + await storage.save_triggers("runtime1", [trigger1, trigger2]) + + # Delete first trigger + await storage.delete_trigger("runtime1", trigger1) + + # Verify only second trigger remains + triggers = await storage.get_triggers("runtime1") + assert triggers is not None + assert len(triggers) == 1 + assert triggers[0].interrupt_id == "interrupt2" + + @pytest.mark.asyncio + async def test_get_nonexistent_triggers_returns_none( + self, storage: SqliteResumableStorage + ): + """Test that getting triggers for non-existent runtime returns None.""" + triggers = await storage.get_triggers("nonexistent_runtime") + assert triggers is None + + @pytest.mark.asyncio + async def test_triggers_scoped_by_runtime_id(self, storage: SqliteResumableStorage): + """Test that triggers are properly scoped by runtime_id.""" + trigger1 = UiPathResumeTrigger( + interrupt_id="interrupt1", + trigger_type=UiPathResumeTriggerType.API, + trigger_name=UiPathResumeTriggerName.API, + payload="runtime1_payload", + ) + trigger2 = UiPathResumeTrigger( + interrupt_id="interrupt2", + trigger_type=UiPathResumeTriggerType.TASK, + trigger_name=UiPathResumeTriggerName.TASK, + payload="runtime2_payload", + ) + + await storage.save_triggers("runtime1", [trigger1]) + await storage.save_triggers("runtime2", [trigger2]) + + triggers1 = await storage.get_triggers("runtime1") + triggers2 = await storage.get_triggers("runtime2") + + assert triggers1 is not None + assert len(triggers1) == 1 + assert triggers1[0].payload == "runtime1_payload" + + assert triggers2 is not None + assert len(triggers2) == 1 + assert triggers2[0].payload == "runtime2_payload" + + @pytest.mark.asyncio + async def test_multiple_parallel_workflows(self, storage: SqliteResumableStorage): + """Test handling multiple parallel workflows with concurrent operations.""" + workflows: list[tuple[str, list[UiPathResumeTrigger], dict[str, Any]]] = [] + triggers: list[UiPathResumeTrigger] | None + # Create multiple parallel workflows + for i in range(10): + runtime_id = f"workflow-{i}" + triggers = [ + UiPathResumeTrigger( + interrupt_id=f"interrupt-{i}", + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM, + item_key=f"queue-item-{i}", + payload={"step": i % 3}, + ) + ] + context = { + "workflow_id": str(i), + "data": f"data-{i}", + "step": i % 3, + } + workflows.append((runtime_id, triggers, context)) + + # Save all workflows concurrently + save_tasks = [] + for runtime_id, triggers, context in workflows: + save_tasks.append(storage.save_triggers(runtime_id, triggers)) + save_tasks.append( + storage.set_value( + runtime_id, "meta", "workflow_id", context["workflow_id"] + ) + ) + save_tasks.append(storage.set_value(runtime_id, "meta", "status", "active")) + save_tasks.append( + storage.set_value(runtime_id, "data", "payload", context["data"]) + ) + + await asyncio.gather(*save_tasks) + + # Verify all workflows were saved correctly + for runtime_id, expected_triggers, expected_context in workflows: + triggers = await storage.get_triggers(runtime_id) + workflow_id = await storage.get_value(runtime_id, "meta", "workflow_id") + status = await storage.get_value(runtime_id, "meta", "status") + data = await storage.get_value(runtime_id, "data", "payload") + + assert triggers is not None + assert len(triggers) == 1 + assert triggers[0].interrupt_id == expected_triggers[0].interrupt_id + assert triggers[0].item_key == expected_triggers[0].item_key + assert workflow_id == expected_context["workflow_id"] + assert status == "active" + assert data == expected_context["data"] diff --git a/uv.lock b/uv.lock index d68e3d19..0c5cf0bb 100644 --- a/uv.lock +++ b/uv.lock @@ -1252,7 +1252,7 @@ wheels = [ [[package]] name = "langchain-core" -version = "1.1.3" +version = "1.2.5" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "jsonpatch" }, @@ -1264,9 +1264,9 @@ dependencies = [ { name = "typing-extensions" }, { name = "uuid-utils" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/0b/6f/9e959821df556cf32d57d26aa7b86905a63a91e6e9e744994f8045fe2d70/langchain_core-1.1.3.tar.gz", hash = "sha256:ff0bc5e6e701c4d6fe00c73c4feae5c993a7a8e0b91f0a1d07015277d4634275", size = 801750, upload-time = "2025-12-09T14:55:55.741Z" } +sdist = { url = "https://files.pythonhosted.org/packages/c8/86/bd678d69341ae4178bc8dfa04024d63636e5d580ff03d4502c8bc2262917/langchain_core-1.2.5.tar.gz", hash = "sha256:d674f6df42f07e846859b9d3afe547cad333d6bf9763e92c88eb4f8aaedcd3cc", size = 820445, upload-time = "2025-12-22T23:45:32.041Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/58/41/6db768d4b208a33b4f09d5415e617d489f68167bb5dd27f87c7a49d13caf/langchain_core-1.1.3-py3-none-any.whl", hash = "sha256:e06efbf55bf7c7e4fcffc2f5b0a39a855176df16b02077add063534d7dabb740", size = 475307, upload-time = "2025-12-09T14:55:54.516Z" }, + { url = "https://files.pythonhosted.org/packages/83/bd/9df897cbc98290bf71140104ee5b9777cf5291afb80333aa7da5a497339b/langchain_core-1.2.5-py3-none-any.whl", hash = "sha256:3255944ef4e21b2551facb319bfc426057a40247c0a05de5bd6f2fc021fbfa34", size = 484851, upload-time = "2025-12-22T23:45:30.525Z" }, ] [[package]] @@ -3219,7 +3219,7 @@ wheels = [ [[package]] name = "uipath" -version = "2.2.44" +version = "2.3.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "click" }, @@ -3239,9 +3239,9 @@ dependencies = [ { name = "uipath-core" }, { name = "uipath-runtime" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/1e/79/2b261df31c2265c724b94a389586250075be619c40a47982fe0683de83eb/uipath-2.2.44.tar.gz", hash = "sha256:170accf02e5d8f5c96e2a501d1a8179810d9d37296b67675d39be080de46b252", size = 3431301, upload-time = "2025-12-23T13:38:07.597Z" } +sdist = { url = "https://files.pythonhosted.org/packages/13/b6/10e30406786dce197d1f7235811ff19440793c835097376606c6500b3242/uipath-2.3.0.tar.gz", hash = "sha256:f46f034d5c29dd86240324118e2ebe51c48bd6bf393081e2ed3c6015963a7d2b", size = 3431649, upload-time = "2025-12-26T10:22:00.273Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/85/6a/fbcd2389d0db64c0f998a73347d7ae0da7ea96525ec75f7ad31f7e8108a4/uipath-2.2.44-py3-none-any.whl", hash = "sha256:145cc0c84ccd44bac5f82ff330799556a59cdcc8854be8b2ca75497510770d98", size = 398294, upload-time = "2025-12-23T13:38:05.615Z" }, + { url = "https://files.pythonhosted.org/packages/f6/46/b77126a490a47251ac3deff0a88cbdf8c5ac1c69ebf2c71649e25b0999c1/uipath-2.3.0-py3-none-any.whl", hash = "sha256:3df06dca5ffc6304a3a5c61d128ea3c07e39267d2dd515b68c39f145b209aa26", size = 398558, upload-time = "2025-12-26T10:21:58.154Z" }, ] [[package]] @@ -3260,7 +3260,7 @@ wheels = [ [[package]] name = "uipath-langchain" -version = "0.1.44" +version = "0.2.0" source = { editable = "." } dependencies = [ { name = "aiosqlite" }, @@ -3278,6 +3278,7 @@ dependencies = [ { name = "pydantic-settings" }, { name = "python-dotenv" }, { name = "uipath" }, + { name = "uipath-runtime" }, ] [package.optional-dependencies] @@ -3313,7 +3314,7 @@ requires-dist = [ { name = "jsonschema-pydantic-converter", specifier = ">=0.1.6" }, { name = "langchain", specifier = ">=1.0.0,<2.0.0" }, { name = "langchain-aws", marker = "extra == 'bedrock'", specifier = ">=0.2.35" }, - { name = "langchain-core", specifier = ">=1.0.0,<2.0.0" }, + { name = "langchain-core", specifier = ">=1.2.5,<2.0.0" }, { name = "langchain-google-genai", marker = "extra == 'vertex'", specifier = ">=2.0.0" }, { name = "langchain-mcp-adapters", specifier = "==0.2.1" }, { name = "langchain-openai", specifier = ">=1.0.0,<2.0.0" }, @@ -3323,7 +3324,8 @@ requires-dist = [ { name = "openinference-instrumentation-langchain", specifier = ">=0.1.56" }, { name = "pydantic-settings", specifier = ">=2.6.0" }, { name = "python-dotenv", specifier = ">=1.0.1" }, - { name = "uipath", specifier = ">=2.2.44,<2.3.0" }, + { name = "uipath", specifier = ">=2.3.0,<2.4.0" }, + { name = "uipath-runtime", specifier = ">=0.3.2,<0.4.0" }, ] provides-extras = ["vertex", "bedrock"] @@ -3342,14 +3344,14 @@ dev = [ [[package]] name = "uipath-runtime" -version = "0.2.7" +version = "0.3.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "uipath-core" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/d8/90/03a8f57d64c4b25ebf2e364f2012bf97ef625a7cef9a6e2f68dfdce29188/uipath_runtime-0.2.7.tar.gz", hash = "sha256:2718a98db995a70b92f5eaa84fb315e8edd324ec406be2face978ffeaa062223", size = 95944, upload-time = "2025-12-11T11:29:25.94Z" } +sdist = { url = "https://files.pythonhosted.org/packages/4c/c1/3ebede98a90e5fc761bacbca65960238062a515efcdac78d7e862d5a3e07/uipath_runtime-0.3.2.tar.gz", hash = "sha256:16c07626c656f0db70dd0c4d3b9b8c58b9eda38af4d29309e5bfd11533b77b96", size = 99557, upload-time = "2025-12-26T13:37:31.061Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/d3/1e/b884a6b80c3985391c2c4155498bf577839c6a85f038fcc8d019d95ed94e/uipath_runtime-0.2.7-py3-none-any.whl", hash = "sha256:c2e0176a0aebcd70d9ba8c323f14b587841fc15872176cebcd31ff61bd9e9e0d", size = 36954, upload-time = "2025-12-11T11:29:21.781Z" }, + { url = "https://files.pythonhosted.org/packages/ea/ab/92b79fc1e2422a123ae80cced3c5f46ef9c21674498fad60fd4b284dc2d7/uipath_runtime-0.3.2-py3-none-any.whl", hash = "sha256:e30d250e80789a5f4ca62322d6d8574d0a4a4fd1610ed754a07296b0d96b7452", size = 38319, upload-time = "2025-12-26T13:37:29.432Z" }, ] [[package]]