From 113f128a40815901d352e0bd839673cb0bb86819 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 15 Jan 2026 12:12:41 -0800 Subject: [PATCH 1/4] Populate Nexus request deadline into operation contexts when present on the task received from Core. --- pyproject.toml | 3 ++ temporalio/bridge/proto/nexus/nexus_pb2.py | 28 +++++----- temporalio/bridge/proto/nexus/nexus_pb2.pyi | 27 +++++++++- temporalio/bridge/sdk-core | 2 +- temporalio/worker/_nexus.py | 38 ++++++++++---- tests/nexus/test_handler.py | 54 +++++++++++++++++++ tests/nexus/test_workflow_run_operation.py | 57 +++++++++++++++++++++ uv.lock | 8 +-- 8 files changed, 186 insertions(+), 31 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d660bfb46..a26fb326c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -229,3 +229,6 @@ exclude = ["temporalio/bridge/target/**/*"] [tool.uv] # Prevent uv commands from building the package by default package = false + +[tool.uv.sources] +nexus-rpc = { git = "https://github.com/nexus-rpc/sdk-python" } diff --git a/temporalio/bridge/proto/nexus/nexus_pb2.py b/temporalio/bridge/proto/nexus/nexus_pb2.py index 4dc3bea86..cba925157 100644 --- a/temporalio/bridge/proto/nexus/nexus_pb2.py +++ b/temporalio/bridge/proto/nexus/nexus_pb2.py @@ -15,6 +15,8 @@ _sym_db = _symbol_database.Default() +from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 + from temporalio.api.common.v1 import ( message_pb2 as temporal_dot_api_dot_common_dot_v1_dot_message__pb2, ) @@ -32,7 +34,7 @@ ) DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n#temporal/sdk/core/nexus/nexus.proto\x12\rcoresdk.nexus\x1a$temporal/api/common/v1/message.proto\x1a%temporal/api/failure/v1/message.proto\x1a#temporal/api/nexus/v1/message.proto\x1a\x36temporal/api/workflowservice/v1/request_response.proto\x1a%temporal/sdk/core/common/common.proto"\xf8\x01\n\x14NexusOperationResult\x12\x34\n\tcompleted\x18\x01 \x01(\x0b\x32\x1f.temporal.api.common.v1.PayloadH\x00\x12\x32\n\x06\x66\x61iled\x18\x02 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\tcancelled\x18\x03 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\ttimed_out\x18\x04 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x42\x08\n\x06status"\xb5\x01\n\x13NexusTaskCompletion\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\tcompleted\x18\x02 \x01(\x0b\x32\x1f.temporal.api.nexus.v1.ResponseH\x00\x12\x34\n\x05\x65rror\x18\x03 \x01(\x0b\x32#.temporal.api.nexus.v1.HandlerErrorH\x00\x12\x14\n\nack_cancel\x18\x04 \x01(\x08H\x00\x42\x08\n\x06status"\x9a\x01\n\tNexusTask\x12K\n\x04task\x18\x01 \x01(\x0b\x32;.temporal.api.workflowservice.v1.PollNexusTaskQueueResponseH\x00\x12\x35\n\x0b\x63\x61ncel_task\x18\x02 \x01(\x0b\x32\x1e.coresdk.nexus.CancelNexusTaskH\x00\x42\t\n\x07variant"[\n\x0f\x43\x61ncelNexusTask\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\x06reason\x18\x02 \x01(\x0e\x32$.coresdk.nexus.NexusTaskCancelReason*;\n\x15NexusTaskCancelReason\x12\r\n\tTIMED_OUT\x10\x00\x12\x13\n\x0fWORKER_SHUTDOWN\x10\x01*\x7f\n\x1eNexusOperationCancellationType\x12\x1f\n\x1bWAIT_CANCELLATION_COMPLETED\x10\x00\x12\x0b\n\x07\x41\x42\x41NDON\x10\x01\x12\x0e\n\nTRY_CANCEL\x10\x02\x12\x1f\n\x1bWAIT_CANCELLATION_REQUESTED\x10\x03\x42+\xea\x02(Temporalio::Internal::Bridge::Api::Nexusb\x06proto3' + b'\n#temporal/sdk/core/nexus/nexus.proto\x12\rcoresdk.nexus\x1a\x1fgoogle/protobuf/timestamp.proto\x1a$temporal/api/common/v1/message.proto\x1a%temporal/api/failure/v1/message.proto\x1a#temporal/api/nexus/v1/message.proto\x1a\x36temporal/api/workflowservice/v1/request_response.proto\x1a%temporal/sdk/core/common/common.proto"\xf8\x01\n\x14NexusOperationResult\x12\x34\n\tcompleted\x18\x01 \x01(\x0b\x32\x1f.temporal.api.common.v1.PayloadH\x00\x12\x32\n\x06\x66\x61iled\x18\x02 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\tcancelled\x18\x03 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\ttimed_out\x18\x04 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x42\x08\n\x06status"\xb5\x01\n\x13NexusTaskCompletion\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\tcompleted\x18\x02 \x01(\x0b\x32\x1f.temporal.api.nexus.v1.ResponseH\x00\x12\x34\n\x05\x65rror\x18\x03 \x01(\x0b\x32#.temporal.api.nexus.v1.HandlerErrorH\x00\x12\x14\n\nack_cancel\x18\x04 \x01(\x08H\x00\x42\x08\n\x06status"\xd0\x01\n\tNexusTask\x12K\n\x04task\x18\x01 \x01(\x0b\x32;.temporal.api.workflowservice.v1.PollNexusTaskQueueResponseH\x00\x12\x35\n\x0b\x63\x61ncel_task\x18\x02 \x01(\x0b\x32\x1e.coresdk.nexus.CancelNexusTaskH\x00\x12\x34\n\x10request_deadline\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.TimestampB\t\n\x07variant"[\n\x0f\x43\x61ncelNexusTask\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\x06reason\x18\x02 \x01(\x0e\x32$.coresdk.nexus.NexusTaskCancelReason*;\n\x15NexusTaskCancelReason\x12\r\n\tTIMED_OUT\x10\x00\x12\x13\n\x0fWORKER_SHUTDOWN\x10\x01*\x7f\n\x1eNexusOperationCancellationType\x12\x1f\n\x1bWAIT_CANCELLATION_COMPLETED\x10\x00\x12\x0b\n\x07\x41\x42\x41NDON\x10\x01\x12\x0e\n\nTRY_CANCEL\x10\x02\x12\x1f\n\x1bWAIT_CANCELLATION_REQUESTED\x10\x03\x42+\xea\x02(Temporalio::Internal::Bridge::Api::Nexusb\x06proto3' ) _NEXUSTASKCANCELREASON = DESCRIPTOR.enum_types_by_name["NexusTaskCancelReason"] @@ -104,16 +106,16 @@ DESCRIPTOR._serialized_options = ( b"\352\002(Temporalio::Internal::Bridge::Api::Nexus" ) - _NEXUSTASKCANCELREASON._serialized_start = 948 - _NEXUSTASKCANCELREASON._serialized_end = 1007 - _NEXUSOPERATIONCANCELLATIONTYPE._serialized_start = 1009 - _NEXUSOPERATIONCANCELLATIONTYPE._serialized_end = 1136 - _NEXUSOPERATIONRESULT._serialized_start = 264 - _NEXUSOPERATIONRESULT._serialized_end = 512 - _NEXUSTASKCOMPLETION._serialized_start = 515 - _NEXUSTASKCOMPLETION._serialized_end = 696 - _NEXUSTASK._serialized_start = 699 - _NEXUSTASK._serialized_end = 853 - _CANCELNEXUSTASK._serialized_start = 855 - _CANCELNEXUSTASK._serialized_end = 946 + _NEXUSTASKCANCELREASON._serialized_start = 1035 + _NEXUSTASKCANCELREASON._serialized_end = 1094 + _NEXUSOPERATIONCANCELLATIONTYPE._serialized_start = 1096 + _NEXUSOPERATIONCANCELLATIONTYPE._serialized_end = 1223 + _NEXUSOPERATIONRESULT._serialized_start = 297 + _NEXUSOPERATIONRESULT._serialized_end = 545 + _NEXUSTASKCOMPLETION._serialized_start = 548 + _NEXUSTASKCOMPLETION._serialized_end = 729 + _NEXUSTASK._serialized_start = 732 + _NEXUSTASK._serialized_end = 940 + _CANCELNEXUSTASK._serialized_start = 942 + _CANCELNEXUSTASK._serialized_end = 1033 # @@protoc_insertion_point(module_scope) diff --git a/temporalio/bridge/proto/nexus/nexus_pb2.pyi b/temporalio/bridge/proto/nexus/nexus_pb2.pyi index 8dfe261b7..f29582b0e 100644 --- a/temporalio/bridge/proto/nexus/nexus_pb2.pyi +++ b/temporalio/bridge/proto/nexus/nexus_pb2.pyi @@ -10,6 +10,7 @@ import typing import google.protobuf.descriptor import google.protobuf.internal.enum_type_wrapper import google.protobuf.message +import google.protobuf.timestamp_pb2 import temporalio.api.common.v1.message_pb2 import temporalio.api.failure.v1.message_pb2 @@ -227,6 +228,7 @@ class NexusTask(google.protobuf.message.Message): TASK_FIELD_NUMBER: builtins.int CANCEL_TASK_FIELD_NUMBER: builtins.int + REQUEST_DEADLINE_FIELD_NUMBER: builtins.int @property def task( self, @@ -246,23 +248,44 @@ class NexusTask(google.protobuf.message.Message): EX: Core knows the nexus operation has timed out, and it does not make sense for the user's operation handler to continue doing work. """ + @property + def request_deadline(self) -> google.protobuf.timestamp_pb2.Timestamp: + """The deadline for this request, parsed from the "Request-Timeout" header. + Only set when variant is `task` and the header was present with a valid value. + Represented as an absolute timestamp. + """ def __init__( self, *, task: temporalio.api.workflowservice.v1.request_response_pb2.PollNexusTaskQueueResponse | None = ..., cancel_task: global___CancelNexusTask | None = ..., + request_deadline: google.protobuf.timestamp_pb2.Timestamp | None = ..., ) -> None: ... def HasField( self, field_name: typing_extensions.Literal[ - "cancel_task", b"cancel_task", "task", b"task", "variant", b"variant" + "cancel_task", + b"cancel_task", + "request_deadline", + b"request_deadline", + "task", + b"task", + "variant", + b"variant", ], ) -> builtins.bool: ... def ClearField( self, field_name: typing_extensions.Literal[ - "cancel_task", b"cancel_task", "task", b"task", "variant", b"variant" + "cancel_task", + b"cancel_task", + "request_deadline", + b"request_deadline", + "task", + b"task", + "variant", + b"variant", ], ) -> None: ... def WhichOneof( diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index d104a77d2..40cd7ec31 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit d104a77d2fe39bbba4c48dfad959fed95f1baebb +Subproject commit 40cd7ec3182e30695bc2ad3691aee23196a40103 diff --git a/temporalio/worker/_nexus.py b/temporalio/worker/_nexus.py index 8f6226ea3..e69ba2e71 100644 --- a/temporalio/worker/_nexus.py +++ b/temporalio/worker/_nexus.py @@ -9,6 +9,7 @@ import threading from collections.abc import Callable, Mapping, Sequence from dataclasses import dataclass +from datetime import datetime, timezone from functools import reduce from typing import ( Any, @@ -122,10 +123,17 @@ async def raise_from_exception_queue() -> NoReturn: task_cancellation = _NexusTaskCancellation() start_op_task = asyncio.create_task( self._handle_start_operation_task( - task.task_token, - task.request.start_operation, - dict(task.request.header), - task_cancellation, + task_token=task.task_token, + start_request=task.request.start_operation, + headers=dict(task.request.header), + task_cancellation=task_cancellation, + request_deadline=( + nexus_task.request_deadline.ToDatetime().replace( + tzinfo=timezone.utc + ) + if nexus_task.HasField("request_deadline") + else None + ), ) ) self._running_tasks[task.task_token] = _RunningNexusTask( @@ -135,10 +143,17 @@ async def raise_from_exception_queue() -> NoReturn: task_cancellation = _NexusTaskCancellation() cancel_op_task = asyncio.create_task( self._handle_cancel_operation_task( - task.task_token, - task.request.cancel_operation, - dict(task.request.header), - task_cancellation, + task_token=task.task_token, + request=task.request.cancel_operation, + headers=dict(task.request.header), + task_cancellation=task_cancellation, + request_deadline=( + nexus_task.request_deadline.ToDatetime().replace( + tzinfo=timezone.utc + ) + if nexus_task.HasField("request_deadline") + else None + ), ) ) self._running_tasks[task.task_token] = _RunningNexusTask( @@ -204,6 +219,7 @@ async def _handle_cancel_operation_task( request: temporalio.api.nexus.v1.CancelOperationRequest, headers: Mapping[str, str], task_cancellation: nexusrpc.handler.OperationTaskCancellation, + request_deadline: datetime | None, ) -> None: """Handle a cancel operation task. @@ -216,6 +232,7 @@ async def _handle_cancel_operation_task( operation=request.operation, headers=headers, task_cancellation=task_cancellation, + request_deadline=request_deadline, ) temporalio.nexus._operation_context._TemporalCancelOperationContext( info=lambda: Info(task_queue=self._task_queue), @@ -264,6 +281,7 @@ async def _handle_start_operation_task( start_request: temporalio.api.nexus.v1.StartOperationRequest, headers: Mapping[str, str], task_cancellation: nexusrpc.handler.OperationTaskCancellation, + request_deadline: datetime | None, ) -> None: """Handle a start operation task. @@ -273,7 +291,7 @@ async def _handle_start_operation_task( try: try: start_response = await self._start_operation( - start_request, headers, task_cancellation + start_request, headers, task_cancellation, request_deadline ) except asyncio.CancelledError: completion = temporalio.bridge.proto.nexus.NexusTaskCompletion( @@ -315,6 +333,7 @@ async def _start_operation( start_request: temporalio.api.nexus.v1.StartOperationRequest, headers: Mapping[str, str], cancellation: nexusrpc.handler.OperationTaskCancellation, + request_deadline: datetime | None, ) -> temporalio.api.nexus.v1.StartOperationResponse: """Invoke the Nexus handler's start_operation method and construct the StartOperationResponse. @@ -334,6 +353,7 @@ async def _start_operation( ], callback_headers=dict(start_request.callback_header), task_cancellation=cancellation, + request_deadline=request_deadline, ) temporalio.nexus._operation_context._TemporalStartOperationContext( nexus_context=ctx, diff --git a/tests/nexus/test_handler.py b/tests/nexus/test_handler.py index 407e61caf..00865205d 100644 --- a/tests/nexus/test_handler.py +++ b/tests/nexus/test_handler.py @@ -21,6 +21,7 @@ from collections.abc import Callable, Mapping from concurrent.futures.thread import ThreadPoolExecutor from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone from types import MappingProxyType from typing import Any @@ -102,6 +103,7 @@ class MyService: operation_error_failed: nexusrpc.Operation[Input, Output] idempotency_check: nexusrpc.Operation[None, Output] non_serializable_output: nexusrpc.Operation[Input, NonSerializableOutput] + check_request_deadline: nexusrpc.Operation[Input, Output] @workflow.defn @@ -277,6 +279,14 @@ async def non_serializable_output( ) -> NonSerializableOutput: return NonSerializableOutput() + @sync_operation + async def check_request_deadline( + self, ctx: StartOperationContext, _input: Input + ) -> Output: + assert ctx.request_deadline is not None, "request_deadline should be set" + # Return ISO format string so we can verify the value + return Output(value=ctx.request_deadline.isoformat()) + # Immutable dicts that can be used as dataclass field defaults @@ -985,6 +995,50 @@ async def test_request_id_is_received_by_sync_operation( assert resp.json() == {"value": f"request_id: {request_id}"} +async def test_request_deadline_is_present_in_start_operation_context( + env: WorkflowEnvironment, +): + """Test that request_deadline is populated from Request-Timeout header.""" + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + + task_queue = str(uuid.uuid4()) + endpoint = (await create_nexus_endpoint(task_queue, env.client)).endpoint.id + service_client = ServiceClient( + server_address=ServiceClient.default_server_address(env), + endpoint=endpoint, + service=MyService.__name__, + ) + + decorator = service_handler(service=MyService) + user_service_handler = decorator(MyServiceHandler)() + + async with Worker( + env.client, + task_queue=task_queue, + nexus_service_handlers=[user_service_handler], + nexus_task_executor=concurrent.futures.ThreadPoolExecutor(), + ): + before = datetime.now(timezone.utc) + resp = await service_client.start_operation( + "check_request_deadline", + dataclass_as_dict(Input("test")), + {"Request-Timeout": "30s"}, + ) + after = datetime.now(timezone.utc) + + assert resp.status_code == 200 + deadline_str = resp.json()["value"] + deadline = datetime.fromisoformat(deadline_str) + + # Deadline should be approximately 30s from request time + expected_min = before + timedelta(seconds=29) + expected_max = after + timedelta(seconds=31) + assert ( + expected_min <= deadline <= expected_max + ), f"Deadline {deadline} not in expected range [{expected_min}, {expected_max}]" + + @workflow.defn class EchoWorkflow: @workflow.run diff --git a/tests/nexus/test_workflow_run_operation.py b/tests/nexus/test_workflow_run_operation.py index 7d284412c..d3a1d4ac7 100644 --- a/tests/nexus/test_workflow_run_operation.py +++ b/tests/nexus/test_workflow_run_operation.py @@ -64,6 +64,34 @@ def op(self) -> OperationHandler[Input, str]: return MyOperation() +class RequestDeadlineOperation(WorkflowRunOperationHandler): + """Operation that asserts request_deadline is accessible.""" + + def __init__(self): # type: ignore[reportMissingSuperCall] + pass + + async def start( + self, ctx: StartOperationContext, input: Input + ) -> StartOperationResultAsync: + assert ( + ctx.request_deadline is not None + ), "request_deadline should be set in workflow_run_operation" + tctx = WorkflowRunOperationContext._from_start_operation_context(ctx) + handle = await tctx.start_workflow( + EchoWorkflow.run, + input.value, + id=str(uuid.uuid4()), + ) + return StartOperationResultAsync(handle.to_token()) + + +@service_handler +class RequestDeadlineHandler: + @operation_handler + def op(self) -> OperationHandler[Input, str]: + return RequestDeadlineOperation() + + @service class Service: op: Operation[Input, str] @@ -117,3 +145,32 @@ async def test_workflow_run_operation( assert re.search(message, failure.message) else: assert resp.status_code == 201 + + +async def test_request_deadline_is_accessible_in_workflow_run_operation( + env: WorkflowEnvironment, +): + """Test that request_deadline is accessible in WorkflowRunOperationContext.""" + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + + task_queue = str(uuid.uuid4()) + endpoint = (await create_nexus_endpoint(task_queue, env.client)).endpoint.id + assert (service_defn := nexusrpc.get_service_definition(RequestDeadlineHandler)) + service_client = ServiceClient( + server_address=ServiceClient.default_server_address(env), + endpoint=endpoint, + service=service_defn.name, + ) + async with Worker( + env.client, + task_queue=task_queue, + nexus_service_handlers=[RequestDeadlineHandler()], + ): + resp = await service_client.start_operation( + "op", + dataclass_as_dict(Input(value="test")), + {"Request-Timeout": "30s"}, + ) + # The assertion in the handler verified request_deadline was accessible + assert resp.status_code == 201 diff --git a/uv.lock b/uv.lock index e29cf27db..f02b90002 100644 --- a/uv.lock +++ b/uv.lock @@ -1773,14 +1773,10 @@ wheels = [ [[package]] name = "nexus-rpc" version = "1.3.0" -source = { registry = "https://pypi.org/simple" } +source = { git = "https://github.com/nexus-rpc/sdk-python#5613448d2bf578619c34d0a99a4779ed9f76de9b" } dependencies = [ { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/2e/f2/d54f5c03d8f4672ccc0875787a385f53dcb61f98a8ae594b5620e85b9cb3/nexus_rpc-1.3.0.tar.gz", hash = "sha256:e56d3b57b60d707ce7a72f83f23f106b86eca1043aa658e44582ab5ff30ab9ad", size = 75650, upload-time = "2025-12-08T22:59:13.002Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/d6/74/0afd841de3199c148146c1d43b4bfb5605b2f1dc4c9a9087fe395091ea5a/nexus_rpc-1.3.0-py3-none-any.whl", hash = "sha256:aee0707b4861b22d8124ecb3f27d62dafbe8777dc50c66c91e49c006f971b92d", size = 28873, upload-time = "2025-12-08T22:59:12.024Z" }, -] [[package]] name = "nh3" @@ -3004,7 +3000,7 @@ dev = [ requires-dist = [ { name = "grpcio", marker = "extra == 'grpc'", specifier = ">=1.48.2,<2" }, { name = "mcp", marker = "extra == 'openai-agents'", specifier = ">=1.9.4,<2" }, - { name = "nexus-rpc", specifier = "==1.3.0" }, + { name = "nexus-rpc", git = "https://github.com/nexus-rpc/sdk-python" }, { name = "openai-agents", marker = "extra == 'openai-agents'", specifier = ">=0.3,<0.7" }, { name = "opentelemetry-api", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, { name = "opentelemetry-sdk", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, From 8e9a203d7b6b0c2895aacd6bed9776dd484509cc Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 15 Jan 2026 13:09:55 -0800 Subject: [PATCH 2/4] Add test to confirm request deadline is present in cancel operation contexts --- tests/helpers/nexus.py | 2 ++ tests/nexus/test_handler.py | 59 ++++++++++++++++++++++++++++++++----- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/tests/helpers/nexus.py b/tests/helpers/nexus.py index 904b4422a..93a769674 100644 --- a/tests/helpers/nexus.py +++ b/tests/helpers/nexus.py @@ -70,12 +70,14 @@ async def cancel_operation( self, operation: str, token: str, + headers: Mapping[str, str] = {}, ) -> httpx.Response: async with httpx.AsyncClient() as http_client: return await http_client.post( f"http://{self.server_address}/nexus/endpoints/{self.endpoint}/services/{self.service}/{operation}/cancel", # Token can also be sent as "Nexus-Operation-Token" header params={"token": token}, + headers=headers, ) @staticmethod diff --git a/tests/nexus/test_handler.py b/tests/nexus/test_handler.py index 00865205d..27da517d7 100644 --- a/tests/nexus/test_handler.py +++ b/tests/nexus/test_handler.py @@ -279,13 +279,25 @@ async def non_serializable_output( ) -> NonSerializableOutput: return NonSerializableOutput() - @sync_operation - async def check_request_deadline( - self, ctx: StartOperationContext, _input: Input - ) -> Output: - assert ctx.request_deadline is not None, "request_deadline should be set" - # Return ISO format string so we can verify the value - return Output(value=ctx.request_deadline.isoformat()) + class OperationHandlerCheckingRequestDeadline(OperationHandler[Input, Output]): + async def start( # type: ignore[override] + self, + ctx: StartOperationContext, + input: Input, + ) -> StartOperationResultSync[Output]: + assert ctx.request_deadline is not None, "request_deadline should be set" + # Return ISO format string so we can verify the value + return StartOperationResultSync( + Output(value=ctx.request_deadline.isoformat()) + ) + + async def cancel(self, ctx: CancelOperationContext, token: str) -> None: + assert ctx.request_deadline is not None, "request_deadline should be set" + return + + @operation_handler + def check_request_deadline(self) -> OperationHandler[Input, Output]: + return MyServiceHandler.OperationHandlerCheckingRequestDeadline() # Immutable dicts that can be used as dataclass field defaults @@ -1023,7 +1035,7 @@ async def test_request_deadline_is_present_in_start_operation_context( resp = await service_client.start_operation( "check_request_deadline", dataclass_as_dict(Input("test")), - {"Request-Timeout": "30s"}, + headers={"Request-Timeout": "30s"}, ) after = datetime.now(timezone.utc) @@ -1039,6 +1051,37 @@ async def test_request_deadline_is_present_in_start_operation_context( ), f"Deadline {deadline} not in expected range [{expected_min}, {expected_max}]" +async def test_request_deadline_is_present_in_cancel_operation_context( + env: WorkflowEnvironment, +): + """Test that request_deadline is populated from Request-Timeout header.""" + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + + task_queue = str(uuid.uuid4()) + endpoint = (await create_nexus_endpoint(task_queue, env.client)).endpoint.id + service_client = ServiceClient( + server_address=ServiceClient.default_server_address(env), + endpoint=endpoint, + service=MyService.__name__, + ) + + decorator = service_handler(service=MyService) + user_service_handler = decorator(MyServiceHandler)() + + async with Worker( + env.client, + task_queue=task_queue, + nexus_service_handlers=[user_service_handler], + nexus_task_executor=concurrent.futures.ThreadPoolExecutor(), + ): + resp = await service_client.cancel_operation( + "check_request_deadline", + "test-token", + ) + assert resp.status_code == 202 + + @workflow.defn class EchoWorkflow: @workflow.run From effdde46e6e0d0fb7ea169770c2dcb34c798ac9b Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 15 Jan 2026 13:18:04 -0800 Subject: [PATCH 3/4] Update request deadline tests for workflow_run_operation to reflect how users will invoke rather than using a private api --- tests/nexus/test_workflow_run_operation.py | 30 +++++++--------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/tests/nexus/test_workflow_run_operation.py b/tests/nexus/test_workflow_run_operation.py index d3a1d4ac7..fc261d3d4 100644 --- a/tests/nexus/test_workflow_run_operation.py +++ b/tests/nexus/test_workflow_run_operation.py @@ -14,8 +14,8 @@ ) from nexusrpc.handler._decorators import operation_handler -from temporalio import workflow -from temporalio.nexus import WorkflowRunOperationContext +from temporalio import nexus, workflow +from temporalio.nexus import WorkflowRunOperationContext, workflow_run_operation from temporalio.nexus._operation_handlers import WorkflowRunOperationHandler from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker @@ -64,32 +64,20 @@ def op(self) -> OperationHandler[Input, str]: return MyOperation() -class RequestDeadlineOperation(WorkflowRunOperationHandler): - """Operation that asserts request_deadline is accessible.""" - - def __init__(self): # type: ignore[reportMissingSuperCall] - pass - - async def start( - self, ctx: StartOperationContext, input: Input - ) -> StartOperationResultAsync: +@service_handler +class RequestDeadlineHandler: + @workflow_run_operation + async def op( + self, ctx: WorkflowRunOperationContext, input: Input + ) -> nexus.WorkflowHandle[str]: assert ( ctx.request_deadline is not None ), "request_deadline should be set in workflow_run_operation" - tctx = WorkflowRunOperationContext._from_start_operation_context(ctx) - handle = await tctx.start_workflow( + return await ctx.start_workflow( EchoWorkflow.run, input.value, id=str(uuid.uuid4()), ) - return StartOperationResultAsync(handle.to_token()) - - -@service_handler -class RequestDeadlineHandler: - @operation_handler - def op(self) -> OperationHandler[Input, str]: - return RequestDeadlineOperation() @service From 5c3fff08c289450a2b9b7e8ffb90244a3dec480f Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 15 Jan 2026 13:24:11 -0800 Subject: [PATCH 4/4] refactor request deadline out of if branches in nexus worker --- temporalio/worker/_nexus.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/temporalio/worker/_nexus.py b/temporalio/worker/_nexus.py index e69ba2e71..8657d6ab9 100644 --- a/temporalio/worker/_nexus.py +++ b/temporalio/worker/_nexus.py @@ -119,6 +119,13 @@ async def raise_from_exception_queue() -> NoReturn: if nexus_task.HasField("task"): task = nexus_task.task + request_deadline = ( + nexus_task.request_deadline.ToDatetime().replace( + tzinfo=timezone.utc + ) + if nexus_task.HasField("request_deadline") + else None + ) if task.request.HasField("start_operation"): task_cancellation = _NexusTaskCancellation() start_op_task = asyncio.create_task( @@ -127,13 +134,7 @@ async def raise_from_exception_queue() -> NoReturn: start_request=task.request.start_operation, headers=dict(task.request.header), task_cancellation=task_cancellation, - request_deadline=( - nexus_task.request_deadline.ToDatetime().replace( - tzinfo=timezone.utc - ) - if nexus_task.HasField("request_deadline") - else None - ), + request_deadline=request_deadline, ) ) self._running_tasks[task.task_token] = _RunningNexusTask( @@ -147,13 +148,7 @@ async def raise_from_exception_queue() -> NoReturn: request=task.request.cancel_operation, headers=dict(task.request.header), task_cancellation=task_cancellation, - request_deadline=( - nexus_task.request_deadline.ToDatetime().replace( - tzinfo=timezone.utc - ) - if nexus_task.HasField("request_deadline") - else None - ), + request_deadline=request_deadline, ) ) self._running_tasks[task.task_token] = _RunningNexusTask(