diff --git a/pyproject.toml b/pyproject.toml index ded68e5..7c6a8f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-runtime" -version = "0.2.9" +version = "0.3.0" description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/src/uipath/runtime/__init__.py b/src/uipath/runtime/__init__.py index 4778bf2..895f9a8 100644 --- a/src/uipath/runtime/__init__.py +++ b/src/uipath/runtime/__init__.py @@ -11,7 +11,7 @@ from uipath.runtime.chat.runtime import UiPathChatRuntime from uipath.runtime.context import UiPathRuntimeContext from uipath.runtime.debug.breakpoint import UiPathBreakpointResult -from uipath.runtime.debug.bridge import UiPathDebugBridgeProtocol +from uipath.runtime.debug.bridge import UiPathDebugProtocol from uipath.runtime.debug.exception import UiPathDebugQuitError from uipath.runtime.debug.runtime import ( UiPathDebugRuntime, @@ -63,7 +63,7 @@ "UiPathResumeTriggerType", "UiPathResumableRuntime", "UiPathDebugQuitError", - "UiPathDebugBridgeProtocol", + "UiPathDebugProtocol", "UiPathDebugRuntime", "UiPathBreakpointResult", "UiPathStreamNotSupportedError", diff --git a/src/uipath/runtime/chat/protocol.py b/src/uipath/runtime/chat/protocol.py index fcdb60c..cab41e4 100644 --- a/src/uipath/runtime/chat/protocol.py +++ b/src/uipath/runtime/chat/protocol.py @@ -1,8 +1,12 @@ """Abstract conversation bridge interface.""" -from typing import Protocol +from typing import Any, Protocol -from uipath.core.chat import UiPathConversationMessageEvent +from uipath.core.chat import ( + UiPathConversationMessageEvent, +) + +from uipath.runtime.result import UiPathRuntimeResult class UiPathChatProtocol(Protocol): @@ -28,3 +32,22 @@ async def emit_message_event( message_event: UiPathConversationMessageEvent to wrap and send """ ... + + async def emit_interrupt_event( + self, + interrupt_event: UiPathRuntimeResult, + ) -> None: + """Wrap and send an interrupt event. + + Args: + interrupt_event: UiPathConversationInterruptEvent to wrap and send + """ + ... + + async def emit_exchange_end_event(self) -> None: + """Send an exchange end event.""" + ... + + async def wait_for_resume(self) -> dict[str, Any]: + """Wait for the interrupt_end event to be received.""" + ... diff --git a/src/uipath/runtime/chat/runtime.py b/src/uipath/runtime/chat/runtime.py index 0436555..2e88c96 100644 --- a/src/uipath/runtime/chat/runtime.py +++ b/src/uipath/runtime/chat/runtime.py @@ -17,6 +17,7 @@ UiPathRuntimeResult, UiPathRuntimeStatus, ) +from uipath.runtime.resumable.trigger import UiPathResumeTriggerType from uipath.runtime.schema import UiPathRuntimeSchema logger = logging.getLogger(__name__) @@ -65,12 +66,44 @@ async def stream( """Stream execution events with chat support.""" await self.chat_bridge.connect() - async for event in self.delegate.stream(input, options=options): - if isinstance(event, UiPathRuntimeMessageEvent): - if event.payload: - await self.chat_bridge.emit_message_event(event.payload) + execution_completed = False + current_input = input + current_options = UiPathStreamOptions( + resume=options.resume if options else False, + breakpoints=options.breakpoints if options else None, + ) + + while not execution_completed: + async for event in self.delegate.stream( + current_input, options=current_options + ): + if isinstance(event, UiPathRuntimeMessageEvent): + if event.payload: + await self.chat_bridge.emit_message_event(event.payload) + + if isinstance(event, UiPathRuntimeResult): + runtime_result = event + + if ( + runtime_result.status == UiPathRuntimeStatus.SUSPENDED + and runtime_result.trigger + and runtime_result.trigger.trigger_type + == UiPathResumeTriggerType.API + ): + await self.chat_bridge.emit_interrupt_event(runtime_result) + resume_data = await self.chat_bridge.wait_for_resume() + + # Continue with resumed execution + current_input = resume_data + current_options.resume = True + break + else: + yield event + execution_completed = True + else: + yield event - yield event + await self.chat_bridge.emit_exchange_end_event() async def get_schema(self) -> UiPathRuntimeSchema: """Get schema from the delegate runtime.""" diff --git a/src/uipath/runtime/debug/__init__.py b/src/uipath/runtime/debug/__init__.py index 899e0ad..0a37956 100644 --- a/src/uipath/runtime/debug/__init__.py +++ b/src/uipath/runtime/debug/__init__.py @@ -1,7 +1,7 @@ """Initialization module for the debug package.""" from uipath.runtime.debug.breakpoint import UiPathBreakpointResult -from uipath.runtime.debug.bridge import UiPathDebugBridgeProtocol +from uipath.runtime.debug.bridge import UiPathDebugProtocol from uipath.runtime.debug.exception import ( UiPathDebugQuitError, ) @@ -9,7 +9,7 @@ __all__ = [ "UiPathDebugQuitError", - "UiPathDebugBridgeProtocol", + "UiPathDebugProtocol", "UiPathDebugRuntime", "UiPathBreakpointResult", ] diff --git a/src/uipath/runtime/debug/bridge.py b/src/uipath/runtime/debug/bridge.py index dc4a593..0c70556 100644 --- a/src/uipath/runtime/debug/bridge.py +++ b/src/uipath/runtime/debug/bridge.py @@ -9,7 +9,7 @@ ) -class UiPathDebugBridgeProtocol(Protocol): +class UiPathDebugProtocol(Protocol): """Abstract interface for debug communication. Implementations: SignalR, Console, WebSocket, etc. diff --git a/src/uipath/runtime/debug/runtime.py b/src/uipath/runtime/debug/runtime.py index 2eed0aa..b621c6d 100644 --- a/src/uipath/runtime/debug/runtime.py +++ b/src/uipath/runtime/debug/runtime.py @@ -14,7 +14,7 @@ ) from uipath.runtime.debug import ( UiPathBreakpointResult, - UiPathDebugBridgeProtocol, + UiPathDebugProtocol, UiPathDebugQuitError, ) from uipath.runtime.events import ( @@ -42,7 +42,7 @@ class UiPathDebugRuntime: def __init__( self, delegate: UiPathRuntimeProtocol, - debug_bridge: UiPathDebugBridgeProtocol, + debug_bridge: UiPathDebugProtocol, trigger_poll_interval: float = 5.0, ): """Initialize the UiPathDebugRuntime. @@ -54,7 +54,7 @@ def __init__( """ super().__init__() self.delegate = delegate - self.debug_bridge: UiPathDebugBridgeProtocol = debug_bridge + self.debug_bridge: UiPathDebugProtocol = debug_bridge if trigger_poll_interval < 0: raise ValueError("trigger_poll_interval must be >= 0") self.trigger_poll_interval = trigger_poll_interval diff --git a/src/uipath/runtime/result.py b/src/uipath/runtime/result.py index 9264e1a..57159c8 100644 --- a/src/uipath/runtime/result.py +++ b/src/uipath/runtime/result.py @@ -24,6 +24,7 @@ class UiPathRuntimeResult(UiPathRuntimeEvent): output: dict[str, Any] | BaseModel | str | None = None status: UiPathRuntimeStatus = UiPathRuntimeStatus.SUCCESSFUL trigger: UiPathResumeTrigger | None = None + triggers: list[UiPathResumeTrigger] | None = None error: UiPathErrorContract | None = None event_type: UiPathRuntimeEventType = Field( @@ -42,7 +43,7 @@ def to_dict(self) -> dict[str, Any]: else: output_data = self.output - result = { + result: dict[str, Any] = { "output": output_data, "status": self.status, } @@ -50,6 +51,12 @@ def to_dict(self) -> dict[str, Any]: if self.trigger: result["resume"] = self.trigger.model_dump(by_alias=True) + if self.triggers: + result["resumeTriggers"] = [ + resume_trigger.model_dump(by_alias=True) + for resume_trigger in self.triggers + ] + if self.error: result["error"] = self.error.model_dump() diff --git a/src/uipath/runtime/resumable/protocols.py b/src/uipath/runtime/resumable/protocols.py index b4c69b6..4b1ecbb 100644 --- a/src/uipath/runtime/resumable/protocols.py +++ b/src/uipath/runtime/resumable/protocols.py @@ -8,7 +8,7 @@ class UiPathResumableStorageProtocol(Protocol): """Protocol for storing and retrieving resume triggers.""" - async def save_trigger(self, trigger: UiPathResumeTrigger) -> None: + async def save_trigger(self, runtime_id: str, trigger: UiPathResumeTrigger) -> None: """Save a resume trigger to storage. Args: @@ -19,7 +19,7 @@ async def save_trigger(self, trigger: UiPathResumeTrigger) -> None: """ ... - async def get_latest_trigger(self) -> UiPathResumeTrigger | None: + async def get_latest_trigger(self, runtime_id: str) -> UiPathResumeTrigger | None: """Retrieve the most recent resume trigger from storage. Returns: @@ -30,6 +30,38 @@ async def get_latest_trigger(self) -> UiPathResumeTrigger | None: """ ... + async def set_value( + self, runtime_id: str, namespace: str, key: str, value: Any + ) -> None: + """Store values for a specific runtime. + + Args: + runtime_id: The runtime ID + namespace: The namespace of the persisted value + key: The key associated with the persisted value + value: The value to persist + + Raises: + Exception: If storage operation fails + """ + ... + + async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any: + """Retrieve values for a specific runtime from storage. + + Args: + runtime_id: The runtime ID + namespace: The namespace of the persisted value + key: The key associated with the persisted value + + Returns: + The value matching the method's parameters, or None if it does not exist + + Raises: + Exception: If retrieval operation fails + """ + ... + class UiPathResumeTriggerCreatorProtocol(Protocol): """Protocol for creating resume triggers from suspend values.""" diff --git a/src/uipath/runtime/resumable/runtime.py b/src/uipath/runtime/resumable/runtime.py index b970ec7..44bb9d4 100644 --- a/src/uipath/runtime/resumable/runtime.py +++ b/src/uipath/runtime/resumable/runtime.py @@ -8,7 +8,7 @@ UiPathRuntimeProtocol, UiPathStreamOptions, ) -from uipath.runtime.debug import UiPathBreakpointResult +from uipath.runtime.debug.breakpoint import UiPathBreakpointResult from uipath.runtime.events import UiPathRuntimeEvent from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus from uipath.runtime.resumable.protocols import ( @@ -35,6 +35,7 @@ def __init__( delegate: UiPathRuntimeProtocol, storage: UiPathResumableStorageProtocol, trigger_manager: UiPathResumeTriggerProtocol, + runtime_id: str, ): """Initialize the resumable runtime wrapper. @@ -42,10 +43,12 @@ def __init__( delegate: The underlying runtime to wrap storage: Storage for persisting/retrieving resume triggers trigger_manager: Manager for creating and reading resume triggers + runtime_id: Id used for runtime orchestration """ self.delegate = delegate self.storage = storage self.trigger_manager = trigger_manager + self.runtime_id = runtime_id async def execute( self, @@ -115,7 +118,7 @@ async def _restore_resume_input( return input # Otherwise, fetch from storage - trigger = await self.storage.get_latest_trigger() + trigger = await self.storage.get_latest_trigger(self.runtime_id) if not trigger: return None @@ -141,7 +144,7 @@ async def _handle_suspension( # Check if trigger already exists in result if result.trigger: - await self.storage.save_trigger(result.trigger) + await self.storage.save_trigger(self.runtime_id, result.trigger) return result suspended_result = UiPathRuntimeResult( @@ -154,7 +157,7 @@ async def _handle_suspension( result.output ) - await self.storage.save_trigger(suspended_result.trigger) + await self.storage.save_trigger(self.runtime_id, suspended_result.trigger) return suspended_result diff --git a/tests/test_chat_runtime.py b/tests/test_chat_runtime.py index 76cd290..2101c6a 100644 --- a/tests/test_chat_runtime.py +++ b/tests/test_chat_runtime.py @@ -13,6 +13,8 @@ from uipath.runtime import ( UiPathExecuteOptions, + UiPathResumeTrigger, + UiPathResumeTriggerType, UiPathRuntimeResult, UiPathRuntimeStatus, UiPathStreamOptions, @@ -32,6 +34,8 @@ def make_chat_bridge_mock() -> UiPathChatProtocol: bridge_mock.connect = AsyncMock() bridge_mock.disconnect = AsyncMock() bridge_mock.emit_message_event = AsyncMock() + bridge_mock.emit_interrupt_event = AsyncMock() + bridge_mock.wait_for_resume = AsyncMock() return cast(UiPathChatProtocol, bridge_mock) @@ -94,6 +98,79 @@ async def get_schema(self) -> UiPathRuntimeSchema: raise NotImplementedError() +class SuspendingMockRuntime: + """Mock runtime that can suspend with API triggers.""" + + def __init__( + self, + suspend_at_message: int | None = None, + ) -> None: + self.suspend_at_message = suspend_at_message + + async def dispose(self) -> None: + pass + + async def execute( + self, + input: dict[str, Any] | None = None, + options: UiPathExecuteOptions | None = None, + ) -> UiPathRuntimeResult: + """Fallback execute path.""" + return UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUCCESSFUL, + output={"mode": "execute"}, + ) + + async def stream( + self, + input: dict[str, Any] | None = None, + options: UiPathStreamOptions | None = None, + ) -> AsyncGenerator[UiPathRuntimeEvent, None]: + """Stream events with potential API trigger suspension.""" + is_resume = options and options.resume + + if not is_resume: + # Initial execution - yield message and then suspend + message_event = UiPathConversationMessageEvent( + message_id="msg-0", + start=UiPathConversationMessageStartEvent( + role="assistant", + timestamp="2025-01-01T00:00:00.000Z", + ), + ) + yield UiPathRuntimeMessageEvent(payload=message_event) + + if self.suspend_at_message is not None: + # Suspend with API trigger + yield UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUSPENDED, + trigger=UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.API, + payload={"action": "confirm_tool_call"}, + ), + ) + return + else: + # Resumed execution - yield another message and complete + message_event = UiPathConversationMessageEvent( + message_id="msg-1", + start=UiPathConversationMessageStartEvent( + role="assistant", + timestamp="2025-01-01T00:00:01.000Z", + ), + ) + yield UiPathRuntimeMessageEvent(payload=message_event) + + # Final successful result + yield UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUCCESSFUL, + output={"resumed": is_resume, "input": input}, + ) + + async def get_schema(self) -> UiPathRuntimeSchema: + raise NotImplementedError() + + @pytest.mark.asyncio async def test_chat_runtime_streams_and_emits_messages(): """UiPathChatRuntime should stream events and emit message events to bridge.""" @@ -221,3 +298,73 @@ async def test_chat_runtime_dispose_suppresses_disconnect_errors(): await chat_runtime.dispose() cast(AsyncMock, bridge.disconnect).assert_awaited_once() + + +@pytest.mark.asyncio +async def test_chat_runtime_handles_api_trigger_suspension(): + """UiPathChatRuntime should intercept suspensions and resume execution.""" + + runtime_impl = SuspendingMockRuntime(suspend_at_message=0) + bridge = make_chat_bridge_mock() + + cast(AsyncMock, bridge.wait_for_resume).return_value = {"approved": True} + + chat_runtime = UiPathChatRuntime( + delegate=runtime_impl, + chat_bridge=bridge, + ) + + result = await chat_runtime.execute({}) + + await chat_runtime.dispose() + + # Result should be SUCCESSFUL + assert isinstance(result, UiPathRuntimeResult) + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert result.output == {"resumed": True, "input": {"approved": True}} + + cast(AsyncMock, bridge.connect).assert_awaited_once() + cast(AsyncMock, bridge.disconnect).assert_awaited_once() + + cast(AsyncMock, bridge.emit_interrupt_event).assert_awaited_once() + cast(AsyncMock, bridge.wait_for_resume).assert_awaited_once() + + # Message events emitted (one before suspend, one after resume) + assert cast(AsyncMock, bridge.emit_message_event).await_count == 2 + + +@pytest.mark.asyncio +async def test_chat_runtime_yields_events_during_suspension_flow(): + """UiPathChatRuntime.stream() should not yield SUSPENDED result, only final result.""" + + runtime_impl = SuspendingMockRuntime(suspend_at_message=0) + bridge = make_chat_bridge_mock() + + # wait_for_resume returns approval data + cast(AsyncMock, bridge.wait_for_resume).return_value = {"approved": True} + + chat_runtime = UiPathChatRuntime( + delegate=runtime_impl, + chat_bridge=bridge, + ) + + events = [] + async for event in chat_runtime.stream({}): + events.append(event) + + await chat_runtime.dispose() + + # Should have 2 message events + 1 final SUCCESSFUL result + # SUSPENDED result should NOT be yielded + assert len(events) == 3 + assert isinstance(events[0], UiPathRuntimeMessageEvent) + assert events[0].payload.message_id == "msg-0" + assert isinstance(events[1], UiPathRuntimeMessageEvent) + assert events[1].payload.message_id == "msg-1" + assert isinstance(events[2], UiPathRuntimeResult) + assert events[2].status == UiPathRuntimeStatus.SUCCESSFUL + + # Verify no SUSPENDED result was yielded + for event in events: + if isinstance(event, UiPathRuntimeResult): + assert event.status != UiPathRuntimeStatus.SUSPENDED diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 89a3133..739ccbd 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -17,7 +17,7 @@ UiPathStreamOptions, ) from uipath.runtime.debug import ( - UiPathDebugBridgeProtocol, + UiPathDebugProtocol, UiPathDebugQuitError, UiPathDebugRuntime, ) @@ -25,13 +25,13 @@ from uipath.runtime.schema import UiPathRuntimeSchema -def make_debug_bridge_mock() -> UiPathDebugBridgeProtocol: +def make_debug_bridge_mock() -> UiPathDebugProtocol: """Create a debug bridge mock with all methods that UiPathDebugRuntime uses. We use `spec=UiPathDebugBridge` so invalid attributes raise at runtime, but still operate as a unittest.mock.Mock with AsyncMock methods. """ - bridge_mock: Mock = Mock(spec=UiPathDebugBridgeProtocol) + bridge_mock: Mock = Mock(spec=UiPathDebugProtocol) bridge_mock.connect = AsyncMock() bridge_mock.disconnect = AsyncMock() @@ -44,7 +44,7 @@ def make_debug_bridge_mock() -> UiPathDebugBridgeProtocol: bridge_mock.get_breakpoints = Mock(return_value=["node-1"]) - return cast(UiPathDebugBridgeProtocol, bridge_mock) + return cast(UiPathDebugProtocol, bridge_mock) class StreamingMockRuntime: diff --git a/uv.lock b/uv.lock index db55239..55f2355 100644 --- a/uv.lock +++ b/uv.lock @@ -1005,7 +1005,7 @@ wheels = [ [[package]] name = "uipath-runtime" -version = "0.2.9" +version = "0.3.0" source = { editable = "." } dependencies = [ { name = "uipath-core" },