From adb563e6190698663d2ba17edf3db1ccdb061a5a Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Sun, 28 Sep 2025 05:37:25 -0400 Subject: [PATCH 01/11] nexus_sync_operations --- hello_nexus/README.md | 2 +- hello_nexus/service.py | 3 +- message_passing/introduction/starter.py | 8 +- message_passing/introduction/workflows.py | 10 +++ nexus_sync_operations/README.md | 37 +++++++++ nexus_sync_operations/__init__.py | 0 nexus_sync_operations/caller/__init__.py | 0 nexus_sync_operations/caller/app.py | 39 +++++++++ nexus_sync_operations/caller/workflows.py | 56 +++++++++++++ nexus_sync_operations/endpoint_description.md | 6 ++ nexus_sync_operations/handler/__init__.py | 0 .../handler/service_handler.py | 82 +++++++++++++++++++ nexus_sync_operations/handler/worker.py | 45 ++++++++++ nexus_sync_operations/service.py | 20 +++++ .../nexus_sync_operations_test.py | 57 +++++++++++++ 15 files changed, 359 insertions(+), 6 deletions(-) create mode 100644 nexus_sync_operations/README.md create mode 100644 nexus_sync_operations/__init__.py create mode 100644 nexus_sync_operations/caller/__init__.py create mode 100644 nexus_sync_operations/caller/app.py create mode 100644 nexus_sync_operations/caller/workflows.py create mode 100644 nexus_sync_operations/endpoint_description.md create mode 100644 nexus_sync_operations/handler/__init__.py create mode 100644 nexus_sync_operations/handler/service_handler.py create mode 100644 nexus_sync_operations/handler/worker.py create mode 100644 nexus_sync_operations/service.py create mode 100644 tests/nexus_sync_operations/nexus_sync_operations_test.py diff --git a/hello_nexus/README.md b/hello_nexus/README.md index bf26ce47..e8067ec3 100644 --- a/hello_nexus/README.md +++ b/hello_nexus/README.md @@ -12,7 +12,7 @@ call the operations from a workflow. Start a Temporal server. (See the main samples repo [README](../README.md)). -Run the following: +Run the following to create the caller and handler namespaces, and the Nexus endpoint: ``` temporal operator namespace create --namespace hello-nexus-basic-handler-namespace diff --git a/hello_nexus/service.py b/hello_nexus/service.py index 6528775d..352375ca 100644 --- a/hello_nexus/service.py +++ b/hello_nexus/service.py @@ -9,7 +9,8 @@ type-safe clients, and it is used by Nexus handlers to validate that they implement correctly-named operation handlers with the correct input and output types. -The service defined in this file features two operations: echo and hello. +The service defined in this file exposes two operations: my_sync_operation and +my_workflow_run_operation. """ from dataclasses import dataclass diff --git a/message_passing/introduction/starter.py b/message_passing/introduction/starter.py index 13a48c3a..555342f9 100644 --- a/message_passing/introduction/starter.py +++ b/message_passing/introduction/starter.py @@ -30,8 +30,8 @@ async def main(client: Optional[Client] = None): previous_language = await wf_handle.execute_update( GreetingWorkflow.set_language, Language.CHINESE ) - current_language = await wf_handle.query(GreetingWorkflow.get_language) - print(f"language changed: {previous_language.name} -> {current_language.name}") + assert await wf_handle.query(GreetingWorkflow.get_language) == Language.CHINESE + print(f"language changed: {previous_language.name} -> {Language.CHINESE.name}") # 👉 Start an Update and then wait for it to complete update_handle = await wf_handle.start_update( @@ -40,8 +40,8 @@ async def main(client: Optional[Client] = None): wait_for_stage=WorkflowUpdateStage.ACCEPTED, ) previous_language = await update_handle.result() - current_language = await wf_handle.query(GreetingWorkflow.get_language) - print(f"language changed: {previous_language.name} -> {current_language.name}") + assert await wf_handle.query(GreetingWorkflow.get_language) == Language.ARABIC + print(f"language changed: {previous_language.name} -> {Language.ARABIC.name}") # 👉 Send a Signal await wf_handle.signal(GreetingWorkflow.approve, ApproveInput(name="")) diff --git a/message_passing/introduction/workflows.py b/message_passing/introduction/workflows.py index 97d2b874..12452b36 100644 --- a/message_passing/introduction/workflows.py +++ b/message_passing/introduction/workflows.py @@ -16,6 +16,16 @@ class GetLanguagesInput: include_unsupported: bool +@dataclass +class SetLanguageInput: + language: Language + + +@dataclass +class SetLanguageUsingActivityInput: + language: Language + + @dataclass class ApproveInput: name: str diff --git a/nexus_sync_operations/README.md b/nexus_sync_operations/README.md new file mode 100644 index 00000000..557745fc --- /dev/null +++ b/nexus_sync_operations/README.md @@ -0,0 +1,37 @@ +This sample shows how to create a Nexus service that is backed by a long-running workflow and +exposes operations that use signals, queries, and updates against that workflow. + +### Sample directory structure + +- [service.py](./service.py) - shared Nexus service definition +- [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code +- [handler](./handler) - Nexus operation handlers, together with a workflow used by one of the Nexus operations, and a worker that polls for both workflow, activity, and Nexus tasks. + + +### Instructions + +Start a Temporal server. (See the main samples repo [README](../README.md)). + +Run the following to create the caller and handler namespaces, and the Nexus endpoint: + +``` +temporal operator namespace create --namespace nexus-sync-operations-handler-namespace +temporal operator namespace create --namespace nexus-sync-operations-caller-namespace + +temporal operator nexus endpoint create \ + --name nexus-sync-operations-nexus-endpoint \ + --target-namespace nexus-sync-operations-handler-namespace \ + --target-task-queue nexus-sync-operations-handler-task-queue \ + --description-file nexus_sync_operations/endpoint_description.md +``` + +In one terminal, run the Temporal worker in the handler namespace: +``` +uv run nexus_sync_operations/handler/worker.py +``` + +In another terminal, run the Temporal worker in the caller namespace and start the caller +workflow: +``` +uv run nexus_sync_operations/caller/app.py +``` diff --git a/nexus_sync_operations/__init__.py b/nexus_sync_operations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_sync_operations/caller/__init__.py b/nexus_sync_operations/caller/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_sync_operations/caller/app.py b/nexus_sync_operations/caller/app.py new file mode 100644 index 00000000..d79db968 --- /dev/null +++ b/nexus_sync_operations/caller/app.py @@ -0,0 +1,39 @@ +import asyncio +import uuid +from typing import Optional + +from temporalio.client import Client +from temporalio.worker import Worker + +from nexus_sync_operations.caller.workflows import CallerWorkflow + +NAMESPACE = "nexus-sync-operations-caller-namespace" +TASK_QUEUE = "nexus-sync-operations-caller-task-queue" + + +async def execute_caller_workflow( + client: Optional[Client] = None, +) -> None: + client = client or await Client.connect( + "localhost:7233", + namespace=NAMESPACE, + ) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[CallerWorkflow], + ): + await client.execute_workflow( + CallerWorkflow.run, + id=str(uuid.uuid4()), + task_queue=TASK_QUEUE, + ) + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(execute_caller_workflow()) + except KeyboardInterrupt: + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/nexus_sync_operations/caller/workflows.py b/nexus_sync_operations/caller/workflows.py new file mode 100644 index 00000000..dba58a57 --- /dev/null +++ b/nexus_sync_operations/caller/workflows.py @@ -0,0 +1,56 @@ +from temporalio import workflow + +from message_passing.introduction import Language +from message_passing.introduction.workflows import ( + ApproveInput, + GetLanguagesInput, + SetLanguageInput, + SetLanguageUsingActivityInput, +) + +with workflow.unsafe.imports_passed_through(): + from nexus_sync_operations.service import GreetingService + +NEXUS_ENDPOINT = "nexus-sync-operations-nexus-endpoint" + + +@workflow.defn +class CallerWorkflow: + @workflow.run + async def run(self) -> None: + nexus_client = workflow.create_nexus_client( + service=GreetingService, + endpoint=NEXUS_ENDPOINT, + ) + + # Get supported languages + supported_languages = await nexus_client.execute_operation( + GreetingService.get_languages, GetLanguagesInput(include_unsupported=False) + ) + print(f"supported languages: {supported_languages}") + + # Set language + previous_language = await nexus_client.execute_operation( + GreetingService.set_language, SetLanguageInput(language=Language.CHINESE) + ) + assert ( + await nexus_client.execute_operation(GreetingService.get_language, None) + == Language.CHINESE + ) + print(f"language changed: {previous_language.name} -> {Language.CHINESE.name}") + + # Set language using remote service + previous_language = await nexus_client.execute_operation( + GreetingService.set_language_using_activity, + SetLanguageUsingActivityInput(language=Language.ARABIC), + ) + assert ( + await nexus_client.execute_operation(GreetingService.get_language, None) + == Language.ARABIC + ) + print(f"language changed: {previous_language.name} -> {Language.ARABIC.name}") + + # Approve + await nexus_client.execute_operation( + GreetingService.approve, ApproveInput(name="") + ) diff --git a/nexus_sync_operations/endpoint_description.md b/nexus_sync_operations/endpoint_description.md new file mode 100644 index 00000000..62ba33a7 --- /dev/null +++ b/nexus_sync_operations/endpoint_description.md @@ -0,0 +1,6 @@ +## Service: [GreetingService](https://github.com/temporalio/samples-python/blob/main/nexus_sync_operations/service.py) +- operation: `get_languages` +- operation: `get_language` +- operation: `set_language` +- operation: `set_language_using_activity` +- operation: `approve` diff --git a/nexus_sync_operations/handler/__init__.py b/nexus_sync_operations/handler/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_sync_operations/handler/service_handler.py b/nexus_sync_operations/handler/service_handler.py new file mode 100644 index 00000000..a357dca8 --- /dev/null +++ b/nexus_sync_operations/handler/service_handler.py @@ -0,0 +1,82 @@ +""" +This file demonstrates how to implement a Nexus service that is backed by a long-running +workflow and exposes operations that perform signals, updates, and queries against that +workflow. +""" + +from __future__ import annotations + +import nexusrpc +from temporalio.client import Client, WorkflowHandle +from temporalio.common import WorkflowIDConflictPolicy + +from message_passing.introduction import Language +from message_passing.introduction.workflows import ( + ApproveInput, + GetLanguagesInput, + GreetingWorkflow, + SetLanguageInput, + SetLanguageUsingActivityInput, +) +from nexus_sync_operations.service import GreetingService + + +@nexusrpc.handler.service_handler(service=GreetingService) +class GreetingServiceHandler: + def __init__( + self, + greeting_workflow_handle: WorkflowHandle[GreetingWorkflow, str], + ): + self.greeting_workflow_handle = greeting_workflow_handle + + @classmethod + async def create(cls, client: Client, task_queue: str) -> GreetingServiceHandler: + # Obtain a workflow handle to the long-running workflow that baxks this service, starting + # the workflow if it is not already running. + wf_handle = await client.start_workflow( + GreetingWorkflow.run, + id="nexus-sync-operations-greeting-workflow", + task_queue=task_queue, + id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, + ) + return cls(wf_handle) + + @nexusrpc.handler.sync_operation + async def get_languages( + self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput + ) -> list[Language]: + return await self.greeting_workflow_handle.query( + GreetingWorkflow.get_languages, input + ) + + @nexusrpc.handler.sync_operation + async def get_language( + self, ctx: nexusrpc.handler.StartOperationContext, input: None + ) -> Language: + return await self.greeting_workflow_handle.query(GreetingWorkflow.get_language) + + @nexusrpc.handler.sync_operation + async def set_language( + self, ctx: nexusrpc.handler.StartOperationContext, input: SetLanguageInput + ) -> Language: + return await self.greeting_workflow_handle.execute_update( + GreetingWorkflow.set_language, input.language + ) + + @nexusrpc.handler.sync_operation + async def set_language_using_activity( + self, + ctx: nexusrpc.handler.StartOperationContext, + input: SetLanguageUsingActivityInput, + ) -> Language: + return await self.greeting_workflow_handle.execute_update( + GreetingWorkflow.set_language_using_activity, input.language + ) + + @nexusrpc.handler.sync_operation + async def approve( + self, ctx: nexusrpc.handler.StartOperationContext, input: ApproveInput + ) -> None: + return await self.greeting_workflow_handle.signal( + GreetingWorkflow.approve, input + ) diff --git a/nexus_sync_operations/handler/worker.py b/nexus_sync_operations/handler/worker.py new file mode 100644 index 00000000..e5ca8fe0 --- /dev/null +++ b/nexus_sync_operations/handler/worker.py @@ -0,0 +1,45 @@ +import asyncio +import logging +from typing import Optional + +from temporalio.client import Client +from temporalio.worker import Worker + +from message_passing.introduction.activities import call_greeting_service +from message_passing.introduction.workflows import GreetingWorkflow +from nexus_sync_operations.handler.service_handler import GreetingServiceHandler + +interrupt_event = asyncio.Event() + +NAMESPACE = "nexus-sync-operations-handler-namespace" +TASK_QUEUE = "nexus-sync-operations-handler-task-queue" + + +async def main(client: Optional[Client] = None): + logging.basicConfig(level=logging.INFO) + + client = client or await Client.connect( + "localhost:7233", + namespace=NAMESPACE, + ) + greeting_service_handler = await GreetingServiceHandler.create(client, TASK_QUEUE) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[GreetingWorkflow], + activities=[call_greeting_service], + nexus_service_handlers=[greeting_service_handler], + ): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/nexus_sync_operations/service.py b/nexus_sync_operations/service.py new file mode 100644 index 00000000..f1aa2c14 --- /dev/null +++ b/nexus_sync_operations/service.py @@ -0,0 +1,20 @@ +import nexusrpc + +from message_passing.introduction import Language +from message_passing.introduction.workflows import ( + ApproveInput, + GetLanguagesInput, + SetLanguageInput, + SetLanguageUsingActivityInput, +) + + +@nexusrpc.service +class GreetingService: + get_languages: nexusrpc.Operation[GetLanguagesInput, list[Language]] + get_language: nexusrpc.Operation[None, Language] + set_language: nexusrpc.Operation[SetLanguageInput, Language] + set_language_using_activity: nexusrpc.Operation[ + SetLanguageUsingActivityInput, Language + ] + approve: nexusrpc.Operation[ApproveInput, None] diff --git a/tests/nexus_sync_operations/nexus_sync_operations_test.py b/tests/nexus_sync_operations/nexus_sync_operations_test.py new file mode 100644 index 00000000..2d6c3959 --- /dev/null +++ b/tests/nexus_sync_operations/nexus_sync_operations_test.py @@ -0,0 +1,57 @@ +import asyncio +import sys + +import pytest +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment + +import nexus_sync_operations.caller.app +import nexus_sync_operations.caller.workflows +import nexus_sync_operations.handler.worker +from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint + + +async def test_nexus_sync_operations(client: Client, env: WorkflowEnvironment): + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work under the Java test server") + + if sys.version_info[:2] < (3, 10): + pytest.skip("Sample is written for Python >= 3.10") + + create_response = await create_nexus_endpoint( + name=nexus_sync_operations.caller.workflows.NEXUS_ENDPOINT, + task_queue=nexus_sync_operations.handler.worker.TASK_QUEUE, + client=client, + ) + try: + handler_worker_task = asyncio.create_task( + nexus_sync_operations.handler.worker.main( + client, + ) + ) + # Give worker time to start the long-running workflow + await asyncio.sleep(1) + + # Execute the caller workflow which will test all the Nexus operations + await nexus_sync_operations.caller.app.execute_caller_workflow( + client, + ) + + # Clean up the handler worker + nexus_sync_operations.handler.worker.interrupt_event.set() + await handler_worker_task + nexus_sync_operations.handler.worker.interrupt_event.clear() + + # The test passes if the caller workflow completes successfully + # The caller workflow verifies that: + # - get_languages returns supported languages + # - set_language updates the language and returns the previous one + # - get_language returns the current language + # - set_language_using_activity updates the language using an activity + # - approve sends the signal successfully + finally: + await delete_nexus_endpoint( + id=create_response.endpoint.id, + version=create_response.endpoint.version, + client=client, + ) From 95e36f20334fa66ea384e04af5e709e7ffa61b02 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Sun, 28 Sep 2025 17:12:51 -0400 Subject: [PATCH 02/11] HACK: restart workflow --- .../handler/service_handler.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/nexus_sync_operations/handler/service_handler.py b/nexus_sync_operations/handler/service_handler.py index a357dca8..952f8ff6 100644 --- a/nexus_sync_operations/handler/service_handler.py +++ b/nexus_sync_operations/handler/service_handler.py @@ -7,6 +7,7 @@ from __future__ import annotations import nexusrpc +from temporalio import nexus from temporalio.client import Client, WorkflowHandle from temporalio.common import WorkflowIDConflictPolicy @@ -31,15 +32,20 @@ def __init__( @classmethod async def create(cls, client: Client, task_queue: str) -> GreetingServiceHandler: - # Obtain a workflow handle to the long-running workflow that baxks this service, starting + # Obtain a workflow handle to the long-running workflow that backs this service, starting # the workflow if it is not already running. - wf_handle = await client.start_workflow( + return cls(await cls._get_workflow_handle(client, task_queue)) + + @staticmethod + async def _get_workflow_handle( + client: Client, task_queue: str + ) -> WorkflowHandle[GreetingWorkflow, str]: + return await client.start_workflow( GreetingWorkflow.run, id="nexus-sync-operations-greeting-workflow", task_queue=task_queue, id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, ) - return cls(wf_handle) @nexusrpc.handler.sync_operation async def get_languages( @@ -77,6 +83,7 @@ async def set_language_using_activity( async def approve( self, ctx: nexusrpc.handler.StartOperationContext, input: ApproveInput ) -> None: - return await self.greeting_workflow_handle.signal( - GreetingWorkflow.approve, input + await self.greeting_workflow_handle.signal(GreetingWorkflow.approve, input) + self.greeting_workflow_handle = await self._get_workflow_handle( + nexus.client(), nexus.info().task_queue ) From 9fe1869a2d90842841b4cfd0dfb719ccb590ed66 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Sun, 28 Sep 2025 17:14:13 -0400 Subject: [PATCH 03/11] Don't send approve signal --- nexus_sync_operations/caller/workflows.py | 6 ------ nexus_sync_operations/handler/service_handler.py | 11 ----------- nexus_sync_operations/service.py | 2 -- 3 files changed, 19 deletions(-) diff --git a/nexus_sync_operations/caller/workflows.py b/nexus_sync_operations/caller/workflows.py index dba58a57..34168e77 100644 --- a/nexus_sync_operations/caller/workflows.py +++ b/nexus_sync_operations/caller/workflows.py @@ -2,7 +2,6 @@ from message_passing.introduction import Language from message_passing.introduction.workflows import ( - ApproveInput, GetLanguagesInput, SetLanguageInput, SetLanguageUsingActivityInput, @@ -49,8 +48,3 @@ async def run(self) -> None: == Language.ARABIC ) print(f"language changed: {previous_language.name} -> {Language.ARABIC.name}") - - # Approve - await nexus_client.execute_operation( - GreetingService.approve, ApproveInput(name="") - ) diff --git a/nexus_sync_operations/handler/service_handler.py b/nexus_sync_operations/handler/service_handler.py index 952f8ff6..d375f098 100644 --- a/nexus_sync_operations/handler/service_handler.py +++ b/nexus_sync_operations/handler/service_handler.py @@ -7,13 +7,11 @@ from __future__ import annotations import nexusrpc -from temporalio import nexus from temporalio.client import Client, WorkflowHandle from temporalio.common import WorkflowIDConflictPolicy from message_passing.introduction import Language from message_passing.introduction.workflows import ( - ApproveInput, GetLanguagesInput, GreetingWorkflow, SetLanguageInput, @@ -78,12 +76,3 @@ async def set_language_using_activity( return await self.greeting_workflow_handle.execute_update( GreetingWorkflow.set_language_using_activity, input.language ) - - @nexusrpc.handler.sync_operation - async def approve( - self, ctx: nexusrpc.handler.StartOperationContext, input: ApproveInput - ) -> None: - await self.greeting_workflow_handle.signal(GreetingWorkflow.approve, input) - self.greeting_workflow_handle = await self._get_workflow_handle( - nexus.client(), nexus.info().task_queue - ) diff --git a/nexus_sync_operations/service.py b/nexus_sync_operations/service.py index f1aa2c14..76e42fd8 100644 --- a/nexus_sync_operations/service.py +++ b/nexus_sync_operations/service.py @@ -2,7 +2,6 @@ from message_passing.introduction import Language from message_passing.introduction.workflows import ( - ApproveInput, GetLanguagesInput, SetLanguageInput, SetLanguageUsingActivityInput, @@ -17,4 +16,3 @@ class GreetingService: set_language_using_activity: nexusrpc.Operation[ SetLanguageUsingActivityInput, Language ] - approve: nexusrpc.Operation[ApproveInput, None] From f4c6d32c4029135f92b310b59d89431157703ad4 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 29 Sep 2025 05:34:28 -0400 Subject: [PATCH 04/11] Introduce SetLanguageInput --- message_passing/introduction/starter.py | 5 ++-- message_passing/introduction/workflows.py | 27 ++++++++----------- .../introduction/test_introduction_sample.py | 8 +++--- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/message_passing/introduction/starter.py b/message_passing/introduction/starter.py index 555342f9..b71dd44d 100644 --- a/message_passing/introduction/starter.py +++ b/message_passing/introduction/starter.py @@ -9,6 +9,7 @@ GetLanguagesInput, GreetingWorkflow, Language, + SetLanguageInput, ) @@ -28,7 +29,7 @@ async def main(client: Optional[Client] = None): # 👉 Execute an Update previous_language = await wf_handle.execute_update( - GreetingWorkflow.set_language, Language.CHINESE + GreetingWorkflow.set_language, SetLanguageInput(language=Language.CHINESE) ) assert await wf_handle.query(GreetingWorkflow.get_language) == Language.CHINESE print(f"language changed: {previous_language.name} -> {Language.CHINESE.name}") @@ -36,7 +37,7 @@ async def main(client: Optional[Client] = None): # 👉 Start an Update and then wait for it to complete update_handle = await wf_handle.start_update( GreetingWorkflow.set_language_using_activity, - Language.ARABIC, + SetLanguageInput(language=Language.ARABIC), wait_for_stage=WorkflowUpdateStage.ACCEPTED, ) previous_language = await update_handle.result() diff --git a/message_passing/introduction/workflows.py b/message_passing/introduction/workflows.py index 12452b36..8988d581 100644 --- a/message_passing/introduction/workflows.py +++ b/message_passing/introduction/workflows.py @@ -21,11 +21,6 @@ class SetLanguageInput: language: Language -@dataclass -class SetLanguageUsingActivityInput: - language: Language - - @dataclass class ApproveInput: name: str @@ -84,21 +79,21 @@ def approve(self, input: ApproveInput) -> None: self.approver_name = input.name @workflow.update - def set_language(self, language: Language) -> Language: + def set_language(self, input: SetLanguageInput) -> Language: # 👉 An Update handler can mutate the Workflow state and return a value. - previous_language, self.language = self.language, language + previous_language, self.language = self.language, input.language return previous_language @set_language.validator - def validate_language(self, language: Language) -> None: - if language not in self.greetings: + def validate_language(self, input: SetLanguageInput) -> None: + if input.language not in self.greetings: # 👉 In an Update validator you raise any exception to reject the Update. - raise ValueError(f"{language.name} is not supported") + raise ValueError(f"{input.language.name} is not supported") @workflow.update - async def set_language_using_activity(self, language: Language) -> Language: + async def set_language_using_activity(self, input: SetLanguageInput) -> Language: # 👉 This update handler is async, so it can execute an activity. - if language not in self.greetings: + if input.language not in self.greetings: # 👉 We use a lock so that, if this handler is executed multiple # times, each execution can schedule the activity only when the # previously scheduled activity has completed. This ensures that @@ -106,7 +101,7 @@ async def set_language_using_activity(self, language: Language) -> Language: async with self.lock: greeting = await workflow.execute_activity( call_greeting_service, - language, + input.language, start_to_close_timeout=timedelta(seconds=10), ) # 👉 The requested language might not be supported by the remote @@ -118,10 +113,10 @@ async def set_language_using_activity(self, language: Language) -> Language: # this purpose.) if greeting is None: raise ApplicationError( - f"Greeting service does not support {language.name}" + f"Greeting service does not support {input.language.name}" ) - self.greetings[language] = greeting - previous_language, self.language = self.language, language + self.greetings[input.language] = greeting + previous_language, self.language = self.language, input.language return previous_language @workflow.query diff --git a/tests/message_passing/introduction/test_introduction_sample.py b/tests/message_passing/introduction/test_introduction_sample.py index de981dc2..17c1b14d 100644 --- a/tests/message_passing/introduction/test_introduction_sample.py +++ b/tests/message_passing/introduction/test_introduction_sample.py @@ -10,6 +10,7 @@ GetLanguagesInput, GreetingWorkflow, Language, + SetLanguageInput, call_greeting_service, ) @@ -63,7 +64,7 @@ async def test_set_language(client: Client, env: WorkflowEnvironment): ) assert await wf_handle.query(GreetingWorkflow.get_language) == Language.ENGLISH previous_language = await wf_handle.execute_update( - GreetingWorkflow.set_language, Language.CHINESE + GreetingWorkflow.set_language, SetLanguageInput(language=Language.CHINESE) ) assert previous_language == Language.ENGLISH assert await wf_handle.query(GreetingWorkflow.get_language) == Language.CHINESE @@ -88,7 +89,8 @@ async def test_set_invalid_language(client: Client, env: WorkflowEnvironment): with pytest.raises(WorkflowUpdateFailedError): await wf_handle.execute_update( - GreetingWorkflow.set_language, Language.ARABIC + GreetingWorkflow.set_language, + SetLanguageInput(language=Language.ARABIC), ) @@ -117,7 +119,7 @@ async def test_set_language_that_is_only_available_via_remote_service( assert await wf_handle.query(GreetingWorkflow.get_language) == Language.ENGLISH previous_language = await wf_handle.execute_update( GreetingWorkflow.set_language_using_activity, - Language.ARABIC, + SetLanguageInput(language=Language.ARABIC), ) assert previous_language == Language.ENGLISH assert await wf_handle.query(GreetingWorkflow.get_language) == Language.ARABIC From 5afcfc0462dffc1a15ffa3d507441710a2efd7cd Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 29 Sep 2025 05:34:02 -0400 Subject: [PATCH 05/11] Only call one of the set_language operations --- nexus_sync_operations/caller/workflows.py | 15 ++------------- .../handler/service_handler.py | 13 ++----------- nexus_sync_operations/service.py | 4 ---- .../nexus_sync_operations_test.py | 18 ------------------ 4 files changed, 4 insertions(+), 46 deletions(-) diff --git a/nexus_sync_operations/caller/workflows.py b/nexus_sync_operations/caller/workflows.py index 34168e77..6f4cc408 100644 --- a/nexus_sync_operations/caller/workflows.py +++ b/nexus_sync_operations/caller/workflows.py @@ -4,7 +4,6 @@ from message_passing.introduction.workflows import ( GetLanguagesInput, SetLanguageInput, - SetLanguageUsingActivityInput, ) with workflow.unsafe.imports_passed_through(): @@ -30,18 +29,8 @@ async def run(self) -> None: # Set language previous_language = await nexus_client.execute_operation( - GreetingService.set_language, SetLanguageInput(language=Language.CHINESE) - ) - assert ( - await nexus_client.execute_operation(GreetingService.get_language, None) - == Language.CHINESE - ) - print(f"language changed: {previous_language.name} -> {Language.CHINESE.name}") - - # Set language using remote service - previous_language = await nexus_client.execute_operation( - GreetingService.set_language_using_activity, - SetLanguageUsingActivityInput(language=Language.ARABIC), + GreetingService.set_language, + SetLanguageInput(language=Language.ARABIC), ) assert ( await nexus_client.execute_operation(GreetingService.get_language, None) diff --git a/nexus_sync_operations/handler/service_handler.py b/nexus_sync_operations/handler/service_handler.py index d375f098..aebd97d4 100644 --- a/nexus_sync_operations/handler/service_handler.py +++ b/nexus_sync_operations/handler/service_handler.py @@ -15,7 +15,6 @@ GetLanguagesInput, GreetingWorkflow, SetLanguageInput, - SetLanguageUsingActivityInput, ) from nexus_sync_operations.service import GreetingService @@ -61,18 +60,10 @@ async def get_language( @nexusrpc.handler.sync_operation async def set_language( - self, ctx: nexusrpc.handler.StartOperationContext, input: SetLanguageInput - ) -> Language: - return await self.greeting_workflow_handle.execute_update( - GreetingWorkflow.set_language, input.language - ) - - @nexusrpc.handler.sync_operation - async def set_language_using_activity( self, ctx: nexusrpc.handler.StartOperationContext, - input: SetLanguageUsingActivityInput, + input: SetLanguageInput, ) -> Language: return await self.greeting_workflow_handle.execute_update( - GreetingWorkflow.set_language_using_activity, input.language + GreetingWorkflow.set_language_using_activity, input ) diff --git a/nexus_sync_operations/service.py b/nexus_sync_operations/service.py index 76e42fd8..347ab6f0 100644 --- a/nexus_sync_operations/service.py +++ b/nexus_sync_operations/service.py @@ -4,7 +4,6 @@ from message_passing.introduction.workflows import ( GetLanguagesInput, SetLanguageInput, - SetLanguageUsingActivityInput, ) @@ -13,6 +12,3 @@ class GreetingService: get_languages: nexusrpc.Operation[GetLanguagesInput, list[Language]] get_language: nexusrpc.Operation[None, Language] set_language: nexusrpc.Operation[SetLanguageInput, Language] - set_language_using_activity: nexusrpc.Operation[ - SetLanguageUsingActivityInput, Language - ] diff --git a/tests/nexus_sync_operations/nexus_sync_operations_test.py b/tests/nexus_sync_operations/nexus_sync_operations_test.py index 2d6c3959..323369b9 100644 --- a/tests/nexus_sync_operations/nexus_sync_operations_test.py +++ b/tests/nexus_sync_operations/nexus_sync_operations_test.py @@ -1,5 +1,4 @@ import asyncio -import sys import pytest from temporalio.client import Client @@ -15,9 +14,6 @@ async def test_nexus_sync_operations(client: Client, env: WorkflowEnvironment): if env.supports_time_skipping: pytest.skip("Nexus tests don't work under the Java test server") - if sys.version_info[:2] < (3, 10): - pytest.skip("Sample is written for Python >= 3.10") - create_response = await create_nexus_endpoint( name=nexus_sync_operations.caller.workflows.NEXUS_ENDPOINT, task_queue=nexus_sync_operations.handler.worker.TASK_QUEUE, @@ -29,26 +25,12 @@ async def test_nexus_sync_operations(client: Client, env: WorkflowEnvironment): client, ) ) - # Give worker time to start the long-running workflow - await asyncio.sleep(1) - - # Execute the caller workflow which will test all the Nexus operations await nexus_sync_operations.caller.app.execute_caller_workflow( client, ) - - # Clean up the handler worker nexus_sync_operations.handler.worker.interrupt_event.set() await handler_worker_task nexus_sync_operations.handler.worker.interrupt_event.clear() - - # The test passes if the caller workflow completes successfully - # The caller workflow verifies that: - # - get_languages returns supported languages - # - set_language updates the language and returns the previous one - # - get_language returns the current language - # - set_language_using_activity updates the language using an activity - # - approve sends the signal successfully finally: await delete_nexus_endpoint( id=create_response.endpoint.id, From 5684514e36f5f29872c7389ae8061098b1f2af41 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 29 Sep 2025 08:45:40 -0400 Subject: [PATCH 06/11] Cleanup --- nexus_sync_operations/README.md | 4 +- nexus_sync_operations/caller/workflows.py | 10 ++-- nexus_sync_operations/endpoint_description.md | 2 - .../handler/service_handler.py | 51 ++++++++++++------- nexus_sync_operations/handler/worker.py | 3 +- nexus_sync_operations/service.py | 14 +++-- 6 files changed, 53 insertions(+), 31 deletions(-) diff --git a/nexus_sync_operations/README.md b/nexus_sync_operations/README.md index 557745fc..10e266ec 100644 --- a/nexus_sync_operations/README.md +++ b/nexus_sync_operations/README.md @@ -1,5 +1,7 @@ This sample shows how to create a Nexus service that is backed by a long-running workflow and -exposes operations that use signals, queries, and updates against that workflow. +exposes operations that execute updates and queries against that workflow. The long-running +workflow, and the updates/queries are private implementation detail of the nexus service: the caller +does not know how the operations are implemented. ### Sample directory structure diff --git a/nexus_sync_operations/caller/workflows.py b/nexus_sync_operations/caller/workflows.py index 6f4cc408..efb7064d 100644 --- a/nexus_sync_operations/caller/workflows.py +++ b/nexus_sync_operations/caller/workflows.py @@ -1,10 +1,12 @@ +""" +This is a workflow that calls nexus operations. The caller does not have information about how these +operations are implemented by the nexus service. +""" + from temporalio import workflow from message_passing.introduction import Language -from message_passing.introduction.workflows import ( - GetLanguagesInput, - SetLanguageInput, -) +from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput with workflow.unsafe.imports_passed_through(): from nexus_sync_operations.service import GreetingService diff --git a/nexus_sync_operations/endpoint_description.md b/nexus_sync_operations/endpoint_description.md index 62ba33a7..a33b60cf 100644 --- a/nexus_sync_operations/endpoint_description.md +++ b/nexus_sync_operations/endpoint_description.md @@ -2,5 +2,3 @@ - operation: `get_languages` - operation: `get_language` - operation: `set_language` -- operation: `set_language_using_activity` -- operation: `approve` diff --git a/nexus_sync_operations/handler/service_handler.py b/nexus_sync_operations/handler/service_handler.py index aebd97d4..fa040181 100644 --- a/nexus_sync_operations/handler/service_handler.py +++ b/nexus_sync_operations/handler/service_handler.py @@ -1,12 +1,12 @@ """ -This file demonstrates how to implement a Nexus service that is backed by a long-running -workflow and exposes operations that perform signals, updates, and queries against that -workflow. +This file demonstrates how to implement a Nexus service that is backed by a long-running workflow +and exposes operations that perform updates and queries against that workflow. """ from __future__ import annotations import nexusrpc +from temporalio import nexus from temporalio.client import Client, WorkflowHandle from temporalio.common import WorkflowIDConflictPolicy @@ -21,29 +21,38 @@ @nexusrpc.handler.service_handler(service=GreetingService) class GreetingServiceHandler: - def __init__( - self, - greeting_workflow_handle: WorkflowHandle[GreetingWorkflow, str], - ): - self.greeting_workflow_handle = greeting_workflow_handle + # This nexus service is backed by a long-running "entity" workflow. This means that the workflow + # is always running in the background, allowing the service to be stateful and durable. The + # service interacts with it via messages (updates and queries). All of this is implementation + # detail private to the nexus handler: the nexus caller does not know how the operations are + # implemented or what is providing the backing storage. + LONG_RUNNING_WORKFLOW_ID = "nexus-sync-operations-greeting-workflow" @classmethod - async def create(cls, client: Client, task_queue: str) -> GreetingServiceHandler: - # Obtain a workflow handle to the long-running workflow that backs this service, starting - # the workflow if it is not already running. - return cls(await cls._get_workflow_handle(client, task_queue)) - - @staticmethod - async def _get_workflow_handle( - client: Client, task_queue: str - ) -> WorkflowHandle[GreetingWorkflow, str]: - return await client.start_workflow( + async def start(cls, client: Client, task_queue: str) -> None: + # Start the long-running "entity" workflow, if it is not already running. + await client.start_workflow( GreetingWorkflow.run, - id="nexus-sync-operations-greeting-workflow", + id=cls.LONG_RUNNING_WORKFLOW_ID, task_queue=task_queue, id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, ) + @property + def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]: + # In nexus operation handler code, nexus.client() is always available, returning a client + # connected to the handler namespace (it's the same client instance that your nexus worker + # is using to poll the server for nexus tasks). This client can be used to interact with the + # handler namespace, for example to send signals, queries, or updates. Remember however, + # that a sync_operation handler must return quickly (no more than a few seconds). To do + # long-running work in a nexus operation handler, use + # temporalio.nexus.workflow_run_operation (see the hello_nexus sample). + return nexus.client().get_workflow_handle_for( + GreetingWorkflow.run, self.LONG_RUNNING_WORKFLOW_ID + ) + + # 👉 This is a handler for a nexus operation whose internal implementation involves executing a + # query against a long-running workflow that is private to the nexus service. @nexusrpc.handler.sync_operation async def get_languages( self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput @@ -52,12 +61,16 @@ async def get_languages( GreetingWorkflow.get_languages, input ) + # 👉 This is a handler for a nexus operation whose internal implementation involves executing a + # query against a long-running workflow that is private to the nexus service. @nexusrpc.handler.sync_operation async def get_language( self, ctx: nexusrpc.handler.StartOperationContext, input: None ) -> Language: return await self.greeting_workflow_handle.query(GreetingWorkflow.get_language) + # 👉 This is a handler for a nexus operation whose internal implementation involves executing an + # update against a long-running workflow that is private to the nexus service. @nexusrpc.handler.sync_operation async def set_language( self, diff --git a/nexus_sync_operations/handler/worker.py b/nexus_sync_operations/handler/worker.py index e5ca8fe0..4ff9dde5 100644 --- a/nexus_sync_operations/handler/worker.py +++ b/nexus_sync_operations/handler/worker.py @@ -22,7 +22,8 @@ async def main(client: Optional[Client] = None): "localhost:7233", namespace=NAMESPACE, ) - greeting_service_handler = await GreetingServiceHandler.create(client, TASK_QUEUE) + greeting_service_handler = GreetingServiceHandler() + await greeting_service_handler.start(client, TASK_QUEUE) async with Worker( client, diff --git a/nexus_sync_operations/service.py b/nexus_sync_operations/service.py index 347ab6f0..3436d5f3 100644 --- a/nexus_sync_operations/service.py +++ b/nexus_sync_operations/service.py @@ -1,10 +1,16 @@ +""" +This module defines a Nexus service that exposes three operations. + +It is used by the nexus service handler to validate that the operation handlers implement the +correct input and output types, and by the caller workflow to create a type-safe client. It does not +contain the implementation of the operations; see nexus_sync_operations.handler.service_handler for +that. +""" + import nexusrpc from message_passing.introduction import Language -from message_passing.introduction.workflows import ( - GetLanguagesInput, - SetLanguageInput, -) +from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput @nexusrpc.service From 15c2a1aa9813683305d210ae7bf8b9f59a56ae89 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 29 Sep 2025 09:08:47 -0400 Subject: [PATCH 07/11] Evolve tests --- .../nexus_sync_operations_test.py | 143 ++++++++++++++++-- 1 file changed, 134 insertions(+), 9 deletions(-) diff --git a/tests/nexus_sync_operations/nexus_sync_operations_test.py b/tests/nexus_sync_operations/nexus_sync_operations_test.py index 323369b9..01cd5fb2 100644 --- a/tests/nexus_sync_operations/nexus_sync_operations_test.py +++ b/tests/nexus_sync_operations/nexus_sync_operations_test.py @@ -1,36 +1,161 @@ import asyncio +import uuid import pytest +from temporalio import workflow from temporalio.client import Client from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker -import nexus_sync_operations.caller.app -import nexus_sync_operations.caller.workflows +import nexus_sync_operations.handler.service_handler import nexus_sync_operations.handler.worker +from message_passing.introduction import Language +from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput +from nexus_sync_operations.caller.workflows import CallerWorkflow from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint +with workflow.unsafe.imports_passed_through(): + from nexus_sync_operations.service import GreetingService + + +NEXUS_ENDPOINT = "nexus-sync-operations-nexus-endpoint" + + +@workflow.defn +class TestCallerWorkflow: + """Test workflow that calls Nexus operations and makes assertions.""" + + @workflow.run + async def run(self) -> None: + nexus_client = workflow.create_nexus_client( + service=GreetingService, + endpoint=NEXUS_ENDPOINT, + ) + + supported_languages = await nexus_client.execute_operation( + GreetingService.get_languages, GetLanguagesInput(include_unsupported=False) + ) + assert supported_languages == [Language.CHINESE, Language.ENGLISH] + + initial_language = await nexus_client.execute_operation( + GreetingService.get_language, None + ) + assert initial_language == Language.ENGLISH + + previous_language = await nexus_client.execute_operation( + GreetingService.set_language, + SetLanguageInput(language=Language.CHINESE), + ) + assert previous_language == Language.ENGLISH + + current_language = await nexus_client.execute_operation( + GreetingService.get_language, None + ) + assert current_language == Language.CHINESE + + previous_language = await nexus_client.execute_operation( + GreetingService.set_language, + SetLanguageInput(language=Language.ARABIC), + ) + assert previous_language == Language.CHINESE + + current_language = await nexus_client.execute_operation( + GreetingService.get_language, None + ) + assert current_language == Language.ARABIC + async def test_nexus_sync_operations(client: Client, env: WorkflowEnvironment): if env.supports_time_skipping: pytest.skip("Nexus tests don't work under the Java test server") create_response = await create_nexus_endpoint( - name=nexus_sync_operations.caller.workflows.NEXUS_ENDPOINT, + name=NEXUS_ENDPOINT, task_queue=nexus_sync_operations.handler.worker.TASK_QUEUE, client=client, ) try: + await ( + nexus_sync_operations.handler.service_handler.GreetingServiceHandler.start( + client, nexus_sync_operations.handler.worker.TASK_QUEUE + ) + ) handler_worker_task = asyncio.create_task( - nexus_sync_operations.handler.worker.main( + nexus_sync_operations.handler.worker.main(client) + ) + try: + async with Worker( client, + task_queue="test-caller-task-queue", + workflows=[TestCallerWorkflow], + ): + await client.execute_workflow( + TestCallerWorkflow.run, + id=str(uuid.uuid4()), + task_queue="test-caller-task-queue", + ) + finally: + nexus_sync_operations.handler.worker.interrupt_event.set() + await handler_worker_task + nexus_sync_operations.handler.worker.interrupt_event.clear() + try: + await client.get_workflow_handle( + nexus_sync_operations.handler.service_handler.GreetingServiceHandler.LONG_RUNNING_WORKFLOW_ID + ).terminate() + except Exception: + pass + finally: + await delete_nexus_endpoint( + id=create_response.endpoint.id, + version=create_response.endpoint.version, + client=client, + ) + + +async def test_nexus_sync_operations_caller_workflow( + client: Client, env: WorkflowEnvironment +): + """ + Runs the CallerWorkflow from the sample to ensure it executes without errors. + """ + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work under the Java test server") + + create_response = await create_nexus_endpoint( + name=NEXUS_ENDPOINT, + task_queue=nexus_sync_operations.handler.worker.TASK_QUEUE, + client=client, + ) + try: + await ( + nexus_sync_operations.handler.service_handler.GreetingServiceHandler.start( + client, nexus_sync_operations.handler.worker.TASK_QUEUE ) ) - await nexus_sync_operations.caller.app.execute_caller_workflow( - client, + handler_worker_task = asyncio.create_task( + nexus_sync_operations.handler.worker.main(client) ) - nexus_sync_operations.handler.worker.interrupt_event.set() - await handler_worker_task - nexus_sync_operations.handler.worker.interrupt_event.clear() + try: + async with Worker( + client, + task_queue="test-caller-task-queue", + workflows=[CallerWorkflow], + ): + await client.execute_workflow( + CallerWorkflow.run, + id=str(uuid.uuid4()), + task_queue="test-caller-task-queue", + ) + finally: + nexus_sync_operations.handler.worker.interrupt_event.set() + await handler_worker_task + nexus_sync_operations.handler.worker.interrupt_event.clear() + try: + await client.get_workflow_handle( + nexus_sync_operations.handler.service_handler.GreetingServiceHandler.LONG_RUNNING_WORKFLOW_ID + ).terminate() + except Exception: + pass finally: await delete_nexus_endpoint( id=create_response.endpoint.id, From 4f63fdbb179e71360698d91b1a37dd7ccc16dba5 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 29 Sep 2025 09:11:10 -0400 Subject: [PATCH 08/11] Refactor --- .../nexus_sync_operations_test.py | 51 +++---------------- 1 file changed, 8 insertions(+), 43 deletions(-) diff --git a/tests/nexus_sync_operations/nexus_sync_operations_test.py b/tests/nexus_sync_operations/nexus_sync_operations_test.py index 01cd5fb2..9c8204e4 100644 --- a/tests/nexus_sync_operations/nexus_sync_operations_test.py +++ b/tests/nexus_sync_operations/nexus_sync_operations_test.py @@ -1,5 +1,6 @@ import asyncio import uuid +from typing import Type import pytest from temporalio import workflow @@ -69,47 +70,7 @@ async def test_nexus_sync_operations(client: Client, env: WorkflowEnvironment): if env.supports_time_skipping: pytest.skip("Nexus tests don't work under the Java test server") - create_response = await create_nexus_endpoint( - name=NEXUS_ENDPOINT, - task_queue=nexus_sync_operations.handler.worker.TASK_QUEUE, - client=client, - ) - try: - await ( - nexus_sync_operations.handler.service_handler.GreetingServiceHandler.start( - client, nexus_sync_operations.handler.worker.TASK_QUEUE - ) - ) - handler_worker_task = asyncio.create_task( - nexus_sync_operations.handler.worker.main(client) - ) - try: - async with Worker( - client, - task_queue="test-caller-task-queue", - workflows=[TestCallerWorkflow], - ): - await client.execute_workflow( - TestCallerWorkflow.run, - id=str(uuid.uuid4()), - task_queue="test-caller-task-queue", - ) - finally: - nexus_sync_operations.handler.worker.interrupt_event.set() - await handler_worker_task - nexus_sync_operations.handler.worker.interrupt_event.clear() - try: - await client.get_workflow_handle( - nexus_sync_operations.handler.service_handler.GreetingServiceHandler.LONG_RUNNING_WORKFLOW_ID - ).terminate() - except Exception: - pass - finally: - await delete_nexus_endpoint( - id=create_response.endpoint.id, - version=create_response.endpoint.version, - client=client, - ) + await _run_caller_workflow(client, TestCallerWorkflow) async def test_nexus_sync_operations_caller_workflow( @@ -121,6 +82,10 @@ async def test_nexus_sync_operations_caller_workflow( if env.supports_time_skipping: pytest.skip("Nexus tests don't work under the Java test server") + await _run_caller_workflow(client, CallerWorkflow) + + +async def _run_caller_workflow(client: Client, workflow: Type): create_response = await create_nexus_endpoint( name=NEXUS_ENDPOINT, task_queue=nexus_sync_operations.handler.worker.TASK_QUEUE, @@ -139,10 +104,10 @@ async def test_nexus_sync_operations_caller_workflow( async with Worker( client, task_queue="test-caller-task-queue", - workflows=[CallerWorkflow], + workflows=[workflow], ): await client.execute_workflow( - CallerWorkflow.run, + workflow.run, id=str(uuid.uuid4()), task_queue="test-caller-task-queue", ) From 78b2f5226e888b613c3c4726591c75fbb17022e7 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 29 Sep 2025 09:16:08 -0400 Subject: [PATCH 09/11] Refactor --- .../handler/service_handler.py | 17 ++++++++--------- nexus_sync_operations/handler/worker.py | 8 ++++++-- .../nexus_sync_operations_test.py | 11 ----------- 3 files changed, 14 insertions(+), 22 deletions(-) diff --git a/nexus_sync_operations/handler/service_handler.py b/nexus_sync_operations/handler/service_handler.py index fa040181..1d8cc6b7 100644 --- a/nexus_sync_operations/handler/service_handler.py +++ b/nexus_sync_operations/handler/service_handler.py @@ -21,22 +21,21 @@ @nexusrpc.handler.service_handler(service=GreetingService) class GreetingServiceHandler: - # This nexus service is backed by a long-running "entity" workflow. This means that the workflow - # is always running in the background, allowing the service to be stateful and durable. The - # service interacts with it via messages (updates and queries). All of this is implementation - # detail private to the nexus handler: the nexus caller does not know how the operations are - # implemented or what is providing the backing storage. - LONG_RUNNING_WORKFLOW_ID = "nexus-sync-operations-greeting-workflow" + def __init__(self, workflow_id: str): + self.workflow_id = workflow_id @classmethod - async def start(cls, client: Client, task_queue: str) -> None: + async def create( + cls, workflow_id: str, client: Client, task_queue: str + ) -> GreetingServiceHandler: # Start the long-running "entity" workflow, if it is not already running. await client.start_workflow( GreetingWorkflow.run, - id=cls.LONG_RUNNING_WORKFLOW_ID, + id=workflow_id, task_queue=task_queue, id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, ) + return cls(workflow_id) @property def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]: @@ -48,7 +47,7 @@ def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]: # long-running work in a nexus operation handler, use # temporalio.nexus.workflow_run_operation (see the hello_nexus sample). return nexus.client().get_workflow_handle_for( - GreetingWorkflow.run, self.LONG_RUNNING_WORKFLOW_ID + GreetingWorkflow.run, self.workflow_id ) # 👉 This is a handler for a nexus operation whose internal implementation involves executing a diff --git a/nexus_sync_operations/handler/worker.py b/nexus_sync_operations/handler/worker.py index 4ff9dde5..5545adc0 100644 --- a/nexus_sync_operations/handler/worker.py +++ b/nexus_sync_operations/handler/worker.py @@ -22,8 +22,12 @@ async def main(client: Optional[Client] = None): "localhost:7233", namespace=NAMESPACE, ) - greeting_service_handler = GreetingServiceHandler() - await greeting_service_handler.start(client, TASK_QUEUE) + + # Create the nexus service handler instance, starting the long-running entity workflow that + # backs the Nexus service + greeting_service_handler = await GreetingServiceHandler.create( + "nexus-sync-operations-greeting-workflow", client, TASK_QUEUE + ) async with Worker( client, diff --git a/tests/nexus_sync_operations/nexus_sync_operations_test.py b/tests/nexus_sync_operations/nexus_sync_operations_test.py index 9c8204e4..d74168cb 100644 --- a/tests/nexus_sync_operations/nexus_sync_operations_test.py +++ b/tests/nexus_sync_operations/nexus_sync_operations_test.py @@ -92,11 +92,6 @@ async def _run_caller_workflow(client: Client, workflow: Type): client=client, ) try: - await ( - nexus_sync_operations.handler.service_handler.GreetingServiceHandler.start( - client, nexus_sync_operations.handler.worker.TASK_QUEUE - ) - ) handler_worker_task = asyncio.create_task( nexus_sync_operations.handler.worker.main(client) ) @@ -115,12 +110,6 @@ async def _run_caller_workflow(client: Client, workflow: Type): nexus_sync_operations.handler.worker.interrupt_event.set() await handler_worker_task nexus_sync_operations.handler.worker.interrupt_event.clear() - try: - await client.get_workflow_handle( - nexus_sync_operations.handler.service_handler.GreetingServiceHandler.LONG_RUNNING_WORKFLOW_ID - ).terminate() - except Exception: - pass finally: await delete_nexus_endpoint( id=create_response.endpoint.id, From 20722ef76103ad4ee00ee167551fc9b95b14bae0 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Tue, 30 Sep 2025 09:01:34 -0400 Subject: [PATCH 10/11] Don't print in workflow --- nexus_sync_operations/caller/app.py | 4 +++- nexus_sync_operations/caller/workflows.py | 11 ++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/nexus_sync_operations/caller/app.py b/nexus_sync_operations/caller/app.py index d79db968..4966415c 100644 --- a/nexus_sync_operations/caller/app.py +++ b/nexus_sync_operations/caller/app.py @@ -24,11 +24,13 @@ async def execute_caller_workflow( task_queue=TASK_QUEUE, workflows=[CallerWorkflow], ): - await client.execute_workflow( + log = await client.execute_workflow( CallerWorkflow.run, id=str(uuid.uuid4()), task_queue=TASK_QUEUE, ) + for line in log: + print(line) if __name__ == "__main__": diff --git a/nexus_sync_operations/caller/workflows.py b/nexus_sync_operations/caller/workflows.py index efb7064d..a358d764 100644 --- a/nexus_sync_operations/caller/workflows.py +++ b/nexus_sync_operations/caller/workflows.py @@ -17,7 +17,8 @@ @workflow.defn class CallerWorkflow: @workflow.run - async def run(self) -> None: + async def run(self) -> list[str]: + log = [] nexus_client = workflow.create_nexus_client( service=GreetingService, endpoint=NEXUS_ENDPOINT, @@ -27,7 +28,7 @@ async def run(self) -> None: supported_languages = await nexus_client.execute_operation( GreetingService.get_languages, GetLanguagesInput(include_unsupported=False) ) - print(f"supported languages: {supported_languages}") + log.append(f"supported languages: {supported_languages}") # Set language previous_language = await nexus_client.execute_operation( @@ -38,4 +39,8 @@ async def run(self) -> None: await nexus_client.execute_operation(GreetingService.get_language, None) == Language.ARABIC ) - print(f"language changed: {previous_language.name} -> {Language.ARABIC.name}") + log.append( + f"language changed: {previous_language.name} -> {Language.ARABIC.name}" + ) + + return log From e67474dc52d128c679a8985ad69e757b7ddbc2b6 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Tue, 30 Sep 2025 09:05:46 -0400 Subject: [PATCH 11/11] Add note regarding necessity for update to be fast --- nexus_sync_operations/handler/service_handler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nexus_sync_operations/handler/service_handler.py b/nexus_sync_operations/handler/service_handler.py index 1d8cc6b7..626948f0 100644 --- a/nexus_sync_operations/handler/service_handler.py +++ b/nexus_sync_operations/handler/service_handler.py @@ -69,7 +69,9 @@ async def get_language( return await self.greeting_workflow_handle.query(GreetingWorkflow.get_language) # 👉 This is a handler for a nexus operation whose internal implementation involves executing an - # update against a long-running workflow that is private to the nexus service. + # update against a long-running workflow that is private to the nexus service. Although updates + # can run for an arbitrarily long time, when exposing an update via a nexus sync operation the + # update should execute quickly (sync operations must complete in under 10s). @nexusrpc.handler.sync_operation async def set_language( self,