diff --git a/hello_nexus/README.md b/hello_nexus/README.md index bf26ce47..e8067ec3 100644 --- a/hello_nexus/README.md +++ b/hello_nexus/README.md @@ -12,7 +12,7 @@ call the operations from a workflow. Start a Temporal server. (See the main samples repo [README](../README.md)). -Run the following: +Run the following to create the caller and handler namespaces, and the Nexus endpoint: ``` temporal operator namespace create --namespace hello-nexus-basic-handler-namespace diff --git a/hello_nexus/service.py b/hello_nexus/service.py index 6528775d..352375ca 100644 --- a/hello_nexus/service.py +++ b/hello_nexus/service.py @@ -9,7 +9,8 @@ type-safe clients, and it is used by Nexus handlers to validate that they implement correctly-named operation handlers with the correct input and output types. -The service defined in this file features two operations: echo and hello. +The service defined in this file exposes two operations: my_sync_operation and +my_workflow_run_operation. """ from dataclasses import dataclass diff --git a/message_passing/introduction/starter.py b/message_passing/introduction/starter.py index 13a48c3a..b71dd44d 100644 --- a/message_passing/introduction/starter.py +++ b/message_passing/introduction/starter.py @@ -9,6 +9,7 @@ GetLanguagesInput, GreetingWorkflow, Language, + SetLanguageInput, ) @@ -28,20 +29,20 @@ async def main(client: Optional[Client] = None): # 👉 Execute an Update previous_language = await wf_handle.execute_update( - GreetingWorkflow.set_language, Language.CHINESE + GreetingWorkflow.set_language, SetLanguageInput(language=Language.CHINESE) ) - current_language = await wf_handle.query(GreetingWorkflow.get_language) - print(f"language changed: {previous_language.name} -> {current_language.name}") + assert await wf_handle.query(GreetingWorkflow.get_language) == Language.CHINESE + print(f"language changed: {previous_language.name} -> {Language.CHINESE.name}") # 👉 Start an Update and then wait for it to complete update_handle = await wf_handle.start_update( GreetingWorkflow.set_language_using_activity, - Language.ARABIC, + SetLanguageInput(language=Language.ARABIC), wait_for_stage=WorkflowUpdateStage.ACCEPTED, ) previous_language = await update_handle.result() - current_language = await wf_handle.query(GreetingWorkflow.get_language) - print(f"language changed: {previous_language.name} -> {current_language.name}") + assert await wf_handle.query(GreetingWorkflow.get_language) == Language.ARABIC + print(f"language changed: {previous_language.name} -> {Language.ARABIC.name}") # 👉 Send a Signal await wf_handle.signal(GreetingWorkflow.approve, ApproveInput(name="")) diff --git a/message_passing/introduction/workflows.py b/message_passing/introduction/workflows.py index 97d2b874..8988d581 100644 --- a/message_passing/introduction/workflows.py +++ b/message_passing/introduction/workflows.py @@ -16,6 +16,11 @@ class GetLanguagesInput: include_unsupported: bool +@dataclass +class SetLanguageInput: + language: Language + + @dataclass class ApproveInput: name: str @@ -74,21 +79,21 @@ def approve(self, input: ApproveInput) -> None: self.approver_name = input.name @workflow.update - def set_language(self, language: Language) -> Language: + def set_language(self, input: SetLanguageInput) -> Language: # 👉 An Update handler can mutate the Workflow state and return a value. - previous_language, self.language = self.language, language + previous_language, self.language = self.language, input.language return previous_language @set_language.validator - def validate_language(self, language: Language) -> None: - if language not in self.greetings: + def validate_language(self, input: SetLanguageInput) -> None: + if input.language not in self.greetings: # 👉 In an Update validator you raise any exception to reject the Update. - raise ValueError(f"{language.name} is not supported") + raise ValueError(f"{input.language.name} is not supported") @workflow.update - async def set_language_using_activity(self, language: Language) -> Language: + async def set_language_using_activity(self, input: SetLanguageInput) -> Language: # 👉 This update handler is async, so it can execute an activity. - if language not in self.greetings: + if input.language not in self.greetings: # 👉 We use a lock so that, if this handler is executed multiple # times, each execution can schedule the activity only when the # previously scheduled activity has completed. This ensures that @@ -96,7 +101,7 @@ async def set_language_using_activity(self, language: Language) -> Language: async with self.lock: greeting = await workflow.execute_activity( call_greeting_service, - language, + input.language, start_to_close_timeout=timedelta(seconds=10), ) # 👉 The requested language might not be supported by the remote @@ -108,10 +113,10 @@ async def set_language_using_activity(self, language: Language) -> Language: # this purpose.) if greeting is None: raise ApplicationError( - f"Greeting service does not support {language.name}" + f"Greeting service does not support {input.language.name}" ) - self.greetings[language] = greeting - previous_language, self.language = self.language, language + self.greetings[input.language] = greeting + previous_language, self.language = self.language, input.language return previous_language @workflow.query diff --git a/nexus_sync_operations/README.md b/nexus_sync_operations/README.md new file mode 100644 index 00000000..10e266ec --- /dev/null +++ b/nexus_sync_operations/README.md @@ -0,0 +1,39 @@ +This sample shows how to create a Nexus service that is backed by a long-running workflow and +exposes operations that execute updates and queries against that workflow. The long-running +workflow, and the updates/queries are private implementation detail of the nexus service: the caller +does not know how the operations are implemented. + +### Sample directory structure + +- [service.py](./service.py) - shared Nexus service definition +- [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code +- [handler](./handler) - Nexus operation handlers, together with a workflow used by one of the Nexus operations, and a worker that polls for both workflow, activity, and Nexus tasks. + + +### Instructions + +Start a Temporal server. (See the main samples repo [README](../README.md)). + +Run the following to create the caller and handler namespaces, and the Nexus endpoint: + +``` +temporal operator namespace create --namespace nexus-sync-operations-handler-namespace +temporal operator namespace create --namespace nexus-sync-operations-caller-namespace + +temporal operator nexus endpoint create \ + --name nexus-sync-operations-nexus-endpoint \ + --target-namespace nexus-sync-operations-handler-namespace \ + --target-task-queue nexus-sync-operations-handler-task-queue \ + --description-file nexus_sync_operations/endpoint_description.md +``` + +In one terminal, run the Temporal worker in the handler namespace: +``` +uv run nexus_sync_operations/handler/worker.py +``` + +In another terminal, run the Temporal worker in the caller namespace and start the caller +workflow: +``` +uv run nexus_sync_operations/caller/app.py +``` diff --git a/nexus_sync_operations/__init__.py b/nexus_sync_operations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_sync_operations/caller/__init__.py b/nexus_sync_operations/caller/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_sync_operations/caller/app.py b/nexus_sync_operations/caller/app.py new file mode 100644 index 00000000..4966415c --- /dev/null +++ b/nexus_sync_operations/caller/app.py @@ -0,0 +1,41 @@ +import asyncio +import uuid +from typing import Optional + +from temporalio.client import Client +from temporalio.worker import Worker + +from nexus_sync_operations.caller.workflows import CallerWorkflow + +NAMESPACE = "nexus-sync-operations-caller-namespace" +TASK_QUEUE = "nexus-sync-operations-caller-task-queue" + + +async def execute_caller_workflow( + client: Optional[Client] = None, +) -> None: + client = client or await Client.connect( + "localhost:7233", + namespace=NAMESPACE, + ) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[CallerWorkflow], + ): + log = await client.execute_workflow( + CallerWorkflow.run, + id=str(uuid.uuid4()), + task_queue=TASK_QUEUE, + ) + for line in log: + print(line) + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(execute_caller_workflow()) + except KeyboardInterrupt: + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/nexus_sync_operations/caller/workflows.py b/nexus_sync_operations/caller/workflows.py new file mode 100644 index 00000000..a358d764 --- /dev/null +++ b/nexus_sync_operations/caller/workflows.py @@ -0,0 +1,46 @@ +""" +This is a workflow that calls nexus operations. The caller does not have information about how these +operations are implemented by the nexus service. +""" + +from temporalio import workflow + +from message_passing.introduction import Language +from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput + +with workflow.unsafe.imports_passed_through(): + from nexus_sync_operations.service import GreetingService + +NEXUS_ENDPOINT = "nexus-sync-operations-nexus-endpoint" + + +@workflow.defn +class CallerWorkflow: + @workflow.run + async def run(self) -> list[str]: + log = [] + nexus_client = workflow.create_nexus_client( + service=GreetingService, + endpoint=NEXUS_ENDPOINT, + ) + + # Get supported languages + supported_languages = await nexus_client.execute_operation( + GreetingService.get_languages, GetLanguagesInput(include_unsupported=False) + ) + log.append(f"supported languages: {supported_languages}") + + # Set language + previous_language = await nexus_client.execute_operation( + GreetingService.set_language, + SetLanguageInput(language=Language.ARABIC), + ) + assert ( + await nexus_client.execute_operation(GreetingService.get_language, None) + == Language.ARABIC + ) + log.append( + f"language changed: {previous_language.name} -> {Language.ARABIC.name}" + ) + + return log diff --git a/nexus_sync_operations/endpoint_description.md b/nexus_sync_operations/endpoint_description.md new file mode 100644 index 00000000..a33b60cf --- /dev/null +++ b/nexus_sync_operations/endpoint_description.md @@ -0,0 +1,4 @@ +## Service: [GreetingService](https://github.com/temporalio/samples-python/blob/main/nexus_sync_operations/service.py) +- operation: `get_languages` +- operation: `get_language` +- operation: `set_language` diff --git a/nexus_sync_operations/handler/__init__.py b/nexus_sync_operations/handler/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_sync_operations/handler/service_handler.py b/nexus_sync_operations/handler/service_handler.py new file mode 100644 index 00000000..626948f0 --- /dev/null +++ b/nexus_sync_operations/handler/service_handler.py @@ -0,0 +1,83 @@ +""" +This file demonstrates how to implement a Nexus service that is backed by a long-running workflow +and exposes operations that perform updates and queries against that workflow. +""" + +from __future__ import annotations + +import nexusrpc +from temporalio import nexus +from temporalio.client import Client, WorkflowHandle +from temporalio.common import WorkflowIDConflictPolicy + +from message_passing.introduction import Language +from message_passing.introduction.workflows import ( + GetLanguagesInput, + GreetingWorkflow, + SetLanguageInput, +) +from nexus_sync_operations.service import GreetingService + + +@nexusrpc.handler.service_handler(service=GreetingService) +class GreetingServiceHandler: + def __init__(self, workflow_id: str): + self.workflow_id = workflow_id + + @classmethod + async def create( + cls, workflow_id: str, client: Client, task_queue: str + ) -> GreetingServiceHandler: + # Start the long-running "entity" workflow, if it is not already running. + await client.start_workflow( + GreetingWorkflow.run, + id=workflow_id, + task_queue=task_queue, + id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, + ) + return cls(workflow_id) + + @property + def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]: + # In nexus operation handler code, nexus.client() is always available, returning a client + # connected to the handler namespace (it's the same client instance that your nexus worker + # is using to poll the server for nexus tasks). This client can be used to interact with the + # handler namespace, for example to send signals, queries, or updates. Remember however, + # that a sync_operation handler must return quickly (no more than a few seconds). To do + # long-running work in a nexus operation handler, use + # temporalio.nexus.workflow_run_operation (see the hello_nexus sample). + return nexus.client().get_workflow_handle_for( + GreetingWorkflow.run, self.workflow_id + ) + + # 👉 This is a handler for a nexus operation whose internal implementation involves executing a + # query against a long-running workflow that is private to the nexus service. + @nexusrpc.handler.sync_operation + async def get_languages( + self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput + ) -> list[Language]: + return await self.greeting_workflow_handle.query( + GreetingWorkflow.get_languages, input + ) + + # 👉 This is a handler for a nexus operation whose internal implementation involves executing a + # query against a long-running workflow that is private to the nexus service. + @nexusrpc.handler.sync_operation + async def get_language( + self, ctx: nexusrpc.handler.StartOperationContext, input: None + ) -> Language: + return await self.greeting_workflow_handle.query(GreetingWorkflow.get_language) + + # 👉 This is a handler for a nexus operation whose internal implementation involves executing an + # update against a long-running workflow that is private to the nexus service. Although updates + # can run for an arbitrarily long time, when exposing an update via a nexus sync operation the + # update should execute quickly (sync operations must complete in under 10s). + @nexusrpc.handler.sync_operation + async def set_language( + self, + ctx: nexusrpc.handler.StartOperationContext, + input: SetLanguageInput, + ) -> Language: + return await self.greeting_workflow_handle.execute_update( + GreetingWorkflow.set_language_using_activity, input + ) diff --git a/nexus_sync_operations/handler/worker.py b/nexus_sync_operations/handler/worker.py new file mode 100644 index 00000000..5545adc0 --- /dev/null +++ b/nexus_sync_operations/handler/worker.py @@ -0,0 +1,50 @@ +import asyncio +import logging +from typing import Optional + +from temporalio.client import Client +from temporalio.worker import Worker + +from message_passing.introduction.activities import call_greeting_service +from message_passing.introduction.workflows import GreetingWorkflow +from nexus_sync_operations.handler.service_handler import GreetingServiceHandler + +interrupt_event = asyncio.Event() + +NAMESPACE = "nexus-sync-operations-handler-namespace" +TASK_QUEUE = "nexus-sync-operations-handler-task-queue" + + +async def main(client: Optional[Client] = None): + logging.basicConfig(level=logging.INFO) + + client = client or await Client.connect( + "localhost:7233", + namespace=NAMESPACE, + ) + + # Create the nexus service handler instance, starting the long-running entity workflow that + # backs the Nexus service + greeting_service_handler = await GreetingServiceHandler.create( + "nexus-sync-operations-greeting-workflow", client, TASK_QUEUE + ) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[GreetingWorkflow], + activities=[call_greeting_service], + nexus_service_handlers=[greeting_service_handler], + ): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/nexus_sync_operations/service.py b/nexus_sync_operations/service.py new file mode 100644 index 00000000..3436d5f3 --- /dev/null +++ b/nexus_sync_operations/service.py @@ -0,0 +1,20 @@ +""" +This module defines a Nexus service that exposes three operations. + +It is used by the nexus service handler to validate that the operation handlers implement the +correct input and output types, and by the caller workflow to create a type-safe client. It does not +contain the implementation of the operations; see nexus_sync_operations.handler.service_handler for +that. +""" + +import nexusrpc + +from message_passing.introduction import Language +from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput + + +@nexusrpc.service +class GreetingService: + get_languages: nexusrpc.Operation[GetLanguagesInput, list[Language]] + get_language: nexusrpc.Operation[None, Language] + set_language: nexusrpc.Operation[SetLanguageInput, Language] diff --git a/tests/message_passing/introduction/test_introduction_sample.py b/tests/message_passing/introduction/test_introduction_sample.py index de981dc2..17c1b14d 100644 --- a/tests/message_passing/introduction/test_introduction_sample.py +++ b/tests/message_passing/introduction/test_introduction_sample.py @@ -10,6 +10,7 @@ GetLanguagesInput, GreetingWorkflow, Language, + SetLanguageInput, call_greeting_service, ) @@ -63,7 +64,7 @@ async def test_set_language(client: Client, env: WorkflowEnvironment): ) assert await wf_handle.query(GreetingWorkflow.get_language) == Language.ENGLISH previous_language = await wf_handle.execute_update( - GreetingWorkflow.set_language, Language.CHINESE + GreetingWorkflow.set_language, SetLanguageInput(language=Language.CHINESE) ) assert previous_language == Language.ENGLISH assert await wf_handle.query(GreetingWorkflow.get_language) == Language.CHINESE @@ -88,7 +89,8 @@ async def test_set_invalid_language(client: Client, env: WorkflowEnvironment): with pytest.raises(WorkflowUpdateFailedError): await wf_handle.execute_update( - GreetingWorkflow.set_language, Language.ARABIC + GreetingWorkflow.set_language, + SetLanguageInput(language=Language.ARABIC), ) @@ -117,7 +119,7 @@ async def test_set_language_that_is_only_available_via_remote_service( assert await wf_handle.query(GreetingWorkflow.get_language) == Language.ENGLISH previous_language = await wf_handle.execute_update( GreetingWorkflow.set_language_using_activity, - Language.ARABIC, + SetLanguageInput(language=Language.ARABIC), ) assert previous_language == Language.ENGLISH assert await wf_handle.query(GreetingWorkflow.get_language) == Language.ARABIC diff --git a/tests/nexus_sync_operations/nexus_sync_operations_test.py b/tests/nexus_sync_operations/nexus_sync_operations_test.py new file mode 100644 index 00000000..d74168cb --- /dev/null +++ b/tests/nexus_sync_operations/nexus_sync_operations_test.py @@ -0,0 +1,118 @@ +import asyncio +import uuid +from typing import Type + +import pytest +from temporalio import workflow +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +import nexus_sync_operations.handler.service_handler +import nexus_sync_operations.handler.worker +from message_passing.introduction import Language +from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput +from nexus_sync_operations.caller.workflows import CallerWorkflow +from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint + +with workflow.unsafe.imports_passed_through(): + from nexus_sync_operations.service import GreetingService + + +NEXUS_ENDPOINT = "nexus-sync-operations-nexus-endpoint" + + +@workflow.defn +class TestCallerWorkflow: + """Test workflow that calls Nexus operations and makes assertions.""" + + @workflow.run + async def run(self) -> None: + nexus_client = workflow.create_nexus_client( + service=GreetingService, + endpoint=NEXUS_ENDPOINT, + ) + + supported_languages = await nexus_client.execute_operation( + GreetingService.get_languages, GetLanguagesInput(include_unsupported=False) + ) + assert supported_languages == [Language.CHINESE, Language.ENGLISH] + + initial_language = await nexus_client.execute_operation( + GreetingService.get_language, None + ) + assert initial_language == Language.ENGLISH + + previous_language = await nexus_client.execute_operation( + GreetingService.set_language, + SetLanguageInput(language=Language.CHINESE), + ) + assert previous_language == Language.ENGLISH + + current_language = await nexus_client.execute_operation( + GreetingService.get_language, None + ) + assert current_language == Language.CHINESE + + previous_language = await nexus_client.execute_operation( + GreetingService.set_language, + SetLanguageInput(language=Language.ARABIC), + ) + assert previous_language == Language.CHINESE + + current_language = await nexus_client.execute_operation( + GreetingService.get_language, None + ) + assert current_language == Language.ARABIC + + +async def test_nexus_sync_operations(client: Client, env: WorkflowEnvironment): + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work under the Java test server") + + await _run_caller_workflow(client, TestCallerWorkflow) + + +async def test_nexus_sync_operations_caller_workflow( + client: Client, env: WorkflowEnvironment +): + """ + Runs the CallerWorkflow from the sample to ensure it executes without errors. + """ + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work under the Java test server") + + await _run_caller_workflow(client, CallerWorkflow) + + +async def _run_caller_workflow(client: Client, workflow: Type): + create_response = await create_nexus_endpoint( + name=NEXUS_ENDPOINT, + task_queue=nexus_sync_operations.handler.worker.TASK_QUEUE, + client=client, + ) + try: + handler_worker_task = asyncio.create_task( + nexus_sync_operations.handler.worker.main(client) + ) + try: + async with Worker( + client, + task_queue="test-caller-task-queue", + workflows=[workflow], + ): + await client.execute_workflow( + workflow.run, + id=str(uuid.uuid4()), + task_queue="test-caller-task-queue", + ) + finally: + nexus_sync_operations.handler.worker.interrupt_event.set() + await handler_worker_task + nexus_sync_operations.handler.worker.interrupt_event.clear() + finally: + await delete_nexus_endpoint( + id=create_response.endpoint.id, + version=create_response.endpoint.version, + client=client, + )