From 8f1445c5311d9b31d92a56ef6895ceee4253fa6f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Jun 2025 20:31:13 +0000 Subject: [PATCH 1/8] Initial plan for issue From c2684fdf4dfafb80566688ff5343514d76ba4ff4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Jun 2025 20:38:56 +0000 Subject: [PATCH 2/8] Implement send_event functionality with basic tests Co-authored-by: berndverst <4535280+berndverst@users.noreply.github.com> --- durabletask/internal/helpers.py | 16 ++++++++++++++++ durabletask/task.py | 21 +++++++++++++++++++++ durabletask/worker.py | 25 +++++++++++++++++++++++-- 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/durabletask/internal/helpers.py b/durabletask/internal/helpers.py index 6b36586..30a61f5 100644 --- a/durabletask/internal/helpers.py +++ b/durabletask/internal/helpers.py @@ -124,6 +124,14 @@ def new_event_raised_event(name: str, encoded_input: Optional[str] = None) -> pb ) +def new_event_sent_event(instance_id: str, name: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent: + return pb.HistoryEvent( + eventId=-1, + timestamp=timestamp_pb2.Timestamp(), + eventSent=pb.EventSentEvent(instanceId=instance_id, name=name, input=get_string_value(encoded_input)) + ) + + def new_suspend_event() -> pb.HistoryEvent: return pb.HistoryEvent( eventId=-1, @@ -203,6 +211,14 @@ def new_create_sub_orchestration_action( )) +def new_send_event_action(id: int, instance_id: str, event_name: str, encoded_data: Optional[str]) -> pb.OrchestratorAction: + return pb.OrchestratorAction(id=id, sendEvent=pb.SendEventAction( + instance=pb.OrchestrationInstance(instanceId=instance_id), + name=event_name, + data=get_string_value(encoded_data) + )) + + def is_empty(v: wrappers_pb2.StringValue): return v is None or v.value == '' diff --git a/durabletask/task.py b/durabletask/task.py index 9e8a08a..eefea19 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -163,6 +163,27 @@ def wait_for_external_event(self, name: str) -> Task: """ pass + @abstractmethod + def send_event(self, instance_id: str, event_name: str, *, + data: Optional[Any] = None) -> Task: + """Send an event to another orchestration instance. + + Parameters + ---------- + instance_id : str + The ID of the orchestration instance to send the event to. + event_name : str + The name of the event to send. + data : Optional[Any] + The optional JSON-serializable data to include with the event. + + Returns + ------- + Task + A Durable Task that completes when the event has been sent. + """ + pass + @abstractmethod def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None: """Continue the orchestration execution as a new instance. diff --git a/durabletask/worker.py b/durabletask/worker.py index b433a83..d0e298e 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -852,6 +852,16 @@ def wait_for_external_event(self, name: str) -> task.Task: task_list.append(external_event_task) return external_event_task + def send_event(self, instance_id: str, event_name: str, *, + data: Optional[Any] = None) -> task.Task: + id = self.next_sequence_number() + encoded_data = shared.to_json(data) if data is not None else None + action = ph.new_send_event_action(id, instance_id, event_name, encoded_data) + self._pending_actions[id] = action + send_event_task = task.CompletableTask() + self._pending_tasks[id] = send_event_task + return send_event_task + def continue_as_new(self, new_input, *, save_events: bool = False) -> None: if self._is_complete: return @@ -1188,6 +1198,17 @@ def process_event( self._logger.info( f"{ctx.instance_id}: Event '{event_name}' has been buffered as there are no tasks waiting for it." ) + elif event.HasField("eventSent"): + # This history event confirms that the event was successfully sent. + # Complete the corresponding send_event task. + event_id = event.eventId + send_event_task = ctx._pending_tasks.pop(event_id, None) + if send_event_task: + # For send_event, we don't return any meaningful result, just completion + send_event_task.complete(None) + ctx.resume() + # Also remove the corresponding action from pending actions + ctx._pending_actions.pop(event_id, None) elif event.HasField("executionSuspended"): if not self._is_suspended and not ctx.is_replaying: self._logger.info(f"{ctx.instance_id}: Execution suspended.") @@ -1304,8 +1325,8 @@ def _get_method_name_for_action(action: pb.OrchestratorAction) -> str: return task.get_name(task.OrchestrationContext.create_timer) elif action_type == "createSubOrchestration": return task.get_name(task.OrchestrationContext.call_sub_orchestrator) - # elif action_type == "sendEvent": - # return task.get_name(task.OrchestrationContext.send_event) + elif action_type == "sendEvent": + return task.get_name(task.OrchestrationContext.send_event) else: raise NotImplementedError(f"Action type '{action_type}' not supported!") From 22d14597e31735c1fbcbb6123679843a775c69fc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Jun 2025 20:42:32 +0000 Subject: [PATCH 3/8] Add comprehensive tests for send_event functionality Co-authored-by: berndverst <4535280+berndverst@users.noreply.github.com> --- tests/durabletask/test_send_event.py | 206 +++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 tests/durabletask/test_send_event.py diff --git a/tests/durabletask/test_send_event.py b/tests/durabletask/test_send_event.py new file mode 100644 index 0000000..60ee92b --- /dev/null +++ b/tests/durabletask/test_send_event.py @@ -0,0 +1,206 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import json +import logging + +import durabletask.internal.helpers as helpers +import durabletask.internal.orchestrator_service_pb2 as pb +from durabletask import task, worker + +logging.basicConfig( + format='%(asctime)s.%(msecs)03d %(name)s %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.DEBUG) +TEST_LOGGER = logging.getLogger("tests") + +TEST_INSTANCE_ID = "abc123" + + +def test_send_event_action(): + """Test that send_event creates the correct action""" + + def orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.send_event("target_instance", "my_event", data="test_data") + return "completed" + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + new_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions + + # Should have one action for send_event + assert len(actions) == 1 + assert type(actions[0]) is pb.OrchestratorAction + + action = actions[0] + assert action.WhichOneof("orchestratorActionType") == "sendEvent" + assert action.id == 1 + + send_action = action.sendEvent + assert send_action.instance.instanceId == "target_instance" + assert send_action.name == "my_event" + assert send_action.data.value == json.dumps("test_data") + + +def test_send_event_completion(): + """Test that send_event can complete successfully""" + + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.send_event("target_instance", "my_event", data="test_data") + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + # First execution - should schedule the send_event + old_events = [] + new_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None) + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + assert len(actions) == 1 + action = actions[0] + assert action.WhichOneof("orchestratorActionType") == "sendEvent" + + # Second execution - simulate event sent completion + # The eventSent needs to have the same eventId as the action + event_sent = helpers.new_event_sent_event("target_instance", "my_event", json.dumps("test_data")) + event_sent.eventId = action.id # This is the key - the event ID must match the action ID + + old_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None) + ] + new_events = [event_sent] + + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + # Should have completion action + assert len(actions) == 1 + complete_action = actions[0] + assert complete_action.WhichOneof("orchestratorActionType") == "completeOrchestration" + assert complete_action.completeOrchestration.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED + + +def test_send_event_with_no_data(): + """Test send_event with no data parameter""" + + def orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.send_event("target_instance", "my_event") + return "completed" + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + new_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions + + # Should have one action for send_event + assert len(actions) == 1 + action = actions[0] + send_action = action.sendEvent + assert send_action.instance.instanceId == "target_instance" + assert send_action.name == "my_event" + # data should be None/empty when no data is provided + assert not send_action.HasField("data") or send_action.data.value == "" + + +def test_send_event_multiple(): + """Test sending multiple events in sequence""" + + def orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.send_event("target1", "event1", data="data1") + yield ctx.send_event("target2", "event2", data="data2") + return "completed" + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + new_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions + + # Should have one action for the first send_event + assert len(actions) == 1 + action = actions[0] + assert action.WhichOneof("orchestratorActionType") == "sendEvent" + assert action.sendEvent.instance.instanceId == "target1" + assert action.sendEvent.name == "event1" + assert action.sendEvent.data.value == json.dumps("data1") + + # Complete the first send_event and continue + event_sent = helpers.new_event_sent_event("target1", "event1", json.dumps("data1")) + event_sent.eventId = action.id + + old_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None) + ] + new_events = [event_sent] + + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + # Should have one action for the second send_event + assert len(actions) == 1 + action = actions[0] + assert action.WhichOneof("orchestratorActionType") == "sendEvent" + assert action.sendEvent.instance.instanceId == "target2" + assert action.sendEvent.name == "event2" + assert action.sendEvent.data.value == json.dumps("data2") + + +def test_send_event_with_various_data_types(): + """Test send_event with different data types""" + + def orchestrator(ctx: task.OrchestrationContext, _): + # Test with dict + yield ctx.send_event("target1", "event1", data={"key": "value", "number": 42}) + # Test with list + yield ctx.send_event("target2", "event2", data=[1, 2, 3]) + # Test with number + yield ctx.send_event("target3", "event3", data=123) + # Test with boolean + yield ctx.send_event("target4", "event4", data=True) + return "completed" + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + new_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions + + # Should have one action for the first send_event + assert len(actions) == 1 + action = actions[0] + assert action.WhichOneof("orchestratorActionType") == "sendEvent" + assert action.sendEvent.instance.instanceId == "target1" + assert action.sendEvent.name == "event1" + expected_data = json.dumps({"key": "value", "number": 42}) + assert action.sendEvent.data.value == expected_data From cd9d951a01ddbcc4232d545a1aedf5b022103550 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Jun 2025 20:53:31 +0000 Subject: [PATCH 4/8] Add input validation to send_event following .NET reference implementation Co-authored-by: berndverst <4535280+berndverst@users.noreply.github.com> --- durabletask/worker.py | 6 +++++ tests/durabletask/test_send_event.py | 37 ++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/durabletask/worker.py b/durabletask/worker.py index d0e298e..2d81cb6 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -854,6 +854,12 @@ def wait_for_external_event(self, name: str) -> task.Task: def send_event(self, instance_id: str, event_name: str, *, data: Optional[Any] = None) -> task.Task: + # Validate inputs similar to .NET implementation + if not instance_id: + raise ValueError("instance_id cannot be None or empty") + if not event_name: + raise ValueError("event_name cannot be None or empty") + id = self.next_sequence_number() encoded_data = shared.to_json(data) if data is not None else None action = ph.new_send_event_action(id, instance_id, event_name, encoded_data) diff --git a/tests/durabletask/test_send_event.py b/tests/durabletask/test_send_event.py index 60ee92b..93dd926 100644 --- a/tests/durabletask/test_send_event.py +++ b/tests/durabletask/test_send_event.py @@ -204,3 +204,40 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert action.sendEvent.name == "event1" expected_data = json.dumps({"key": "value", "number": 42}) assert action.sendEvent.data.value == expected_data + + +def test_send_event_validation(): + """Test send_event input validation""" + + def orchestrator_empty_instance(ctx: task.OrchestrationContext, _): + yield ctx.send_event("", "event1", data="test") + return "completed" + + registry = worker._Registry() + + # Test empty instance_id + name1 = registry.add_orchestrator(orchestrator_empty_instance) + new_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name1, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + + # Check if the orchestration failed due to validation error + actions = result.actions + if len(actions) > 0: + action = actions[0] + if action.WhichOneof("orchestratorActionType") == "completeOrchestration": + complete_action = action.completeOrchestration + if complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED: + # The orchestration should have failed with the validation error + failure_details = complete_action.failureDetails + assert "instance_id cannot be None or empty" in failure_details.errorMessage + else: + assert False, "Expected orchestration to fail with validation error" + else: + assert False, "Expected failure completion action, got different action type" + else: + assert False, "Expected at least one action (failure completion)" From 00b49572fd6fa9d8dca86186c60a24736e03c5fe Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Jun 2025 21:08:26 +0000 Subject: [PATCH 5/8] Add advanced orchestration-to-orchestration communication test Co-authored-by: berndverst <4535280+berndverst@users.noreply.github.com> --- tests/durabletask/test_send_event.py | 91 ++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/tests/durabletask/test_send_event.py b/tests/durabletask/test_send_event.py index 93dd926..2cc54d8 100644 --- a/tests/durabletask/test_send_event.py +++ b/tests/durabletask/test_send_event.py @@ -241,3 +241,94 @@ def orchestrator_empty_instance(ctx: task.OrchestrationContext, _): assert False, "Expected failure completion action, got different action type" else: assert False, "Expected at least one action (failure completion)" + + +def test_orchestration_to_orchestration_communication(): + """Test advanced scenario: orchestration sends event to another waiting orchestration""" + + # Define the waiting orchestration that waits for an approval event + def waiting_orchestration(ctx: task.OrchestrationContext, _): + approval_data = yield ctx.wait_for_external_event("approval") + return f"Received approval: {approval_data}" + + # Define the sender orchestration that sends an event to another orchestration + def sender_orchestration(ctx: task.OrchestrationContext, target_instance_id: str): + approval_payload = {"approved": True, "approver": "manager", "timestamp": "2024-01-01T10:00:00Z"} + yield ctx.send_event(target_instance_id, "approval", data=approval_payload) + return "Event sent successfully" + + registry = worker._Registry() + waiting_name = registry.add_orchestrator(waiting_orchestration) + sender_name = registry.add_orchestrator(sender_orchestration) + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + + # Instance IDs for our orchestrations + waiting_instance_id = "waiting-instance-123" + sender_instance_id = "sender-instance-456" + + # Step 1: Start the waiting orchestration + waiting_new_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(waiting_name, waiting_instance_id, encoded_input=None), + ] + waiting_result = executor.execute(waiting_instance_id, [], waiting_new_events) + + # The waiting orchestration should produce no actions when waiting for an external event + assert len(waiting_result.actions) == 0 + + # Step 2: Start the sender orchestration with the waiting instance ID as input + sender_new_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(sender_name, sender_instance_id, + encoded_input=json.dumps(waiting_instance_id)), + ] + sender_result = executor.execute(sender_instance_id, [], sender_new_events) + + # The sender orchestration should yield a send_event action + assert len(sender_result.actions) == 1 + send_action = sender_result.actions[0] + assert send_action.WhichOneof("orchestratorActionType") == "sendEvent" + assert send_action.sendEvent.instance.instanceId == waiting_instance_id + assert send_action.sendEvent.name == "approval" + + # Verify the data payload is correct + expected_payload = {"approved": True, "approver": "manager", "timestamp": "2024-01-01T10:00:00Z"} + assert send_action.sendEvent.data.value == json.dumps(expected_payload) + + # Step 3: Complete the send_event action + event_sent = helpers.new_event_sent_event(waiting_instance_id, "approval", + json.dumps(expected_payload)) + event_sent.eventId = send_action.id + + sender_old_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(sender_name, sender_instance_id, + encoded_input=json.dumps(waiting_instance_id)) + ] + sender_completion_result = executor.execute(sender_instance_id, sender_old_events, [event_sent]) + + # The sender should complete successfully + assert len(sender_completion_result.actions) == 1 + sender_complete_action = sender_completion_result.actions[0] + assert sender_complete_action.WhichOneof("orchestratorActionType") == "completeOrchestration" + assert sender_complete_action.completeOrchestration.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert sender_complete_action.completeOrchestration.result.value == json.dumps("Event sent successfully") + + # Step 4: Simulate the event being raised to the waiting orchestration + event_raised = helpers.new_event_raised_event("approval", json.dumps(expected_payload)) + + waiting_old_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(waiting_name, waiting_instance_id, encoded_input=None) + ] + waiting_completion_result = executor.execute(waiting_instance_id, waiting_old_events, [event_raised]) + + # The waiting orchestration should complete with the received data + assert len(waiting_completion_result.actions) == 1 + waiting_complete_action = waiting_completion_result.actions[0] + assert waiting_complete_action.WhichOneof("orchestratorActionType") == "completeOrchestration" + assert waiting_complete_action.completeOrchestration.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED + + # Verify the data was passed correctly through the event + expected_result = f"Received approval: {expected_payload}" + assert waiting_complete_action.completeOrchestration.result.value == json.dumps(expected_result) From 7ba38cef96afc970cc79b6988d05b7df2294119e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Jun 2025 21:17:55 +0000 Subject: [PATCH 6/8] Fix linter error: remove whitespace from blank line in worker.py Co-authored-by: berndverst <4535280+berndverst@users.noreply.github.com> --- durabletask/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index 2d81cb6..40f9f08 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -859,7 +859,7 @@ def send_event(self, instance_id: str, event_name: str, *, raise ValueError("instance_id cannot be None or empty") if not event_name: raise ValueError("event_name cannot be None or empty") - + id = self.next_sequence_number() encoded_data = shared.to_json(data) if data is not None else None action = ph.new_send_event_action(id, instance_id, event_name, encoded_data) From ccfbefdd624f4e153a9a246a7fe6f97a299dafae Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Jun 2025 21:24:50 +0000 Subject: [PATCH 7/8] Fix flake8 linting errors with autopep8 formatting Co-authored-by: berndverst <4535280+berndverst@users.noreply.github.com> --- tests/durabletask/test_send_event.py | 50 ++++++++++++++-------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/tests/durabletask/test_send_event.py b/tests/durabletask/test_send_event.py index 2cc54d8..a675e3e 100644 --- a/tests/durabletask/test_send_event.py +++ b/tests/durabletask/test_send_event.py @@ -214,7 +214,7 @@ def orchestrator_empty_instance(ctx: task.OrchestrationContext, _): return "completed" registry = worker._Registry() - + # Test empty instance_id name1 = registry.add_orchestrator(orchestrator_empty_instance) new_events = [ @@ -222,9 +222,9 @@ def orchestrator_empty_instance(ctx: task.OrchestrationContext, _): helpers.new_execution_started_event(name1, TEST_INSTANCE_ID, encoded_input=None), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - + result = executor.execute(TEST_INSTANCE_ID, [], new_events) - + # Check if the orchestration failed due to validation error actions = result.actions if len(actions) > 0: @@ -245,90 +245,90 @@ def orchestrator_empty_instance(ctx: task.OrchestrationContext, _): def test_orchestration_to_orchestration_communication(): """Test advanced scenario: orchestration sends event to another waiting orchestration""" - + # Define the waiting orchestration that waits for an approval event def waiting_orchestration(ctx: task.OrchestrationContext, _): approval_data = yield ctx.wait_for_external_event("approval") return f"Received approval: {approval_data}" - + # Define the sender orchestration that sends an event to another orchestration def sender_orchestration(ctx: task.OrchestrationContext, target_instance_id: str): approval_payload = {"approved": True, "approver": "manager", "timestamp": "2024-01-01T10:00:00Z"} yield ctx.send_event(target_instance_id, "approval", data=approval_payload) return "Event sent successfully" - + registry = worker._Registry() waiting_name = registry.add_orchestrator(waiting_orchestration) sender_name = registry.add_orchestrator(sender_orchestration) executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - + # Instance IDs for our orchestrations waiting_instance_id = "waiting-instance-123" sender_instance_id = "sender-instance-456" - + # Step 1: Start the waiting orchestration waiting_new_events = [ helpers.new_orchestrator_started_event(), helpers.new_execution_started_event(waiting_name, waiting_instance_id, encoded_input=None), ] waiting_result = executor.execute(waiting_instance_id, [], waiting_new_events) - + # The waiting orchestration should produce no actions when waiting for an external event assert len(waiting_result.actions) == 0 - + # Step 2: Start the sender orchestration with the waiting instance ID as input sender_new_events = [ helpers.new_orchestrator_started_event(), - helpers.new_execution_started_event(sender_name, sender_instance_id, - encoded_input=json.dumps(waiting_instance_id)), + helpers.new_execution_started_event(sender_name, sender_instance_id, + encoded_input=json.dumps(waiting_instance_id)), ] sender_result = executor.execute(sender_instance_id, [], sender_new_events) - + # The sender orchestration should yield a send_event action assert len(sender_result.actions) == 1 send_action = sender_result.actions[0] assert send_action.WhichOneof("orchestratorActionType") == "sendEvent" assert send_action.sendEvent.instance.instanceId == waiting_instance_id assert send_action.sendEvent.name == "approval" - + # Verify the data payload is correct expected_payload = {"approved": True, "approver": "manager", "timestamp": "2024-01-01T10:00:00Z"} assert send_action.sendEvent.data.value == json.dumps(expected_payload) - + # Step 3: Complete the send_event action - event_sent = helpers.new_event_sent_event(waiting_instance_id, "approval", - json.dumps(expected_payload)) + event_sent = helpers.new_event_sent_event(waiting_instance_id, "approval", + json.dumps(expected_payload)) event_sent.eventId = send_action.id - + sender_old_events = [ helpers.new_orchestrator_started_event(), - helpers.new_execution_started_event(sender_name, sender_instance_id, - encoded_input=json.dumps(waiting_instance_id)) + helpers.new_execution_started_event(sender_name, sender_instance_id, + encoded_input=json.dumps(waiting_instance_id)) ] sender_completion_result = executor.execute(sender_instance_id, sender_old_events, [event_sent]) - + # The sender should complete successfully assert len(sender_completion_result.actions) == 1 sender_complete_action = sender_completion_result.actions[0] assert sender_complete_action.WhichOneof("orchestratorActionType") == "completeOrchestration" assert sender_complete_action.completeOrchestration.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED assert sender_complete_action.completeOrchestration.result.value == json.dumps("Event sent successfully") - + # Step 4: Simulate the event being raised to the waiting orchestration event_raised = helpers.new_event_raised_event("approval", json.dumps(expected_payload)) - + waiting_old_events = [ helpers.new_orchestrator_started_event(), helpers.new_execution_started_event(waiting_name, waiting_instance_id, encoded_input=None) ] waiting_completion_result = executor.execute(waiting_instance_id, waiting_old_events, [event_raised]) - + # The waiting orchestration should complete with the received data assert len(waiting_completion_result.actions) == 1 waiting_complete_action = waiting_completion_result.actions[0] assert waiting_complete_action.WhichOneof("orchestratorActionType") == "completeOrchestration" assert waiting_complete_action.completeOrchestration.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED - + # Verify the data was passed correctly through the event expected_result = f"Received approval: {expected_payload}" assert waiting_complete_action.completeOrchestration.result.value == json.dumps(expected_result) From 58d88c7c23df6d04bcf66665891cab26c3f0c8b0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Jun 2025 06:14:49 +0000 Subject: [PATCH 8/8] Make send_event a one-way operation returning None Co-authored-by: cgillum <2704139+cgillum@users.noreply.github.com> --- durabletask/task.py | 7 +- durabletask/worker.py | 27 ++-- tests/durabletask/test_send_event.py | 191 ++++++++++++--------------- 3 files changed, 92 insertions(+), 133 deletions(-) diff --git a/durabletask/task.py b/durabletask/task.py index eefea19..d8607f8 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -165,7 +165,7 @@ def wait_for_external_event(self, name: str) -> Task: @abstractmethod def send_event(self, instance_id: str, event_name: str, *, - data: Optional[Any] = None) -> Task: + data: Optional[Any] = None) -> None: """Send an event to another orchestration instance. Parameters @@ -176,11 +176,6 @@ def send_event(self, instance_id: str, event_name: str, *, The name of the event to send. data : Optional[Any] The optional JSON-serializable data to include with the event. - - Returns - ------- - Task - A Durable Task that completes when the event has been sent. """ pass diff --git a/durabletask/worker.py b/durabletask/worker.py index 40f9f08..e69db30 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -629,13 +629,16 @@ def set_complete( result: Any, status: pb.OrchestrationStatus, is_result_encoded: bool = False, + preserve_actions: bool = False, ): if self._is_complete: return self._is_complete = True self._completion_status = status - self._pending_actions.clear() # Cancel any pending actions + + if not preserve_actions: + self._pending_actions.clear() # Cancel any pending actions self._result = result result_json: Optional[str] = None @@ -853,8 +856,7 @@ def wait_for_external_event(self, name: str) -> task.Task: return external_event_task def send_event(self, instance_id: str, event_name: str, *, - data: Optional[Any] = None) -> task.Task: - # Validate inputs similar to .NET implementation + data: Optional[Any] = None) -> None: if not instance_id: raise ValueError("instance_id cannot be None or empty") if not event_name: @@ -864,9 +866,6 @@ def send_event(self, instance_id: str, event_name: str, *, encoded_data = shared.to_json(data) if data is not None else None action = ph.new_send_event_action(id, instance_id, event_name, encoded_data) self._pending_actions[id] = action - send_event_task = task.CompletableTask() - self._pending_tasks[id] = send_event_task - return send_event_task def continue_as_new(self, new_input, *, save_events: bool = False) -> None: if self._is_complete: @@ -989,8 +988,9 @@ def process_event( # Start the orchestrator's generator function ctx.run(result) else: - # This is an orchestrator that doesn't schedule any tasks - ctx.set_complete(result, pb.ORCHESTRATION_STATUS_COMPLETED) + # This is an orchestrator that doesn't use generators (async tasks) + # but it may have scheduled actions like send_event + ctx.set_complete(result, pb.ORCHESTRATION_STATUS_COMPLETED, preserve_actions=True) elif event.HasField("timerCreated"): # This history event confirms that the timer was successfully scheduled. # Remove the timerCreated event from the pending action list so we don't schedule it again. @@ -1204,17 +1204,6 @@ def process_event( self._logger.info( f"{ctx.instance_id}: Event '{event_name}' has been buffered as there are no tasks waiting for it." ) - elif event.HasField("eventSent"): - # This history event confirms that the event was successfully sent. - # Complete the corresponding send_event task. - event_id = event.eventId - send_event_task = ctx._pending_tasks.pop(event_id, None) - if send_event_task: - # For send_event, we don't return any meaningful result, just completion - send_event_task.complete(None) - ctx.resume() - # Also remove the corresponding action from pending actions - ctx._pending_actions.pop(event_id, None) elif event.HasField("executionSuspended"): if not self._is_suspended and not ctx.is_replaying: self._logger.info(f"{ctx.instance_id}: Execution suspended.") diff --git a/tests/durabletask/test_send_event.py b/tests/durabletask/test_send_event.py index a675e3e..08a8326 100644 --- a/tests/durabletask/test_send_event.py +++ b/tests/durabletask/test_send_event.py @@ -21,7 +21,7 @@ def test_send_event_action(): """Test that send_event creates the correct action""" def orchestrator(ctx: task.OrchestrationContext, _): - yield ctx.send_event("target_instance", "my_event", data="test_data") + ctx.send_event("target_instance", "my_event", data="test_data") return "completed" registry = worker._Registry() @@ -35,10 +35,12 @@ def orchestrator(ctx: task.OrchestrationContext, _): result = executor.execute(TEST_INSTANCE_ID, [], new_events) actions = result.actions - # Should have one action for send_event - assert len(actions) == 1 + # Should have two actions: send_event and completion + assert len(actions) == 2 assert type(actions[0]) is pb.OrchestratorAction + assert type(actions[1]) is pb.OrchestratorAction + # First action should be send_event action = actions[0] assert action.WhichOneof("orchestratorActionType") == "sendEvent" assert action.id == 1 @@ -48,57 +50,17 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert send_action.name == "my_event" assert send_action.data.value == json.dumps("test_data") - -def test_send_event_completion(): - """Test that send_event can complete successfully""" - - def orchestrator(ctx: task.OrchestrationContext, _): - result = yield ctx.send_event("target_instance", "my_event", data="test_data") - return result - - registry = worker._Registry() - name = registry.add_orchestrator(orchestrator) - - # First execution - should schedule the send_event - old_events = [] - new_events = [ - helpers.new_orchestrator_started_event(), - helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None) - ] - executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) - actions = result.actions - - assert len(actions) == 1 - action = actions[0] - assert action.WhichOneof("orchestratorActionType") == "sendEvent" - - # Second execution - simulate event sent completion - # The eventSent needs to have the same eventId as the action - event_sent = helpers.new_event_sent_event("target_instance", "my_event", json.dumps("test_data")) - event_sent.eventId = action.id # This is the key - the event ID must match the action ID - - old_events = [ - helpers.new_orchestrator_started_event(), - helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None) - ] - new_events = [event_sent] - - result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) - actions = result.actions - - # Should have completion action - assert len(actions) == 1 - complete_action = actions[0] - assert complete_action.WhichOneof("orchestratorActionType") == "completeOrchestration" - assert complete_action.completeOrchestration.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED + # Second action should be completion + completion_action = actions[1] + assert completion_action.WhichOneof("orchestratorActionType") == "completeOrchestration" + assert completion_action.completeOrchestration.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED def test_send_event_with_no_data(): """Test send_event with no data parameter""" def orchestrator(ctx: task.OrchestrationContext, _): - yield ctx.send_event("target_instance", "my_event") + ctx.send_event("target_instance", "my_event") return "completed" registry = worker._Registry() @@ -112,22 +74,27 @@ def orchestrator(ctx: task.OrchestrationContext, _): result = executor.execute(TEST_INSTANCE_ID, [], new_events) actions = result.actions - # Should have one action for send_event - assert len(actions) == 1 + # Should have two actions: send_event and completion + assert len(actions) == 2 action = actions[0] + assert action.WhichOneof("orchestratorActionType") == "sendEvent" send_action = action.sendEvent assert send_action.instance.instanceId == "target_instance" assert send_action.name == "my_event" # data should be None/empty when no data is provided assert not send_action.HasField("data") or send_action.data.value == "" + # Second action should be completion + completion_action = actions[1] + assert completion_action.WhichOneof("orchestratorActionType") == "completeOrchestration" + def test_send_event_multiple(): """Test sending multiple events in sequence""" def orchestrator(ctx: task.OrchestrationContext, _): - yield ctx.send_event("target1", "event1", data="data1") - yield ctx.send_event("target2", "event2", data="data2") + ctx.send_event("target1", "event1", data="data1") + ctx.send_event("target2", "event2", data="data2") return "completed" registry = worker._Registry() @@ -141,34 +108,27 @@ def orchestrator(ctx: task.OrchestrationContext, _): result = executor.execute(TEST_INSTANCE_ID, [], new_events) actions = result.actions - # Should have one action for the first send_event - assert len(actions) == 1 - action = actions[0] - assert action.WhichOneof("orchestratorActionType") == "sendEvent" - assert action.sendEvent.instance.instanceId == "target1" - assert action.sendEvent.name == "event1" - assert action.sendEvent.data.value == json.dumps("data1") - - # Complete the first send_event and continue - event_sent = helpers.new_event_sent_event("target1", "event1", json.dumps("data1")) - event_sent.eventId = action.id + # Should have two actions for both send_event calls and one completion action + assert len(actions) == 3 - old_events = [ - helpers.new_orchestrator_started_event(), - helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None) - ] - new_events = [event_sent] + # First action: send_event to target1 + action1 = actions[0] + assert action1.WhichOneof("orchestratorActionType") == "sendEvent" + assert action1.sendEvent.instance.instanceId == "target1" + assert action1.sendEvent.name == "event1" + assert action1.sendEvent.data.value == json.dumps("data1") - result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) - actions = result.actions + # Second action: send_event to target2 + action2 = actions[1] + assert action2.WhichOneof("orchestratorActionType") == "sendEvent" + assert action2.sendEvent.instance.instanceId == "target2" + assert action2.sendEvent.name == "event2" + assert action2.sendEvent.data.value == json.dumps("data2") - # Should have one action for the second send_event - assert len(actions) == 1 - action = actions[0] - assert action.WhichOneof("orchestratorActionType") == "sendEvent" - assert action.sendEvent.instance.instanceId == "target2" - assert action.sendEvent.name == "event2" - assert action.sendEvent.data.value == json.dumps("data2") + # Third action: completion + action3 = actions[2] + assert action3.WhichOneof("orchestratorActionType") == "completeOrchestration" + assert action3.completeOrchestration.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED def test_send_event_with_various_data_types(): @@ -176,13 +136,13 @@ def test_send_event_with_various_data_types(): def orchestrator(ctx: task.OrchestrationContext, _): # Test with dict - yield ctx.send_event("target1", "event1", data={"key": "value", "number": 42}) + ctx.send_event("target1", "event1", data={"key": "value", "number": 42}) # Test with list - yield ctx.send_event("target2", "event2", data=[1, 2, 3]) + ctx.send_event("target2", "event2", data=[1, 2, 3]) # Test with number - yield ctx.send_event("target3", "event3", data=123) + ctx.send_event("target3", "event3", data=123) # Test with boolean - yield ctx.send_event("target4", "event4", data=True) + ctx.send_event("target4", "event4", data=True) return "completed" registry = worker._Registry() @@ -196,21 +156,49 @@ def orchestrator(ctx: task.OrchestrationContext, _): result = executor.execute(TEST_INSTANCE_ID, [], new_events) actions = result.actions - # Should have one action for the first send_event - assert len(actions) == 1 - action = actions[0] - assert action.WhichOneof("orchestratorActionType") == "sendEvent" - assert action.sendEvent.instance.instanceId == "target1" - assert action.sendEvent.name == "event1" + # Should have four send_event actions and one completion action + assert len(actions) == 5 + + # First action: dict data + action1 = actions[0] + assert action1.WhichOneof("orchestratorActionType") == "sendEvent" + assert action1.sendEvent.instance.instanceId == "target1" + assert action1.sendEvent.name == "event1" expected_data = json.dumps({"key": "value", "number": 42}) - assert action.sendEvent.data.value == expected_data + assert action1.sendEvent.data.value == expected_data + + # Second action: list data + action2 = actions[1] + assert action2.WhichOneof("orchestratorActionType") == "sendEvent" + assert action2.sendEvent.instance.instanceId == "target2" + assert action2.sendEvent.name == "event2" + assert action2.sendEvent.data.value == json.dumps([1, 2, 3]) + + # Third action: number data + action3 = actions[2] + assert action3.WhichOneof("orchestratorActionType") == "sendEvent" + assert action3.sendEvent.instance.instanceId == "target3" + assert action3.sendEvent.name == "event3" + assert action3.sendEvent.data.value == json.dumps(123) + + # Fourth action: boolean data + action4 = actions[3] + assert action4.WhichOneof("orchestratorActionType") == "sendEvent" + assert action4.sendEvent.instance.instanceId == "target4" + assert action4.sendEvent.name == "event4" + assert action4.sendEvent.data.value == json.dumps(True) + + # Fifth action: completion + action5 = actions[4] + assert action5.WhichOneof("orchestratorActionType") == "completeOrchestration" + assert action5.completeOrchestration.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED def test_send_event_validation(): """Test send_event input validation""" def orchestrator_empty_instance(ctx: task.OrchestrationContext, _): - yield ctx.send_event("", "event1", data="test") + ctx.send_event("", "event1", data="test") return "completed" registry = worker._Registry() @@ -254,7 +242,7 @@ def waiting_orchestration(ctx: task.OrchestrationContext, _): # Define the sender orchestration that sends an event to another orchestration def sender_orchestration(ctx: task.OrchestrationContext, target_instance_id: str): approval_payload = {"approved": True, "approver": "manager", "timestamp": "2024-01-01T10:00:00Z"} - yield ctx.send_event(target_instance_id, "approval", data=approval_payload) + ctx.send_event(target_instance_id, "approval", data=approval_payload) return "Event sent successfully" registry = worker._Registry() @@ -284,8 +272,8 @@ def sender_orchestration(ctx: task.OrchestrationContext, target_instance_id: str ] sender_result = executor.execute(sender_instance_id, [], sender_new_events) - # The sender orchestration should yield a send_event action - assert len(sender_result.actions) == 1 + # The sender orchestration should produce a send_event action and complete immediately + assert len(sender_result.actions) == 2 send_action = sender_result.actions[0] assert send_action.WhichOneof("orchestratorActionType") == "sendEvent" assert send_action.sendEvent.instance.instanceId == waiting_instance_id @@ -295,26 +283,13 @@ def sender_orchestration(ctx: task.OrchestrationContext, target_instance_id: str expected_payload = {"approved": True, "approver": "manager", "timestamp": "2024-01-01T10:00:00Z"} assert send_action.sendEvent.data.value == json.dumps(expected_payload) - # Step 3: Complete the send_event action - event_sent = helpers.new_event_sent_event(waiting_instance_id, "approval", - json.dumps(expected_payload)) - event_sent.eventId = send_action.id - - sender_old_events = [ - helpers.new_orchestrator_started_event(), - helpers.new_execution_started_event(sender_name, sender_instance_id, - encoded_input=json.dumps(waiting_instance_id)) - ] - sender_completion_result = executor.execute(sender_instance_id, sender_old_events, [event_sent]) - - # The sender should complete successfully - assert len(sender_completion_result.actions) == 1 - sender_complete_action = sender_completion_result.actions[0] + # The sender should also complete successfully in the same execution + sender_complete_action = sender_result.actions[1] assert sender_complete_action.WhichOneof("orchestratorActionType") == "completeOrchestration" assert sender_complete_action.completeOrchestration.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED assert sender_complete_action.completeOrchestration.result.value == json.dumps("Event sent successfully") - # Step 4: Simulate the event being raised to the waiting orchestration + # Step 3: Simulate the event being raised to the waiting orchestration event_raised = helpers.new_event_raised_event("approval", json.dumps(expected_payload)) waiting_old_events = [