From 3262496139cbd7e8e00d4a6b3a7c5e19fb7c7471 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 8 Oct 2025 11:46:51 -0700 Subject: [PATCH 1/3] Eager workflow start sample --- README.md | 1 + eager_wf_start/README.md | 16 +++++++++++ eager_wf_start/__init__.py | 0 eager_wf_start/activities.py | 5 ++++ eager_wf_start/run.py | 41 +++++++++++++++++++++++++++ eager_wf_start/workflows.py | 15 ++++++++++ tests/conftest.py | 2 ++ tests/eager_wf_start/__init__.py | 0 tests/eager_wf_start/workflow_test.py | 28 ++++++++++++++++++ 9 files changed, 108 insertions(+) create mode 100644 eager_wf_start/README.md create mode 100644 eager_wf_start/__init__.py create mode 100644 eager_wf_start/activities.py create mode 100644 eager_wf_start/run.py create mode 100644 eager_wf_start/workflows.py create mode 100644 tests/eager_wf_start/__init__.py create mode 100644 tests/eager_wf_start/workflow_test.py diff --git a/README.md b/README.md index 43d2cba6..d9ae57a3 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity. * [custom_metric](custom_metric) - Custom metric to record the workflow type in the activity schedule to start latency. * [dsl](dsl) - DSL workflow that executes steps defined in a YAML file. +* [eager_wf_start](eager_wf_start) - Run a workflow using Eager Workflow Start * [encryption](encryption) - Apply end-to-end encryption for all input/output. * [env_config](env_config) - Load client configuration from TOML files with programmatic overrides. * [gevent_async](gevent_async) - Combine gevent and Temporal. diff --git a/eager_wf_start/README.md b/eager_wf_start/README.md new file mode 100644 index 00000000..12dfec73 --- /dev/null +++ b/eager_wf_start/README.md @@ -0,0 +1,16 @@ +# Eager Workflow Start + +This sample shows how to create a workflow that uses Eager Workflow Start. + +Eager Workflow Start is feature that reduces the time it takes to start a Workflow. The target use case is short-lived Workflows that interact with other services using Local Activities, as such, we'll be using Local Activities in this sample. + +You can read more about Eager Workflow Start in our: + +- [Eager Workflow Start blog](https://temporal.io/blog/improving-latency-with-eager-workflow-start) +- [Worker Performance Docs](https://docs.temporal.io/develop/worker-performance#eager-workflow-start) + +To run, first see the main [README.md](../README.md) for prerequisites. + +Then run the sample via: + + uv run eager_wf_start/run.py \ No newline at end of file diff --git a/eager_wf_start/__init__.py b/eager_wf_start/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/eager_wf_start/activities.py b/eager_wf_start/activities.py new file mode 100644 index 00000000..e8f00072 --- /dev/null +++ b/eager_wf_start/activities.py @@ -0,0 +1,5 @@ +from temporalio import activity + +@activity.defn() +async def greeting(name: str) -> str: + return f"Hello {name}!" \ No newline at end of file diff --git a/eager_wf_start/run.py b/eager_wf_start/run.py new file mode 100644 index 00000000..479cac82 --- /dev/null +++ b/eager_wf_start/run.py @@ -0,0 +1,41 @@ +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.worker import Worker + +from eager_wf_start.activities import greeting +from eager_wf_start.workflows import EagerWorkflow + +TASK_QUEUE = "eager-wf-start-task-queue" + +async def main(): + + # Note that the worker and client run in the same process and share the same client connection. + client = await Client.connect("localhost:7233") + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[EagerWorkflow], + activities=[greeting], + ) + + # Run worker in the background + async with worker: + # Start workflow(s) while worker is running + wf_handle = await client.start_workflow( + EagerWorkflow.run, + "Temporal", + id=f"eager-workflow-id-{uuid.uuid4()}", + task_queue=TASK_QUEUE, + request_eager_start=True, + ) + + # This is an internal flag not intended to be used publicly. + # It is used here purely to display that the workflow was eagerly started. + print(f"Workflow eagerly started: {wf_handle.__temporal_eagerly_started}") + print(await wf_handle.result()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/eager_wf_start/workflows.py b/eager_wf_start/workflows.py new file mode 100644 index 00000000..9107d402 --- /dev/null +++ b/eager_wf_start/workflows.py @@ -0,0 +1,15 @@ +from datetime import timedelta + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from eager_wf_start.activities import greeting + + +@workflow.defn +class EagerWorkflow: + @workflow.run + async def run(self, name: str) -> str: + return await workflow.execute_local_activity( + greeting, name, schedule_to_close_timeout=timedelta(seconds=5) + ) diff --git a/tests/conftest.py b/tests/conftest.py index e63a059b..65de246e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -45,6 +45,8 @@ async def env(request) -> AsyncGenerator[WorkflowEnvironment, None]: dev_server_extra_args=[ "--dynamic-config-value", "frontend.enableExecuteMultiOperation=true", + "--dynamic-config-value", + "system.enableEagerWorkflowStart=true", ] ) elif env_type == "time-skipping": diff --git a/tests/eager_wf_start/__init__.py b/tests/eager_wf_start/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/eager_wf_start/workflow_test.py b/tests/eager_wf_start/workflow_test.py new file mode 100644 index 00000000..0222894c --- /dev/null +++ b/tests/eager_wf_start/workflow_test.py @@ -0,0 +1,28 @@ +import uuid + +from temporalio.client import Client +from temporalio.worker import Worker + +from eager_wf_start.workflows import EagerWorkflow +from eager_wf_start.activities import greeting + +async def test_eager_wf_start(client: Client): + task_queue_name = str(uuid.uuid4()) + + async with Worker( + client, + task_queue=task_queue_name, + workflows=[EagerWorkflow], + activities=[greeting], + ): + handle = await client.start_workflow( + EagerWorkflow.run, + "Temporal", + id=f"workflow-{uuid.uuid4()}", + task_queue=task_queue_name, + request_eager_start=True, + ) + print("HANDLE", handle.__temporal_eagerly_started) + assert handle.__temporal_eagerly_started + result = await handle.result() + assert result == "Hello Temporal!" \ No newline at end of file From 0b717224501981acdde57670f24d8a25f7befbc7 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 8 Oct 2025 16:17:28 -0700 Subject: [PATCH 2/3] Formatting --- eager_wf_start/activities.py | 3 ++- eager_wf_start/run.py | 5 +++-- tests/eager_wf_start/workflow_test.py | 5 +++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/eager_wf_start/activities.py b/eager_wf_start/activities.py index e8f00072..6533140f 100644 --- a/eager_wf_start/activities.py +++ b/eager_wf_start/activities.py @@ -1,5 +1,6 @@ from temporalio import activity + @activity.defn() async def greeting(name: str) -> str: - return f"Hello {name}!" \ No newline at end of file + return f"Hello {name}!" diff --git a/eager_wf_start/run.py b/eager_wf_start/run.py index 479cac82..c1daf82a 100644 --- a/eager_wf_start/run.py +++ b/eager_wf_start/run.py @@ -9,8 +9,9 @@ TASK_QUEUE = "eager-wf-start-task-queue" + async def main(): - + # Note that the worker and client run in the same process and share the same client connection. client = await Client.connect("localhost:7233") worker = Worker( @@ -30,7 +31,7 @@ async def main(): task_queue=TASK_QUEUE, request_eager_start=True, ) - + # This is an internal flag not intended to be used publicly. # It is used here purely to display that the workflow was eagerly started. print(f"Workflow eagerly started: {wf_handle.__temporal_eagerly_started}") diff --git a/tests/eager_wf_start/workflow_test.py b/tests/eager_wf_start/workflow_test.py index 0222894c..1080f5bc 100644 --- a/tests/eager_wf_start/workflow_test.py +++ b/tests/eager_wf_start/workflow_test.py @@ -3,8 +3,9 @@ from temporalio.client import Client from temporalio.worker import Worker -from eager_wf_start.workflows import EagerWorkflow from eager_wf_start.activities import greeting +from eager_wf_start.workflows import EagerWorkflow + async def test_eager_wf_start(client: Client): task_queue_name = str(uuid.uuid4()) @@ -25,4 +26,4 @@ async def test_eager_wf_start(client: Client): print("HANDLE", handle.__temporal_eagerly_started) assert handle.__temporal_eagerly_started result = await handle.result() - assert result == "Hello Temporal!" \ No newline at end of file + assert result == "Hello Temporal!" From fc0a7667f1478f1a5a21a5a95370d9c9d2e05b70 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 8 Oct 2025 17:27:50 -0700 Subject: [PATCH 3/3] Update README --- eager_wf_start/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eager_wf_start/README.md b/eager_wf_start/README.md index 12dfec73..834c6a55 100644 --- a/eager_wf_start/README.md +++ b/eager_wf_start/README.md @@ -2,7 +2,7 @@ This sample shows how to create a workflow that uses Eager Workflow Start. -Eager Workflow Start is feature that reduces the time it takes to start a Workflow. The target use case is short-lived Workflows that interact with other services using Local Activities, as such, we'll be using Local Activities in this sample. +The target use case is workflows whose first task needs to execute quickly (ex: payment verification in an online checkout workflow). That work typically can't be done directly in the workflow (ex: using web APIs, databases, etc.), and also needs to avoid the overhead of dispatching another task. Using a Local Activity suffices both needs, which this sample demonstrates. You can read more about Eager Workflow Start in our: